-
Notifications
You must be signed in to change notification settings - Fork 12k
[ISSUE #6205] Support Async pull for DefaultLitePullConsumer #6417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report
@@ Coverage Diff @@
## develop #6417 +/- ##
=============================================
+ Coverage 43.05% 43.14% +0.09%
- Complexity 8911 9014 +103
=============================================
Files 1105 1107 +2
Lines 77970 78321 +351
Branches 10146 10211 +65
=============================================
+ Hits 33570 33794 +224
- Misses 40190 40290 +100
- Partials 4210 4237 +27
... and 73 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
| public PullTaskImpl(final MessageQueue messageQueue, CommunicationMode communicationMode) { | ||
| this.messageQueue = messageQueue; | ||
| this.communicationMode = communicationMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good job! How do users set whether the pullTask is asynchronous or synchronous?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comment! I have the same confusion about the entrance of asynchronous pullTask.
As you see, I create another constructor here. When user creates the PullTaskImpl with "Async", the pullTask will run in the asynchronous way. However, I find that the PullTaskImpl only be created in other methods (e.g. seek(), startPullTask()) in this class.
If we let user decide which way to pull the tasks, we need to add a new parameter "communicationMode" for all upstream functions or in the class DefaultLitePullConsumerImpl (maybe better? just a private parameter with a set() function). I think either of them will make the code more complex.
This is my thought on the entrance design, but as I'm a freshman in RocketMQ, there may be some areas where I have not considered thoroughly. I would appreciate your advice and guidance on this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. It is not appropriate for users to decide whether to synchronize or asynchronously. IMO, we should replace the current synchronous pull implementation after the asynchronous pull is stable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your approval! Based on your suggestion, I think we can keep this constructor to facilitate our testing of asynchronous pull. Besides, what else can I do for this PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for your approval! Based on your suggestion, I think we can keep this constructor to facilitate our testing of asynchronous pull. Besides, what else can I do for this PR?
Go ahead, you can fix the problem of the comments, then we should do some tests. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have fixed the problem mentioned in comments. Thanks for your effort! Please let me know if there is anything I can do for it.
| updatePullOffset(messageQueue, pullResult.getNextBeginOffset(), processQueue); | ||
| } catch (InterruptedException interruptedException) { | ||
| log.warn("Polling thread was interrupted.", interruptedException); | ||
| } catch (Throwable e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If CommunicationMode is ASYNC, these code shoud be performed in callback.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
THX! I'll correct it
| public PullTaskImpl(final MessageQueue messageQueue, CommunicationMode communicationMode) { | ||
| this.messageQueue = messageQueue; | ||
| this.communicationMode = communicationMode; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. It is not appropriate for users to decide whether to synchronize or asynchronously. IMO, we should replace the current synchronous pull implementation after the asynchronous pull is stable.
|
You have successfully added a new CodeQL configuration |
| if (e instanceof MQBrokerException && ((MQBrokerException) e).getResponseCode() == ResponseCode.FLOW_CONTROL) { | ||
| pullDelayTimeMills = PULL_TIME_DELAY_MILLS_WHEN_BROKER_FLOW_CONTROL; | ||
| } else { | ||
| pullDelayTimeMills = pullTimeDelayMillsWhenException; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The time for the next scheduling is changed here, but there is no scheduling
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing it out. I will correct it as soon as possible.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for pointing it out. I will correct it as soon as possible.
Go ahead! :-)
| this.pullAsyncImpl(mq, subscriptionData, offset, maxNums, true, timeout, pullCallback); | ||
| } | ||
|
|
||
| private void pullAsyncImpl(MessageQueue mq, SubscriptionData subscriptionData, long offset, int maxNums, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is it possible to extract the same code as existed method pullSyncImpl()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your comment! In fact, there are 11 lines of duplicated code in pullSyncImpl() and pullAsyncImpl(). So we can extract a method for them. However, I notice that in DefaultMQPullConsumer, these two functions are written in the same way(duplicated code).
"I retain a little. I think retain some flavor of RocketMQ can let you know you are reading RocketMQ's code." 🐶
|
Hi @buptxxb65, could you do some tests and post a report?If you need any help, plz feel free to contact me. |
Sorry for late reply. Sure, I will do some tests for it. However, there is an examination on Tuesday(4.4), so I need to work on the tests and reports after it. I hope it won't be too late. |
Of course it doesn't matter. Look forward to your test. |
|
Hi @RongtongJin , I created an example for the DefaultLitePullConsumer. To use this example testing the Async pull, we need to change the line 680 in DefaultLitePullConsumerImpl into Here is the conclusion of its performance.
That's all of the test. I would appreciate if you have any better idea of the test and I'm happy to do anything I can to improve it. |
| } | ||
| } | ||
|
|
||
| private void executeThreadWithDelay() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about rename to schedulePullTaskWithDelay?
| @@ -0,0 +1,42 @@ | |||
| package org.apache.rocketmq.example.simple; | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IMO, no need for a new example.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I totally agree with you so I'll remove this example tonight. And I want to make sure my understanding is correct.
Since we haven't figure out a suitable way to start an Async pull, we can only change the constructor method of PullTaskImpl in seek(). Therefore, in users' view (example view), they just call the seek() function. Maybe we can add a new example after we separate the Async pull and Sync pull.
Firstly, testing needs to verify whether it can solve the problem described in the issue. |
In achievement aspect, I think the testing has verified that it can successfully asynchronous pull messages. Do you mean I need to verify whether the Async pull will behave better than Sync pull under the special situation? Like the number of partitions/queues assigned to the client is much more greater than 20. |
I think @RongtongJin means to verify the performance in some abnormal scenarios, such as restarting the broker during the process of pulling messages to see if it works normally |
Got it! I will try to achieve it. |
|
This PR is stale because it has been open for 365 days with no activity. It will be closed in 3 days if no further activity occurs. If you wish not to mark it as stale, please leave a comment in this PR. |
|
This PR was closed because it has been inactive for 3 days since being marked as stale. |

Make sure set the target branch to
developWhat is the purpose of the change
fixes #6205
Support the new feature mentioned in it.
fix #
Brief changelog
Support the Async pull function by using PullCallback.
Currently, it has not been actually invoked. Since I'm confused when to use it(When the number of partitions/queues assigned to the client is greater than a specific number?).
I'll further modified it in the future. Looking forward to your suggestions : )
Verifying this change
XXXX
Follow this checklist to help us incorporate your contribution quickly and easily. Notice,
it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.[ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.mvn -B clean apache-rat:check findbugs:findbugs checkstyle:checkstyleto make sure basic checks pass. Runmvn clean install -DskipITsto make sure unit-test pass. Runmvn clean test-compile failsafe:integration-testto make sure integration-test pass.