Skip to content

Commit

Permalink
ZOOKEEPER-3274: Use CompositeByteBuf to queue data in NettyServerCnxn
Browse files Browse the repository at this point in the history
This avoids unnecessary buffer copies and resizes.

Author: Ilya Maykov <ilyam@fb.com>

Reviewers: andor@apache.org

Closes apache#810 from ivmaykov/ZOOKEEPER-3274
  • Loading branch information
ivmaykov authored and anmolnar committed Feb 18, 2019
1 parent 226af6e commit 5375c25
Showing 1 changed file with 27 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
Expand All @@ -53,7 +54,7 @@
public class NettyServerCnxn extends ServerCnxn {
private static final Logger LOG = LoggerFactory.getLogger(NettyServerCnxn.class);
private final Channel channel;
private ByteBuf queuedBuffer;
private CompositeByteBuf queuedBuffer;
private final AtomicBoolean throttled = new AtomicBoolean(false);
private ByteBuffer bb;
private final ByteBuffer bbLen = ByteBuffer.allocate(4);
Expand Down Expand Up @@ -291,6 +292,24 @@ private void checkIsInEventLoop(String callerMethodName) {
}
}

/**
* Appends <code>buf</code> to <code>queuedBuffer</code>. Does not duplicate <code>buf</code>
* or call any flavor of {@link ByteBuf#retain()}. Caller must ensure that <code>buf</code>
* is not owned by anyone else, as this call transfers ownership of <code>buf</code> to the
* <code>queuedBuffer</code>.
*
* This method should only be called from the event loop thread.
* @param buf the buffer to append to the queue.
*/
private void appendToQueuedBuffer(ByteBuf buf) {
checkIsInEventLoop("appendToQueuedBuffer");
if (queuedBuffer.numComponents() == queuedBuffer.maxNumComponents()) {
// queuedBuffer has reached its component limit, so combine the existing components.
queuedBuffer.consolidate();
}
queuedBuffer.addComponent(true, buf);
}

/**
* Process incoming message. This should only be called from the event
* loop thread.
Expand Down Expand Up @@ -318,9 +337,9 @@ void processMessage(ByteBuf buf) {
// we are throttled, so we need to queue
if (queuedBuffer == null) {
LOG.debug("allocating queue");
queuedBuffer = channel.alloc().buffer(buf.readableBytes());
queuedBuffer = channel.alloc().compositeBuffer();
}
queuedBuffer.writeBytes(buf);
appendToQueuedBuffer(buf.retainedDuplicate());
if (LOG.isTraceEnabled()) {
LOG.trace("0x{} queuedBuffer {}",
Long.toHexString(sessionId),
Expand All @@ -329,7 +348,7 @@ void processMessage(ByteBuf buf) {
} else {
LOG.debug("not throttled");
if (queuedBuffer != null) {
queuedBuffer.writeBytes(buf);
appendToQueuedBuffer(buf.retainedDuplicate());
processQueuedBuffer();
} else {
receiveMessage(buf);
Expand All @@ -340,9 +359,9 @@ void processMessage(ByteBuf buf) {
LOG.trace("Before copy {}", buf);
}
if (queuedBuffer == null) {
queuedBuffer = channel.alloc().buffer(buf.readableBytes());
queuedBuffer = channel.alloc().compositeBuffer();
}
queuedBuffer.writeBytes(buf);
appendToQueuedBuffer(buf.retainedSlice(buf.readerIndex(), buf.readableBytes()));
if (LOG.isTraceEnabled()) {
LOG.trace("Copy is {}", queuedBuffer);
LOG.trace("0x{} queuedBuffer {}",
Expand Down Expand Up @@ -375,9 +394,9 @@ void processQueuedBuffer() {
releaseQueuedBuffer();
} else {
LOG.debug("Processed queue - bytes remaining");
// Possibly reduce memory consumption by freeing up buffer space
// Try to reduce memory consumption by freeing up buffer space
// which is no longer needed.
queuedBuffer.discardSomeReadBytes();
queuedBuffer.discardReadComponents();
}
} else {
LOG.debug("queue empty");
Expand Down

0 comments on commit 5375c25

Please sign in to comment.