From 256b2ee3e530eea80d73ae1dd013648de26af5b5 Mon Sep 17 00:00:00 2001 From: Ufuk Celebi Date: Mon, 2 Mar 2015 18:41:15 +0100 Subject: [PATCH] [distributed runtime] [tests] Add helper setters for NettyConfig --- .../runtime/io/network/netty/NettyClient.java | 2 +- .../runtime/io/network/netty/NettyConfig.java | 76 +++++++++++++++++-- 2 files changed, 71 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java index 35bccf4aa1249..86e84ae2a1519 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java @@ -88,7 +88,7 @@ void init(final NettyProtocol protocol) throws IOException { bootstrap.option(ChannelOption.SO_KEEPALIVE, true); // Timeout for new connections - bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutMs() * 1000); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutSeconds() * 1000); // Pooled allocator for Netty's ByteBuf instances bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java index a3e01fee7afc3..45ffb1547e941 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConfig.java @@ -31,10 +31,22 @@ public class NettyConfig { private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class); + // - Config keys ---------------------------------------------------------- + public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads"; public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads"; + public static final String CONNECT_BACKLOG = "taskmanager.net.server.backlog"; + + public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = "taskmanager.net.client.connectTimeoutSec"; + + public static final String SEND_RECEIVE_BUFFER_SIZE = "taskmanager.net.sendReceiveBufferSize"; + + public static final String TRANSPORT_TYPE = "taskmanager.net.transport"; + + // ------------------------------------------------------------------------ + static enum TransportType { NIO, EPOLL, AUTO } @@ -77,11 +89,63 @@ int getMemorySegmentSize() { return memorySegmentSize; } + // ------------------------------------------------------------------------ + // Setters + // ------------------------------------------------------------------------ + + NettyConfig setServerConnectBacklog(int connectBacklog) { + checkArgument(connectBacklog >= 0); + config.setInteger(CONNECT_BACKLOG, connectBacklog); + + return this; + } + + NettyConfig setServerNumThreads(int numThreads) { + checkArgument(numThreads >= 0); + config.setInteger(NUM_THREADS_SERVER, numThreads); + + return this; + } + + NettyConfig setClientNumThreads(int numThreads) { + checkArgument(numThreads >= 0); + config.setInteger(NUM_THREADS_CLIENT, numThreads); + + return this; + } + + NettyConfig setClientConnectTimeoutSeconds(int connectTimeoutSeconds) { + checkArgument(connectTimeoutSeconds >= 0); + config.setInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, connectTimeoutSeconds); + + return this; + } + + NettyConfig setSendAndReceiveBufferSize(int bufferSize) { + checkArgument(bufferSize >= 0); + config.setInteger(SEND_RECEIVE_BUFFER_SIZE, bufferSize); + + return this; + } + + NettyConfig setTransportType(String transport) { + if (transport.equals("nio") || transport.equals("epoll") || transport.equals("auto")) { + config.setString(TRANSPORT_TYPE, transport); + } + else { + throw new IllegalArgumentException("Unknown transport type."); + } + + return this; + } + + // ------------------------------------------------------------------------ + // Getters // ------------------------------------------------------------------------ int getServerConnectBacklog() { // default: 0 => Netty's default - return config.getInteger("taskmanager.net.server.backlog", 0); + return config.getInteger(CONNECT_BACKLOG, 0); } int getServerNumThreads() { @@ -94,18 +158,18 @@ int getClientNumThreads() { return config.getInteger(NUM_THREADS_CLIENT, 0); } - int getClientConnectTimeoutMs() { + int getClientConnectTimeoutSeconds() { // default: 120s = 2min - return config.getInteger("taskmanager.net.client.connectTimeoutSec", 120); + return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, 120); } int getSendAndReceiveBufferSize() { // default: 0 => Netty's default - return config.getInteger("taskmanager.net.sendReceiveBufferSize", 0); + return config.getInteger(SEND_RECEIVE_BUFFER_SIZE, 0); } TransportType getTransportType() { - String transport = config.getString("taskmanager.net.transport", "nio"); + String transport = config.getString(TRANSPORT_TYPE, "nio"); if (transport.equals("nio")) { return TransportType.NIO; @@ -138,7 +202,7 @@ public String toString() { getTransportType(), getServerNumThreads(), getServerNumThreads() == 0 ? def : man, getClientNumThreads(), getClientNumThreads() == 0 ? def : man, getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man, - getClientConnectTimeoutMs(), getSendAndReceiveBufferSize(), + getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(), getSendAndReceiveBufferSize() == 0 ? def : man); } }