Open
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
Given a reader and the following code:
final var msgIds = new ArrayList<MessageId>();
final var timestamps = new ArrayList<Long>();
for (int i = 0; i < 3; i++) {
final var msg = reader.readNext();
msgIds.add(msg.getMessageId());
timestamps.add(msg.getPublishTime());
}
// 1. seek to the 2nd message's message id
reader.seek(msgIds.get(1));
log.info("Case 1: {}", reader.readNext().getValue());
// 2. seek to the 2nd message's timestamp
reader.seek(timestamps.get(1));
log.info("Case 2: {}", reader.readNext().getValue());
It's intuitive to see values of case 1 and case 2 are the same.
However, if the reader has configured startMessageIdInclusive
, the results are different:
2025-03-04T12:07:05,365 - INFO - [main:SimpleProducerConsumerTest] - Case 1: msg-2
2025-03-04T12:07:05,483 - INFO - [main:SimpleProducerConsumerTest] - Case 2: msg-1
See the full test code:
@DataProvider
public static Object[][] startMessageIdInclusive() {
return new Object[][] { { true }, { false } };
}
@Test(dataProvider = "startMessageIdInclusive")
public void test(boolean startMessageIdInclusive) throws Exception {
final var topic = "test";
@Cleanup final var producer = pulsarClient.newProducer(Schema.STRING).topic(topic).create();
for (int i = 0; i < 3; i++) {
producer.send("msg-" + i);
}
final var readerBuilder = pulsarClient.newReader(Schema.STRING).topic(topic).startMessageId(MessageId.earliest);
if (startMessageIdInclusive) {
readerBuilder.startMessageIdInclusive();
}
@Cleanup final var reader = readerBuilder.create();
final var msgIds = new ArrayList<MessageId>();
final var timestamps = new ArrayList<Long>();
for (int i = 0; i < 3; i++) {
final var msg = reader.readNext();
msgIds.add(msg.getMessageId());
timestamps.add(msg.getPublishTime());
}
// 1. seek to the 2nd message's message id
reader.seek(msgIds.get(1));
log.info("Case 1: {}", reader.readNext().getValue());
// 2. seek to the 2nd message's timestamp
reader.seek(timestamps.get(1));
log.info("Case 2: {}", reader.readNext().getValue());
}
Solution
The root cause is when #4331 introduced this config, it didn't consider the case when seeking by timestamp.
It's hard to change the existing behavior because many applications might rely on it. We'd better add new client side configs and mark existing startMessageInclusive
as deprecated.
Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!