Skip to content

Commit

Permalink
--story=872772869 支持cache动态配置(apache#13679) (merge request !66)
Browse files Browse the repository at this point in the history
Squash merge branch 'cache_config' into '2.8.1'
Fixes #<xyz>

### Motivation
--story=872772869 支持cache动态配置(apache#13679)


TAPD: --story=872772869
  • Loading branch information
leolinchen authored and mayozhang committed Nov 24, 2022
1 parent 8597b58 commit 56e541e
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 5 deletions.
Expand Up @@ -27,6 +27,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;

/**
* A factory to open/create managed ledgers and delete them.
Expand Down Expand Up @@ -171,4 +172,21 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);

/**
* @return return EntryCacheManager.
*/
EntryCacheManager getEntryCacheManager();

/**
* update cache evictionTimeThreshold.
*
* @param cacheEvictionTimeThresholdNanos time threshold for eviction.
*/
void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos);

/**
* @return time threshold for eviction.
* */
long getCacheEvictionTimeThreshold();

}
Expand Up @@ -44,9 +44,9 @@
@SuppressWarnings("checkstyle:javadoctype")
public class EntryCacheManager {

private final long maxSize;
private final long evictionTriggerThreshold;
private final double cacheEvictionWatermark;
private volatile long maxSize;
private volatile long evictionTriggerThreshold;
private volatile double cacheEvictionWatermark;
private final AtomicLong currentSize = new AtomicLong(0);
private final ConcurrentMap<String, EntryCache> caches = Maps.newConcurrentMap();
private final EntryCacheEvictionPolicy evictionPolicy;
Expand Down Expand Up @@ -87,6 +87,15 @@ public EntryCache getEntryCache(ManagedLedgerImpl ml) {
}
}

public void updateCacheSizeAndThreshold(long maxSize) {
this.maxSize = maxSize;
this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
}

public void updateCacheEvictionWatermark(double cacheEvictionWatermark) {
this.cacheEvictionWatermark = cacheEvictionWatermark;
}

void removeEntryCache(String name) {
EntryCache entryCache = caches.remove(name);
if (entryCache == null) {
Expand Down Expand Up @@ -148,6 +157,10 @@ public long getMaxSize() {
return maxSize;
}

public double getCacheEvictionWatermark() {
return cacheEvictionWatermark;
}

public void clear() {
caches.values().forEach(EntryCache::clear);
}
Expand Down
Expand Up @@ -100,7 +100,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ScheduledFuture<?> statsTask;
private final ScheduledFuture<?> flushCursorsTask;

private final long cacheEvictionTimeThresholdNanos;
private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;

public static final int StatsPeriodSeconds = 60;
Expand Down Expand Up @@ -817,10 +817,21 @@ public ManagedLedgerFactoryConfig getConfig() {
return config;
}

@Override
public EntryCacheManager getEntryCacheManager() {
return entryCacheManager;
}

@Override
public void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos){
this.cacheEvictionTimeThresholdNanos = cacheEvictionTimeThresholdNanos;
}

@Override
public long getCacheEvictionTimeThreshold(){
return cacheEvictionTimeThresholdNanos;
}

public ManagedLedgerFactoryMXBean getCacheStats() {
return this.mbean;
}
Expand Down
Expand Up @@ -1484,6 +1484,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerMaxAckQuorum = 5;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis"
+ " memory is allocated from JVM direct memory and it's shared across all the topics"
+ " running in the same broker. By default, uses 1/5th of available direct memory")
Expand All @@ -1493,13 +1494,15 @@ public class ServiceConfiguration implements PulsarConfiguration {
private boolean managedLedgerCacheCopyEntries = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Threshold to which bring down the cache level when eviction is triggered"
dynamic = true,
doc = "Threshold to which bring down the cache level when eviction is triggered"
)
private double managedLedgerCacheEvictionWatermark = 0.9f;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
@FieldContext(category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
private long managedLedgerCacheEvictionTimeThresholdMillis = 1000;
@FieldContext(category = CATEGORY_STORAGE_ML,
Expand Down
Expand Up @@ -2057,6 +2057,28 @@ private void updateConfigurationAndRegisterListeners() {
log.warn("Failed to change load manager", ex);
}
});

// add listener to notify broker managedLedgerCacheSizeMB dynamic config
registerConfigurationListener("managedLedgerCacheSizeMB", (managedLedgerCacheSizeMB) -> {
managedLedgerFactory.getEntryCacheManager()
.updateCacheSizeAndThreshold(((int) managedLedgerCacheSizeMB) * 1024L * 1024L);
});

// add listener to notify broker managedLedgerCacheEvictionWatermark dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionWatermark", (cacheEvictionWatermark) -> {
managedLedgerFactory.getEntryCacheManager()
.updateCacheEvictionWatermark((double) cacheEvictionWatermark);
});

// add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdMills) -> {
managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
.toNanos((long) cacheEvictionTimeThresholdMills));
});


// add listener to update message-dispatch-rate in msg for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
updateTopicMessageDispatchRate();
Expand Down
Expand Up @@ -502,6 +502,28 @@ public void brokers() throws Exception {
assertEquals(admin.clusters().getClusters(), Lists.newArrayList());
}

@Test
public void testUpdateDynamicCacheConfigurationWithZkWatch() throws Exception {
// update configuration
admin.brokers().updateDynamicConfiguration("managedLedgerCacheSizeMB", "1");
admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionWatermark", "0.8");
admin.brokers().updateDynamicConfiguration("managedLedgerCacheEvictionTimeThresholdMillis", "2000");

// wait config to be updated
Awaitility.await().until(() -> {
return pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize() == 1 * 1024L * 1024L
&& pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark() == 0.8
&& pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold() == TimeUnit.MILLISECONDS
.toNanos(2000);
});

// verify value is updated
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getMaxSize(), 1 * 1024L * 1024L);
assertEquals(pulsar.getManagedLedgerFactory().getEntryCacheManager().getCacheEvictionWatermark(), 0.8);
assertEquals(pulsar.getManagedLedgerFactory().getCacheEvictionTimeThreshold(), TimeUnit.MILLISECONDS
.toNanos(2000));
}

/**
* <pre>
* Verifies: zk-update configuration updates service-config
Expand Down

0 comments on commit 56e541e

Please sign in to comment.