-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix consume message order issue when use listener. #13023
Conversation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lgtm
@michaeljmarshall @lhotari would you mind to take a look?
(cherry picked from commit e134e37)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR remediates the data race, but I think we might not want all of the extra thread switching that this introduces. We can also fix the data race using synchronization. Note that this PR attempts to fix a race introduced in #11455.
if (executorQueueSize.get() < 1) { | ||
final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); | ||
if (msg != null) { | ||
executorQueueSize.incrementAndGet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is the race condition. Lines 922 and 925. The non-synchronized get and subsequent update would explain messages processed out of order. By putting this on the same thread, internalPinnedExecutor
, the race is no longer possible. However, I wonder if we want the extra thread switching here and if we want the internalReceive
method called on the internalPinnedExecutor
for all method calls. It seems like we should have instead solved this data race.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After looking at the client a bit more, we certainly follow this paradigm frequently. It is possible that this internalPinnedExecutor
might become a bit of a bottle neck. However, I think we can keep the design as is for now. I opened #13273 to clean up this code block a bit.
(cherry picked from commit e134e37)
While this may have fixed an ordering bug. It has caused degraded performance when the client is consuming a topic at a high rate. The evidence is that the change moved work between threads and under high load Where are documents that explain the threading architecture? Also, I would like to understand why you think this fixed the ordering issue ... |
I've verified the performance regression introduced by this PR. It's a really serious bug if we're using
From my test, the consumer throughput is limited to at most 50+ MiB/s for single partition, even if the producer rate was downgraded to only 200 MiB/s. Here are my test results for the case that only 1 partition is used. Max throughput (10000 MB/s produce rate)200 MB/s produce rate/cc @merlimat @codelipenghui @dave2wave @Jason918 @eolivelli |
I'm also confused about why this PR fixes the message order issue and when would the message disordering happened. Could it be reproduced? |
@congbobo184 Do you remember how to reproduce the message disordering issue? |
@BewareMyPower Thanks for these tests. |
Updated: I've confirmed why this PR fixes the message order issue and it's safe to revert it. I'll push a PR soon to give more explanations and add a unit test for it. |
### Motivation apache#13023 introduced a performance regression, see details in apache#13023 (comment). It might be caused by the frequent thread switch. Because to ensure the message order, we must execute `ConsumerBase#triggerListener` in a single thread because it will deliver the received message to other threads, see https://github.com/apache/pulsar/blob/81da8d3cd199fd6c1e4510a1c1c2ac71418efd5e/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java#L970-L977 However, it's necessary for the correctness because `triggerListener` could be called in two places: - `messageReceived`, which is called in `pulsar-io-xxx` thread. - `callMessageListener`, which is called in `pulsar-external-listener-xxx` thread. Then these two threads can retrieve the results concurrently and it leads to the message disordering. This PR is to add a test that verifies the message order when message listener is configured so that the following PRs won't introduce the regression to correctness. ### Modifications - Add `SimpleProducerConsumerTest#testListenerOrdering`. - Make `triggerListener` private and the derived classes of `ConsumerBase` must call `tryTriggerListener`. It makes the code more clear that the performance issue is related to `triggerListener`. ### Verifying this change If we removed the `internalPinnedExecutor.execute` block in `triggerListener`, the `testListenerOrdering` could fail easily.
Sorry I found it's wrong. If we don't execute this logic in a separated executor, message disordering would happen. I've opened #15049 to add the test. I think we should find a way to enhance the performance of message listener in future. |
### Motivation apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer which using listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fixes the orderding issue, so this is the root cause of the performance regression. Here is the framegraph to to show the thread frame of internal thread and external thread. And also fix the performance issue for multiple topic consumer and key-shared subscription which enabled message listener. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring ordering. ### Modification - Remove the isListenerHandlingMessage control - Move the messages from receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener After the above changes, we don't need to call triggerListener from the external executor. Here is the thread framegraph after apply this change Before this change, the consumer can't reach 50000 messages/s. After this change, the consumer can reach 400000 messages/s ``` 2022-04-14T02:14:58,208+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 9723124 msg --- 470142.670 msg/s --- 3.587 Mbit/s --- Latency: mean: 9476.742 ms - med: 9441 - 95pct: 11908 - 99pct: 12152 - 99.9pct: 12239 - 99.99pct: 12247 - Max: 12247 2022-04-14T02:15:08,222+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 14411888 msg --- 468147.684 msg/s --- 3.572 Mbit/s --- Latency: mean: 15262.627 ms - med: 15253 - 95pct: 18023 - 99pct: 18258 - 99.9pct: 18315 - 99.99pct: 18317 - Max: 18318 2022-04-14T02:15:18,236+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 18841513 msg --- 442446.540 msg/s --- 3.376 Mbit/s --- Latency: mean: 21164.401 ms - med: 21094 - 95pct: 23664 - 99pct: 23899 - 99.9pct: 23939 - 99.99pct: 23955 - Max: 23955 2022-04-14T02:15:28,253+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23082078 msg --- 423212.525 msg/s --- 3.229 Mbit/s --- Latency: mean: 27174.714 ms - med: 27272 - 95pct: 29453 - 99pct: 29698 - 99.9pct: 29725 - 99.99pct: 29736 - Max: 29736 2022-04-14T02:15:38,268+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 27647013 msg --- 455823.127 msg/s --- 3.478 Mbit/s --- Latency: mean: 32438.418 ms - med: 32410 - 95pct: 34870 - 99pct: 35098 - 99.9pct: 35130 - 99.99pct: 35133 - Max: 35134 ```
### Motivation apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer which using listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fixes the orderding issue, so this is the root cause of the performance regression. Here is the framegraph to to show the thread frame of internal thread and external thread. ### Modification - Remove the isListenerHandlingMessage control - Move the messages from receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener After the above changes, we don't need to call triggerListener from the external executor. Here is the thread framegraph after apply this change Before this change, the consumer can't reach 50000 messages/s. After this change, the consumer can reach
…15162) ### Motivation #13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after #11455, the message will not move from the receiver queue to the external executor. And #13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. ### Modification - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener
…pache#15162) ### Motivation apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. ### Modification - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener
…15162) #13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after #11455, the message will not move from the receiver queue to the external executor. And #13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791)
…15162) #13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after #11455, the message will not move from the receiver queue to the external executor. And #13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791)
…15162) #13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after #11455, the message will not move from the receiver queue to the external executor. And #13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791)
…pache#15162) apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791) (cherry picked from commit c712441)
…pache#15162) apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791) (cherry picked from commit c712441)
Motivation
When ConsumerImpl try to call
tryTriggerListener
, it's the internal thread(internalPinnedExecutor) executingtriggerListener
.Then external executor will call
callMessageListener
->tryTriggerListener
.This will result in the message order issue. We should keep calling
triggerListener
in the internalPinnedExecutor.Modifications
triggerListener
always in internalPinnedExecutor.Documentation
no-need-doc