Skip to content

Commit

Permalink
Support dynamic update cache config (#13679)
Browse files Browse the repository at this point in the history
  • Loading branch information
lordcheng10 committed Mar 14, 2022
1 parent 1993fd7 commit b0213b2
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 4 deletions.
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;

/**
* A factory to open/create managed ledgers and delete them.
Expand Down Expand Up @@ -179,4 +180,21 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);

/**
* @return return EntryCacheManager.
*/
EntryCacheManager getEntryCacheManager();

/**
* update cache evictionTimeThreshold.
*
* @param cacheEvictionTimeThresholdNanos time threshold for eviction.
*/
void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos);

/**
* @return time threshold for eviction.
* */
long getCacheEvictionTimeThreshold();

}
Expand Up @@ -45,9 +45,9 @@
@SuppressWarnings("checkstyle:javadoctype")
public class EntryCacheManager {

private final long maxSize;
private final long evictionTriggerThreshold;
private final double cacheEvictionWatermark;
private volatile long maxSize;
private volatile long evictionTriggerThreshold;
private volatile double cacheEvictionWatermark;
private final AtomicLong currentSize = new AtomicLong(0);
private final ConcurrentMap<String, EntryCache> caches = Maps.newConcurrentMap();
private final EntryCacheEvictionPolicy evictionPolicy;
Expand Down Expand Up @@ -88,6 +88,15 @@ public EntryCache getEntryCache(ManagedLedgerImpl ml) {
}
}

public void updateCacheSizeAndThreshold(long maxSize) {
this.maxSize = maxSize;
this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
}

public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
this.cacheEvictionWatermark = cacheEvictionWatermark;
}

void removeEntryCache(String name) {
EntryCache entryCache = caches.remove(name);
if (entryCache == null) {
Expand Down Expand Up @@ -149,6 +158,10 @@ public long getMaxSize() {
return maxSize;
}

public double getCacheEvictionWatermark() {
return cacheEvictionWatermark;
}

public void clear() {
caches.values().forEach(EntryCache::clear);
}
Expand Down
Expand Up @@ -104,7 +104,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ScheduledFuture<?> statsTask;
private final ScheduledFuture<?> flushCursorsTask;

private final long cacheEvictionTimeThresholdNanos;
private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;

//indicate whether shutdown() is called.
Expand Down Expand Up @@ -942,10 +942,21 @@ public ManagedLedgerFactoryConfig getConfig() {
return config;
}

@Override
public EntryCacheManager getEntryCacheManager() {
return entryCacheManager;
}

@Override
public void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos){
this.cacheEvictionTimeThresholdNanos = cacheEvictionTimeThresholdNanos;
}

@Override
public long getCacheEvictionTimeThreshold(){
return cacheEvictionTimeThresholdNanos;
}

public ManagedLedgerFactoryMXBean getCacheStats() {
return this.mbean;
}
Expand Down
Expand Up @@ -1678,23 +1678,27 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerMaxAckQuorum = 5;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis"
+ " memory is allocated from JVM direct memory and it's shared across all the topics"
+ " running in the same broker. By default, uses 1/5th of available direct memory")
private int managedLedgerCacheSizeMB = Math.max(64,
(int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024)));

@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when "
+ "inserting in cache")
private boolean managedLedgerCacheCopyEntries = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Threshold to which bring down the cache level when eviction is triggered"
)
private double managedLedgerCacheEvictionWatermark = 0.9;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
@FieldContext(category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
private long managedLedgerCacheEvictionTimeThresholdMillis = 1000;
@FieldContext(category = CATEGORY_STORAGE_ML,
Expand Down
Expand Up @@ -2145,6 +2145,28 @@ private void updateConfigurationAndRegisterListeners() {
}
});
});

// add listener to notify broker managedLedgerCacheSizeMB dynamic config
registerConfigurationListener("managedLedgerCacheSizeMB", (managedLedgerCacheSizeMB) -> {
managedLedgerFactory.getEntryCacheManager()
.updateCacheSizeAndThreshold(((int) managedLedgerCacheSizeMB) * 1024L * 1024L);
});

// add listener to notify broker managedLedgerCacheEvictionWatermark dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionWatermark", (cacheEvictionWatermark) -> {
managedLedgerFactory.getEntryCacheManager()
.updateCacheEvictionWatermark((double) cacheEvictionWatermark);
});

// add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdMills) -> {
managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
.toNanos((long) cacheEvictionTimeThresholdMills));
});


// add listener to update message-dispatch-rate in msg for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
updateTopicMessageDispatchRate();
Expand Down
Expand Up @@ -500,6 +500,28 @@ public void brokers() throws Exception {
assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
}

@Test
public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception {
// update configuration
admin.brokers().updateDynamicConfiguration("managedLedgerCacheSizeMB", "1");
admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionWatermark", "0.8");
admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionTimeThresholdMillis", "2000");

// wait config to be updated
Awaitility.await().until(() -> {
return pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 1 * 1024L * 1024L
&& pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark() == 0.8
&& pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold() == TimeUnit.MILLISECONDS
.toNanos(2000);
});

// verify value is updated
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(), 1 * 1024L * 1024L);
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(), 0.8);
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), TimeUnit.MILLISECONDS
.toNanos(2000));
}

/**
* <pre>
* Verifies: zk-update configuration updates service-config
Expand Down

0 comments on commit b0213b2

Please sign in to comment.