Skip to content

Commit

Permalink
[fix][client] Fix the startMessageId can't be respected as the ChunkM…
Browse files Browse the repository at this point in the history
…essageID (apache#16154)

### Motivation

This is the same problem as when the consumer inclusive seeks the chunked message.

See more detail in [PIP-107](apache#12402)

### Modifications

* Use the first chunk message id as the startMessageId when creating the consumer/reader.

(cherry picked from commit 33cf2d0)
(cherry picked from commit d7f996f)
  • Loading branch information
RobertIndie authored and nicoloboschi committed Jul 4, 2022
1 parent 208b0a8 commit fbfaa51
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,15 @@ public void testSeekChunkMessages() throws PulsarClientException {
assertEquals(msgIds.get(i), msgAfterSeek.getMessageId());
}

Reader<byte[]> reader = pulsarClient.newReader()
.topic(topicName)
.startMessageIdInclusive()
.startMessageId(msgIds.get(1))
.create();

Message<byte[]> readMsg = reader.readNext(5, TimeUnit.SECONDS);
assertEquals(msgIds.get(1), readMsg.getMessageId());

consumer1.close();
consumer2.close();
producer.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,14 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat
interceptors);
this.consumerId = client.newConsumerId();
this.subscriptionMode = conf.getSubscriptionMode();
this.startMessageId = startMessageId != null ? new BatchMessageIdImpl((MessageIdImpl) startMessageId) : null;
if (startMessageId != null) {
if (startMessageId instanceof ChunkMessageIdImpl) {
this.startMessageId = new BatchMessageIdImpl(
((ChunkMessageIdImpl) startMessageId).getFirstChunkMessageId());
} else {
this.startMessageId = new BatchMessageIdImpl((MessageIdImpl) startMessageId);
}
}
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
AVAILABLE_PERMITS_UPDATER.set(this, 0);
Expand Down

0 comments on commit fbfaa51

Please sign in to comment.