Skip to content

Java client Reader readNext and hasMessageAvailable not working when attempting to find last message in topic #4912

@kimcs

Description

@kimcs

Describe the bug
Test3: When configuring Reader with startMessageIdInclusive and positioning stream at MessageId.latest, then the next call to Reader.readNext(timeout) should have returned the last message of the topic, but it times out and returns null (without a configured timeout it blocks forever).
Test4: When configuring Reader without startMessageIdInclusive and positioning stream at MessageId.latest, then the next call to Reader.hasMessageAvailable should have returned false, but it returns true. Reader.readNext(timeout) however cannot find a message (as expected but inconsistent with the hasMessageAvailable result).

To Reproduce

  1. Start a local standalone pulsar and make sure the broker port in the test code below match
  2. Run the following java code to reproduce bug:
package sample;

import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.TimeUnit;

import static org.testng.Assert.assertEquals;

public class MessageIdInclusiveTest {

    static final String topic = "public/default/test-start-message-id-inclusive-magic-m6km32j";

    void deleteTopic(String topic) throws PulsarClientException {
        ClientConfigurationData config = new ClientConfigurationData();
        config.setAuthentication(new AuthenticationDisabled());
        config.setServiceUrl("http://localhost:8080");
        try (PulsarAdmin admin = new PulsarAdmin("http://localhost:8080", config)) {
            List<String> topics = admin.namespaces().getTopics("public/default");
            if (topics.contains(topic)) {
                admin.topics().delete(topic);
            }
        } catch (PulsarAdminException e) {
            throw new RuntimeException(e);
        }
    }

    Reader<byte[]> newReader(PulsarClient client, boolean startMessageIdInclusive, MessageId startMessageId) throws PulsarClientException {
        ReaderBuilder<byte[]> builder = client.newReader(JSONSchema.of(byte[].class)).topic(MessageIdInclusiveTest.topic).startMessageId(startMessageId);
        if (startMessageIdInclusive) {
            builder.startMessageIdInclusive();
        }
        return builder.create();
    }

    int readMessage(PulsarClient client, int timeoutInSeconds, boolean startMessageIdInclusive, MessageId startMessageId) {
        try (Reader<byte[]> reader = newReader(client, startMessageIdInclusive, startMessageId)) {

            if (reader.hasMessageAvailable() == false) {
                return -1;
            }

            Message<byte[]> message = reader.readNext(timeoutInSeconds, TimeUnit.SECONDS);
            if (message == null) {
                System.out.println("ERROR! hasMessageAvailable returned true, but no message was read after " + timeoutInSeconds + " seconds");
                return -2;
            }

            return message.getValue()[0];
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    void produceMessages(PulsarClient client, int n) throws PulsarClientException {
        try (Producer<byte[]> producer = client.newProducer(JSONSchema.of(byte[].class)).topic(topic).create()) {
            for (int i = 0; i < n; i++) {
                producer.send(new byte[]{(byte) (1 + i)});
            }
        }
    }

    @Test
    public void test1() throws PulsarClientException {
        deleteTopic(topic);

        try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
            produceMessages(client, 3);

            // expect to find the very first message on the topic, which has a value of 1
            assertEquals(readMessage(client, 5, true, MessageId.earliest), 1);
        }
    }

    @Test
    public void test2() throws PulsarClientException {
        deleteTopic(topic);

        try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
            produceMessages(client, 3);

            // expect to find the very first message on the topic, which has a value of 1
            assertEquals(readMessage(client, 5, false, MessageId.earliest), 1);
        }
    }

    @Test
    public void test3() throws PulsarClientException {
        deleteTopic(topic);

        try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
            produceMessages(client, 3);

            // expect to find the very last message on the topic, which has a value of 3
            assertEquals(readMessage(client, 5, true, MessageId.latest), 3);
        }
    }

    @Test
    public void test4() throws PulsarClientException {
        deleteTopic(topic);

        try (PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build()) {
            produceMessages(client, 3);

            // expect to not find any message, which is signified by a value of -1
            assertEquals(readMessage(client, 5, false, MessageId.latest), -1);
        }
    }
}

Expected behavior
test3 expected to return the last message in topic, test4 hasMessageAvailable expected to be false.

Desktop (please complete the following information):

  • OS: macOS Mojave 10.14.6

Additional context
The bug was found when trying to use the Reader interface to read the last message in the topic. It could be that the bug is also present if attempting to find a specific message in the middle of the topic.

Metadata

Metadata

Assignees

No one assigned

    Labels

    area/clienttype/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions