-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fixed race condition on multi-topic consumer #11764
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
great catch!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
pendingReceives also seems to have a race condition, two threads will operate pendingReceives :
receiveMessageFromConsumer is executed by client.getInternalExecutorService().execute
internalReceiveAsync is executed by the user's thread
It is better to put them all in pinnedExecutor, I am working on it. And it can solve the above scenario at the same time
I think the usage on
My concern there is that if you have a multi-topic consumer pulling from many topics, we shouldn't be bottlenecked by 1 single thread. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍🏻
### Motivation Under certain conditions applications using the multi-topic consumers might get the consumption stalled: The conditions to reproduce the issue are: * Consumer is subscribed to multiple topics, but only 1 topic has traffic * Messages are published in batches (no repro if no batches) * Receiver queue size == 1 (or small, in order to exercise race condition) The problem is that there is race condition between 2 threads when we're deciding to put one of the individual consumers in "paused" state, when the shared queue is full. What happens is that, just after we checked the conditions and we decide to mark the consumer as paused, the application has emptied the shared queue completely. From that point on, there is no re-attempt to check whether we need to unblock that consumer. ### Modification Instead of introducing a sync block (contended by many consumers), we just double check the state of the shared queue after marking the consumer as "paused". If the other thread has emptied the queue in the meantime, we'll be guaranteed to unblock the consumer. (cherry picked from commit f1d66d1)
### Motivation Under certain conditions applications using the multi-topic consumers might get the consumption stalled: The conditions to reproduce the issue are: * Consumer is subscribed to multiple topics, but only 1 topic has traffic * Messages are published in batches (no repro if no batches) * Receiver queue size == 1 (or small, in order to exercise race condition) The problem is that there is race condition between 2 threads when we're deciding to put one of the individual consumers in "paused" state, when the shared queue is full. What happens is that, just after we checked the conditions and we decide to mark the consumer as paused, the application has emptied the shared queue completely. From that point on, there is no re-attempt to check whether we need to unblock that consumer. ### Modification Instead of introducing a sync block (contended by many consumers), we just double check the state of the shared queue after marking the consumer as "paused". If the other thread has emptied the queue in the meantime, we'll be guaranteed to unblock the consumer. (cherry picked from commit f1d66d1)
### Motivation Under certain conditions applications using the multi-topic consumers might get the consumption stalled: The conditions to reproduce the issue are: * Consumer is subscribed to multiple topics, but only 1 topic has traffic * Messages are published in batches (no repro if no batches) * Receiver queue size == 1 (or small, in order to exercise race condition) The problem is that there is race condition between 2 threads when we're deciding to put one of the individual consumers in "paused" state, when the shared queue is full. What happens is that, just after we checked the conditions and we decide to mark the consumer as paused, the application has emptied the shared queue completely. From that point on, there is no re-attempt to check whether we need to unblock that consumer. ### Modification Instead of introducing a sync block (contended by many consumers), we just double check the state of the shared queue after marking the consumer as "paused". If the other thread has emptied the queue in the meantime, we'll be guaranteed to unblock the consumer.
Motivation
Under certain conditions applications using the multi-topic consumers might get the consumption stalled:
The conditions to reproduce the issue are:
The problem is that there is race condition between 2 threads when we're deciding to put one of the individual consumers in "paused" state, when the shared queue is full.
What happens is that, just after we checked the conditions and we decide to mark the consumer as paused, the application has emptied the shared queue completely. From that point on, there is no re-attempt to check whether we need to unblock that consumer.
Modification
Instead of introducing a sync block (contended by many consumers), we just double check the state of the shared queue after marking the consumer as "paused". If the other thread has emptied the queue in the meantime, we'll be guaranteed to unblock the consumer.