diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java index 1b6d80df64579..ea78987eb3ea7 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java @@ -26,6 +26,7 @@ import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback; +import org.apache.bookkeeper.mledger.impl.EntryCacheManager; /** * A factory to open/create managed ledgers and delete them. @@ -179,4 +180,21 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M */ CompletableFuture asyncExists(String ledgerName); + /** + * @return return EntryCacheManager. + */ + EntryCacheManager getEntryCacheManager(); + + /** + * update cache evictionTimeThreshold. + * + * @param cacheEvictionTimeThresholdNanos time threshold for eviction. + */ + void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos); + + /** + * @return time threshold for eviction. + * */ + long getCacheEvictionTimeThreshold(); + } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java index ab71ea55efeec..132feca6de8fe 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/EntryCacheManager.java @@ -45,9 +45,9 @@ @SuppressWarnings("checkstyle:javadoctype") public class EntryCacheManager { - private final long maxSize; - private final long evictionTriggerThreshold; - private final double cacheEvictionWatermark; + private volatile long maxSize; + private volatile long evictionTriggerThreshold; + private volatile double cacheEvictionWatermark; private final AtomicLong currentSize = new AtomicLong(0); private final ConcurrentMap caches = Maps.newConcurrentMap(); private final EntryCacheEvictionPolicy evictionPolicy; @@ -88,6 +88,15 @@ public EntryCache getEntryCache(ManagedLedgerImpl ml) { } } + public void updateCacheSizeAndThreshold(long maxSize) { + this.maxSize = maxSize; + this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent); + } + + public void updateCacheEvictionWatermark(double cacheEvictionWatermark) { + this.cacheEvictionWatermark = cacheEvictionWatermark; + } + void removeEntryCache(String name) { EntryCache entryCache = caches.remove(name); if (entryCache == null) { @@ -149,6 +158,10 @@ public long getMaxSize() { return maxSize; } + public double getCacheEvictionWatermark() { + return cacheEvictionWatermark; + } + public void clear() { caches.values().forEach(EntryCache::clear); } diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java index f183d1ce8414a..68420f19450ca 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java @@ -104,7 +104,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory { private final ScheduledFuture statsTask; private final ScheduledFuture flushCursorsTask; - private final long cacheEvictionTimeThresholdNanos; + private volatile long cacheEvictionTimeThresholdNanos; private final MetadataStore metadataStore; //indicate whether shutdown() is called. @@ -942,10 +942,21 @@ public ManagedLedgerFactoryConfig getConfig() { return config; } + @Override public EntryCacheManager getEntryCacheManager() { return entryCacheManager; } + @Override + public void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos){ + this.cacheEvictionTimeThresholdNanos = cacheEvictionTimeThresholdNanos; + } + + @Override + public long getCacheEvictionTimeThreshold(){ + return cacheEvictionTimeThresholdNanos; + } + public ManagedLedgerFactoryMXBean getCacheStats() { return this.mbean; } diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index d6e08b81faa94..3bec4a4f9af7d 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -1678,16 +1678,19 @@ public class ServiceConfiguration implements PulsarConfiguration { private int managedLedgerMaxAckQuorum = 5; @FieldContext( category = CATEGORY_STORAGE_ML, + dynamic = true, doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis" + " memory is allocated from JVM direct memory and it's shared across all the topics" + " running in the same broker. By default, uses 1/5th of available direct memory") private int managedLedgerCacheSizeMB = Math.max(64, (int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024))); + @FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when " + "inserting in cache") private boolean managedLedgerCacheCopyEntries = false; @FieldContext( category = CATEGORY_STORAGE_ML, + dynamic = true, doc = "Threshold to which bring down the cache level when eviction is triggered" ) private double managedLedgerCacheEvictionWatermark = 0.9; @@ -1695,6 +1698,7 @@ public class ServiceConfiguration implements PulsarConfiguration { doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s") private double managedLedgerCacheEvictionFrequency = 100.0; @FieldContext(category = CATEGORY_STORAGE_ML, + dynamic = true, doc = "All entries that have stayed in cache for more than the configured time, will be evicted") private long managedLedgerCacheEvictionTimeThresholdMillis = 1000; @FieldContext(category = CATEGORY_STORAGE_ML, diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java index e638392eed8cd..884171e0c4753 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java @@ -2145,6 +2145,28 @@ private void updateConfigurationAndRegisterListeners() { } }); }); + + // add listener to notify broker managedLedgerCacheSizeMB dynamic config + registerConfigurationListener("managedLedgerCacheSizeMB", (managedLedgerCacheSizeMB) -> { + managedLedgerFactory.getEntryCacheManager() + .updateCacheSizeAndThreshold(((int) managedLedgerCacheSizeMB) * 1024L * 1024L); + }); + + // add listener to notify broker managedLedgerCacheEvictionWatermark dynamic config + registerConfigurationListener( + "managedLedgerCacheEvictionWatermark", (cacheEvictionWatermark) -> { + managedLedgerFactory.getEntryCacheManager() + .updateCacheEvictionWatermark((double) cacheEvictionWatermark); + }); + + // add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config + registerConfigurationListener( + "managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdMills) -> { + managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS + .toNanos((long) cacheEvictionTimeThresholdMills)); + }); + + // add listener to update message-dispatch-rate in msg for topic registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> { updateTopicMessageDispatchRate(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 63bf10b4a1278..64de71ac488ea 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -500,6 +500,28 @@ public void brokers() throws Exception { assertEquals(admin.clusters().getClusters(), Lists.newArrayList()); } + @Test + public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception { + // update configuration + admin.brokers().updateDynamicConfiguration("managedLedgerCacheSizeMB", "1"); + admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionWatermark", "0.8"); + admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionTimeThresholdMillis", "2000"); + + // wait config to be updated + Awaitility.await().until(() -> { + return pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 1 * 1024L * 1024L + && pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark() == 0.8 + && pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold() == TimeUnit.MILLISECONDS + .toNanos(2000); + }); + + // verify value is updated + assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(), 1 * 1024L * 1024L); + assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(), 0.8); + assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), TimeUnit.MILLISECONDS + .toNanos(2000)); + } + /** *
      * Verifies: zk-update configuration updates service-config