-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Issue 11689][Client] Fixed block forever bug in Consumer.batchReceive #11691
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Good work @Vanlightly |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
0064ef6
to
b507be1
Compare
The main reason for this to be locked is the following two pieces of logic: Can we set an abstract batchReceiveTimeout() on the base; So that peek and poll can be run in the internalPinnedExecutor, and there is no need to lock multiple times. Or any other suggestions? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@315157973 we could look at something like that. The ConsumerImpl has its |
@315157973 @Vanlightly Thanks for your explanation, I like the approach @315157973 you have mentioned. We can create an abstract method in the ConsumerBase and for all the consumer implements ensure call the method by |
@315157973 @codelipenghui I have an implementation based on the suggestion, there was just one complication: On The issues here:
I have an implementation where I am in the process of running some nasty multi-threaded tests with concurrent calls to batchReceive and close, with one client, multiple shared consumers and multiple threads per consumer. Running for non-partitioned and partitioned topics. Will force push when I am happy there are no race conditions. |
b507be1
to
a3fc6fe
Compare
Ensure that all poll() calls to pendingBatchReceives is done within the pinnedInternalExecutor to avoid a race condition where a peek and a subsequent poll get different pending receives. Moved the pinnedInternalExecutor into the ConsumerBase as both ConsumerImpl and MultiTopicsConsumerImpl require it. failingPendingReceive() now always submits its work to the internal executor returning a CompletableFuture and all callers treat it as an asynchronous operation.
a3fc6fe
to
bb9b4c6
Compare
@315157973 @codelipenghui @lhotari I have pushed the change of ensuring all poll() calls go through the pinnedInternalExecutor to avoid threading issues. I didn't have to make an abstract method as I could simply move the pinnedInternalExecutor to ConsumerBase, given that both ConsumerImpl and MultiTopicsConsumerImpl require it. The batch timer in ConsumerBase then simply submits its work to the executor from there. I also moved the failPendingReceive() to the ConsumerBase as it is the same for both. I also renamed the pinnedExecutor to externalPinnedExecutor as that is what it is and make it clearer. |
Needed a real executor service to run the failPendingReceive() method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -305,7 +305,7 @@ private void resumeReceivingFromPausedConsumersIfNeeded() { | |||
break; | |||
} | |||
|
|||
client.getInternalExecutorService().execute(() -> { | |||
internalPinnedExecutor.execute(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems to have changed the previous behavior. Before, you can select a different thread to execute each time, and now it has become one thread to execute.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes I mentioned this in my first comment on the suggested approach. If we're going to avoid all multi-threaded calls to to `pendingBatchReceives.poll()' then we can't use the internal pool here, we must use only one.
@codelipenghui PTAL again |
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
Thanks @Vanlightly The change looks good, I just left some minor comments about the |
Ensure that the externalPinnedExecutor is only called for user code and internalPinnedExecutor used for internal tasks. Some test refactoring to manage creation of executors.
3faca72
to
9b5eeff
Compare
/pulsarbot run-failure-checks |
@codelipenghui I've made those updates, PTAL. |
#11691) * Fixed block forever bug in Consumer.batchReceive Ensure that all poll() calls to pendingBatchReceives is done within the pinnedInternalExecutor to avoid a race condition where a peek and a subsequent poll get different pending receives. Moved the pinnedInternalExecutor into the ConsumerBase as both ConsumerImpl and MultiTopicsConsumerImpl require it. failingPendingReceive() now always submits its work to the internal executor returning a CompletableFuture and all callers treat it as an asynchronous operation. * Fix broken MultiTopicsConsumerImplTest Needed a real executor service to run the failPendingReceive() method. * Ensure all calls to messageReceived happen on internal executor * Readd missing return statement in ConsumerImpl.closeAsync() * Ensure correct usage of consumer internal executors Ensure that the externalPinnedExecutor is only called for user code and internalPinnedExecutor used for internal tasks. Some test refactoring to manage creation of executors. (cherry picked from commit bd942e1)
apache#11691) * Fixed block forever bug in Consumer.batchReceive Ensure that all poll() calls to pendingBatchReceives is done within the pinnedInternalExecutor to avoid a race condition where a peek and a subsequent poll get different pending receives. Moved the pinnedInternalExecutor into the ConsumerBase as both ConsumerImpl and MultiTopicsConsumerImpl require it. failingPendingReceive() now always submits its work to the internal executor returning a CompletableFuture and all callers treat it as an asynchronous operation. * Fix broken MultiTopicsConsumerImplTest Needed a real executor service to run the failPendingReceive() method. * Ensure all calls to messageReceived happen on internal executor * Readd missing return statement in ConsumerImpl.closeAsync() * Ensure correct usage of consumer internal executors Ensure that the externalPinnedExecutor is only called for user code and internalPinnedExecutor used for internal tasks. Some test refactoring to manage creation of executors.
apache#11691) * Fixed block forever bug in Consumer.batchReceive Ensure that all poll() calls to pendingBatchReceives is done within the pinnedInternalExecutor to avoid a race condition where a peek and a subsequent poll get different pending receives. Moved the pinnedInternalExecutor into the ConsumerBase as both ConsumerImpl and MultiTopicsConsumerImpl require it. failingPendingReceive() now always submits its work to the internal executor returning a CompletableFuture and all callers treat it as an asynchronous operation. * Fix broken MultiTopicsConsumerImplTest Needed a real executor service to run the failPendingReceive() method. * Ensure all calls to messageReceived happen on internal executor * Readd missing return statement in ConsumerImpl.closeAsync() * Ensure correct usage of consumer internal executors Ensure that the externalPinnedExecutor is only called for user code and internalPinnedExecutor used for internal tasks. Some test refactoring to manage creation of executors.
#11691) * Fixed block forever bug in Consumer.batchReceive Ensure that all poll() calls to pendingBatchReceives is done within the pinnedInternalExecutor to avoid a race condition where a peek and a subsequent poll get different pending receives. Moved the pinnedInternalExecutor into the ConsumerBase as both ConsumerImpl and MultiTopicsConsumerImpl require it. failingPendingReceive() now always submits its work to the internal executor returning a CompletableFuture and all callers treat it as an asynchronous operation. * Fix broken MultiTopicsConsumerImplTest Needed a real executor service to run the failPendingReceive() method. * Ensure all calls to messageReceived happen on internal executor * Readd missing return statement in ConsumerImpl.closeAsync() * Ensure correct usage of consumer internal executors Ensure that the externalPinnedExecutor is only called for user code and internalPinnedExecutor used for internal tasks. Some test refactoring to manage creation of executors. (cherry picked from commit bd942e1)
apache#11691) * Fixed block forever bug in Consumer.batchReceive Ensure that all poll() calls to pendingBatchReceives is done within the pinnedInternalExecutor to avoid a race condition where a peek and a subsequent poll get different pending receives. Moved the pinnedInternalExecutor into the ConsumerBase as both ConsumerImpl and MultiTopicsConsumerImpl require it. failingPendingReceive() now always submits its work to the internal executor returning a CompletableFuture and all callers treat it as an asynchronous operation. * Fix broken MultiTopicsConsumerImplTest Needed a real executor service to run the failPendingReceive() method. * Ensure all calls to messageReceived happen on internal executor * Readd missing return statement in ConsumerImpl.closeAsync() * Ensure correct usage of consumer internal executors Ensure that the externalPinnedExecutor is only called for user code and internalPinnedExecutor used for internal tasks. Some test refactoring to manage creation of executors.
Fixes #11689
Motivation
When
Consumer.batchReceive()
is called concurrently by different threads there exists a race condition inConsumerBase.java
which when triggered causes a CompletableFuture in the queuependingBatchReceives
to be removed from the queue but not completed, causing the consumer to block forever. This has occurred a few times in production recently.The issue is that there are concurrent calls to peek and poll in
peekNextBatchReceive
and the code is only correct when what is peeked is polled. If another thread calls poll between a peek and poll then this bug occurs. There is an error message when this occurs Bug: Removed entry wasn't the expected one.Modifications
pendingBatchReceives
inConsumerImpl
andMultiTopicsConsumerImpl
given that the lazy instantiation was not thread safe.nextBatchReceive
now.failPendingBatchReceives
andfailPendingReceives
as they were superfluos.Verifying this change
All existing tests that cover the consumer pass.
Reproduction was difficult, but I have seen it in production. The only way to make it trigger within a few seconds or minutes was to add a short thread sleep between the peek and poll while calling batchReceive concurrently from many threads with a non-zero
maxNumMessages
. I have run the repro steps in the issue with this fix and added a temporary thread sleep between peek and poll, and have verified it no longer occurs.Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
For contributor
Nothing to document here, purely internal implementation details.
For committer
For this PR, do we need to update docs?
If yes,
if you update docs in this PR, label this PR with the
doc
label.if you plan to update docs later, label this PR with the
doc-required
label.if you need help on updating docs, create a follow-up issue with the
doc-required
label.If no, label this PR with the
no-need-doc
label and explain why.