Skip to content

Commit

Permalink
support peek broker entry metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
aloyszhang committed Dec 27, 2020
1 parent 8574e58 commit d3012e9
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2031,6 +2031,15 @@ public static PulsarApi.BrokerEntryMetadata parseBrokerEntryMetadataIfExist(
}
}

public static PulsarApi.BrokerEntryMetadata peekBrokerEntryMetadataIfExist(
ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
final int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex();
PulsarApi.BrokerEntryMetadata entryMetadata =
parseBrokerEntryMetadataIfExist(headerAndPayloadWithBrokerEntryMetadata);
headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
return entryMetadata;
}

public static ByteBuf serializeMetadataAndPayload(ChecksumType checksumType,
MessageMetadata msgMetadata, ByteBuf payload) {
// / Wire format
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,38 @@ public void testParseBrokerEntryMetadata() throws Exception {
assertEquals(new String(content, StandardCharsets.UTF_8), data);
}

@Test
public void testPeekBrokerEntryMetadata() throws Exception {
int MOCK_BATCH_SIZE = 10;
String data = "test-message";
ByteBuf byteBuf = PulsarByteBufAllocator.DEFAULT.buffer(data.length(), data.length());
byteBuf.writeBytes(data.getBytes(StandardCharsets.UTF_8));
ByteBuf dataWithBrokerEntryMetadata =
Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), MOCK_BATCH_SIZE);
int bytesBeforePeek = dataWithBrokerEntryMetadata.readableBytes();
PulsarApi.BrokerEntryMetadata brokerMetadata =
Commands.peekBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);

assertTrue(brokerMetadata.getBrokerTimestamp() <= System.currentTimeMillis());
assertEquals(brokerMetadata.getIndex(), MOCK_BATCH_SIZE - 1);

int bytesAfterPeek = dataWithBrokerEntryMetadata.readableBytes();
assertEquals(bytesBeforePeek, bytesAfterPeek);

// test parse logic after peek

PulsarApi.BrokerEntryMetadata brokerMetadata1 =
Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
assertTrue(brokerMetadata1.getBrokerTimestamp() <= System.currentTimeMillis());

assertEquals(brokerMetadata1.getIndex(), MOCK_BATCH_SIZE - 1);
assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes());

byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()];
dataWithBrokerEntryMetadata.readBytes(content);
assertEquals(new String(content, StandardCharsets.UTF_8), data);
}

public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
Set<String> interceptorNames = new HashSet<>();
interceptorNames.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
Expand Down

0 comments on commit d3012e9

Please sign in to comment.