Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MessageID has always batch index 0 when sending messages in a batch #1455

Closed
zubchenok opened this issue Mar 27, 2018 · 0 comments
Closed

MessageID has always batch index 0 when sending messages in a batch #1455

zubchenok opened this issue Mar 27, 2018 · 0 comments
Labels
area/client type/bug The PR fixed a bug or issue reported a bug

Comments

@zubchenok
Copy link

zubchenok commented Mar 27, 2018

When I send messages I always get 0 in batchIndex component of MessageID in the result of sendAsync method. Here is a unit test that reproduce the issue:

    @Test
    public void testSameBatchIndex() throws Exception
    {
        String topicName = "persistent://test-prop/vv/ns1/test-" + UUID.randomUUID();
        PulsarClient pulsarClient = PulsarClient.create("pulsar://vv:6650");

        Producer producer = pulsarClient.createProducer(topicName, new ProducerConfiguration()
                .setBatchingEnabled(true)
                .setBatchingMaxMessages(1000)
                .setBatchingMaxPublishDelay(1, TimeUnit.SECONDS)
        );

        Consumer consumer = pulsarClient.subscribe(topicName, "subscription", new ConsumerConfiguration()
                .setMessageListener((consumer1, message) -> System.out.println("Read: " + message.getMessageId()))
        );

        producer.sendAsync("A".getBytes()).whenCompleteAsync((messageId, throwable) -> {
            System.out.println("A: " + messageId);
        });
        producer.sendAsync("B".getBytes()).whenCompleteAsync((messageId, throwable) -> {
            System.out.println("B: " + messageId);
        });
        producer.sendAsync("C".getBytes()).whenCompleteAsync((messageId, throwable) -> {
            System.out.println("C: " + messageId);
        });

        Thread.sleep(3333);

        consumer.close();
        producer.close();
        pulsarClient.close();
    }

Actual output is

A: 1286:0:-1:0
B: 1286:0:-1:0
C: 1286:0:-1:0
Read: 1286:0:-1:0
Read: 1286:0:-1:1
Read: 1286:0:-1:2

Expected output is

A: 1286:0:-1:0
B: 1286:0:-1:1
C: 1286:0:-1:2
Read: 1286:0:-1:0
Read: 1286:0:-1:1
Read: 1286:0:-1:2

version: 1.22.0-incubating

@sijie sijie added type/bug The PR fixed a bug or issue reported a bug area/client triage/week-25 labels Jun 20, 2018
sijie added a commit to sijie/pulsar that referenced this issue Jul 6, 2018
…in a batch

*Motivation*

Fixes apache#1455.

Pulsar uses a callback chain for completing the list of callbacks for a batch. However the callback chain doesn't reference the message instance for completing the callback.
so when callback chain is triggered, it always uses the first message id to complete the chain of callbacks.

*Changes*

Introduce a field to keep message instance in the callback chain. So when the chain is invoked, each callback can use the right message instance to complete the callback.

Added an integration test to ensure it works correctly.
sijie added a commit that referenced this issue Jul 6, 2018
…in a batch (#2099)

*Motivation*

Fixes #1455.

Pulsar uses a callback chain for completing the list of callbacks for a batch. However the callback chain doesn't reference the message instance for completing the callback.
so when callback chain is triggered, it always uses the first message id to complete the chain of callbacks.

*Changes*

Introduce a field to keep message instance in the callback chain. So when the chain is invoked, each callback can use the right message instance to complete the callback.

Added an integration test to ensure it works correctly.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/client type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

No branches or pull requests

2 participants