diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java index 6b032370898e9..7c75f09977376 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/client/impl/RawReaderImpl.java @@ -116,6 +116,7 @@ static class RawConsumerImpl extends ConsumerImpl { client.externalExecutorProvider(), TopicName.getPartitionIndex(conf.getSingleTopic()), false, + false, consumerFuture, MessageId.earliest, 0 /* startMessageRollbackDurationInSec */, diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java index 7bc63d89f98c2..09b840fa502f5 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBase.java @@ -84,7 +84,6 @@ public abstract class ConsumerBase extends HandlerState implements Consumer conf, int receiverQueueSize, ExecutorProvider executorProvider, @@ -913,33 +912,34 @@ protected void tryTriggerListener() { } private void triggerListener() { + // The messages are added into the receiver queue by the internal pinned executor, + // so need to use internal pinned executor to avoid race condition which message + // might be added into the receiver queue but not able to read here. internalPinnedExecutor.execute(() -> { try { - // Listener should only have one pending/running executable to process a message - // See https://github.com/apache/pulsar/issues/11008 for context. - if (!isListenerHandlingMessage) { - final Message msg = internalReceive(0, TimeUnit.MILLISECONDS); + Message msg; + do { + msg = internalReceive(0, TimeUnit.MILLISECONDS); if (msg != null) { - isListenerHandlingMessage = true; // Trigger the notification on the message listener in a separate thread to avoid blocking the // internal pinned executor thread while the message processing happens + final Message finalMsg = msg; if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) { executorProvider.getExecutor(peekMessageKey(msg)).execute(() -> - callMessageListener(msg)); + callMessageListener(finalMsg)); } else { getExternalExecutor(msg).execute(() -> { - callMessageListener(msg); + callMessageListener(finalMsg); }); } + } else { + if (log.isDebugEnabled()) { + log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); + } } - } + } while (msg != null); } catch (PulsarClientException e) { log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e); - return; - } - - if (log.isDebugEnabled()) { - log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription); } }); } @@ -950,13 +950,16 @@ protected void callMessageListener(Message msg) { log.debug("[{}][{}] Calling message listener for message {}", topic, subscription, msg.getMessageId()); } + ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) + ? ((TopicMessageImpl) msg).receivedByconsumer : (ConsumerImpl) this; + // Increase the permits here since we will not increase permits while receive messages from consumer + // after enabled message listener. + receivedConsumer.increaseAvailablePermits((MessageImpl) (msg instanceof TopicMessageImpl + ? ((TopicMessageImpl) msg).getMessage() : msg)); listener.received(ConsumerBase.this, msg); } catch (Throwable t) { log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription, msg.getMessageId(), t); - } finally { - isListenerHandlingMessage = false; - triggerListener(); } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index f680d1e4f8808..0d5e63a1e52bb 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -128,6 +128,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final long subscribeTimeout; private final int partitionIndex; private final boolean hasParentConsumer; + private final boolean parentConsumerHasListener; private final int receiverQueueRefillThreshold; @@ -202,8 +203,8 @@ static ConsumerImpl newConsumerImpl(PulsarClientImpl client, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist) { - return newConsumerImpl(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture, - startMessageId, schema, interceptors, createTopicIfDoesNotExist, 0); + return newConsumerImpl(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, false, + subscribeFuture, startMessageId, schema, interceptors, createTopicIfDoesNotExist, 0); } static ConsumerImpl newConsumerImpl(PulsarClientImpl client, @@ -212,6 +213,7 @@ static ConsumerImpl newConsumerImpl(PulsarClientImpl client, ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, + boolean parentConsumerHasListener, CompletableFuture> subscribeFuture, MessageId startMessageId, Schema schema, @@ -225,14 +227,16 @@ static ConsumerImpl newConsumerImpl(PulsarClientImpl client, createTopicIfDoesNotExist); } else { return new ConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, - subscribeFuture, startMessageId, startMessageRollbackDurationInSec /* rollback time in sec to start msgId */, + parentConsumerHasListener, + subscribeFuture, startMessageId, + startMessageRollbackDurationInSec /* rollback time in sec to start msgId */, schema, interceptors, createTopicIfDoesNotExist); } } protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData conf, ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer, - CompletableFuture> subscribeFuture, MessageId startMessageId, + boolean parentConsumerHasListener, CompletableFuture> subscribeFuture, MessageId startMessageId, long startMessageRollbackDurationInSec, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist) { super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema, interceptors); @@ -246,6 +250,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.partitionIndex = partitionIndex; this.hasParentConsumer = hasParentConsumer; this.receiverQueueRefillThreshold = conf.getReceiverQueueSize() / 2; + this.parentConsumerHasListener = parentConsumerHasListener; this.priorityLevel = conf.getPriorityLevel(); this.readCompacted = conf.isReadCompacted(); this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition(); @@ -1342,7 +1347,9 @@ protected synchronized void messageProcessed(Message msg) { if (msgCnx != currentCnx) { // The processed message did belong to the old queue that was cleared after reconnection. } else { - increaseAvailablePermits(currentCnx); + if (listener == null && !parentConsumerHasListener) { + increaseAvailablePermits(currentCnx); + } stats.updateNumMsgsReceived(msg); trackMessage(msg); @@ -1373,6 +1380,14 @@ protected void trackMessage(MessageId messageId) { } } + void increaseAvailablePermits(MessageImpl msg) { + ClientCnx currentCnx = cnx(); + ClientCnx msgCnx = msg.getCnx(); + if (msgCnx == currentCnx) { + increaseAvailablePermits(currentCnx); + } + } + void increaseAvailablePermits(ClientCnx currentCnx) { increaseAvailablePermits(currentCnx, 1); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index bcccd09d7c7c1..d19f8b8e892a1 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -975,7 +975,7 @@ private void doSubscribeTopicPartitions(Schema schema, CompletableFuture> subFuture = new CompletableFuture<>(); ConsumerImpl newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName, configurationData, client.externalExecutorProvider(), - partitionIndex, true, subFuture, + partitionIndex, true, listener != null, subFuture, startMessageId, schema, interceptors, createIfDoesNotExist, startMessageRollbackDurationInSec); synchronized (pauseMutex) { @@ -1002,7 +1002,7 @@ private void doSubscribeTopicPartitions(Schema schema, } else { ConsumerImpl newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig, client.externalExecutorProvider(), -1, - true, subFuture, startMessageId, schema, interceptors, + true, listener != null, subFuture, startMessageId, schema, interceptors, createIfDoesNotExist, startMessageRollbackDurationInSec); synchronized (pauseMutex) { @@ -1298,7 +1298,7 @@ private CompletableFuture subscribeIncreasedTopicPartitions(String topicNa ConsumerImpl newConsumer = ConsumerImpl.newConsumerImpl( client, partitionName, configurationData, client.externalExecutorProvider(), - partitionIndex, true, subFuture, startMessageId, schema, interceptors, + partitionIndex, true, listener != null, subFuture, startMessageId, schema, interceptors, true /* createTopicIfDoesNotExist */, startMessageRollbackDurationInSec); synchronized (pauseMutex) { if (paused) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java index 884fe696a1002..d5719ddf016d4 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ReaderImpl.java @@ -103,7 +103,7 @@ public void reachedEndOfTopic(Consumer consumer) { final int partitionIdx = TopicName.getPartitionIndex(readerConfiguration.getTopicName()); consumer = new ConsumerImpl<>(client, readerConfiguration.getTopicName(), consumerConfiguration, - executorProvider, partitionIdx, false, consumerFuture, + executorProvider, partitionIdx, false, false, consumerFuture, readerConfiguration.getStartMessageId(), readerConfiguration.getStartMessageFromRollbackDurationInSec(), schema, null, true /* createTopicIfDoesNotExist */); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java index fca9cffcfdc23..66ba0d47ae754 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ZeroQueueConsumerImpl.java @@ -53,7 +53,7 @@ public ZeroQueueConsumerImpl(PulsarClientImpl client, String topic, ConsumerConf CompletableFuture> subscribeFuture, MessageId startMessageId, Schema schema, ConsumerInterceptors interceptors, boolean createTopicIfDoesNotExist) { - super(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, subscribeFuture, + super(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, false, subscribeFuture, startMessageId, 0 /* startMessageRollbackDurationInSec */, schema, interceptors, createTopicIfDoesNotExist); }