Skip to content

Commit

Permalink
plog-client: write header every time
Browse files Browse the repository at this point in the history
From the stress tester, turns out copying a reference isn't particularly efficient...
  • Loading branch information
Pierre Carrier committed May 7, 2014
1 parent 034b666 commit 8b81245
Showing 1 changed file with 16 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,21 @@ public ByteBuf[] fragment(ByteBufAllocator alloc, ByteBuf payload, Collection<St

final ByteBuf[] fragments = new ByteBuf[fragmentCount];

final ByteBuf reference = alloc.buffer(HEADER_SIZE + maxFragmentSizeExcludingHeader,
HEADER_SIZE + maxFragmentSizeExcludingHeader).order(ByteOrder.BIG_ENDIAN);
reference.writeBytes(UDP_V0_FRAGMENT_PREFIX);
reference.writeShort(fragmentCount);
reference.writeZero(2);
reference.writeShort(maxFragmentSizeExcludingHeader);
reference.writeInt(messageIndex);
reference.writeInt(length);
reference.writeInt(hash);
reference.writeZero(2); // tags buffer length
reference.writeZero(2);

// All packets but the last are easy
int contentIdx, fragmentIdx;
for (contentIdx = 0, fragmentIdx = 0; fragmentIdx < fragmentCount - 1;
fragmentIdx++, contentIdx += maxFragmentSizeExcludingHeader) {
final ByteBuf fragment = makeBuffer(reference, fragmentIdx, HEADER_SIZE + maxFragmentSizeExcludingHeader);
final ByteBuf fragment = alloc.buffer(HEADER_SIZE + maxFragmentSizeExcludingHeader,
HEADER_SIZE + maxFragmentSizeExcludingHeader).order(ByteOrder.BIG_ENDIAN);
writeHeader(messageIndex, maxFragmentSizeExcludingHeader, 0, length, hash, fragmentCount, fragmentIdx, fragment);
fragment.writeBytes(payload, contentIdx, maxFragmentSizeExcludingHeader);
fragment.resetReaderIndex();
fragments[fragmentIdx] = fragment;
}

final int lastPayloadLength = length - (maxFragmentSizeExcludingHeader * (fragmentCount - 1));
final ByteBuf finalFragment = makeBuffer(reference, fragmentIdx, HEADER_SIZE + tagsBufferLength + lastPayloadLength);
final ByteBuf finalFragment = alloc.buffer(HEADER_SIZE + tagsBufferLength + lastPayloadLength,
HEADER_SIZE + tagsBufferLength + lastPayloadLength).order(ByteOrder.BIG_ENDIAN);
writeHeader(messageIndex, maxFragmentSizeExcludingHeader, tagsBufferLength, length, hash, fragmentCount, fragmentIdx, finalFragment);

if (tagsCount > 0) {
finalFragment.setShort(20, tagsBufferLength); // tags buffer length
Expand All @@ -109,16 +100,18 @@ public ByteBuf[] fragment(ByteBufAllocator alloc, ByteBuf payload, Collection<St
finalFragment.writeBytes(payload, contentIdx, lastPayloadLength);
fragments[fragmentCount - 1] = finalFragment;

reference.release();

return fragments;
}

private ByteBuf makeBuffer(ByteBuf reference, int fragmentIdx, int length) {
final ByteBuf fragment = reference.copy(0, length);
fragment.readerIndex(0);
fragment.setShort(4, fragmentIdx);
fragment.writerIndex(HEADER_SIZE);
return fragment;
private static void writeHeader(int messageIndex, int fragmentLength, int tagsBufferLength, int messageLength, int hash, int fragmentCount, int fragmentIdx, ByteBuf fragment) {
fragment.writeBytes(UDP_V0_FRAGMENT_PREFIX);
fragment.writeShort(fragmentCount);
fragment.writeShort(fragmentIdx);
fragment.writeShort(fragmentLength);
fragment.writeInt(messageIndex);
fragment.writeInt(messageLength);
fragment.writeInt(hash);
fragment.writeShort(tagsBufferLength);
fragment.writeZero(2);
}
}

0 comments on commit 8b81245

Please sign in to comment.