-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[broker] Fix issue where StackOverflowError occurs when trying to redeliver a large number of already acked messages #10696
Conversation
readMoreEntries(); | ||
// We should not call readMoreEntries() recursively in the same thread | ||
// as there is a risk of StackOverflowError | ||
topic.getBrokerService().executor().execute(() -> readMoreEntries()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have two high level questions about this change:
- is it possible/does it make sense to not schedule the execution of
readMoreEntries
if there is already another pending request of executingreadModeEntries
? - is this changing the semantics of how this method works ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli Those are difficult questions...
Even now, if the message dispatch rate reaches the threshold, another thread can execute readMoreEntries()
recursively after a certain period of time, so it seems to me that this change doesn't break the behavior of the dispatcher. However, there is no clear evidence for it.
If anyone has any knowledge, I would like to hear opinions.
Line 225 in a6aed55
int messagesToRead = calculateNumOfMessageToRead(currentTotalAvailablePermits); |
Lines 275 to 308 in a6aed55
protected int calculateNumOfMessageToRead(int currentTotalAvailablePermits) { | |
int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize); | |
Consumer c = getRandomConsumer(); | |
// if turn on precise dispatcher flow control, adjust the record to read | |
if (c != null && c.isPreciseDispatcherFlowControl()) { | |
messagesToRead = Math.min( | |
(int) Math.ceil(currentTotalAvailablePermits * 1.0 / c.getAvgMessagesPerEntry()), | |
readBatchSize); | |
} | |
if (!isConsumerWritable()) { | |
// If the connection is not currently writable, we issue the read request anyway, but for a single | |
// message. The intent here is to keep use the request as a notification mechanism while avoiding to | |
// read and dispatch a big batch of messages which will need to wait before getting written to the | |
// socket. | |
messagesToRead = 1; | |
} | |
// throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz | |
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate | |
// threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS | |
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { | |
if (topic.getDispatchRateLimiter().isPresent() | |
&& topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) { | |
DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get(); | |
if (!topicRateLimiter.hasMessageDispatchPermit()) { | |
if (log.isDebugEnabled()) { | |
log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name, | |
topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(), | |
MESSAGE_RATE_BACKOFF_MS); | |
} | |
topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS, | |
TimeUnit.MILLISECONDS); |
…eliver a large number of already acked messages (apache#10696) ### Motivation The other day, some of our broker servers got the following StackOverflowError: ``` 13:44:17.410 [pulsar-io-21-6] WARN o.a.pulsar.broker.service.ServerCnx - [/xxx.xxx.xxx.xxx:58438] Got exception StackOverflowError : null java.lang.StackOverflowError: null at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2746) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1086) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1066) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntries(PersistentDispatcherMultipleConsumers.java:341) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:309) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) ``` This phenomenon can be reproduced by the following procedure: 1. Store a large number of messages in the backlog of a topic 2. Connect some Shared consumers to the topic. These consumers receive messages but do not acknowledge at all 3. Run skip-all to remove all messages from the backlog 4. Add another consumer whose receiver queue size is small 5. Close all the consumers added in step 2 6. StackOverflowError occurs on the broker If broker receives a large number of redelivery requests for messages that have already been deleted, `PersistentDispatcherMultipleConsumers#readMoreEntries()` is called recursively many times. As a result, we get a StackOverflowError. https://github.com/apache/pulsar/blob/a6aed551026825ef2de6b1ac5916d17daf1af5c3/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L232-L252 ### Modifications - Avoid recursive calls of `readMoreEntries()` on the same thread - If the dispatcher receives redelivery requests for messages whose positions are earlier than the mark delete position, it does not need to add them to `messagesToRedeliver`
…eliver a large number of already acked messages (#10696) The other day, some of our broker servers got the following StackOverflowError: ``` 13:44:17.410 [pulsar-io-21-6] WARN o.a.pulsar.broker.service.ServerCnx - [/xxx.xxx.xxx.xxx:58438] Got exception StackOverflowError : null java.lang.StackOverflowError: null at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2746) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1086) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1066) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntries(PersistentDispatcherMultipleConsumers.java:341) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:309) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) ``` This phenomenon can be reproduced by the following procedure: 1. Store a large number of messages in the backlog of a topic 2. Connect some Shared consumers to the topic. These consumers receive messages but do not acknowledge at all 3. Run skip-all to remove all messages from the backlog 4. Add another consumer whose receiver queue size is small 5. Close all the consumers added in step 2 6. StackOverflowError occurs on the broker If broker receives a large number of redelivery requests for messages that have already been deleted, `PersistentDispatcherMultipleConsumers#readMoreEntries()` is called recursively many times. As a result, we get a StackOverflowError. https://github.com/apache/pulsar/blob/a6aed551026825ef2de6b1ac5916d17daf1af5c3/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L232-L252 - Avoid recursive calls of `readMoreEntries()` on the same thread - If the dispatcher receives redelivery requests for messages whose positions are earlier than the mark delete position, it does not need to add them to `messagesToRedeliver` (cherry picked from commit 894d92b)
…eliver a large number of already acked messages (apache#10696) ### Motivation The other day, some of our broker servers got the following StackOverflowError: ``` 13:44:17.410 [pulsar-io-21-6] WARN o.a.pulsar.broker.service.ServerCnx - [/xxx.xxx.xxx.xxx:58438] Got exception StackOverflowError : null java.lang.StackOverflowError: null at java.util.stream.ReferencePipeline$2$1.accept(ReferencePipeline.java:174) at java.util.TreeMap$KeySpliterator.forEachRemaining(TreeMap.java:2746) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:150) at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:173) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1086) at org.apache.bookkeeper.mledger.impl.ManagedCursorImpl.asyncReplayEntries(ManagedCursorImpl.java:1066) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.asyncReplayEntries(PersistentDispatcherMultipleConsumers.java:341) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:309) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.readMoreEntries(PersistentDispatcherMultipleConsumers.java:318) ``` This phenomenon can be reproduced by the following procedure: 1. Store a large number of messages in the backlog of a topic 2. Connect some Shared consumers to the topic. These consumers receive messages but do not acknowledge at all 3. Run skip-all to remove all messages from the backlog 4. Add another consumer whose receiver queue size is small 5. Close all the consumers added in step 2 6. StackOverflowError occurs on the broker If broker receives a large number of redelivery requests for messages that have already been deleted, `PersistentDispatcherMultipleConsumers#readMoreEntries()` is called recursively many times. As a result, we get a StackOverflowError. https://github.com/apache/pulsar/blob/a6aed551026825ef2de6b1ac5916d17daf1af5c3/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L232-L252 ### Modifications - Avoid recursive calls of `readMoreEntries()` on the same thread - If the dispatcher receives redelivery requests for messages whose positions are earlier than the mark delete position, it does not need to add them to `messagesToRedeliver`
Motivation
The other day, some of our broker servers got the following StackOverflowError:
This phenomenon can be reproduced by the following procedure:
If broker receives a large number of redelivery requests for messages that have already been deleted,
PersistentDispatcherMultipleConsumers#readMoreEntries()
is called recursively many times. As a result, we get a StackOverflowError.pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Lines 232 to 252 in a6aed55
Modifications
readMoreEntries()
on the same threadmessagesToRedeliver