Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dynamic update cache config #13679

Merged
merged 16 commits into from Mar 14, 2022
Merged
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.api.LedgerEntry;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.client.impl.LedgerEntryImpl;
Expand All @@ -45,9 +47,12 @@
@SuppressWarnings("checkstyle:javadoctype")
public class EntryCacheManager {

private final long maxSize;
private final long evictionTriggerThreshold;
private final double cacheEvictionWatermark;
@Setter
private volatile long maxSize;
@Setter
aloyszhang marked this conversation as resolved.
Show resolved Hide resolved
private volatile long evictionTriggerThreshold;
@Setter
private volatile double cacheEvictionWatermark;
private final AtomicLong currentSize = new AtomicLong(0);
private final ConcurrentMap<String, EntryCache> caches = Maps.newConcurrentMap();
private final EntryCacheEvictionPolicy evictionPolicy;
Expand All @@ -59,6 +64,7 @@ public class EntryCacheManager {

protected static final double MB = 1024 * 1024;

@Getter
private static final double evictionTriggerThresholdPercent = 0.98;


Expand Down
Expand Up @@ -40,6 +40,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;
import lombok.Getter;
import lombok.Setter;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.common.util.OrderedScheduler;
Expand Down Expand Up @@ -104,7 +105,8 @@ public class ManagedLedgerFactoryImpl implements ManagedLedgerFactory {
private final ScheduledFuture<?> statsTask;
private final ScheduledFuture<?> flushCursorsTask;

private final long cacheEvictionTimeThresholdNanos;
@Setter
private volatile long cacheEvictionTimeThresholdNanos;
private final MetadataStore metadataStore;

//indicate whether shutdown() is called.
Expand Down
Expand Up @@ -1678,23 +1678,27 @@ public class ServiceConfiguration implements PulsarConfiguration {
private int managedLedgerMaxAckQuorum = 5;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Amount of memory to use for caching data payload in managed ledger. \n\nThis"
+ " memory is allocated from JVM direct memory and it's shared across all the topics"
+ " running in the same broker. By default, uses 1/5th of available direct memory")
private int managedLedgerCacheSizeMB = Math.max(64,
(int) (io.netty.util.internal.PlatformDependent.maxDirectMemory() / 5 / (1024 * 1024)));

@FieldContext(category = CATEGORY_STORAGE_ML, doc = "Whether we should make a copy of the entry payloads when "
+ "inserting in cache")
private boolean managedLedgerCacheCopyEntries = false;
@FieldContext(
category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "Threshold to which bring down the cache level when eviction is triggered"
)
private double managedLedgerCacheEvictionWatermark = 0.9;
@FieldContext(category = CATEGORY_STORAGE_ML,
doc = "Configure the cache eviction frequency for the managed ledger cache. Default is 100/s")
private double managedLedgerCacheEvictionFrequency = 100.0;
@FieldContext(category = CATEGORY_STORAGE_ML,
dynamic = true,
doc = "All entries that have stayed in cache for more than the configured time, will be evicted")
private long managedLedgerCacheEvictionTimeThresholdMillis = 1000;
@FieldContext(category = CATEGORY_STORAGE_ML,
Expand Down
Expand Up @@ -88,6 +88,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerFactoryImpl;
import org.apache.bookkeeper.mledger.util.Futures;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy;
Expand Down Expand Up @@ -2132,6 +2133,35 @@ private void updateConfigurationAndRegisterListeners() {
}
});
});

// add listener to notify broker managedLedgerCacheSizeMB dynamic config
registerConfigurationListener("managedLedgerCacheSizeMB", (managedLedgerCacheSizeMB) -> {
ManagedLedgerFactoryImpl managedLedgerFactory =
(ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory();
long maxSize = (long) managedLedgerCacheSizeMB * 1024L * 1024L;

updateCacheSizeAndThreshold(maxSize);
});

// add listener to notify broker managedLedgerCacheEvictionWatermark dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionWatermark", (cacheEvictionWatermark) -> {
ManagedLedgerFactoryImpl managedLedgerFactory =
(ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory();
managedLedgerFactory.getEntryCacheManager()
.setCacheEvictionWatermark((double) cacheEvictionWatermark);
});

// add listener to notify broker managedLedgerCacheEvictionTimeThresholdMillis dynamic config
registerConfigurationListener(
"managedLedgerCacheEvictionTimeThresholdMillis", (cacheEvictionTimeThresholdMills) -> {
ManagedLedgerFactoryImpl managedLedgerFactory =
(ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory();
managedLedgerFactory.setCacheEvictionTimeThresholdNanos(TimeUnit.MILLISECONDS
.toNanos((long) cacheEvictionTimeThresholdMills));
});


// add listener to update message-dispatch-rate in msg for topic
registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> {
updateTopicMessageDispatchRate();
Expand Down Expand Up @@ -2233,6 +2263,13 @@ private void updateBrokerPublisherThrottlingMaxRate() {
}
}

private void updateCacheSizeAndThreshold(long maxSize) {
ManagedLedgerFactoryImpl managedLedgerFactory = (ManagedLedgerFactoryImpl) pulsar().getManagedLedgerFactory();
double thresholdPercent = managedLedgerFactory.getEntryCacheManager().getEvictionTriggerThresholdPercent();
managedLedgerFactory.getEntryCacheManager().setMaxSize(maxSize);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should move these lines into method in ManagedLedgerFactoryImpl
Otherwise we are breaking incapsulation

Probably it is better to add the new method to the interface and not only in the Impl classe

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same applies to setCacheEvictionTimeThresholdNanos

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I will fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

managedLedgerFactory.getEntryCacheManager().setEvictionTriggerThreshold((long) (maxSize * thresholdPercent));
}

private void updateTopicMessageDispatchRate() {
this.pulsar().getExecutor().execute(() -> {
// update message-rate for each topic
Expand Down