From 613ed9cce07d36e7b229e444dad3996db1bdb8c6 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Wed, 3 Feb 2016 16:05:37 +0100 Subject: [PATCH] [FLINK-3120] [runtime] Manually configure Netty's ByteBufAllocator tl;dr Change default Netty configuration to be relative to number of slots, i.e. configure one memory arena (in PooledByteBufAllocator) per slot and use one event loop thread per slot. Behaviour can still be manually overwritten. With this change, we can expect 16 MB of direct memory allocated per task slot by Netty. Problem: We were using Netty's default PooledByteBufAllocator instance, which is subject to changing behaviour between Netty versions (happened between versions 4.0.27.Final and 4.0.28.Final resulting in increased memory consumption) and whose default memory consumption depends on the number of available cores in the system. This can be problematic for example in YARN setups where users run one slot per task manager on machines with many cores, resulting in a relatively high number of allocated memory. Solution: We instantiate a PooledByteBufAllocator instance manually and wrap it as a NettyBufferPool. Our instance configures one arena per task slot as default. It's desirable to have the number of arenas match the number of event loop threads to minimize lock contention (Netty's default tried to ensure this as well), hence the number of threads is changed as well to match the number of slots as default. Both number of threads and arenas can still be manually configured. --- .../io/network/netty/NettyBufferPool.java | 320 ++++++++++++++++++ .../runtime/io/network/netty/NettyClient.java | 9 +- .../runtime/io/network/netty/NettyConfig.java | 25 +- .../network/netty/NettyConnectionManager.java | 19 +- .../runtime/io/network/netty/NettyServer.java | 11 +- .../runtime/taskmanager/TaskManager.scala | 1 + .../io/network/NetworkEnvironmentTest.java | 2 +- .../io/network/netty/NettyBufferPoolTest.java | 101 ++++++ .../netty/NettyConnectionManagerTest.java | 174 ++++++++++ .../io/network/netty/NettyTestUtil.java | 15 +- .../PartitionRequestClientFactoryTest.java | 8 +- 11 files changed, 662 insertions(+), 23 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java new file mode 100644 index 0000000000000..a3e973f5ef648 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyBufferPool.java @@ -0,0 +1,320 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import io.netty.util.internal.PlatformDependent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.Option; + +import java.lang.reflect.Field; + +import static com.google.common.base.Preconditions.checkArgument; + +/** + * Wrapper around Netty's {@link PooledByteBufAllocator} with strict control + * over the number of created arenas. + */ +public class NettyBufferPool implements ByteBufAllocator { + + private static final Logger LOG = LoggerFactory.getLogger(NettyBufferPool.class); + + /** The wrapped buffer allocator. */ + private final PooledByteBufAllocator alloc; + + /** PoolArena[] via Reflection. */ + private final Object[] directArenas; + + /** Configured number of arenas. */ + private final int numberOfArenas; + + /** Configured chunk size for the arenas. */ + private final int chunkSize; + + /** + * Creates Netty's buffer pool with the specified number of direct arenas. + * + * @param numberOfArenas Number of arenas (recommended: 2 * number of task + * slots) + */ + NettyBufferPool(int numberOfArenas) { + checkArgument(numberOfArenas >= 1, "Number of arenas"); + this.numberOfArenas = numberOfArenas; + + if (!PlatformDependent.hasUnsafe()) { + LOG.warn("Using direct buffers, but sun.misc.Unsafe not available."); + } + + // We strictly prefer direct buffers and disallow heap allocations. + boolean preferDirect = true; + + // Arenas allocate chunks of pageSize << maxOrder bytes. With these + // defaults, this results in chunks of 16 MB. + int pageSize = 8192; + int maxOrder = 11; + + this.chunkSize = pageSize << maxOrder; + + // Number of direct arenas. Each arena allocates a chunk of 16 MB, i.e. + // we allocate numDirectArenas * 16 MB of direct memory. This can grow + // to multiple chunks per arena during runtime, but this should only + // happen with a large amount of connections per task manager. We + // control the memory allocations with low/high watermarks when writing + // to the TCP channels. Chunks are allocated lazily. + int numDirectArenas = numberOfArenas; + + // No heap arenas, please. + int numHeapArenas = 0; + + this.alloc = new PooledByteBufAllocator( + preferDirect, + numHeapArenas, + numDirectArenas, + pageSize, + maxOrder); + + Object[] allocDirectArenas = null; + try { + Field directArenasField = alloc.getClass() + .getDeclaredField("directArenas"); + directArenasField.setAccessible(true); + + allocDirectArenas = (Object[]) directArenasField.get(alloc); + } catch (Exception ignored) { + LOG.warn("Memory statistics not available"); + } finally { + this.directArenas = allocDirectArenas; + } + } + + /** + * Returns the number of arenas. + * + * @return Number of arenas. + */ + int getNumberOfArenas() { + return numberOfArenas; + } + + /** + * Returns the chunk size. + * + * @return Chunk size. + */ + int getChunkSize() { + return chunkSize; + } + + // ------------------------------------------------------------------------ + // Direct pool arena stats via Reflection. This is not safe when upgrading + // Netty versions, but we are currently bound to the version we have (see + // commit d92e422). In newer Netty versions these statistics are exposed. + // ------------------------------------------------------------------------ + + /** + * Returns the number of currently allocated bytes. + * + *

The stats are gathered via Reflection and are mostly relevant for + * debugging purposes. + * + * @return Number of currently allocated bytes. + * + * @throws NoSuchFieldException Error getting the statistics (should not + * happen when the Netty version stays the + * same). + * @throws IllegalAccessException Error getting the statistics (should not + * happen when the Netty version stays the + * same). + */ + public Option getNumberOfAllocatedBytes() + throws NoSuchFieldException, IllegalAccessException { + + if (directArenas != null) { + int numChunks = 0; + for (Object arena : directArenas) { + numChunks += getNumberOfAllocatedChunks(arena, "qInit"); + numChunks += getNumberOfAllocatedChunks(arena, "q000"); + numChunks += getNumberOfAllocatedChunks(arena, "q025"); + numChunks += getNumberOfAllocatedChunks(arena, "q050"); + numChunks += getNumberOfAllocatedChunks(arena, "q075"); + numChunks += getNumberOfAllocatedChunks(arena, "q100"); + } + + long allocatedBytes = numChunks * chunkSize; + return Option.apply(allocatedBytes); + } else { + return Option.empty(); + } + } + + /** + * Returns the number of allocated bytes of the given arena and chunk list. + * + * @param arena Arena to gather statistics about. + * @param chunkListFieldName Chunk list to check. + * + * @return Number of total allocated bytes by this arena. + * + * @throws NoSuchFieldException Error getting the statistics (should not + * happen when the Netty version stays the + * same). + * @throws IllegalAccessException Error getting the statistics (should not + * happen when the Netty version stays the + * same). + */ + private long getNumberOfAllocatedChunks(Object arena, String chunkListFieldName) + throws NoSuchFieldException, IllegalAccessException { + + // Each PoolArena stores its allocated PoolChunk + // instances grouped by usage (field qInit, q000, q025, etc.) in + // PoolChunkList lists. Each list has zero or more + // PoolChunk instances. + + // Chunk list of arena + Field chunkListField = arena.getClass().getSuperclass() + .getDeclaredField(chunkListFieldName); + chunkListField.setAccessible(true); + Object chunkList = chunkListField.get(arena); + + // Count the chunks in the list + Field headChunkField = chunkList.getClass().getDeclaredField("head"); + headChunkField.setAccessible(true); + Object headChunk = headChunkField.get(chunkList); + + if (headChunk == null) { + return 0; + } else { + int numChunks = 0; + + Object current = headChunk; + + while (current != null) { + Field nextChunkField = headChunk.getClass().getDeclaredField("next"); + nextChunkField.setAccessible(true); + current = nextChunkField.get(current); + numChunks++; + } + + return numChunks; + } + } + + // ------------------------------------------------------------------------ + // Delegate calls to the allocated and prohibit heap buffer allocations + // ------------------------------------------------------------------------ + + @Override + public ByteBuf buffer() { + return alloc.buffer(); + } + + @Override + public ByteBuf buffer(int initialCapacity) { + return alloc.buffer(initialCapacity); + } + + @Override + public ByteBuf buffer(int initialCapacity, int maxCapacity) { + return alloc.buffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf ioBuffer() { + return alloc.ioBuffer(); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity) { + return alloc.ioBuffer(initialCapacity); + } + + @Override + public ByteBuf ioBuffer(int initialCapacity, int maxCapacity) { + return alloc.ioBuffer(initialCapacity, maxCapacity); + } + + @Override + public ByteBuf heapBuffer() { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity) { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public ByteBuf heapBuffer(int initialCapacity, int maxCapacity) { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public ByteBuf directBuffer() { + return alloc.directBuffer(); + } + + @Override + public ByteBuf directBuffer(int initialCapacity) { + return alloc.directBuffer(initialCapacity); + } + + @Override + public ByteBuf directBuffer(int initialCapacity, int maxCapacity) { + return alloc.directBuffer(initialCapacity, maxCapacity); + } + + @Override + public CompositeByteBuf compositeBuffer() { + return alloc.compositeBuffer(); + } + + @Override + public CompositeByteBuf compositeBuffer(int maxNumComponents) { + return alloc.compositeBuffer(maxNumComponents); + } + + @Override + public CompositeByteBuf compositeHeapBuffer() { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public CompositeByteBuf compositeHeapBuffer(int maxNumComponents) { + throw new UnsupportedOperationException("Heap buffer"); + } + + @Override + public CompositeByteBuf compositeDirectBuffer() { + return alloc.compositeDirectBuffer(); + } + + @Override + public CompositeByteBuf compositeDirectBuffer(int maxNumComponents) { + return alloc.compositeDirectBuffer(maxNumComponents); + } + + @Override + public boolean isDirectBufferPooled() { + return alloc.isDirectBufferPooled(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java index 6cab15dcbbca6..5fd04de5bb78d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.io.network.netty; import io.netty.bootstrap.Bootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -49,7 +48,7 @@ class NettyClient { this.config = config; } - void init(final NettyProtocol protocol) throws IOException { + void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException { checkState(bootstrap == null, "Netty client has already been initialized."); long start = System.currentTimeMillis(); @@ -91,7 +90,7 @@ void init(final NettyProtocol protocol) throws IOException { bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutSeconds() * 1000); // Pooled allocator for Netty's ByteBuf instances - bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.option(ChannelOption.ALLOCATOR, nettyBufferPool); // Receive and send buffer size int receiveAndSendBufferSize = config.getSendAndReceiveBufferSize(); @@ -119,6 +118,10 @@ NettyConfig getConfig() { return config; } + Bootstrap getBootstrap() { + return bootstrap; + } + void shutdown() { long start = System.currentTimeMillis(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java index 9d4f0785427ae..0c52e02e873ea 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java @@ -33,6 +33,8 @@ public class NettyConfig { // - Config keys ---------------------------------------------------------- + public static final String NUM_ARENAS = "taskmanager.net.num-arenas"; + public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads"; public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads"; @@ -61,12 +63,15 @@ enum TransportType { private final int memorySegmentSize; + private final int numberOfSlots; + private final Configuration config; // optional configuration public NettyConfig( InetAddress serverAddress, int serverPort, int memorySegmentSize, + int numberOfSlots, Configuration config) { this.serverAddress = checkNotNull(serverAddress); @@ -77,6 +82,9 @@ public NettyConfig( checkArgument(memorySegmentSize > 0, "Invalid memory segment size."); this.memorySegmentSize = memorySegmentSize; + checkArgument(numberOfSlots > 0, "Number of slots"); + this.numberOfSlots = numberOfSlots; + this.config = checkNotNull(config); LOG.info(this.toString()); @@ -94,6 +102,10 @@ int getMemorySegmentSize() { return memorySegmentSize; } + public int getNumberOfSlots() { + return numberOfSlots; + } + // ------------------------------------------------------------------------ // Setters // ------------------------------------------------------------------------ @@ -153,14 +165,19 @@ public int getServerConnectBacklog() { return config.getInteger(CONNECT_BACKLOG, 0); } + public int getNumberOfArenas() { + // default: number of slots + return config.getInteger(NUM_ARENAS, numberOfSlots); + } + public int getServerNumThreads() { - // default: 0 => Netty's default: 2 * #cores - return config.getInteger(NUM_THREADS_SERVER, 0); + // default: number of task slots + return config.getInteger(NUM_THREADS_SERVER, numberOfSlots); } public int getClientNumThreads() { - // default: 0 => Netty's default: 2 * #cores - return config.getInteger(NUM_THREADS_CLIENT, 0); + // default: number of task slots + return config.getInteger(NUM_THREADS_CLIENT, numberOfSlots); } public int getClientConnectTimeoutSeconds() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java index 0b66b2c8150c5..d278b3c7effe1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java @@ -32,11 +32,14 @@ public class NettyConnectionManager implements ConnectionManager { private final NettyClient client; + private final NettyBufferPool bufferPool; + private final PartitionRequestClientFactory partitionRequestClientFactory; public NettyConnectionManager(NettyConfig nettyConfig) { this.server = new NettyServer(nettyConfig); this.client = new NettyClient(nettyConfig); + this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas()); this.partitionRequestClientFactory = new PartitionRequestClientFactory(client); } @@ -47,8 +50,8 @@ public void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher PartitionRequestProtocol partitionRequestProtocol = new PartitionRequestProtocol(partitionProvider, taskEventDispatcher, networkbufferPool); - client.init(partitionRequestProtocol); - server.init(partitionRequestProtocol); + client.init(partitionRequestProtocol, bufferPool); + server.init(partitionRequestProtocol, bufferPool); } @Override @@ -72,4 +75,16 @@ public void shutdown() { client.shutdown(); server.shutdown(); } + + NettyClient getClient() { + return client; + } + + NettyServer getServer() { + return server; + } + + NettyBufferPool getBufferPool() { + return bufferPool; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java index 00fec8768f58f..4436cafb33c43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java @@ -20,7 +20,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; import io.netty.bootstrap.ServerBootstrap; -import io.netty.buffer.PooledByteBufAllocator; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; @@ -55,7 +54,7 @@ class NettyServer { this.config = checkNotNull(config); } - void init(final NettyProtocol protocol) throws IOException { + void init(final NettyProtocol protocol, NettyBufferPool nettyBufferPool) throws IOException { checkState(bootstrap == null, "Netty server has already been initialized."); long start = System.currentTimeMillis(); @@ -94,8 +93,8 @@ void init(final NettyProtocol protocol) throws IOException { bootstrap.localAddress(config.getServerAddress(), config.getServerPort()); // Pooled allocators for Netty's ByteBuf instances - bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); - bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); + bootstrap.option(ChannelOption.ALLOCATOR, nettyBufferPool); + bootstrap.childOption(ChannelOption.ALLOCATOR, nettyBufferPool); if (config.getServerConnectBacklog() > 0) { bootstrap.option(ChannelOption.SO_BACKLOG, config.getServerConnectBacklog()); @@ -137,6 +136,10 @@ NettyConfig getConfig() { return config; } + ServerBootstrap getBootstrap() { + return bootstrap; + } + void shutdown() { long start = System.currentTimeMillis(); if (bindFuture != null) { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index afceaf986d847..5ad9dad69069c 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -1813,6 +1813,7 @@ object TaskManager { connectionInfo.address(), connectionInfo.dataPort(), pageSize, + slots, configuration) ) } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java index 84f9f14439e12..fca3cebdb0885 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NetworkEnvironmentTest.java @@ -76,7 +76,7 @@ public void testAssociateDisassociate() { } try { - NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, new Configuration()); + NettyConfig nettyConf = new NettyConfig(InetAddress.getLocalHost(), port, BUFFER_SIZE, 1, new Configuration()); NetworkEnvironmentConfiguration config = new NetworkEnvironmentConfiguration( NUM_BUFFERS, BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, new Some<>(nettyConf), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java new file mode 100644 index 0000000000000..956d54cf350df --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyBufferPoolTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * Tests for the {@link io.netty.buffer.PooledByteBufAllocator} wrapper. + */ +public class NettyBufferPoolTest { + + @Test + public void testNoHeapAllocations() throws Exception { + NettyBufferPool nettyBufferPool = new NettyBufferPool(1); + + // Buffers should prefer to be direct + assertTrue(nettyBufferPool.buffer().isDirect()); + assertTrue(nettyBufferPool.buffer(128).isDirect()); + assertTrue(nettyBufferPool.buffer(128, 256).isDirect()); + + // IO buffers should prefer to be direct + assertTrue(nettyBufferPool.ioBuffer().isDirect()); + assertTrue(nettyBufferPool.ioBuffer(128).isDirect()); + assertTrue(nettyBufferPool.ioBuffer(128, 256).isDirect()); + + // Disallow heap buffers + try { + nettyBufferPool.heapBuffer(); + fail("Unexpected heap buffer operation"); + } catch (UnsupportedOperationException ignored) { + } + + try { + nettyBufferPool.heapBuffer(128); + fail("Unexpected heap buffer operation"); + } catch (UnsupportedOperationException ignored) { + } + + try { + nettyBufferPool.heapBuffer(128, 256); + fail("Unexpected heap buffer operation"); + } catch (UnsupportedOperationException ignored) { + } + + // Disallow composite heap buffers + try { + nettyBufferPool.compositeHeapBuffer(); + fail("Unexpected heap buffer operation"); + } catch (UnsupportedOperationException ignored) { + } + + try { + nettyBufferPool.compositeHeapBuffer(2); + fail("Unexpected heap buffer operation"); + } catch (UnsupportedOperationException ignored) { + } + + // Is direct buffer pooled! + assertTrue(nettyBufferPool.isDirectBufferPooled()); + } + + @Test + public void testAllocationsStatistics() throws Exception { + NettyBufferPool nettyBufferPool = new NettyBufferPool(1); + int chunkSize = nettyBufferPool.getChunkSize(); + + { + // Single large buffer allocates one chunk + nettyBufferPool.directBuffer(chunkSize - 64); + long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get(); + assertEquals(chunkSize, allocated); + } + + { + // Allocate a little more (one more chunk required) + nettyBufferPool.directBuffer(128); + long allocated = nettyBufferPool.getNumberOfAllocatedBytes().get(); + assertEquals(2 * chunkSize, allocated); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java new file mode 100644 index 0000000000000..8ab572eabe175 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.EventLoopGroup; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.io.network.TaskEventDispatcher; +import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider; +import org.apache.flink.util.NetUtils; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.net.InetAddress; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; + +/** + * Simple netty connection manager test. + */ +public class NettyConnectionManagerTest { + + /** + * Tests that the number of arenas and number of threads of the client and + * server are set to the same number, that is the number of configured + * task slots. + */ + @Test + public void testMatchingNumberOfArenasAndThreadsAsDefault() throws Exception { + // Expected number of arenas and threads + int numberOfSlots = 2; + + NettyConfig config = new NettyConfig( + InetAddress.getLocalHost(), + NetUtils.getAvailablePort(), + 1024, + numberOfSlots, + new Configuration()); + + NettyConnectionManager connectionManager = new NettyConnectionManager(config); + + connectionManager.start( + mock(ResultPartitionProvider.class), + mock(TaskEventDispatcher.class), + mock(NetworkBufferPool.class)); + + assertEquals(numberOfSlots, connectionManager.getBufferPool().getNumberOfArenas()); + + { + // Client event loop group + Bootstrap boostrap = connectionManager.getClient().getBootstrap(); + EventLoopGroup group = boostrap.group(); + + Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children"); + f.setAccessible(true); + Object[] eventExecutors = (Object[]) f.get(group); + + assertEquals(numberOfSlots, eventExecutors.length); + } + + { + // Server event loop group + ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap(); + EventLoopGroup group = bootstrap.group(); + + Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children"); + f.setAccessible(true); + Object[] eventExecutors = (Object[]) f.get(group); + + assertEquals(numberOfSlots, eventExecutors.length); + } + + { + // Server child event loop group + ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap(); + EventLoopGroup group = bootstrap.childGroup(); + + Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children"); + f.setAccessible(true); + Object[] eventExecutors = (Object[]) f.get(group); + + assertEquals(numberOfSlots, eventExecutors.length); + } + } + + /** + * Tests that the number of arenas and threads can be configured manually. + */ + @Test + public void testManualConfiguration() throws Exception { + // Expected numbers + int numberOfArenas = 1; + int numberOfClientThreads = 3; + int numberOfServerThreads = 4; + + // Expected number of threads + Configuration flinkConfig = new Configuration(); + flinkConfig.setInteger(NettyConfig.NUM_ARENAS, numberOfArenas); + flinkConfig.setInteger(NettyConfig.NUM_THREADS_CLIENT, 3); + flinkConfig.setInteger(NettyConfig.NUM_THREADS_SERVER, 4); + + NettyConfig config = new NettyConfig( + InetAddress.getLocalHost(), + NetUtils.getAvailablePort(), + 1024, + 1337, + flinkConfig); + + NettyConnectionManager connectionManager = new NettyConnectionManager(config); + + connectionManager.start( + mock(ResultPartitionProvider.class), + mock(TaskEventDispatcher.class), + mock(NetworkBufferPool.class)); + + assertEquals(numberOfArenas, connectionManager.getBufferPool().getNumberOfArenas()); + + { + // Client event loop group + Bootstrap boostrap = connectionManager.getClient().getBootstrap(); + EventLoopGroup group = boostrap.group(); + + Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children"); + f.setAccessible(true); + Object[] eventExecutors = (Object[]) f.get(group); + + assertEquals(numberOfClientThreads, eventExecutors.length); + } + + { + // Server event loop group + ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap(); + EventLoopGroup group = bootstrap.group(); + + Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children"); + f.setAccessible(true); + Object[] eventExecutors = (Object[]) f.get(group); + + assertEquals(numberOfServerThreads, eventExecutors.length); + } + + { + // Server child event loop group + ServerBootstrap bootstrap = connectionManager.getServer().getBootstrap(); + EventLoopGroup group = bootstrap.childGroup(); + + Field f = group.getClass().getSuperclass().getSuperclass().getDeclaredField("children"); + f.setAccessible(true); + Object[] eventExecutors = (Object[]) f.get(group); + + assertEquals(numberOfServerThreads, eventExecutors.length); + } + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java index bf014221b1590..00e8ace1caaf7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyTestUtil.java @@ -43,11 +43,11 @@ public class NettyTestUtil { // NettyServer and NettyClient // --------------------------------------------------------------------------------------------- - static NettyServer initServer(NettyConfig config, NettyProtocol protocol) throws Exception { + static NettyServer initServer(NettyConfig config, NettyProtocol protocol, NettyBufferPool bufferPool) throws Exception { final NettyServer server = new NettyServer(config); try { - server.init(protocol); + server.init(protocol, bufferPool); } catch (Exception e) { server.shutdown(); @@ -57,11 +57,11 @@ static NettyServer initServer(NettyConfig config, NettyProtocol protocol) throws return server; } - static NettyClient initClient(NettyConfig config, NettyProtocol protocol) throws Exception { + static NettyClient initClient(NettyConfig config, NettyProtocol protocol, NettyBufferPool bufferPool) throws Exception { final NettyClient client = new NettyClient(config); try { - client.init(protocol); + client.init(protocol, bufferPool); } catch (Exception e) { client.shutdown(); @@ -78,8 +78,10 @@ static NettyServerAndClient initServerAndClient(NettyProtocol protocol) throws E static NettyServerAndClient initServerAndClient(NettyProtocol protocol, NettyConfig config) throws Exception { - final NettyClient client = initClient(config, protocol); - final NettyServer server = initServer(config, protocol); + NettyBufferPool bufferPool = new NettyBufferPool(1); + + final NettyClient client = initClient(config, protocol, bufferPool); + final NettyServer server = initServer(config, protocol, bufferPool); return new NettyServerAndClient(server, client); } @@ -140,6 +142,7 @@ static NettyConfig createConfig(int segmentSize, Configuration config) throws Ex InetAddress.getLocalHost(), NetUtils.getAvailablePort(), segmentSize, + 1, config); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java index ca8810b4eb9f6..cabca2356a169 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java @@ -158,7 +158,7 @@ private List getErrors() { // ------------------------------------------------------------------------ private static Tuple2 createNettyServerAndClient(NettyProtocol protocol) throws IOException { - final NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), SERVER_PORT, 32 * 1024, new Configuration()); + final NettyConfig config = new NettyConfig(InetAddress.getLocalHost(), SERVER_PORT, 32 * 1024, 1, new Configuration()); final NettyServer server = new NettyServer(config); final NettyClient client = new NettyClient(config); @@ -166,8 +166,10 @@ private static Tuple2 createNettyServerAndClient(Netty boolean success = false; try { - server.init(protocol); - client.init(protocol); + NettyBufferPool bufferPool = new NettyBufferPool(1); + + server.init(protocol, bufferPool); + client.init(protocol, bufferPool); success = true; }