From f1d66d1a635dfcdc3335a5c8d5ea6d68e38ae71c Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 25 Aug 2021 00:33:35 -0700 Subject: [PATCH] Fixed race condition on multi-topic consumer (#11764) ### Motivation Under certain conditions applications using the multi-topic consumers might get the consumption stalled: The conditions to reproduce the issue are: * Consumer is subscribed to multiple topics, but only 1 topic has traffic * Messages are published in batches (no repro if no batches) * Receiver queue size == 1 (or small, in order to exercise race condition) The problem is that there is race condition between 2 threads when we're deciding to put one of the individual consumers in "paused" state, when the shared queue is full. What happens is that, just after we checked the conditions and we decide to mark the consumer as paused, the application has emptied the shared queue completely. From that point on, there is no re-attempt to check whether we need to unblock that consumer. ### Modification Instead of introducing a sync block (contended by many consumers), we just double check the state of the shared queue after marking the consumer as "paused". If the other thread has emptied the queue in the meantime, we'll be guaranteed to unblock the consumer. --- .../apache/pulsar/client/impl/MultiTopicsConsumerImpl.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f618ccb49ab34..1b2fe720b707c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -259,6 +259,11 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { // mark this consumer to be resumed later: if No more space left in shared queue, // or if any consumer is already paused (to create fair chance for already paused consumers) pausedConsumers.add(consumer); + + // Since we din't get a mutex, the condition on the incoming queue might have changed after + // we have paused the current consumer. We need to re-check in order to avoid this consumer + // from getting stalled. + resumeReceivingFromPausedConsumersIfNeeded(); } else { // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid // recursion and stack overflow