Skip to content

Commit

Permalink
[Issue 11814] fix pulsar admin method:getMessageById. (apache#11852)
Browse files Browse the repository at this point in the history
Fix apache#11814 , if we use another topic to find the message, it will return the message, but we may contaminate the ledgers cache in the topic.

**changes**
Add check in the method 'internalGetMessageById' in PersistentTopicsBase, if the ledgerId not belong to this topic, throw a exception.

(cherry picked from commit 9bfb3db)
  • Loading branch information
zhanghaou authored and eolivelli committed Sep 2, 2021
1 parent 2d00447 commit e6bb267
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1563,6 +1563,13 @@ public CompletableFuture<String> getLedgerMetadata(long ledgerId) {
return getLedgerHandle(ledgerId).thenApply(rh -> rh.getLedgerMetadata().toSafeString());
}

public CompletableFuture<LedgerInfo> getLedgerInfo(long ledgerId) {
CompletableFuture<LedgerInfo> result = new CompletableFuture<>();
final LedgerInfo ledgerInfo = ledgers.get(ledgerId);
result.complete(ledgerInfo);
return result;
}

CompletableFuture<ReadHandle> getLedgerHandle(long ledgerId) {
CompletableFuture<ReadHandle> ledgerHandle = ledgerCache.get(ledgerId);
if (ledgerHandle != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2170,6 +2170,14 @@ protected void internalGetMessageById(AsyncResponse asyncResponse, long ledgerId
validateReadOperationOnTopic(authoritative);
PersistentTopic topic = (PersistentTopic) getTopicReference(topicName);
ManagedLedgerImpl ledger = (ManagedLedgerImpl) topic.getManagedLedger();
if (null == ledger.getLedgerInfo(ledgerId).get()) {
log.error("[{}] Failed to get message with ledgerId {} entryId {} from {}, "
+ "the ledgerId does not belong to this topic.",
clientAppId(), ledgerId, entryId, topicName);
asyncResponse.resume(new RestException(Status.NOT_FOUND,
"Message not found, the ledgerId does not belong to this topic"));
return;
}
ledger.asyncReadEntry(new PositionImpl(ledgerId, entryId), new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,4 +716,168 @@ public void testOffloadWithNullMessageId() {
Assert.assertEquals(e.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode());
}
}

@Test
public void testGetMessageById() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
final String topicName1 = "persistent://tenant-xyz/ns-abc/testGetMessageById1";
final String topicName2 = "persistent://tenant-xyz/ns-abc/testGetMessageById2";
admin.topics().createNonPartitionedTopic(topicName1);
admin.topics().createNonPartitionedTopic(topicName2);
ProducerBase<byte[]> producer1 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName1)
.enableBatching(false).create();
String data1 = "test1";
MessageIdImpl id1 = (MessageIdImpl) producer1.send(data1.getBytes());

ProducerBase<byte[]> producer2 = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName2)
.enableBatching(false).create();
String data2 = "test2";
MessageIdImpl id2 = (MessageIdImpl) producer2.send(data2.getBytes());

Message<byte[]> message1 = admin.topics().getMessageById(topicName1, id1.getLedgerId(), id1.getEntryId());
Assert.assertEquals(message1.getData(), data1.getBytes());

Message<byte[]> message2 = admin.topics().getMessageById(topicName2, id2.getLedgerId(), id2.getEntryId());
Assert.assertEquals(message2.getData(), data2.getBytes());

Message<byte[]> message3 = admin.topics().getMessageById(topicName2, id1.getLedgerId(), id1.getEntryId());
Assert.assertNull(message3);

Message<byte[]> message4 = admin.topics().getMessageById(topicName1, id2.getLedgerId(), id2.getEntryId());
Assert.assertNull(message4);
}

@Test
public void testGetMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestamp";
admin.topics().createNonPartitionedTopic(topicName);

AtomicLong publishTime = new AtomicLong(0);
ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(false)
.intercept(new ProducerInterceptor() {
@Override
public void close() {

}

@Override
public boolean eligible(Message message) {
return true;
}

@Override
public Message beforeSend(Producer producer, Message message) {
return message;
}

@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
Throwable exception) {
publishTime.set(message.getPublishTime());
}
})
.create();

MessageId id1 = producer.send("test1".getBytes());
long publish1 = publishTime.get();

Thread.sleep(10);
MessageId id2 = producer.send("test2".getBytes());
long publish2 = publishTime.get();

Assert.assertTrue(publish1 < publish2);

Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
.compareTo(id2) > 0);
}

@Test
public void testGetBatchMessageIdByTimestamp() throws Exception {
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
admin.tenants().createTenant("tenant-xyz", tenantInfo);
admin.namespaces().createNamespace("tenant-xyz/ns-abc", Sets.newHashSet("test"));
final String topicName = "persistent://tenant-xyz/ns-abc/testGetBatchMessageIdByTimestamp";
admin.topics().createNonPartitionedTopic(topicName);

Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();

ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
.enableBatching(true)
.batchingMaxPublishDelay(1, TimeUnit.MINUTES)
.batchingMaxMessages(2)
.intercept(new ProducerInterceptor() {
@Override
public void close() {

}

@Override
public boolean eligible(Message message) {
return true;
}

@Override
public Message beforeSend(Producer producer, Message message) {
return message;
}

@Override
public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
Throwable exception) {
log.info("onSendAcknowledgement, message={}, msgId={},publish_time={},exception={}",
message, msgId, message.getPublishTime(), exception);
publishTimeMap.put(msgId, message.getPublishTime());

}
})
.create();

List<CompletableFuture<MessageId>> idFutureList = new ArrayList<>();
for (int i = 0; i < 4; i++) {
idFutureList.add(producer.sendAsync(new byte[]{(byte) i}));
Thread.sleep(5);
}

List<MessageIdImpl> ids = new ArrayList<>();
for (CompletableFuture<MessageId> future : idFutureList) {
MessageId id = future.get();
ids.add((MessageIdImpl) id);
}

for (MessageIdImpl messageId : ids) {
Assert.assertTrue(publishTimeMap.containsKey(messageId));
log.info("MessageId={},PublishTime={}", messageId, publishTimeMap.get(messageId));
}

//message 0, 1 are in the same batch, as batchingMaxMessages is set to 2.
Assert.assertEquals(ids.get(0).getLedgerId(), ids.get(1).getLedgerId());
MessageIdImpl id1 =
new MessageIdImpl(ids.get(0).getLedgerId(), ids.get(0).getEntryId(), ids.get(0).getPartitionIndex());
long publish1 = publishTimeMap.get(ids.get(0));

Assert.assertEquals(ids.get(2).getLedgerId(), ids.get(3).getLedgerId());
MessageIdImpl id2 =
new MessageIdImpl(ids.get(2).getLedgerId(), ids.get(2).getEntryId(), ids.get(2).getPartitionIndex());
long publish2 = publishTimeMap.get(ids.get(2));


Assert.assertTrue(publish1 < publish2);

Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
.compareTo(id2) > 0);
}
}

0 comments on commit e6bb267

Please sign in to comment.