Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch message serialization ignores batch size #9557

Closed
Lanayx opened this issue Feb 10, 2021 · 6 comments · Fixed by #9855
Closed

Batch message serialization ignores batch size #9557

Lanayx opened this issue Feb 10, 2021 · 6 comments · Fixed by #9855
Labels
release/2.7.2 type/bug The PR fixed a bug or issue reported a bug

Comments

@Lanayx
Copy link
Contributor

Lanayx commented Feb 10, 2021

Describe the bug
Since batch index message acknowledgement was added the seek method was updated to support more precise seek using ack sets (source code). However when the seek is performed by the message that was serialized and deserialized, the batchSize is set to zero (since it is ignored during serialization), which leads to discrepancy between messageId forms and seek results.

To Reproduce
Compare BatchMessageIdImpl in runtime before and after serialization+deserialization, the batchSize will be always 0 in latter case. This leads to inconsistent behavior of Consumer seek by messageId

Expected behavior
There should be no difference between regular BatchMessageIdImpl and recovered from byteArray

@codelipenghui
Copy link
Contributor

@Lanayx Sorry for the late response, I think #8285 is fixed this problem. Currently, we don't use the toByteArray method to do the seek operation. It is achieved by https://github.com/apache/pulsar/pull/8285/files#diff-f6e4c1c4091aa10525f331e48e66b29f22b9f7987755c1b4fb887e24f198bed6R1934-R1942.

The fix is released at 2.6.2 and 2.7.0. What pulsar version you have seen this issue?

@Lanayx
Copy link
Contributor Author

Lanayx commented Mar 2, 2021

Hi, @codelipenghui It seems you've misunderstood the issue :) My concern is that this line will give incorrect result if batched message id passed to the method had been taken from byte array (serialized and deserialized) since bath size is missing in byte representation of batched message id.
The repro is the following:

  1. Consumer receives the batch of 5 messages,
  2. Consumer notes 3d message id
  3. Consumer performs seek on 3d messageId - everything works as expected
  4. Consumer serializes 3d messageId to byteArray
  5. Consumer restores 3d messageId from byteArray
  6. Consumer performs seek on restored 3d messageId - the seek logic breaks since batch size is incorrect

Rather than storing batch size, I would expect fixing that by changing this new seek logic so it won't depend on batch size.

@codelipenghui
Copy link
Contributor

@Lanayx I see, thanks for the clarification. We will push a fix soon.

@zymap
Copy link
Member

zymap commented Mar 3, 2021

@Lanayx I am trying to reproduce this issue, I am using the following case to test but everything looks good. Could you please take a look at the test case?

    @Test
    public void testSeekBatchMessageWithSerializedMessageId() throws PulsarClientException, InterruptedException {
        final String topic = "public/default/seek-with-batch-message-" + UUID.randomUUID();
        final String messagePrefix = "message";
        @Cleanup
        PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(lookupUrl.toString()).build();
        @Cleanup
        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
            .topic(topic)
            .enableBatching(true)
            .batchingMaxMessages(20)
            .create();
        @Cleanup
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
            .topic(topic)
            .subscriptionName("seek-batch")
            .subscribe();

        ArrayList<String> messages = new ArrayList<>();
        // send messages and batch the messages
        CountDownLatch latch = new CountDownLatch(10);
        IntStream.range(0, 10)
            .forEach(i -> {
                String msg = String.format("%s-%d", messagePrefix, i);
                producer.sendAsync(msg);
                messages.add(msg);
                latch.countDown();
            });
        latch.await();
        producer.flush();

        Message<String> msg1 = consumer.receive();
        assertEquals(msg1.getValue(), messages.get(0));
        Message<String> msg2 = consumer.receive();
        assertEquals(msg2.getValue(), messages.get(1));
        Message<String> msg3 = consumer.receive();
        assertEquals(msg3.getValue(), messages.get(2));
        Message<String> msg4 = consumer.receive();
        assertEquals(msg4.getValue(), messages.get(3));

        consumer.seek(msg3.getMessageId());
        Message<String> seekMsg4 = consumer.receive();
        assertEquals(seekMsg4.getValue(), messages.get(3));

        byte[] msg3IdBytes = msg3.getMessageId().toByteArray();
        MessageId msg3Id = null;
        try {
            msg3Id = MessageId.fromByteArray(msg3IdBytes);
        } catch (IOException e) {
            fail("Failed parse message id from byte array");
        }
        consumer.seek(msg3Id);
        seekMsg4 = consumer.receive();
        assertEquals(seekMsg4.getValue(), messages.get(3));
    }

@Lanayx
Copy link
Contributor Author

Lanayx commented Mar 3, 2021

@zymap Hi! This test works, but not because the seek logic isn't broken, but because seek message id is stored as startMessageId in consumer and extra messages are ignored on receive. So basically if you have batch of 10000 messages and do seek to the last one of the batch all 10000 will be returned from broker, but 9999 will be ignored due to the startMessageId. My understanding is that this logic was supposed to serve for precise reset, so extra messages are not sent to the consumer.

@codelipenghui
Copy link
Contributor

@Lanayx The broker does not do deserialization and filter the messages from a batch, this is expected behavior. And yes, this issue should be fixed since the consumer does not pass the right position to the broker, this will lead to the message acknowledgment issues(filtered messages by the client never get changes to ack, the mark delete position will stop to move forward)

@zymap I think you can try to close the consumer after the seek operation and restart to consume messages, I think you will get the msg1 from your test.

zymap added a commit to zymap/pulsar that referenced this issue Mar 10, 2021
---

Fixes: apache#9557

**Motivation**

Batch size does not include in the serialize and deserialize
process.
zymap added a commit to zymap/pulsar that referenced this issue Mar 10, 2021
---

Fixes: apache#9557

**Motivation**

Batch size does not include in the serialize and deserialize
process.
zymap added a commit to zymap/pulsar that referenced this issue Mar 10, 2021
---

Fixes: apache#9557

**Motivation**

Batch size does not include in the serialize and deserialize
process.
zymap added a commit to zymap/pulsar that referenced this issue Mar 10, 2021
---

Fixes: apache#9557

**Motivation**

Batch size does not include in the serialize and deserialize
process.
codelipenghui pushed a commit that referenced this issue Mar 12, 2021
---

Fixes: #9557

**Motivation**

Batch size does not include in the serialize and deserialize
process.
fmiguelez pushed a commit to fmiguelez/pulsar that referenced this issue Mar 16, 2021
---

Fixes: apache#9557

**Motivation**

Batch size does not include in the serialize and deserialize
process.
codelipenghui pushed a commit that referenced this issue Mar 23, 2021
---

Fixes: #9557

**Motivation**

Batch size does not include in the serialize and deserialize
process.

(cherry picked from commit cb5a933)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release/2.7.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants