Skip to content

[Bug] Deserialized BatchMessageIdImpl cannot be used for acknowledgment #19030

@BewareMyPower

Description

@BewareMyPower

Search before asking

  • I searched in the issues and found nothing similar.

Version

OS: Ubuntu 20.04
Pulsar: master (41edd2e)

Minimal reproduce step

Add a unit test that does these things:

  1. Create a producer to send N messages in the same batch.
  2. Create a consumer to receive these messages and store the MessageId objects, which share the same ledger id, entry id, batch size, only the batch indexes are different.
  3. Convert these MessageId objects by a serialization (MessageId#toByteArray) and a deserialization (MessageId#fromByteArray).
  4. Acknowledge these MessageId objects.
  5. Restart the consumer, it still receives the 1st message.
    @Test
    public void testSerialization() throws Exception {
        var topic = "test-serialization-origin";
        @Cleanup var producer = pulsarClient.newProducer(Schema.INT32)
                .topic(topic)
                .batchingMaxMessages(100)
                .batchingMaxPublishDelay(1, TimeUnit.DAYS)
                .create();
        @Cleanup var consumer = pulsarClient.newConsumer(Schema.INT32)
                .topic(topic)
                .subscriptionName("sub")
                .isAckReceiptEnabled(true)
                .subscribe();

        final var numMessages = 10;
        for (int i = 0; i < numMessages; i++) {
            producer.sendAsync(i);
        }
        producer.flush();
        final var msgIds = new ArrayList<MessageId>();
        for (int i = 0; i < numMessages; i++) {
            msgIds.add(consumer.receive().getMessageId());
        }
        for (int i = 1; i < numMessages; i++) {
            final var lhs = (BatchMessageIdImpl) msgIds.get(0);
            final var rhs = (BatchMessageIdImpl) msgIds.get(i);
            assertEquals(lhs.getLedgerId(), rhs.getLedgerId());
            assertEquals(lhs.getEntryId(), rhs.getEntryId());
            assertEquals(lhs.getBatchSize(), rhs.getBatchSize());
            assertEquals(lhs.getBatchSize(), numMessages);
        }

        var deserializedMsgIds = new ArrayList<MessageId>();
        for (var msgId : msgIds) {
            var deserialized = MessageId.fromByteArray(msgId.toByteArray());
            assertTrue(deserialized instanceof BatchMessageIdImpl);
            deserializedMsgIds.add(deserialized);
        }
        for (var msgId : deserializedMsgIds) {
            consumer.acknowledge(msgId);
        }
        consumer.close();

        consumer = pulsarClient.newConsumer(Schema.INT32)
                .topic(topic)
                .subscriptionName("sub")
                .isAckReceiptEnabled(true)
                .subscribe();
        final var msg = consumer.receive(3, TimeUnit.SECONDS);
        assertNotNull(msg);
        assertEquals(msg.getValue(), 0);
    }

What did you expect to see?

The restarted consumer should receive nothing.

What did you see instead?

The restarted consumer received the 1st message.

Anything else?

The root cause is from #1424, which make all MessageId instances in the same batch share the same BatchMessageAcker object. However, when MessageId instances are created from a deserialization. It's impossible to make them share the same BatchMessageAcker.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

Staletype/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions