Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
aloyszhang committed Dec 27, 2020
1 parent 3602c57 commit c9428ec
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2034,25 +2034,9 @@ public static PulsarApi.BrokerEntryMetadata parseBrokerEntryMetadataIfExist(
public static PulsarApi.BrokerEntryMetadata peekBrokerEntryMetadataIfExist(
ByteBuf headerAndPayloadWithBrokerEntryMetadata) {
final int readerIndex = headerAndPayloadWithBrokerEntryMetadata.readerIndex();
if (headerAndPayloadWithBrokerEntryMetadata.readShort() == magicBrokerEntryMetadata) {
final int brokerEntryMetadataSize = headerAndPayloadWithBrokerEntryMetadata.readInt();
final int writerIndex = headerAndPayloadWithBrokerEntryMetadata.writerIndex();
headerAndPayloadWithBrokerEntryMetadata.writerIndex(headerAndPayloadWithBrokerEntryMetadata.readerIndex()
+ brokerEntryMetadataSize);
final ByteBufCodedInputStream brokerEntryMetadataInputStream =
ByteBufCodedInputStream.get(headerAndPayloadWithBrokerEntryMetadata);
final PulsarApi.BrokerEntryMetadata.Builder builder = PulsarApi.BrokerEntryMetadata.newBuilder();
try {
builder.mergeFrom(brokerEntryMetadataInputStream, null).build();
} catch (IOException e) {
throw new RuntimeException(e);
}
headerAndPayloadWithBrokerEntryMetadata.writerIndex(writerIndex);
headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
brokerEntryMetadataInputStream.recycle();
return builder.build();
}
return null;
PulsarApi.BrokerEntryMetadata entryMetadata = parseBrokerEntryMetadataIfExist(headerAndPayloadWithBrokerEntryMetadata);
headerAndPayloadWithBrokerEntryMetadata.readerIndex(readerIndex);
return entryMetadata;
}

public static ByteBuf serializeMetadataAndPayload(ChecksumType checksumType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,19 @@ public void testPeekBrokerEntryMetadata() throws Exception {

int bytesAfterPeek = dataWithBrokerEntryMetadata.readableBytes();
assertEquals(bytesBeforePeek, bytesAfterPeek);

// test parse logic after peek

PulsarApi.BrokerEntryMetadata brokerMetadata1 =
Commands.parseBrokerEntryMetadataIfExist(dataWithBrokerEntryMetadata);
assertTrue(brokerMetadata1.getBrokerTimestamp() <= System.currentTimeMillis());

assertEquals(brokerMetadata1.getIndex(), MOCK_BATCH_SIZE - 1);
assertEquals(data.length(), dataWithBrokerEntryMetadata.readableBytes());

byte [] content = new byte[dataWithBrokerEntryMetadata.readableBytes()];
dataWithBrokerEntryMetadata.readBytes(content);
assertEquals(new String(content, StandardCharsets.UTF_8), data);
}

public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
Expand Down

0 comments on commit c9428ec

Please sign in to comment.