Skip to content

Commit

Permalink
[improve] [broker] Trigger offload on topic load (#22652)
Browse files Browse the repository at this point in the history
  • Loading branch information
hangc0276 committed May 17, 2024
1 parent e35c00e commit 400a286
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 4 deletions.
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1211,6 +1211,9 @@ managedLedgerDataReadPriority=tiered-storage-first
# (default is -1, which is disabled)
managedLedgerOffloadThresholdInSeconds=-1

# Trigger offload on topic load or not. Default is false.
# triggerOffloadOnTopicLoad=false

# Max number of entries to append to a cursor ledger
managedLedgerCursorMaxEntriesPerLedger=50000

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -835,6 +835,9 @@ managedLedgerPrometheusStatsLatencyRolloverSeconds=60
# Whether trace managed ledger task execution time
managedLedgerTraceTaskExecution=true

# Trigger offload on topic load or not. Default is false.
# triggerOffloadOnTopicLoad=false

# If you want to custom bookie ID or use a dynamic network address for the bookie,
# you can set this option.
# Bookie advertises itself using bookieId rather than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class ManagedLedgerConfig {
private int minimumBacklogCursorsForCaching = 0;
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
private boolean triggerOffloadOnTopicLoad = false;

@Getter
@Setter
Expand Down Expand Up @@ -748,6 +749,22 @@ public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsFo
this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching;
}

/**
* Trigger offload on topic load.
* @return
*/
public boolean isTriggerOffloadOnTopicLoad() {
return triggerOffloadOnTopicLoad;
}

/**
* Set trigger offload on topic load.
* @param triggerOffloadOnTopicLoad
*/
public void setTriggerOffloadOnTopicLoad(boolean triggerOffloadOnTopicLoad) {
this.triggerOffloadOnTopicLoad = triggerOffloadOnTopicLoad;
}

public String getShadowSource() {
return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -395,6 +396,10 @@ public void initializeComplete() {

// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex trimmerMutex = new CallbackMutex();

private final CallbackMutex offloadMutex = new CallbackMutex();
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
public static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionImpl.LATEST);
protected volatile LedgerHandle currentLedger;
protected volatile long currentLedgerEntries = 0;
Expand Down Expand Up @@ -2469,7 +2469,7 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
100, TimeUnit.MILLISECONDS);
}

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
public void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
|| config.getLedgerOffloader().getOffloadPolicies() == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2090,10 +2090,15 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "The threshold to triggering automatic offload to long term storage"
category = CATEGORY_STORAGE_OFFLOADING,
doc = "The threshold to triggering automatic offload to long term storage"
)
private long managedLedgerOffloadThresholdInSeconds = -1L;
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Trigger offload on topic load or not. Default is false"
)
private boolean triggerOffloadOnTopicLoad = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Max number of entries to append to a cursor ledger"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1986,6 +1986,7 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}
}
managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad());

managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
Expand Down

0 comments on commit 400a286

Please sign in to comment.