Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug] consumer.pause() is paused after 1 message is consumed even with receiverQueueSize=0 #19320

Closed
1 of 2 tasks
Croway opened this issue Jan 24, 2023 · 7 comments
Closed
1 of 2 tasks
Labels
type/bug The PR fixed a bug or issue reported a bug

Comments

@Croway
Copy link

Croway commented Jan 24, 2023

Search before asking

  • I searched in the issues and found nothing similar.

Version

Fedora, 2.11.0

Minimal reproduce step

  • With the java client 2.11.0 configure a consumer with receiverQueueSize=0
  • start the consumer
  • pause the consumer
  • send 2 or more messages

What did you expect to see?

0 messages consumed

What did you see instead?

1 message consumed

Anything else?

With pulsar-client version 2.10.3 and pulsar-client-admin version 2.10.3 the reproducer works, something changed in the client.

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@Croway Croway added the type/bug The PR fixed a bug or issue reported a bug label Jan 24, 2023
@nicoloboschi
Copy link
Contributor

This might be related #14494 @Jason918 ?

@michaeljmarshall
Copy link
Member

@Croway - thanks for opening the issue. I was not able to reproduce it on tag v2.11.0 or on a recent version of master branch. Here is the unit test I used, but it failed due to the 30 second timeout. Would you take a look to see if there is something off between my test and your steps to reproduce the issue?

    @Test(timeOut = 30000L)
    public void zeroQueueSizePausedConsumer() throws PulsarClientException {
        String key = "zeroQueueSizePausedConsumer";

        // 1. Config
        final String topicName = "persistent://prop/use/ns-abc/topic-" + key;
        final String subscriptionName = "my-ex-subscription-" + key;
        final String messagePredicate = "my-message-" + key + "-";

        // 2. Create Producer
        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();

        // 3. Create Consumer
        ConsumerImpl<byte[]> consumer = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
                .subscriptionName(subscriptionName).receiverQueueSize(0).subscribe();

        consumer.pause();

        // 3. producer publish messages
        for (int i = 0; i < 2; i++) {
            String message = messagePredicate + i;
            log.info("Producer produced: " + message);
            producer.send(message.getBytes());
        }

        // 4. Receiver receives the message
        Message<byte[]> message;
        for (int i = 0; i < totalMessages; i++) {
            assertEquals(consumer.numMessagesInQueue(), 0);
            message = consumer.receive();
            assertEquals(new String(message.getData()), messagePredicate + i);
            assertEquals(consumer.numMessagesInQueue(), 0);
            log.info("Consumer received : " + new String(message.getData()));
        }
    }

When I run that test, it fails due to the 30 second timeout.

Additionally, I tested with startPaused(true) and got the same results.

michaeljmarshall added a commit to michaeljmarshall/pulsar that referenced this issue Jan 25, 2023
@Croway
Copy link
Author

Croway commented Jan 25, 2023

Hello @nicoloboschi, @michaeljmarshall, I created a reproducer https://github.com/Croway/pulsar-client-test since I noticed the reproducer step was not good enough, actually, this is the original failing test https://github.com/apache/camel/blob/main/components/camel-pulsar/src/test/java/org/apache/camel/component/pulsar/integration/PulsarSuspendRouteIT.java#L220 , I just removed the camel overhead, the logic should be the same, the test fails with 2.11.0 and pass with 2.10.3

@michaeljmarshall
Copy link
Member

Thanks for the reproducer @Croway. I reproduced it on my end. I'll work on a fix.

@michaeljmarshall
Copy link
Member

michaeljmarshall commented Jan 25, 2023

After digging into the reproducer, I found an explanation for the behavior. The difference between 2.10.3 and 2.11.0 is because we stopped wrapping single messages in batch message envelopes as of PIP 189 #16619, which was introduced in 2.11.0.

In the 2.10.3 test, note that the logs contain this log line:

2023-01-25T13:07:25,151-0600 [pulsar-client-io-36-1] WARN org.apache.pulsar.client.impl.ZeroQueueConsumerImpl - Closing consumer [camel-subscription]-[camel-consumer] due to unsupported received batch-message with zero receiver queue size

In the 2.11.0 test, we do not log this exception because the message is a single message.

A few notes:

  1. The first thing I notice in the reproducer is that the consumer is started, receive() is called, and then pause() is called. This ordering results in the consumer sending a request for messages to the server in both 2.10.3 and 2.11.0. In both versions, the server responds with a message.
  2. The test for 2.10.3 fails when the producer is configured with .enableBatching(false).
  3. Perhaps the meaning of pause is ambiguous. From reading the code, the current meaning is best described as "do not send any more permits to the broker". As such, in flight requests for messages and messages in the internal receiveQueue are eligible to receive (this logic applies to both ZeroQueue consumers and consumers with a receive queue).
  4. The 2.11.0 test passes when the consumer is started with startPaused(true). This is because the consumer never sends any requests for messages.

@Croway - I think it the underlying issue is that the test described in https://issues.apache.org/jira/projects/CAMEL/issues/CAMEL-18974 was implicitly relying on the zero queue failing due to message batching, and the test would have failed if the producer had not batched the message. It might be the case that the test needs to improve.

@michaeljmarshall
Copy link
Member

michaeljmarshall commented Jan 25, 2023

Anecdotally, we do have a unit test to verify the behavior I described above:

public void testPauseAndResume() throws Exception {
final String topicName = "persistent://prop/ns-abc/zero-queue-pause-and-resume";
final String subName = "sub";
AtomicReference<CountDownLatch> latch = new AtomicReference<>(new CountDownLatch(1));
AtomicInteger received = new AtomicInteger();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
.receiverQueueSize(0).messageListener((c1, msg) -> {
assertNotNull(msg, "Message cannot be null");
c1.acknowledgeAsync(msg);
received.incrementAndGet();
latch.get().countDown();
}).subscribe();
consumer.pause();
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).enableBatching(false).create();
for (int i = 0; i < 2; i++) {
producer.send(("my-message-" + i).getBytes());
}
// Paused consumer receives only one message
assertTrue(latch.get().await(2, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
Thread.sleep(2000);
assertEquals(received.intValue(), 1, "Consumer received messages while paused");
latch.set(new CountDownLatch(1));
consumer.resume();
assertTrue(latch.get().await(2, TimeUnit.SECONDS), "Timed out waiting for message listener acks");
consumer.unsubscribe();
producer.close();
}

In that test, the consumer uses a listener, but the semantics are the same:

  1. Consumer sends flow permit requesting a message.
  2. Consumer is paused.
  3. Producer sends two messages.
  4. Consumer receives a single message.
  5. Consumer is resumed.
  6. Consumer receives next message.

@Croway
Copy link
Author

Croway commented Jan 27, 2023

Thanks a lot for the explanation @michaeljmarshall , I've updated our test so that it reflects yours. Thanks for the analysis and sorry for bothering.

@Croway Croway closed this as completed Jan 27, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

3 participants