-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Closed
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug
Description
When using a reader to read a single message from a partitioned topic the error The partitioned topic startMessageId is illegal appears:
Exception in thread "main" org.apache.pulsar.client.api.PulsarClientException: java.util.concurrent.ExecutionException: org.apache.pulsar.client.api.PulsarClientException: The partitioned topic startMessageId is illegal
at org.apache.pulsar.client.api.PulsarClientException.unwrap(PulsarClientException.java:976)
at org.apache.pulsar.client.impl.ReaderBuilderImpl.create(ReaderBuilderImpl.java:74)
I implemented the code in https://pulsar.apache.org/docs/en/client-libraries-java/#reader which works for non partitioned topics.
I just consumed and saved the msg id bytes into a file and then loaded that file as the msg_id . Is any additional parameter needed to fetch partitioned topics?
Consumer
Consumer consumer = client.newConsumer()
.topic("apache/pulsar/test-topic")
.subscriptionName("my-sub")
.subscribe();
// Wait for a message
Message msg = consumer.receive();
try {
byte[] by = msg.getMessageId().toByteArray();
FileUtils.writeByteArrayToFile(new File(".\\msg.bytes"), by);
}catch (Exception e){
System.out.println("ERROR");
e.printStackTrace();
}
Reader
byte[] msgIdBytes = FileUtils.readFileToByteArray(new File(".\\msg.bytes"));// Some message ID byte array
System.out.println(msgIdBytes.length);
MessageId id = MessageId.fromByteArray(msgIdBytes);
System.out.println(id);
Reader reader = client.newReader()
.topic("apache/pulsar/test-topic")
.startMessageId(id)
.create();
Message message = reader.readNext();
// Process message
System.out.println(message.getMessageId());
client.close();
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
type/bugThe PR fixed a bug or issue reported a bugThe PR fixed a bug or issue reported a bug