Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic update cache config #13679

Merged
merged 16 commits into from Mar 14, 2022
Merged
Expand Up @@ -26,6 +26,7 @@
import org.apache.bookkeeper.mledger.AsyncCallbacks.ManagedLedgerInfoCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenReadOnlyCursorCallback;
import org.apache.bookkeeper.mledger.impl.EntryCacheManager;

/**
* A factory to open/create managed ledgers and delete them.
Expand Down Expand Up @@ -179,4 +180,22 @@ void asyncOpenReadOnlyCursor(String managedLedgerName, Position startPosition, M
*/
CompletableFuture<Boolean> asyncExists(String ledgerName);

/**
* @return return EntryCacheManager.
*/
EntryCacheManager getEntryCacheManager();

/**
* update cache size and evictionTriggerThreshold.
*
* @param maxSize max cache size.
*/
void updateCacheSizeAndThreshold(long maxSize);
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved

/**
* update cache evictionTimeThreshold.
*
* @param cacheEvictionTimeThresholdNanos time threshold for eviction.
*/
void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos);
}
Expand Up @@ -45,9 +45,9 @@
@SuppressWarnings("checkstyle:javadoctype")
public class EntryCacheManager {

private final long maxSize;
private final long evictionTriggerThreshold;
private final double cacheEvictionWatermark;
private volatile long maxSize;
private volatile long evictionTriggerThreshold;
private volatile double cacheEvictionWatermark;
Comment on lines +48 to +50
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned about the cost of this change. These three variables are read each time that a message is written to the cache. While I see the generic benefit of increasing configurability, I don't think we should do so in a way that adds cost (even small costs) to the write path. @lordcheng10 - do you have a use case that shows why the benefits out way the costs here?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@merlimat - do you have any thoughts here? I remember that you have raised concerns about volatile in the metadatastore.

I know that we are talking about minor optimizations here, but since this is on the write path, I feel it makes sense to discuss now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you have a use case that shows why the benefits out way the costs here?

If we need to adjust the cache parameters, we need to restart the online cluster in turn. Restarting the cluster will have a certain impact on online business.

For different business scenarios, we cannot give a more reasonable cache configuration at once. May need to adjust the cache configuration multiple times

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am concerned about the cost of this change. These three variables are read each time that a message is written to the cache. While I see the generic benefit of increasing configurability, I don't think we should do so in a way that adds cost (even small costs) to the write path.

Do you have some good ideas? Regarding the need to provide a dynamic configuration cache, but also to solve the problems you mentioned.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lordcheng10 - thanks for the additional context. It would be interesting to see a flame graph for the write path of the broker to identify hot spots. The only reason this PR caught my attention is because of the general heuristic that I've seen @merlimat employ: avoid volatile when possible. Without actual metrics about performance impact, I think we should leave this PR in place.

private final AtomicLong currentSize = new AtomicLong(0);
private final ConcurrentMap<String, EntryCache> caches = Maps.newConcurrentMap();
private final EntryCacheEvictionPolicy evictionPolicy;
Expand Down Expand Up @@ -88,6 +88,15 @@ public EntryCache getEntryCache(ManagedLedgerImpl ml) {
}
}

public void updateCacheSizeAndThreshold(long maxSize) {
this.maxSize = maxSize;
this.evictionTriggerThreshold = (long) (maxSize * evictionTriggerThresholdPercent);
}

public void updateCacheEvictionWatermark(double cacheEvictionWatermark){
this.cacheEvictionWatermark = cacheEvictionWatermark;
}

void removeEntryCache(String name) {
EntryCache entryCache = caches.remove(name);
if (entryCache == null) {
Expand Down
Expand Up @@ -104,7 +104,7 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ScheduledFuture<?> statsTask;
private final ScheduledFuture<?> flushCursorsTask;

private final long cacheEvictionTimeThresholdNanos;
private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;

//indicate whether shutdown() is called.
Expand Down Expand Up @@ -942,10 +942,21 @@ public ManagedLedgerFactoryConfig getConfig() {
return config;
}

@Override
public EntryCacheManager getEntryCacheManager() {
return entryCacheManager;
}

@Override
public void updateCacheSizeAndThreshold(long maxSize) {
entryCacheManager.updateCacheSizeAndThreshold(maxSize);
}

@Override
public void updateCacheEvictionTimeThreshold(long cacheEvictionTimeThresholdNanos){
this.cacheEvictionTimeThresholdNanos = cacheEvictionTimeThresholdNanos;
}

public ManagedLedgerFactoryMXBean getCacheStats() {
return this.mbean;
}
Expand Down
Expand Up @@ -1678,23 +1678,27 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerMaxAckQuorum = 5;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis"
+ " memory is allocated from JVM direct memory and it's shared across all the topics"
+ " running in the same broker. By default, uses 1/5th of available direct memory")
private int managedLedgerCacheSizeMB = Math.max(64,
(int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024)));

@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when "
+ "inserting in cache")
private boolean managedLedgerCacheCopyEntries = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Threshold to which bring down the cache level when eviction is triggered"
)
private double managedLedgerCacheEvictionWatermark = 0.9;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
@FieldContext(category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
private long managedLedgerCacheEvictionTimeThresholdMillis = 1000;
@FieldContext(category = CATEGORY_STORAGE_ML,
Expand Down
Expand Up @@ -2132,6 +2132,28 @@ private void updateConfigurationAndRegisterListeners() {
}
});
});

// add listener to notify broker managedLedgerCacheSizeMB dynamic config
registerConfigurationListener("managedLedgerCacheSizeMB", (managedLedgerCacheSizeMB) -> {
managedLedgerFactory
.updateCacheSizeAndThreshold((long) managedLedgerCacheSizeMB * 1024L * 1024L);
});

// add listener to notify broker managedLedgerCacheEvictionWatermark dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionWatermark", (cacheEvictionWatermark) -> {
managedLedgerFactory.getEntryCacheManager()
.updateCacheEvictionWatermark((double) cacheEvictionWatermark);
});

// add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdMills) -> {
managedLedgerFactory.updateCacheEvictionTimeThreshold(TimeUnit.MILLISECONDS
.toNanos((long) cacheEvictionTimeThresholdMills));
});


// add listener to update message-dispatch-rate in msg for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
updateTopicMessageDispatchRate();
Expand Down