From 59e0ab867ecef5602cb8e05109a3ffd019f94536 Mon Sep 17 00:00:00 2001 From: uce Date: Tue, 17 Jun 2014 16:18:02 +0200 Subject: [PATCH] [FLINK-971] Configure PooledByteBufAllocator in NettyConnectionManager instead of using the default allocator Configuration: - 0 heap arenas, - n direct arenas (where n = num incoming + num outgoing network IO threads), and - bufferSize << 1 bytes page size. Additionally, OutboundEnvelopeEncoder directly implements ChannelOutboundHandlerAdapter instead of the MessageToByteEncoder wrapper to have tighter control of memory allocations. --- .../network/netty/NettyConnectionManager.java | 22 +++++++++++-- .../netty/OutboundConnectionQueue.java | 2 +- .../netty/OutboundEnvelopeEncoder.java | 32 +++++++++++++++---- 3 files changed, 45 insertions(+), 11 deletions(-) diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java index 73afcbc297178..4b546418d1a51 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/NettyConnectionManager.java @@ -92,6 +92,22 @@ public void start(ChannelManager channelManager) throws IOException { final BufferProviderBroker bufferProviderBroker = channelManager; final EnvelopeDispatcher envelopeDispatcher = channelManager; + int numHeapArenas = 0; + int numDirectArenas = numInThreads + numOutThreads; + int pageSize = bufferSize << 1; + int chunkSize = 16 * 1 << 20; // 16 MB + + // shift pageSize maxOrder times to get to chunkSize + int maxOrder = (int) (Math.log(chunkSize/pageSize) / Math.log(2)); + + PooledByteBufAllocator pooledByteBufAllocator = + new PooledByteBufAllocator(true, numHeapArenas, numDirectArenas, pageSize, maxOrder); + + String msg = String.format("Instantiated PooledByteBufAllocator with direct arenas: %d, heap arenas: %d, " + + "page size (bytes): %d, chunk size (bytes): %d.", + numDirectArenas, numHeapArenas, pageSize, (pageSize << maxOrder)); + LOG.info(msg); + // -------------------------------------------------------------------- // server bootstrap (incoming connections) // -------------------------------------------------------------------- @@ -107,8 +123,8 @@ public void initChannel(SocketChannel channel) throws Exception { .addLast(new InboundEnvelopeDispatcher(envelopeDispatcher)); } }) - .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(bufferSize)) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + .option(ChannelOption.RCVBUF_ALLOCATOR, new FixedRecvByteBufAllocator(pageSize)) + .option(ChannelOption.ALLOCATOR, pooledByteBufAllocator); // -------------------------------------------------------------------- // client bootstrap (outgoing connections) @@ -125,7 +141,7 @@ public void initChannel(SocketChannel channel) throws Exception { }) .option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, lowWaterMark) .option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, highWaterMark) - .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT) + .option(ChannelOption.ALLOCATOR, pooledByteBufAllocator) .option(ChannelOption.TCP_NODELAY, false) .option(ChannelOption.SO_KEEPALIVE, true); diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java index 8fef3c1951d08..ff6c694e50cd9 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundConnectionQueue.java @@ -42,7 +42,7 @@ public OutboundConnectionQueue(Channel channel) { } /** - * Enqueues an envelope so be sent later. + * Enqueues an envelope to be sent later. *

* This method is always invoked by the task thread that wants the envelope sent. * diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java index 424f2c052db3b..dad690ccb5117 100644 --- a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java +++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/netty/OutboundEnvelopeEncoder.java @@ -16,25 +16,43 @@ import eu.stratosphere.runtime.io.Buffer; import eu.stratosphere.runtime.io.network.Envelope; import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.MessageToByteEncoder; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; @ChannelHandler.Sharable -public class OutboundEnvelopeEncoder extends MessageToByteEncoder { +public class OutboundEnvelopeEncoder extends ChannelOutboundHandlerAdapter { public static final int HEADER_SIZE = 48; public static final int MAGIC_NUMBER = 0xBADC0FFE; @Override - protected void encode(ChannelHandlerContext ctx, Envelope env, ByteBuf out) throws Exception { + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { + Envelope env = (Envelope) msg; + + ByteBuf buf = ctx.alloc().directBuffer(); + + encode(env, buf); + + if (buf.isReadable()) { + ctx.write(buf, promise); + } + else { + buf.release(); + ctx.write(Unpooled.EMPTY_BUFFER, promise); + } + } + + private void encode(Envelope env, ByteBuf out) { // -------------------------------------------------------------------- // (1) header (48 bytes) // -------------------------------------------------------------------- out.writeInt(MAGIC_NUMBER); // 4 bytes - if (out.getInt(out.writerIndex()-4) != MAGIC_NUMBER) { + if (out.getInt(out.writerIndex() - 4) != MAGIC_NUMBER) { throw new RuntimeException(); } @@ -54,12 +72,12 @@ protected void encode(ChannelHandlerContext ctx, Envelope env, ByteBuf out) thro // (3) buffer (var length) // -------------------------------------------------------------------- if (env.getBuffer() != null) { - Buffer buffer = env.getBuffer(); - out.writeBytes(buffer.getMemorySegment().wrap(0, buffer.size())); + Buffer envBuffer = env.getBuffer(); + out.writeBytes(envBuffer.getMemorySegment().wrap(0, envBuffer.size())); // Recycle the buffer from OUR buffer pool after everything has been // copied to Nettys buffer space. - buffer.recycleBuffer(); + envBuffer.recycleBuffer(); } } }