diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index f618ccb49ab34..1b2fe720b707c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -259,6 +259,11 @@ private void receiveMessageFromConsumer(ConsumerImpl consumer) { // mark this consumer to be resumed later: if No more space left in shared queue, // or if any consumer is already paused (to create fair chance for already paused consumers) pausedConsumers.add(consumer); + + // Since we din't get a mutex, the condition on the incoming queue might have changed after + // we have paused the current consumer. We need to re-check in order to avoid this consumer + // from getting stalled. + resumeReceivingFromPausedConsumersIfNeeded(); } else { // Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid // recursion and stack overflow