Skip to content

Commit

Permalink
Introduce maxMessagePublishBufferSizeInMB configuration to avoid brok…
Browse files Browse the repository at this point in the history
…er OOM (apache#6178)

Motivation
Introduce maxMessagePublishBufferSizeInMB configuration to avoid broker OOM.

Modifications
If the processing message size exceeds this value, the broker will stop read data from the connection. When available size > half of the maxMessagePublishBufferSizeInMB, start auto-read data from the connection.
  • Loading branch information
codelipenghui committed Feb 16, 2020
1 parent e9083f5 commit 91dfa1a
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 17 deletions.
12 changes: 12 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,18 @@ replicatedSubscriptionsSnapshotTimeoutSeconds=30
# Max number of snapshot to be cached per subscription.
replicatedSubscriptionsSnapshotMaxCachedPerSubscription=10

# Max memory size for broker handling messages sending from producers.
# If the processing message size exceed this value, broker will stop read data
# from the connection. The processing messages means messages are sends to broker
# but broker have not send response to client, usually waiting to write to bookies.
# It's shared across all the topics running in the same broker.
# Use -1 to disable the memory limitation. Default is 1/2 of direct memory.
maxMessagePublishBufferSizeInMB=

# Interval between checks to see if message publish buffer size is exceed the max message publish buffer size
# Use 0 or negative number to disable the max publish buffer limiting.
messagePublishBufferCheckIntervalInMillis=100

### --- Authentication --- ###
# Role names that are treated as "proxy roles". If the broker sees a request with
#role as proxyRoles - it will demand to see a valid original principal.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,23 @@ public class ServiceConfiguration implements PulsarConfiguration {
doc = "Max number of snapshot to be cached per subscription.")
private int replicatedSubscriptionsSnapshotMaxCachedPerSubscription = 10;

@FieldContext(
category = CATEGORY_SERVER,
doc = "Max memory size for broker handling messages sending from producers.\n\n"
+ " If the processing message size exceed this value, broker will stop read data"
+ " from the connection. The processing messages means messages are sends to broker"
+ " but broker have not send response to client, usually waiting to write to bookies.\n\n"
+ " It's shared across all the topics running in the same broker.\n\n"
+ " Use -1 to disable the memory limitation. Default is 1/2 of direct memory.\n\n")
private int maxMessagePublishBufferSizeInMB = Math.max(64,
(int) (PlatformDependent.maxDirectMemory() / 2 / (1024 * 1024)));

@FieldContext(
category = CATEGORY_SERVER,
doc = "Interval between checks to see if message publish buffer size is exceed the max message publish buffer size"
)
private int messagePublishBufferCheckIntervalInMillis = 100;

/**** --- Messaging Protocols --- ****/

@FieldContext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,24 +277,36 @@ public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
public void resetTopicPublishCountAndEnableReadIfRequired() {
// broker rate not exceeded. and completed topic limiter reset.
if (!getBrokerPublishRateLimiter().isPublishRateExceeded() && topicPublishRateLimiter.resetPublishCount()) {
enableProducerRead();
enableProducerReadForPublishRateLimiting();
}
}

@Override
public void resetBrokerPublishCountAndEnableReadIfRequired(boolean doneBrokerReset) {
// topic rate not exceeded, and completed broker limiter reset.
if (!topicPublishRateLimiter.isPublishRateExceeded() && doneBrokerReset) {
enableProducerRead();
enableProducerReadForPublishRateLimiting();
}
}

/**
* it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling
*/
protected void enableProducerRead() {
protected void enableProducerReadForPublishRateLimiting() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
producers.values().forEach(producer -> {
producer.getCnx().cancelPublishRateLimiting();
producer.getCnx().enableCnxAutoRead();
});
}
}

protected void enableProducerReadForPublishBufferLimiting() {
if (producers != null) {
producers.values().forEach(producer -> {
producer.getCnx().cancelPublishBufferLimiting();
producer.getCnx().enableCnxAutoRead();
});
}
}

Expand Down Expand Up @@ -389,7 +401,7 @@ private void updatePublishDispatcher(Policies policies) {
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerRead();
enableProducerReadForPublishRateLimiting();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT;
import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Queues;
Expand Down Expand Up @@ -58,6 +59,7 @@
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReadWriteLock;
Expand Down Expand Up @@ -185,6 +187,7 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private final ScheduledExecutorService inactivityMonitor;
private final ScheduledExecutorService messageExpiryMonitor;
private final ScheduledExecutorService compactionMonitor;
private final ScheduledExecutorService messagePublishBufferMonitor;
private ScheduledExecutorService topicPublishRateLimiterMonitor;
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
Expand Down Expand Up @@ -216,8 +219,15 @@ public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies
private Channel listenChannel;
private Channel listenChannelTls;

private final long maxMessagePublishBufferBytes;
private final long resumeProducerReadMessagePublishBufferBytes;
private volatile boolean reachMessagePublishBufferThreshold;

public BrokerService(PulsarService pulsar) throws Exception {
this.pulsar = pulsar;
this.maxMessagePublishBufferBytes = pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() > 0 ?
pulsar.getConfiguration().getMaxMessagePublishBufferSizeInMB() * 1024 * 1024 : -1;
this.resumeProducerReadMessagePublishBufferBytes = this.maxMessagePublishBufferBytes / 2;
this.managedLedgerFactory = pulsar.getManagedLedgerFactory();
this.topics = new ConcurrentOpenHashMap<>();
this.replicationClients = new ConcurrentOpenHashMap<>();
Expand Down Expand Up @@ -257,6 +267,8 @@ public BrokerService(PulsarService pulsar) throws Exception {
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-msg-expiry-monitor"));
this.compactionMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-compaction-monitor"));
this.messagePublishBufferMonitor =
Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-publish-buffer-monitor"));

this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
Expand Down Expand Up @@ -386,6 +398,7 @@ public void start() throws Exception {
this.startInactivityMonitor();
this.startMessageExpiryMonitor();
this.startCompactionMonitor();
this.startMessagePublishBufferMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
// register listener to capture zk-latency
Expand Down Expand Up @@ -438,6 +451,14 @@ protected void startCompactionMonitor() {
}
}

protected void startMessagePublishBufferMonitor() {
int interval = pulsar().getConfiguration().getMessagePublishBufferCheckIntervalInMillis();
if (interval > 0 && maxMessagePublishBufferBytes > 0) {
messagePublishBufferMonitor.scheduleAtFixedRate(safeRun(this::checkMessagePublishBuffer),
interval, interval, TimeUnit.MILLISECONDS);
}
}

protected void startBacklogQuotaChecker() {
if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) {
final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds();
Expand Down Expand Up @@ -2011,4 +2032,36 @@ public Optional<Integer> getListenPortTls() {
return Optional.empty();
}
}

private void checkMessagePublishBuffer() {
AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
if (currentMessagePublishBufferBytes.get() >= maxMessagePublishBufferBytes
&& !reachMessagePublishBufferThreshold) {
reachMessagePublishBufferThreshold = true;
}
if (currentMessagePublishBufferBytes.get() < resumeProducerReadMessagePublishBufferBytes
&& reachMessagePublishBufferThreshold) {
reachMessagePublishBufferThreshold = false;
forEachTopic(topic -> ((AbstractTopic) topic).enableProducerReadForPublishBufferLimiting());
}
}

private void foreachProducer(Consumer<Producer> consumer) {
topics.forEach((n, t) -> {
Optional<Topic> topic = extractTopic(t);
topic.ifPresent(value -> value.getProducers().values().forEach(consumer));
});
}

public boolean isReachMessagePublishBufferThreshold() {
return reachMessagePublishBufferThreshold;
}

@VisibleForTesting
long getCurrentMessagePublishBufferSize() {
AtomicLong currentMessagePublishBufferBytes = new AtomicLong();
foreachProducer(producer -> currentMessagePublishBufferBytes.addAndGet(producer.getCnx().getMessagePublishBufferSize()));
return currentMessagePublishBufferBytes.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void publishMessage(long producerId, long lowestSequenceId, long highestS
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, highestSequenceId, ServerError.MetadataError,
"Invalid lowest or highest sequence id"));
cnx.completedSendOperation(isNonPersistentTopic);
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}
Expand All @@ -160,7 +160,7 @@ public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPa
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.PersistenceError,
"Producer is closed"));
cnx.completedSendOperation(isNonPersistentTopic);
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});

return;
Expand All @@ -170,7 +170,7 @@ public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPa
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(
Commands.newSendError(producerId, sequenceId, ServerError.ChecksumError, "Checksum failed on the broker"));
cnx.completedSendOperation(isNonPersistentTopic);
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}
Expand All @@ -187,7 +187,7 @@ public void beforePublish(long producerId, long sequenceId, ByteBuf headersAndPa
cnx.ctx().channel().eventLoop().execute(() -> {
cnx.ctx().writeAndFlush(Commands.newSendError(producerId, sequenceId, ServerError.MetadataError,
"Messages must be encrypted"));
cnx.completedSendOperation(isNonPersistentTopic);
cnx.completedSendOperation(isNonPersistentTopic, headersAndPayload.readableBytes());
});
return;
}
Expand Down Expand Up @@ -353,7 +353,7 @@ public void completed(Exception exception, long ledgerId, long entryId) {
producer.cnx.ctx().writeAndFlush(Commands.newSendError(producer.producerId, callBackSequenceId,
serverError, exception.getMessage()));
}
producer.cnx.completedSendOperation(producer.isNonPersistentTopic);
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
producer.publishOperationCompleted();
recycle();
});
Expand Down Expand Up @@ -385,7 +385,7 @@ public void run() {
producer.cnx.ctx().writeAndFlush(
Commands.newSendReceipt(producer.producerId, sequenceId, highestSequenceId, ledgerId, entryId),
producer.cnx.ctx().voidPromise());
producer.cnx.completedSendOperation(producer.isNonPersistentTopic);
producer.cnx.completedSendOperation(producer.isNonPersistentTopic, msgSize);
producer.publishOperationCompleted();
recycle();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import static org.apache.pulsar.common.protocol.Commands.newLookupErrorResponse;
import static org.apache.pulsar.common.api.proto.PulsarApi.ProtocolVersion.v5;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;

import io.netty.buffer.ByteBuf;
Expand Down Expand Up @@ -151,6 +152,9 @@ public class ServerCnx extends PulsarHandler {
// Flag to manage throttling-rate by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledRateLimiting = false;
private FeatureFlags features;
// Flag to manage throttling-publish-buffer by atomically enable/disable read-channel.
private volatile boolean autoReadDisabledPublishBufferLimiting = false;
private volatile long messagePublishBufferSize = 0;

enum State {
Start, Connected, Failed, Connecting
Expand Down Expand Up @@ -1154,7 +1158,7 @@ protected void handleSend(CommandSend send, ByteBuf headersAndPayload) {
}
}

startSendOperation(producer);
startSendOperation(producer, headersAndPayload.readableBytes());

// Persist the message
if (send.hasHighestSequenceId() && send.getSequenceId() <= send.getHighestSequenceId()) {
Expand Down Expand Up @@ -1677,17 +1681,24 @@ public boolean isWritable() {
return ctx.channel().isWritable();
}

public void startSendOperation(Producer producer) {
private void startSendOperation(Producer producer, int msgSize) {
messagePublishBufferSize += msgSize;
boolean isPublishRateExceeded = producer.getTopic().isPublishRateExceeded();
if (++pendingSendRequest == MaxPendingSendRequests || isPublishRateExceeded) {
// When the quota of pending send requests is reached, stop reading from socket to cause backpressure on
// client connection, possibly shared between multiple producers
ctx.channel().config().setAutoRead(false);
autoReadDisabledRateLimiting = isPublishRateExceeded;

}
if (getBrokerService().isReachMessagePublishBufferThreshold()) {
ctx.channel().config().setAutoRead(false);
autoReadDisabledPublishBufferLimiting = true;
}
}

public void completedSendOperation(boolean isNonPersistentTopic) {
void completedSendOperation(boolean isNonPersistentTopic, int msgSize) {
messagePublishBufferSize -= msgSize;
if (--pendingSendRequest == ResumeReadsThreshold) {
// Resume reading from socket
ctx.channel().config().setAutoRead(true);
Expand All @@ -1699,19 +1710,32 @@ public void completedSendOperation(boolean isNonPersistentTopic) {
}
}

public void enableCnxAutoRead() {
void enableCnxAutoRead() {
// we can add check (&& pendingSendRequest < MaxPendingSendRequests) here but then it requires
// pendingSendRequest to be volatile and it can be expensive while writing. also this will be called on if
// throttling is enable on the topic. so, avoid pendingSendRequest check will be fine.
if (!ctx.channel().config().isAutoRead() && autoReadDisabledRateLimiting) {
if (!ctx.channel().config().isAutoRead() && !autoReadDisabledRateLimiting && !autoReadDisabledPublishBufferLimiting) {
// Resume reading from socket if pending-request is not reached to threshold
ctx.channel().config().setAutoRead(true);
// triggers channel read
ctx.read();
}
}

@VisibleForTesting
void cancelPublishRateLimiting() {
if (autoReadDisabledRateLimiting) {
autoReadDisabledRateLimiting = false;
}
}

@VisibleForTesting
void cancelPublishBufferLimiting() {
if (autoReadDisabledPublishBufferLimiting) {
autoReadDisabledPublishBufferLimiting = false;
}
}

private <T> ServerError getErrorCode(CompletableFuture<T> future) {
ServerError error = ServerError.UnknownError;
try {
Expand All @@ -1724,7 +1748,7 @@ private <T> ServerError getErrorCode(CompletableFuture<T> future) {
return error;
}

private final void disableTcpNoDelayIfNeeded(String topic, String producerName) {
private void disableTcpNoDelayIfNeeded(String topic, String producerName) {
if (producerName != null && producerName.startsWith(replicatorPrefix)) {
// Re-enable nagle algorithm on connections used for replication purposes
try {
Expand Down Expand Up @@ -1799,4 +1823,18 @@ boolean supportsAuthenticationRefresh() {
public String getClientVersion() {
return clientVersion;
}

public long getMessagePublishBufferSize() {
return this.messagePublishBufferSize;
}

@VisibleForTesting
void setMessagePublishBufferSize(long bufferSize) {
this.messagePublishBufferSize = bufferSize;
}

@VisibleForTesting
void setAutoReadDisabledRateLimiting(boolean isLimiting) {
this.autoReadDisabledRateLimiting = isLimiting;
}
}

0 comments on commit 91dfa1a

Please sign in to comment.