-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix consumer stuck issue due to reuse entry wrapper. #10824
Conversation
1. Add wrapperOffset to make sure get the correct batch size from the metadata 2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer it should be (messages / avgBatchSizePerMsg)
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. I added 2 code review comments which are more like questions where I'm trying to understand the overall code base around message dispatching. Good work Penghui!
The change makes sense to me. Thank you very much ! |
I have tested the fix. The situation looks better, but all messages don't get delivered from the backlog to the consumer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
The fix makes sense to me. |
* According the batch average size dispatching, the broker will dispatch all the batches to the consumer | ||
*/ | ||
@Test | ||
public void testFlowPermitsWithMultiBatchesDispatch() throws PulsarAdminException, PulsarClientException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the main difference between my test and this test is that here we do not have a partitioned topic.
a good follow up work is to add a test for partitioned topics
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@eolivelli I have merged the PR, will try to investigate the problem with the partitioned topic. I have test 100 partitions on the standalone, seem not able to reproduce the issue. Will try to deploy to a real cluster for testing. Will try to write a UT for partitioned case if it can be reproduced
let's merge the patch as soon as CI is green |
@codelipenghui I would like to add that with this fix my test arrived to 99.85% ! thanks ! |
Fixes apache#10813 The issue is introduced by apache#7266, it only affects the master branch. ### Motivation 1. Add wrapperOffset to make sure get the correct batch size from the metadata 2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer it should be (messages / avgBatchSizePerMsg) ### Verifying this change * The test case is to simulate dispatch batches with different batch size to the consumer. * 1. The consumer has 1000 available permits * 2. The producer send batches with different batch size * * According the batch average size dispatching, the broker will dispatch all the batches to the consumer (cherry picked from commit 4f23767)
Fixes apache#10813 The issue is introduced by apache#7266, it only affects the master branch. ### Motivation 1. Add wrapperOffset to make sure get the correct batch size from the metadata 2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer it should be (messages / avgBatchSizePerMsg) ### Verifying this change * The test case is to simulate dispatch batches with different batch size to the consumer. * 1. The consumer has 1000 available permits * 2. The producer send batches with different batch size * * According the batch average size dispatching, the broker will dispatch all the batches to the consumer
Fixes apache#10813 The issue is introduced by apache#7266, it only affects the master branch. ### Motivation 1. Add wrapperOffset to make sure get the correct batch size from the metadata 2. Fix the issue that using (batch count / avgBatchSizePerMsg) to calculate messages for the consumer it should be (messages / avgBatchSizePerMsg) ### Verifying this change * The test case is to simulate dispatch batches with different batch size to the consumer. * 1. The consumer has 1000 available permits * 2. The producer send batches with different batch size * * According the batch average size dispatching, the broker will dispatch all the batches to the consumer
Fixes #10813
The issue is introduced by #7266, it only affects the master branch.
Motivation
it should be (messages / avgBatchSizePerMsg)
Verifying this change
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation