diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java index c7f1a6d6dafdd..360f0bc550eaf 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PartitionedProducerConsumerTest.java @@ -28,6 +28,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.apache.pulsar.client.impl.PartitionedProducerImpl; import org.apache.pulsar.common.naming.TopicName; @@ -216,6 +218,58 @@ private void testKeyBasedOrder(Set messageSet, String message) { Assert.assertTrue(messageSet.add(message), "Received duplicate message " + message); } + @Test(timeOut = 100000) + public void testPauseAndResume() throws Exception { + log.info("-- Starting {} test --", methodName); + + int numPartitions = 2; + String topicName = TopicName + .get("persistent://my-property/my-ns/my-partitionedtopic-pr-" + System.currentTimeMillis()).toString(); + + admin.topics().createPartitionedTopic(topicName, numPartitions); + + int receiverQueueSize = 20; // number of permits broker has per partition when consumer initially subscribes + int numMessages = receiverQueueSize * numPartitions; + + AtomicReference latch = new AtomicReference<>(new CountDownLatch(numMessages)); + AtomicInteger received = new AtomicInteger(); + + Consumer consumer = pulsarClient.newConsumer().receiverQueueSize(receiverQueueSize) + .topic(topicName) + .subscriptionName("my-partitioned-subscriber").messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + c1.acknowledgeAsync(msg); + received.incrementAndGet(); + latch.get().countDown(); + }).subscribe(); + + Producer producer = pulsarClient.newProducer().topic(topicName).create(); + + consumer.pause(); + + for (int i = 0; i < numMessages * 2; i++) producer.send(("my-message-" + i).getBytes()); + + log.info("Waiting for message listener to ack " + numMessages + " messages"); + assertEquals(latch.get().await(numMessages, TimeUnit.SECONDS), true, "Timed out waiting for message listener acks"); + + log.info("Giving message listener an opportunity to receive messages while paused"); + Thread.sleep(2000); // hopefully this is long enough + assertEquals(received.intValue(), numMessages, "Consumer received messages while paused"); + + latch.set(new CountDownLatch(numMessages)); + + consumer.resume(); + + log.info("Waiting for message listener to ack all messages"); + assertEquals(latch.get().await(numMessages, TimeUnit.SECONDS), true, "Timed out waiting for message listener acks"); + + consumer.close(); + producer.close(); + log.info("-- Exiting {} test --", methodName); + } + @Test(timeOut = 30000) public void testInvalidSequence() throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java index 5af0088d27c9b..f6b8e1a518457 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java @@ -49,6 +49,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import java.util.stream.Collectors; import org.apache.bookkeeper.mledger.impl.EntryCacheImpl; @@ -238,6 +239,52 @@ public void testMessageListener(int batchMessageDelayMs) throws Exception { log.info("-- Exiting {} test --", methodName); } + @Test(timeOut = 100000) + public void testPauseAndResume() throws Exception { + log.info("-- Starting {} test --", methodName); + + int receiverQueueSize = 20; // number of permits broker has when consumer initially subscribes + + AtomicReference latch = new AtomicReference<>(new CountDownLatch(receiverQueueSize)); + AtomicInteger received = new AtomicInteger(); + + Consumer consumer = pulsarClient.newConsumer().receiverQueueSize(receiverQueueSize) + .topic("persistent://my-property/my-ns/my-topic-pr") + .subscriptionName("my-subscriber-name").messageListener((c1, msg) -> { + Assert.assertNotNull(msg, "Message cannot be null"); + String receivedMessage = new String(msg.getData()); + log.debug("Received message [{}] in the listener", receivedMessage); + c1.acknowledgeAsync(msg); + received.incrementAndGet(); + latch.get().countDown(); + }).subscribe(); + + Producer producer = pulsarClient.newProducer() + .topic("persistent://my-property/my-ns/my-topic-pr").create(); + + consumer.pause(); + + for (int i = 0; i < receiverQueueSize * 2; i++) producer.send(("my-message-" + i).getBytes()); + + log.info("Waiting for message listener to ack " + receiverQueueSize + " messages"); + assertEquals(latch.get().await(receiverQueueSize, TimeUnit.SECONDS), true, "Timed out waiting for message listener acks"); + + log.info("Giving message listener an opportunity to receive messages while paused"); + Thread.sleep(2000); // hopefully this is long enough + assertEquals(received.intValue(), receiverQueueSize, "Consumer received messages while paused"); + + latch.set(new CountDownLatch(receiverQueueSize)); + + consumer.resume(); + + log.info("Waiting for message listener to ack all messages"); + assertEquals(latch.get().await(receiverQueueSize, TimeUnit.SECONDS), true, "Timed out waiting for message listener acks"); + + consumer.close(); + producer.close(); + log.info("-- Exiting {} test --", methodName); + } + @Test(dataProvider = "batch") public void testBackoffAndReconnect(int batchMessageDelayMs) throws Exception { log.info("-- Starting {} test --", methodName); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java index f5a926602df66..12de077ebb43e 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/api/Consumer.java @@ -294,4 +294,15 @@ public interface Consumer extends Closeable { * @return consumer name. */ String getConsumerName(); + + /** + * Stop requesting new messages from the broker until {@link #resume()} is called. Note that this might cause + * {@link #receive()} to block until {@link #resume()} is called and new messages are pushed by the broker. + */ + void pause(); + + /** + * Resume requesting messages from the broker. + */ + void resume(); } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 67b9854da266a..a4ab48c3cf830 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -144,6 +144,8 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private Producer deadLetterProducer; + protected volatile boolean paused; + enum SubscriptionMode { // Make the subscription to be backed by a durable cursor that will retain messages and persist the current // position @@ -1073,7 +1075,7 @@ void increaseAvailablePermits(ClientCnx currentCnx) { private void increaseAvailablePermits(ClientCnx currentCnx, int delta) { int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta); - while (available >= receiverQueueRefillThreshold) { + while (available >= receiverQueueRefillThreshold && !paused) { if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) { sendFlowPermitsToBroker(currentCnx, available); break; @@ -1083,6 +1085,19 @@ private void increaseAvailablePermits(ClientCnx currentCnx, int delta) { } } + @Override + public void pause() { + paused = true; + } + + @Override + public void resume() { + if (paused) { + paused = false; + increaseAvailablePermits(cnx(), 0); + } + } + private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload, ClientCnx currentCnx) { diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java index 8b8556adafb21..41821ad93a613 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java @@ -853,5 +853,15 @@ public List> getConsumers() { return consumers.values().stream().collect(Collectors.toList()); } + @Override + public void pause() { + consumers.forEach((name, consumer) -> consumer.pause()); + } + + @Override + public void resume() { + consumers.forEach((name, consumer) -> consumer.resume()); + } + private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class); } diff --git a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java index ad8cd08a58ff8..e20b0f18f116e 100644 --- a/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java +++ b/pulsar-flink/src/test/java/org/apache/flink/streaming/connectors/pulsar/PulsarConsumerSourceTests.java @@ -504,6 +504,14 @@ public boolean isConnected() { public String getConsumerName() { return "test-consumer-0"; } + + @Override + public void pause() { + } + + @Override + public void resume() { + } } private static List createMessages(int startIndex, int numMessages) {