-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize #6862
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should introduce an exception when the maxNumMessages greater than queue size when creating consumers. This will be more transparent to users.
.build(); | ||
} else { | ||
this.batchReceivePolicy = conf.getBatchReceivePolicy(); | ||
} | ||
} else { | ||
this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We also need to add a check for DEFAULT_POLICY
since users may specify the queue size that lower that maxNumMessages
of the DEFAULT_POLICY
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The above solution is correct but users can get confused seeing their batchRecieve policy configuration has changed
Why not changing the function hasEnoughMessagesForBatchReceive to return true when
incomingMessages.size() >= maxReceiverQueueSize ?
protected boolean hasEnoughMessagesForBatchReceive() {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
return false;
}
return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
|| (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()) || (incomingMessages.size() >= maxReceiverQueueSize);
}
What do you think ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will cause batch message return to the user earlier right? This will introduce another problem, users will get the batch message before reaching maxNumMessage
. It's better to tell users the receiver queue size needs to greater than the maxNumMessage
of the batch receive policy.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I completely agree that throwing an exception is a better choice
@@ -95,7 +95,16 @@ protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurat | |||
this.schema = schema; | |||
this.interceptors = interceptors; | |||
if (conf.getBatchReceivePolicy() != null) { | |||
this.batchReceivePolicy = conf.getBatchReceivePolicy(); | |||
if (conf.getBatchReceivePolicy().getMaxNumMessages() > this.maxReceiverQueueSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we also add unit tests with the desired behaviour ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your feedback, I have add the unit tests for all kinds of exception case, please take a look.
@hangc0276 Jia has started a discussion in the email thread for cutting 2.5.2 and it's better to include this fix in 2.5.2. If you have time, please help take a look at the comments, so that we can merge this PR. |
sorry,I will fix this pr soon. When i add test case for the batchReceivePolicy, I found another bugs, and the consumer still stuck when reset the maxNumMessages to receiverQueueSize. I will fix it soon. |
I have throw a log when reset the batch receive policy, please take a look, thanks. |
@codelipenghui When batch receive triggered by timeout, it should trigger send permit to broker server in |
/pulsarbot run-failure-checks |
…erQueueSize (#6862) Fix #6854 ### Bug description The consumer stuck due to `hasEnoughMessagesForBatchReceive` checking: ``` protected boolean hasEnoughMessagesForBatchReceive() { if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) { return false; } return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()); } ``` ### Changes When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking `hasEnoughMessagesForBatchReceive` * fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize * throw log warn and add test case (cherry picked from commit 561868d)
…erQueueSize (apache#6862) Fix apache#6854 ### Bug description The consumer stuck due to `hasEnoughMessagesForBatchReceive` checking: ``` protected boolean hasEnoughMessagesForBatchReceive() { if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) { return false; } return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()); } ``` ### Changes When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking `hasEnoughMessagesForBatchReceive` * fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize * throw log warn and add test case
…erQueueSize (apache#6862) Fix apache#6854 ### Bug description The consumer stuck due to `hasEnoughMessagesForBatchReceive` checking: ``` protected boolean hasEnoughMessagesForBatchReceive() { if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) { return false; } return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()); } ``` ### Changes When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking `hasEnoughMessagesForBatchReceive` * fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize * throw log warn and add test case (cherry picked from commit 561868d)
…erQueueSize (apache#6862) Fix apache#6854 ### Bug description The consumer stuck due to `hasEnoughMessagesForBatchReceive` checking: ``` protected boolean hasEnoughMessagesForBatchReceive() { if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) { return false; } return (batchReceivePolicy.getMaxNumMessages() > 0 && incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages()) || (batchReceivePolicy.getMaxNumBytes() > 0 && INCOMING_MESSAGES_SIZE_UPDATER.get(this) >= batchReceivePolicy.getMaxNumBytes()); } ``` ### Changes When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking `hasEnoughMessagesForBatchReceive` * fix consumer stuck when batchReceivePolicy maxNumMessages > maxReceiverQueueSize * throw log warn and add test case
Fix #6854
Bug description
The consumer stuck due to
hasEnoughMessagesForBatchReceive
checking:Changes
When batchReceivePolicy maxNumMessages > maxReceiverQueueSize, we force batch receivePolicy maxNumMessages to maxReceiverQueueSize to avoid consumer stuck when checking
hasEnoughMessagesForBatchReceive