Skip to content

Commit

Permalink
use registerConfigurationListener for dynamic update cache configs
Browse files Browse the repository at this point in the history
  • Loading branch information
lordcheng10 committed Mar 13, 2022
1 parent 52a7018 commit a541c4c
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 5 deletions.
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;

import lombok.Setter;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
Expand All @@ -45,9 +47,12 @@
@SuppressWarnings("checkstyle:javadoctype")
public class EntryCacheManager {

private final long maxSize;
private final long evictionTriggerThreshold;
private final double cacheEvictionWatermark;
@Setter
private volatile long maxSize;
@Setter
private volatile long evictionTriggerThreshold;
@Setter
private volatile double cacheEvictionWatermark;
private final AtomicLong currentSize = new AtomicLong(0);
private final ConcurrentMap<String, EntryCache> caches = Maps.newConcurrentMap();
private final EntryCacheEvictionPolicy evictionPolicy;
Expand All @@ -59,7 +64,8 @@ public class EntryCacheManager {

protected static final double MB = 1024 * 1024;

private static final double evictionTriggerThresholdPercent = 0.98;
@Setter
private volatile double evictionTriggerThresholdPercent = 0.98;


public EntryCacheManager(ManagedLedgerFactoryImpl factory) {
Expand Down
Expand Up @@ -40,6 +40,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
Expand Down Expand Up @@ -104,7 +105,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ScheduledFuture<?> statsTask;
private final ScheduledFuture<?> flushCursorsTask;

private final long cacheEvictionTimeThresholdNanos;
@Setter
private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;

//indicate whether shutdown() is called.
Expand Down
Expand Up @@ -1678,23 +1678,34 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerMaxAckQuorum = 5;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis"
+ " memory is allocated from JVM direct memory and it's shared across all the topics"
+ " running in the same broker. By default, uses 1/5th of available direct memory")
private int managedLedgerCacheSizeMB = Math.max(64,
(int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024)));

@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Threshold to evicteis triggered"
)
private double evictionTriggerThresholdPercent = 0.98;

@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when "
+ "inserting in cache")
private boolean managedLedgerCacheCopyEntries = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Threshold to which bring down the cache level when eviction is triggered"
)
private double managedLedgerCacheEvictionWatermark = 0.9;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
@FieldContext(category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
private long managedLedgerCacheEvictionTimeThresholdMillis = 1000;
@FieldContext(category = CATEGORY_STORAGE_ML,
Expand Down
Expand Up @@ -88,6 +88,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
Expand Down Expand Up @@ -2132,6 +2133,37 @@ private void updateConfigurationAndRegisterListeners() {
}
});
});

// add listener to notify broker managedLedgerCacheSizeMB dynamic config
registerConfigurationListener("managedLedgerCacheSizeMB", (managedLedgerCacheSizeMB) -> {
long maxSize = (long) managedLedgerCacheSizeMB * 1024L * 1024L;
double thresholdPercent = pulsar().getConfiguration().getEvictionTriggerThresholdPercent();

updateCacheSizeAndThreshold(maxSize, thresholdPercent);
});

// add listener to notify broker evictionTriggerThresholdPercent dynamic config
registerConfigurationListener("evictionTriggerThresholdPercent", (evictionTriggerThresholdPercent) -> {
long maxSize = pulsar().getConfiguration().getManagedLedgerCacheSizeMB() * 1024L * 1024L;
double thresholdPercent = (double) evictionTriggerThresholdPercent;

updateCacheSizeAndThreshold(maxSize, thresholdPercent);
});

// add listener to notify broker managedLedgerCacheEvictionWatermark dynamic config
registerConfigurationListener("managedLedgerCacheEvictionWatermark", (cacheEvictionWatermark) -> {
ManagedLedgerFactoryImpl managedLedgerFactory = (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory();
managedLedgerFactory.getEntryCacheManager().setCacheEvictionWatermark((double) cacheEvictionWatermark);
});

// add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config
registerConfigurationListener("managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdNanos) -> {
ManagedLedgerFactoryImpl managedLedgerFactory = (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory();
managedLedgerFactory.setCacheEvictionTimeThresholdNanos(TimeUnit.MILLISECONDS
.toNanos((long) cacheEvictionTimeThresholdNanos));
});


// add listener to update message-dispatch-rate in msg for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
updateTopicMessageDispatchRate();
Expand Down Expand Up @@ -2233,6 +2265,12 @@ private void updateBrokerPublisherThrottlingMaxRate() {
}
}

private void updateCacheSizeAndThreshold(long maxSize, double thresholdPercent) {
ManagedLedgerFactoryImpl managedLedgerFactory = (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory();
managedLedgerFactory.getEntryCacheManager().setMaxSize(maxSize);
managedLedgerFactory.getEntryCacheManager().setEvictionTriggerThreshold((long) (maxSize * thresholdPercent));
}

private void updateTopicMessageDispatchRate() {
this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
Expand Down

0 comments on commit a541c4c

Please sign in to comment.