Skip to content

Commit

Permalink
[Issue 11814] fix pulsar admin method:getMessageById. (#11852)
Browse files Browse the repository at this point in the history
Fix #11814 , if we use another topic to find the message, it will return the message, but we may contaminate the ledgers cache in the topic.


**changes**
Add check in the method 'internalGetMessageById' in PersistentTopicsBase, if the ledgerId not belong to this topic, throw a exception.
  • Loading branch information
zhanghaou committed Sep 2, 2021
1 parent 44becbf commit 9bfb3db
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2300,6 +2300,13 @@ protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId
}
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
if (null == ledger.getLedgerInfo(ledgerId).get()) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, "
+ "the ledgerId does not belong to this topic.",
clientAppId(), ledgerId, entryId, topicName);
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Message not found, the ledgerId does not belong to this topic"));
}
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,37 @@ public void testSetReplicatedSubscriptionStatus() {
Assert.assertEquals(responseCaptor.getValue().getStatus(), Response.Status.NO_CONTENT.getStatusCode());
}

public void testGetMessageById() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
admin.topics().createNonPartitionedTopic(topicName1);
admin.topics().createNonPartitionedTopic(topicName2);
ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1)
.enableBatching(false).create();
String data1 = "test1";
MessageIdImpl id1 = (MessageIdImpl) producer1.send(data1.getBytes());

ProducerBase<byte[]> producer2 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName2)
.enableBatching(false).create();
String data2 = "test2";
MessageIdImpl id2 = (MessageIdImpl) producer2.send(data2.getBytes());

Message<byte[]> message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId());
Assert.assertEquals(message1.getData(), data1.getBytes());

Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
Assert.assertEquals(message2.getData(), data2.getBytes());

Message<byte[]> message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
Assert.assertNull(message3);

Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
Assert.assertNull(message4);
}

@Test
public void testGetMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
Expand Down

0 comments on commit 9bfb3db

Please sign in to comment.