Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Trigger offload on topic load #22652

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -1207,6 +1207,9 @@ managedLedgerDataReadPriority=tiered-storage-first
# (default is -1, which is disabled)
managedLedgerOffloadThresholdInSeconds=-1

# Trigger offload on topic load or not. Default is false.
# triggerOffloadOnTopicLoad=false

# Max number of entries to append to a cursor ledger
managedLedgerCursorMaxEntriesPerLedger=50000

Expand Down
3 changes: 3 additions & 0 deletions conf/standalone.conf
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,9 @@ managedLedgerPrometheusStatsLatencyRolloverSeconds=60
# Whether trace managed ledger task execution time
managedLedgerTraceTaskExecution=true

# Trigger offload on topic load or not. Default is false.
# triggerOffloadOnTopicLoad=false

# If you want to custom bookie ID or use a dynamic network address for the bookie,
# you can set this option.
# Bookie advertises itself using bookieId rather than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public class ManagedLedgerConfig {
private int minimumBacklogCursorsForCaching = 0;
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
private boolean triggerOffloadOnTopicLoad = false;

@Getter
@Setter
Expand Down Expand Up @@ -748,6 +749,22 @@ public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsFo
this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching;
}

/**
* Trigger offload on topic load.
* @return
*/
public boolean isTriggerOffloadOnTopicLoad() {
return triggerOffloadOnTopicLoad;
}

/**
* Set trigger offload on topic load.
* @param triggerOffloadOnTopicLoad
*/
public void setTriggerOffloadOnTopicLoad(boolean triggerOffloadOnTopicLoad) {
this.triggerOffloadOnTopicLoad = triggerOffloadOnTopicLoad;
}

public String getShadowSource() {
return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
Expand Down Expand Up @@ -395,6 +396,10 @@ public void initializeComplete() {

// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
// May need to trigger offloading
if (config.isTriggerOffloadOnTopicLoad()) {
newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex trimmerMutex = new CallbackMutex();

private final CallbackMutex offloadMutex = new CallbackMutex();
private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
public static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionImpl.LATEST);
protected volatile LedgerHandle currentLedger;
protected volatile long currentLedgerEntries = 0;
Expand Down Expand Up @@ -2472,7 +2472,7 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture<?> p
100, TimeUnit.MILLISECONDS);
}

private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
public void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
|| config.getLedgerOffloader().getOffloadPolicies() == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2083,10 +2083,15 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece
)
private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "The threshold to triggering automatic offload to long term storage"
category = CATEGORY_STORAGE_OFFLOADING,
doc = "The threshold to triggering automatic offload to long term storage"
)
private long managedLedgerOffloadThresholdInSeconds = -1L;
@FieldContext(
category = CATEGORY_STORAGE_OFFLOADING,
doc = "Trigger offload on topic load or not. Default is false"
)
private boolean triggerOffloadOnTopicLoad = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Max number of entries to append to a cursor ledger"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1980,6 +1980,7 @@ private CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(@Nonnull T
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}
}
managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad());

managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());
Expand Down
Loading