-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[fix][java-client] Fix performance regression with message listener #15162
[fix][java-client] Fix performance regression with message listener #15162
Conversation
### Motivation apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer which using listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fixes the orderding issue, so this is the root cause of the performance regression. Here is the framegraph to to show the thread frame of internal thread and external thread. And also fix the performance issue for multiple topic consumer and key-shared subscription which enabled message listener. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring ordering. ### Modification - Remove the isListenerHandlingMessage control - Move the messages from receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener After the above changes, we don't need to call triggerListener from the external executor. Here is the thread framegraph after apply this change Before this change, the consumer can't reach 50000 messages/s. After this change, the consumer can reach 400000 messages/s ``` 2022-04-14T02:14:58,208+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 9723124 msg --- 470142.670 msg/s --- 3.587 Mbit/s --- Latency: mean: 9476.742 ms - med: 9441 - 95pct: 11908 - 99pct: 12152 - 99.9pct: 12239 - 99.99pct: 12247 - Max: 12247 2022-04-14T02:15:08,222+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 14411888 msg --- 468147.684 msg/s --- 3.572 Mbit/s --- Latency: mean: 15262.627 ms - med: 15253 - 95pct: 18023 - 99pct: 18258 - 99.9pct: 18315 - 99.99pct: 18317 - Max: 18318 2022-04-14T02:15:18,236+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 18841513 msg --- 442446.540 msg/s --- 3.376 Mbit/s --- Latency: mean: 21164.401 ms - med: 21094 - 95pct: 23664 - 99pct: 23899 - 99.9pct: 23939 - 99.99pct: 23955 - Max: 23955 2022-04-14T02:15:28,253+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 23082078 msg --- 423212.525 msg/s --- 3.229 Mbit/s --- Latency: mean: 27174.714 ms - med: 27272 - 95pct: 29453 - 99pct: 29698 - 99.9pct: 29725 - 99.99pct: 29736 - Max: 29736 2022-04-14T02:15:38,268+0800 [main] INFO org.apache.pulsar.testclient.PerformanceConsumer - Throughput received: 27647013 msg --- 455823.127 msg/s --- 3.478 Mbit/s --- Latency: mean: 32438.418 ms - med: 32410 - 95pct: 34870 - 99pct: 35098 - 99.9pct: 35130 - 99.99pct: 35133 - Max: 35134 ```
### Motivation apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer which using listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fixes the orderding issue, so this is the root cause of the performance regression. Here is the framegraph to to show the thread frame of internal thread and external thread. ### Modification - Remove the isListenerHandlingMessage control - Move the messages from receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener After the above changes, we don't need to call triggerListener from the external executor. Here is the thread framegraph after apply this change Before this change, the consumer can't reach 50000 messages/s. After this change, the consumer can reach
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left one question
PTAL
} | ||
} | ||
} while (msg != null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my understanding is that this way we are going to occupy the thread forever
other Consumers won't be able to use this thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, it will not occupy the thread forever. After enabling the message listener, the incoming messages will trigger here, so we will only have a few messages in the receiver queue. Here just want to make sure all the messages in the receiver queue can be moved out to the external executor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
learned a lot.
final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); | ||
Message<T> msg; | ||
do { | ||
msg = internalReceive(0, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui here we are doing a "take" operation, without conditions, this will wait forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a take operation, it's a poll operation, if no more messages are in the receiver queue, we will get null here immediately.
final Message<T> msg = internalReceive(0, TimeUnit.MILLISECONDS); | ||
Message<T> msg; | ||
do { | ||
msg = internalReceive(0, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the role of ReceiveQueue disappear in this way? Regardless of whether the Consumer has finished consumption, the client will continue to pull messages, and finally make the client OOM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is controlled by the flow permits, we only increase the permits after triggering the listener.
}); | ||
} | ||
} else { | ||
if (log.isDebugEnabled()) { | ||
log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem there will be more logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Technoboy- It's debugging level logs, it will help for troubleshooting problems if we see this log but there are messages in the queue.
@eolivelli Could you please help review again? |
…pache#15162) ### Motivation apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. ### Modification - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener
…15162) #13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after #11455, the message will not move from the receiver queue to the external executor. And #13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791)
…15162) #13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after #11455, the message will not move from the receiver queue to the external executor. And #13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791)
…15162) #13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after #11455, the message will not move from the receiver queue to the external executor. And #13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791)
…pache#15162) apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791) (cherry picked from commit c712441)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, for the late review. Great work @codelipenghui! LGTM.
…pache#15162) apache#13023 has introduced a performance regression. For each message, we are switching from external thread pool -> internal thread poll -> external thread pool. Previously we want to control the outstanding messages of a consumer using a listener, so after apache#11455, the message will not move from the receiver queue to the external executor. And apache#13023 changed the listener trigger in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression. Here is the frame graph to show the thread frame of the internal thread and external thread. [framegraph.html.txt](https://github.com/apache/pulsar/files/8483765/framegraph.html.txt) And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order. - Remove the isListenerHandlingMessage control - Move the messages from the receiver queue to the queue of external executor but not increase permits - Increase permits before call message listener (cherry picked from commit 83cd791) (cherry picked from commit c712441)
Has this been cherry picked to 2.10, 2.9, and 2.8 yet? |
Yes, see the |
Motivation
#13023 has introduced a performance regression.
For each message, we are switching from external thread pool -> internal thread poll -> external thread pool.
Previously we want to control the outstanding messages of a consumer using a listener, so after #11455,
the message will not move from the receiver queue to the external executor. And #13023 changed the listener trigger
in the internal thread pool to fix the ordering issue, so this is the root cause of the performance regression.
Here is the frame graph to show the thread frame of the internal thread and external thread.
framegraph.html.txt
And also fix the performance issue for multiple topic consumers and key-shared subscriptions which enabled message listeners. Before this change, the messages are processed serially. After this change, We can improve parallelism on the premise of ensuring order.
Modification
After the above changes, we don't need to call triggerListener from the external executor.
Here is the thread frame graph after applying this change
framegraph2.html.txt
Before this change, the consumer can't reach 50000 messages/s.
After this change, the consumer can reach 400000 messages/s
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
no-need-doc
(Please explain why)
doc
(Your PR contains doc changes)
doc-added
(Docs have been already added)