Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improved in max-pending-bytes mechanism for broker #7406

Merged
merged 3 commits into from
May 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -474,10 +474,6 @@ replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10
# Use -1 to disable the memory limitation. Default is 1/2 of direct memory.
maxMessagePublishBufferSizeInMB=

# Interval between checks to see if message publish buffer size is exceed the max message publish buffer size
# Use 0 or negative number to disable the max publish buffer limiting.
messagePublishBufferCheckIntervalInMillis=100

# Check between intervals to see if consumed ledgers need to be trimmed
# Use 0 or negative number to disable the check
retentionCheckIntervalInSeconds=120
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,7 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ "requests in memory. Default: 1000"
)
private int maxPendingPublishRequestsPerConnection = 1000;

@FieldContext(
category = CATEGORY_POLICIES,
doc = "How frequently to proactively check and purge expired messages"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,12 @@ protected void addConsumerToSubscription(Subscription subscription, Consumer con

@Override
public void disableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}

@Override
public void enableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}

protected boolean hasLocalProducers() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -67,7 +66,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -232,7 +230,6 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
private final ScheduledExecutorService consumedLedgersMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
Expand Down Expand Up @@ -269,18 +266,13 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private Channel listenChannelTls;

private boolean preciseTopicPublishRateLimitingEnable;
private final long maxMessagePublishBufferBytes;
private final long resumeProducerReadMessagePublishBufferBytes;
private volatile boolean reachMessagePublishBufferThreshold;
private final LongAdder pausedConnections = new LongAdder();
private BrokerInterceptor interceptor;

private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors;

public BrokerService(PulsarService pulsar) throws Exception {
this.pulsar = pulsar;
this.maxMessagePublishBufferBytes = pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() > 0
? pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024L * 1024L : -1;
this.resumeProducerReadMessagePublishBufferBytes = this.maxMessagePublishBufferBytes / 2;
this.preciseTopicPublishRateLimitingEnable =
pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable();
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
Expand Down Expand Up @@ -322,9 +314,6 @@ public BrokerService(PulsarService pulsar) throws Exception {
this.compactionMonitor =
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("pulsar-compaction-monitor"));
this.messagePublishBufferMonitor =
Executors.newSingleThreadScheduledExecutor(
new DefaultThreadFactory("pulsar-publish-buffer-monitor"));
this.consumedLedgersMonitor = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor"));

Expand Down Expand Up @@ -474,7 +463,6 @@ public void start() throws Exception {
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startConsumedLedgersMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
Expand Down Expand Up @@ -552,14 +540,6 @@ protected void startCompactionMonitor() {
}
}

protected void startMessagePublishBufferMonitor() {
int interval = pulsar().getConfiguration().getMessagePublishBufferCheckIntervalInMillis();
if (interval > 0 && maxMessagePublishBufferBytes > 0) {
messagePublishBufferMonitor.scheduleAtFixedRate(safeRun(this::checkMessagePublishBuffer),
interval, interval, TimeUnit.MILLISECONDS);
}
}

protected void startConsumedLedgersMonitor() {
int interval = pulsar().getConfiguration().getRetentionCheckIntervalInSeconds();
if (interval > 0) {
Expand Down Expand Up @@ -757,7 +737,6 @@ public CompletableFuture<Void> closeAsync() {
inactivityMonitor,
messageExpiryMonitor,
compactionMonitor,
messagePublishBufferMonitor,
consumedLedgersMonitor,
backlogQuotaChecker,
topicOrderedExecutor,
Expand Down Expand Up @@ -2487,22 +2466,6 @@ public Optional<Integer> getListenPortTls() {
}
}

@VisibleForTesting
void checkMessagePublishBuffer() {
AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
foreachCnx(cnx -> currentMessagePublishBufferBytes.addAndGet(cnx.getMessagePublishBufferSize()));
if (currentMessagePublishBufferBytes.get() >= maxMessagePublishBufferBytes
&& !reachMessagePublishBufferThreshold) {
reachMessagePublishBufferThreshold = true;
forEachTopic(topic -> ((AbstractTopic) topic).disableProducerRead());
}
if (currentMessagePublishBufferBytes.get() < resumeProducerReadMessagePublishBufferBytes
&& reachMessagePublishBufferThreshold) {
reachMessagePublishBufferThreshold = false;
forEachTopic(topic -> ((AbstractTopic) topic).enableProducerReadForPublishBufferLimiting());
}
}

private void foreachCnx(Consumer<TransportCnx> consumer) {
Set<TransportCnx> cnxSet = new HashSet<>();
topics.forEach((n, t) -> {
Expand All @@ -2512,17 +2475,6 @@ private void foreachCnx(Consumer<TransportCnx> consumer) {
cnxSet.forEach(consumer);
}

public boolean isReachMessagePublishBufferThreshold() {
return reachMessagePublishBufferThreshold;
}

@VisibleForTesting
long getCurrentMessagePublishBufferSize() {
AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
foreachCnx(cnx -> currentMessagePublishBufferBytes.addAndGet(cnx.getMessagePublishBufferSize()));
return currentMessagePublishBufferBytes.get();
}

public boolean isAllowAutoTopicCreation(final String topic) {
TopicName topicName = TopicName.get(topic);
return isAllowAutoTopicCreation(topicName);
Expand Down Expand Up @@ -2691,7 +2643,18 @@ public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors()
}

public boolean isBrokerEntryMetadataEnabled() {
return brokerEntryMetadataInterceptors.size() > 0;
return !brokerEntryMetadataInterceptors.isEmpty();
}

public void pausedConnections(int numberOfConnections) {
pausedConnections.add(numberOfConnections);
}

public void resumedConnections(int numberOfConnections) {
pausedConnections.add(-numberOfConnections);
}

public long getPausedConnections() {
return pausedConnections.longValue();
}
}
Loading