diff --git a/conf/broker.conf b/conf/broker.conf index 1ef68a0395cef..2a9641b5b90b8 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -1211,6 +1211,9 @@ managedLedgerDataReadPriority=tiered-storage-first # (default is -1, which is disabled) managedLedgerOffloadThresholdInSeconds=-1 +# Trigger offload on topic load or not. Default is false. +# triggerOffloadOnTopicLoad=false + # Max number of entries to append to a cursor ledger managedLedgerCursorMaxEntriesPerLedger=50000 diff --git a/conf/standalone.conf b/conf/standalone.conf index a8615b70293d6..7c6aeb6815d6b 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -835,6 +835,9 @@ managedLedgerPrometheusStatsLatencyRolloverSeconds=60 # Whether trace managed ledger task execution time managedLedgerTraceTaskExecution=true +# Trigger offload on topic load or not. Default is false. +# triggerOffloadOnTopicLoad=false + # If you want to custom bookie ID or use a dynamic network address for the bookie, # you can set this option. # Bookie advertises itself using bookieId rather than diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java index 0c93a5b642cf6..fb2c6de3c7423 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java @@ -85,6 +85,7 @@ public class ManagedLedgerConfig { private int minimumBacklogCursorsForCaching = 0; private int minimumBacklogEntriesForCaching = 1000; private int maxBacklogBetweenCursorsForCaching = 1000; + private boolean triggerOffloadOnTopicLoad = false; @Getter @Setter @@ -748,6 +749,22 @@ public void setMaxBacklogBetweenCursorsForCaching(int maxBacklogBetweenCursorsFo this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching; } + /** + * Trigger offload on topic load. + * @return + */ + public boolean isTriggerOffloadOnTopicLoad() { + return triggerOffloadOnTopicLoad; + } + + /** + * Set trigger offload on topic load. + * @param triggerOffloadOnTopicLoad + */ + public void setTriggerOffloadOnTopicLoad(boolean triggerOffloadOnTopicLoad) { + this.triggerOffloadOnTopicLoad = triggerOffloadOnTopicLoad; + } + public String getShadowSource() { return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index 5ce84b3ed850a..d867f2f4c0221 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException; +import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE; import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; import com.google.common.base.Predicates; import com.google.common.collect.Maps; @@ -395,6 +396,10 @@ public void initializeComplete() { // May need to update the cursor position newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger(); + // May need to trigger offloading + if (config.isTriggerOffloadOnTopicLoad()) { + newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE); + } } @Override diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index b3426692df308..681441bf73839 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -212,7 +212,7 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback { private final CallbackMutex trimmerMutex = new CallbackMutex(); private final CallbackMutex offloadMutex = new CallbackMutex(); - private static final CompletableFuture NULL_OFFLOAD_PROMISE = CompletableFuture + public static final CompletableFuture NULL_OFFLOAD_PROMISE = CompletableFuture .completedFuture(PositionImpl.LATEST); protected volatile LedgerHandle currentLedger; protected volatile long currentLedgerEntries = 0; @@ -2469,7 +2469,7 @@ private void scheduleDeferredTrimming(boolean isTruncate, CompletableFuture p 100, TimeUnit.MILLISECONDS); } - private void maybeOffloadInBackground(CompletableFuture promise) { + public void maybeOffloadInBackground(CompletableFuture promise) { if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE || config.getLedgerOffloader().getOffloadPolicies() == null) { return; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 9efe185650969..6f03bef30548e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -2090,10 +2090,15 @@ The max allowed delay for delayed delivery (in milliseconds). If the broker rece ) private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L; @FieldContext( - category = CATEGORY_STORAGE_OFFLOADING, - doc = "The threshold to triggering automatic offload to long term storage" + category = CATEGORY_STORAGE_OFFLOADING, + doc = "The threshold to triggering automatic offload to long term storage" ) private long managedLedgerOffloadThresholdInSeconds = -1L; + @FieldContext( + category = CATEGORY_STORAGE_OFFLOADING, + doc = "Trigger offload on topic load or not. Default is false" + ) + private boolean triggerOffloadOnTopicLoad = false; @FieldContext( category = CATEGORY_STORAGE_ML, doc = "Max number of entries to append to a cursor ledger" diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 6603e240ee7d9..9a08578ee4088 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -1986,6 +1986,7 @@ private CompletableFuture getManagedLedgerConfig(@Nonnull T .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); } } + managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad()); managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled( serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());