diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index ab71ea55efeec..2785af3651426 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; + +import lombok.Setter; import org.apache.bookkeeper.client.api.LedgerEntry; import org.apache.bookkeeper.client.api.ReadHandle; import org.apache.bookkeeper.client.impl.LedgerEntryImpl; @@ -45,9 +47,12 @@ @SuppressWarnings("checkstyle:javadoctype") public class EntryCacheManager { - private final long maxSize; - private final long evictionTriggerThreshold; - private final double cacheEvictionWatermark; + @Setter + private volatile long maxSize; + @Setter + private volatile long evictionTriggerThreshold; + @Setter + private volatile double cacheEvictionWatermark; private final AtomicLong currentSize = new AtomicLong(0); private final ConcurrentMap caches = Maps.newConcurrentMap(); private final EntryCacheEvictionPolicy evictionPolicy; @@ -59,7 +64,8 @@ public class EntryCacheManager { protected static final double MB = 1024 * 1024; - private static final double evictionTriggerThresholdPercent = 0.98; + @Setter + private volatile double evictionTriggerThresholdPercent = 0.98; public EntryCacheManager(ManagedLedgerFactoryImpl factory) { diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index f183d1ce8414a..2b58a2b8853b1 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -40,6 +40,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; import lombok.Getter; +import lombok.Setter; import org.apache.bookkeeper.client.BKException; import org.apache.bookkeeper.client.BookKeeper; import org.apache.bookkeeper.common.util.OrderedScheduler; @@ -104,7 +105,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final ScheduledFuture statsTask; private final ScheduledFuture flushCursorsTask; - private final long cacheEvictionTimeThresholdNanos; + @Setter + private volatile long cacheEvictionTimeThresholdNanos; private final MetadataStore metadataStore; //indicate whether shutdown() is called. diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 19ba26857534f..2129eb22b405e 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1678,16 +1678,26 @@ public class ServiceConfiguration implements PulsarConfiguration { private int managedLedgerMaxAckQuorum = 5; @FieldContext( category = CATEGORY_STORAGE_ML, + dynamic = true, doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis" + " memory is allocated from JVM direct memory and it's shared across all the topics" + " running in the same broker. By default, uses 1/5th of available direct memory") private int managedLedgerCacheSizeMB = Math.max(64, (int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024))); + + @FieldContext( + category = CATEGORY_STORAGE_ML, + dynamic = true, + doc = "Threshold to evicteis triggered" + ) + private double evictionTriggerThresholdPercent = 0.98; + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when " + "inserting in cache") private boolean managedLedgerCacheCopyEntries = false; @FieldContext( category = CATEGORY_STORAGE_ML, + dynamic = true, doc = "Threshold to which bring down the cache level when eviction is triggered" ) private double managedLedgerCacheEvictionWatermark = 0.9; @@ -1695,6 +1705,7 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s") private double managedLedgerCacheEvictionFrequency = 100.0; @FieldContext(category = CATEGORY_STORAGE_ML, + dynamic = true, doc = "All entries that have stayed in cache for more than the configured time, will be evicted") private long managedLedgerCacheEvictionTimeThresholdMillis = 1000; @FieldContext(category = CATEGORY_STORAGE_ML, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index 40caab783d071..95eb65772085b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -88,6 +88,7 @@ import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; import org.apache.bookkeeper.mledger.ManagedLedgerFactory; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl; import org.apache.bookkeeper.mledger.util.Futures; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; @@ -2132,6 +2133,37 @@ private void updateConfigurationAndRegisterListeners() { } }); }); + + // add listener to notify broker managedLedgerCacheSizeMB dynamic config + registerConfigurationListener("managedLedgerCacheSizeMB", (managedLedgerCacheSizeMB) -> { + long maxSize = (long) managedLedgerCacheSizeMB * 1024L * 1024L; + double thresholdPercent = pulsar().getConfiguration().getEvictionTriggerThresholdPercent(); + + updateCacheSizeAndThreshold(maxSize, thresholdPercent); + }); + + // add listener to notify broker evictionTriggerThresholdPercent dynamic config + registerConfigurationListener("evictionTriggerThresholdPercent", (evictionTriggerThresholdPercent) -> { + long maxSize = pulsar().getConfiguration().getManagedLedgerCacheSizeMB() * 1024L * 1024L; + double thresholdPercent = (double) evictionTriggerThresholdPercent; + + updateCacheSizeAndThreshold(maxSize, thresholdPercent); + }); + + // add listener to notify broker managedLedgerCacheEvictionWatermark dynamic config + registerConfigurationListener("managedLedgerCacheEvictionWatermark", (cacheEvictionWatermark) -> { + ManagedLedgerFactoryImpl managedLedgerFactory = (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(); + managedLedgerFactory.getEntryCacheManager().setCacheEvictionWatermark((double) cacheEvictionWatermark); + }); + + // add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config + registerConfigurationListener("managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdNanos) -> { + ManagedLedgerFactoryImpl managedLedgerFactory = (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(); + managedLedgerFactory.setCacheEvictionTimeThresholdNanos(TimeUnit.MILLISECONDS + .toNanos((long) cacheEvictionTimeThresholdNanos)); + }); + + // add listener to update message-dispatch-rate in msg for topic registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> { updateTopicMessageDispatchRate(); @@ -2233,6 +2265,12 @@ private void updateBrokerPublisherThrottlingMaxRate() { } } + private void updateCacheSizeAndThreshold(long maxSize, double thresholdPercent) { + ManagedLedgerFactoryImpl managedLedgerFactory = (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory(); + managedLedgerFactory.getEntryCacheManager().setMaxSize(maxSize); + managedLedgerFactory.getEntryCacheManager().setEvictionTriggerThreshold((long) (maxSize * thresholdPercent)); + } + private void updateTopicMessageDispatchRate() { this.pulsar().getExecutor().execute(() -> { // update message-rate for each topic