Skip to content

Commit

Permalink
[distributed runtime] [tests] Add helper setters for NettyConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
uce committed Mar 3, 2015
1 parent 2a52871 commit 256b2ee
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 7 deletions.
Expand Up @@ -88,7 +88,7 @@ void init(final NettyProtocol protocol) throws IOException {
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);

// Timeout for new connections
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutMs() * 1000);
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, config.getClientConnectTimeoutSeconds() * 1000);

// Pooled allocator for Netty's ByteBuf instances
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
Expand Down
Expand Up @@ -31,10 +31,22 @@ public class NettyConfig {

private static final Logger LOG = LoggerFactory.getLogger(NettyConfig.class);

// - Config keys ----------------------------------------------------------

public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads";

public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads";

public static final String CONNECT_BACKLOG = "taskmanager.net.server.backlog";

public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = "taskmanager.net.client.connectTimeoutSec";

public static final String SEND_RECEIVE_BUFFER_SIZE = "taskmanager.net.sendReceiveBufferSize";

public static final String TRANSPORT_TYPE = "taskmanager.net.transport";

// ------------------------------------------------------------------------

static enum TransportType {
NIO, EPOLL, AUTO
}
Expand Down Expand Up @@ -77,11 +89,63 @@ int getMemorySegmentSize() {
return memorySegmentSize;
}

// ------------------------------------------------------------------------
// Setters
// ------------------------------------------------------------------------

NettyConfig setServerConnectBacklog(int connectBacklog) {
checkArgument(connectBacklog >= 0);
config.setInteger(CONNECT_BACKLOG, connectBacklog);

return this;
}

NettyConfig setServerNumThreads(int numThreads) {
checkArgument(numThreads >= 0);
config.setInteger(NUM_THREADS_SERVER, numThreads);

return this;
}

NettyConfig setClientNumThreads(int numThreads) {
checkArgument(numThreads >= 0);
config.setInteger(NUM_THREADS_CLIENT, numThreads);

return this;
}

NettyConfig setClientConnectTimeoutSeconds(int connectTimeoutSeconds) {
checkArgument(connectTimeoutSeconds >= 0);
config.setInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, connectTimeoutSeconds);

return this;
}

NettyConfig setSendAndReceiveBufferSize(int bufferSize) {
checkArgument(bufferSize >= 0);
config.setInteger(SEND_RECEIVE_BUFFER_SIZE, bufferSize);

return this;
}

NettyConfig setTransportType(String transport) {
if (transport.equals("nio") || transport.equals("epoll") || transport.equals("auto")) {
config.setString(TRANSPORT_TYPE, transport);
}
else {
throw new IllegalArgumentException("Unknown transport type.");
}

return this;
}

// ------------------------------------------------------------------------
// Getters
// ------------------------------------------------------------------------

int getServerConnectBacklog() {
// default: 0 => Netty's default
return config.getInteger("taskmanager.net.server.backlog", 0);
return config.getInteger(CONNECT_BACKLOG, 0);
}

int getServerNumThreads() {
Expand All @@ -94,18 +158,18 @@ int getClientNumThreads() {
return config.getInteger(NUM_THREADS_CLIENT, 0);
}

int getClientConnectTimeoutMs() {
int getClientConnectTimeoutSeconds() {
// default: 120s = 2min
return config.getInteger("taskmanager.net.client.connectTimeoutSec", 120);
return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, 120);
}

int getSendAndReceiveBufferSize() {
// default: 0 => Netty's default
return config.getInteger("taskmanager.net.sendReceiveBufferSize", 0);
return config.getInteger(SEND_RECEIVE_BUFFER_SIZE, 0);
}

TransportType getTransportType() {
String transport = config.getString("taskmanager.net.transport", "nio");
String transport = config.getString(TRANSPORT_TYPE, "nio");

if (transport.equals("nio")) {
return TransportType.NIO;
Expand Down Expand Up @@ -138,7 +202,7 @@ public String toString() {
getTransportType(), getServerNumThreads(), getServerNumThreads() == 0 ? def : man,
getClientNumThreads(), getClientNumThreads() == 0 ? def : man,
getServerConnectBacklog(), getServerConnectBacklog() == 0 ? def : man,
getClientConnectTimeoutMs(), getSendAndReceiveBufferSize(),
getClientConnectTimeoutSeconds(), getSendAndReceiveBufferSize(),
getSendAndReceiveBufferSize() == 0 ? def : man);
}
}

0 comments on commit 256b2ee

Please sign in to comment.