diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 7dfb13104a767..b7dc13cbeefe8 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -2031,6 +2031,15 @@ public static PulsarApi.BrokerEntryMetadata parseBrokerEntryMetadataIfExist( } } + public static PulsarApi.BrokerEntryMetadata peekBrokerEntryMetadataIfExist( + ByteBuf headerAndPayloadWithBrokerEntryMetadata) { + final int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex(); + PulsarApi.BrokerEntryMetadata entryMetadata = + parseBrokerEntryMetadataIfExist(headerAndPayloadWithBrokerEntryMetadata); + headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex); + return entryMetadata; + } + public static ByteBuf serializeMetadataAndPayload(ChecksumType checksumType, MessageMetadata msgMetadata, ByteBuf payload) { // / Wire format diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java index 2bd4df09d0516..c7fcc716f5f15 100644 --- a/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java +++ b/pulsar-common/src/test/java/org/apache/pulsar/common/protocol/CommandUtilsTests.java @@ -202,6 +202,38 @@ public void testParseBrokerEntryMetadata() throws Exception { assertEquals(new String(content, StandardCharsets.UTF_8), data); } + @Test + public void testPeekBrokerEntryMetadata() throws Exception { + int MOCK_BATCH_SIZE = 10; + String data = "test-message"; + ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length()); + byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8)); + ByteBuf dataWithBrokerEntryMetadata = + Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE); + int bytesBeforePeek = dataWithBrokerEntryMetadata.readableBytes(); + PulsarApi.BrokerEntryMetadata brokerMetadata = + Commands.peekBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata); + + assertTrue(brokerMetadata.getBrokerTimestamp() <= System.currentTimeMillis()); + assertEquals(brokerMetadata.getIndex(), MOCK_BATCH_SIZE - 1); + + int bytesAfterPeek = dataWithBrokerEntryMetadata.readableBytes(); + assertEquals(bytesBeforePeek, bytesAfterPeek); + + // test parse logic after peek + + PulsarApi.BrokerEntryMetadata brokerMetadata1 = + Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata); + assertTrue(brokerMetadata1.getBrokerTimestamp() <= System.currentTimeMillis()); + + assertEquals(brokerMetadata1.getIndex(), MOCK_BATCH_SIZE - 1); + assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes()); + + byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()]; + dataWithBrokerEntryMetadata.readBytes(content); + assertEquals(new String(content, StandardCharsets.UTF_8), data); + } + public Set getBrokerEntryMetadataInterceptors() { Set interceptorNames = new HashSet<>(); interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");