Skip to content

Commit

Permalink
[hotfix] [config] Harmonize configuration keys for TaskManager networ…
Browse files Browse the repository at this point in the history
…k settings.

This preserves old config keys as deprecated keys where the key was already present
in an earlier release.

This also re-arranges config options to form logical sections in the file
and harmonized JavaDoc formatting style.
  • Loading branch information
StephanEwen committed May 6, 2017
1 parent 710c08b commit aed3b80
Show file tree
Hide file tree
Showing 2 changed files with 116 additions and 120 deletions.
Expand Up @@ -29,22 +29,41 @@
public class TaskManagerOptions { public class TaskManagerOptions {


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// TaskManager Options // General TaskManager Options
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


// @TODO Migrate 'taskmanager.*' config options from ConfigConstants // @TODO Migrate 'taskmanager.*' config options from ConfigConstants

/** Whether to kill the TaskManager when the task thread throws an OutOfMemoryError */
public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
key("taskmanager.jvm-exit-on-oom")
.defaultValue(false);


/** JVM heap size (in megabytes) for the TaskManagers */ /**
* JVM heap size (in megabytes) for the TaskManagers
*/
public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY = public static final ConfigOption<Integer> TASK_MANAGER_HEAP_MEMORY =
key("taskmanager.heap.mb") key("taskmanager.heap.mb")
.defaultValue(1024); .defaultValue(1024);


/** Size of memory buffers used by the network stack and the memory manager (in bytes). */ /**
* Whether to kill the TaskManager when the task thread throws an OutOfMemoryError
*/
public static final ConfigOption<Boolean> KILL_ON_OUT_OF_MEMORY =
key("taskmanager.jvm-exit-on-oom")
.defaultValue(false);

/**
* Whether the quarantine monitor for task managers shall be started. The quarantine monitor
* shuts down the actor system if it detects that it has quarantined another actor system
* or if it has been quarantined by another actor system.
*/
public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
key("taskmanager.exit-on-fatal-akka-error")
.defaultValue(false);

// ------------------------------------------------------------------------
// Managed Memory Options
// ------------------------------------------------------------------------

/**
* Size of memory buffers used by the network stack and the memory manager (in bytes).
*/
public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE = public static final ConfigOption<Integer> MEMORY_SEGMENT_SIZE =
key("taskmanager.memory.segment-size") key("taskmanager.memory.segment-size")
.defaultValue(32768); .defaultValue(32768);
Expand Down Expand Up @@ -73,7 +92,9 @@ public class TaskManagerOptions {
key("taskmanager.memory.off-heap") key("taskmanager.memory.off-heap")
.defaultValue(false); .defaultValue(false);


/** Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting. */ /**
* Whether TaskManager managed memory should be pre-allocated when the TaskManager is starting.
*/
public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE = public static final ConfigOption<Boolean> MANAGED_MEMORY_PRE_ALLOCATE =
key("taskmanager.memory.preallocate") key("taskmanager.memory.preallocate")
.defaultValue(false); .defaultValue(false);
Expand All @@ -94,53 +115,65 @@ public class TaskManagerOptions {
key("taskmanager.network.numberOfBuffers") key("taskmanager.network.numberOfBuffers")
.defaultValue(2048); .defaultValue(2048);


/** Fraction of JVM memory to use for network buffers. */ /**
* Fraction of JVM memory to use for network buffers.
*/
public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION = public static final ConfigOption<Float> NETWORK_BUFFERS_MEMORY_FRACTION =
key("taskmanager.network.memory.fraction") key("taskmanager.network.memory.fraction")
.defaultValue(0.1f); .defaultValue(0.1f);


/** Minimum memory size for network buffers (in bytes) */ /**
* Minimum memory size for network buffers (in bytes)
*/
public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN = public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MIN =
key("taskmanager.network.memory.min") key("taskmanager.network.memory.min")
.defaultValue(64L << 20); // 64 MB .defaultValue(64L << 20); // 64 MB


/** Maximum memory size for network buffers (in bytes) */ /**
* Maximum memory size for network buffers (in bytes)
*/
public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX = public static final ConfigOption<Long> NETWORK_BUFFERS_MEMORY_MAX =
key("taskmanager.network.memory.max") key("taskmanager.network.memory.max")
.defaultValue(1024L << 20); // 1 GB .defaultValue(1024L << 20); // 1 GB



/** Minimum backoff for partition requests of input channels. */
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.net.request-backoff.initial")
.defaultValue(100);

/** Maximum backoff for partition requests of input channels. */
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key("taskmanager.net.request-backoff.max")
.defaultValue(10000);


/** /**
* Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel). * Number of network buffers to use for each outgoing/ingoing channel (subpartition/input channel).
* *
* Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization * Reasoning: 1 buffer for in-flight data in the subpartition + 1 buffer for parallel serialization
*/ */
public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL = public static final ConfigOption<Integer> NETWORK_BUFFERS_PER_CHANNEL =
key("taskmanager.net.memory.buffers-per-channel") key("taskmanager.network.memory.buffers-per-channel")
.defaultValue(2); .defaultValue(2);


/** Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate). */ /**
* Number of extra network buffers to use for each outgoing/ingoing gate (result partition/input gate).
*/
public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE = public static final ConfigOption<Integer> NETWORK_EXTRA_BUFFERS_PER_GATE =
key("taskmanager.net.memory.extra-buffers-per-gate") key("taskmanager.network.memory.floating-buffers-per-gate")
.defaultValue(8); .defaultValue(8);


/**
* Minimum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_INITIAL =
key("taskmanager.network.request-backoff.initial")
.defaultValue(100)
.withDeprecatedKeys("taskmanager.net.request-backoff.initial");

/**
* Maximum backoff for partition requests of input channels.
*/
public static final ConfigOption<Integer> NETWORK_REQUEST_BACKOFF_MAX =
key("taskmanager.network.request-backoff.max")
.defaultValue(10000)
.withDeprecatedKeys("taskmanager.net.request-backoff.max");

/** /**
* Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
* lengths. * lengths.
*/ */
public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS = public static final ConfigOption<Boolean> NETWORK_DETAILED_METRICS =
key("taskmanager.net.detailed-metrics") key("taskmanager.network.detailed-metrics")
.defaultValue(false); .defaultValue(false);


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
Expand Down Expand Up @@ -176,15 +209,6 @@ public class TaskManagerOptions {
key("task.checkpoint.alignment.max-size") key("task.checkpoint.alignment.max-size")
.defaultValue(-1L); .defaultValue(-1L);


/**
* Whether the quarantine monitor for task managers shall be started. The quarantine monitor
* shuts down the actor system if it detects that it has quarantined another actor system
* or if it has been quarantined by another actor system.
*/
public static final ConfigOption<Boolean> EXIT_ON_FATAL_AKKA_ERROR =
key("taskmanager.exit-on-fatal-akka-error")
.defaultValue(false);

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


/** Not intended to be instantiated */ /** Not intended to be instantiated */
Expand Down
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.runtime.io.network.netty; package org.apache.flink.runtime.io.network.netty;


import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.net.SSLUtils; import org.apache.flink.runtime.net.SSLUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
Expand All @@ -38,19 +40,40 @@ public class NettyConfig {


// - Config keys ---------------------------------------------------------- // - Config keys ----------------------------------------------------------


public static final String NUM_ARENAS = "taskmanager.net.num-arenas"; public static final ConfigOption<Integer> NUM_ARENAS = ConfigOptions

.key("taskmanager.network.netty.num-arenas")
public static final String NUM_THREADS_SERVER = "taskmanager.net.server.numThreads"; .defaultValue(-1)

.withDeprecatedKeys("taskmanager.net.num-arenas");
public static final String NUM_THREADS_CLIENT = "taskmanager.net.client.numThreads";

public static final ConfigOption<Integer> NUM_THREADS_SERVER = ConfigOptions
public static final String CONNECT_BACKLOG = "taskmanager.net.server.backlog"; .key("taskmanager.network.netty.server.numThreads")

.defaultValue(-1)
public static final String CLIENT_CONNECT_TIMEOUT_SECONDS = "taskmanager.net.client.connectTimeoutSec"; .withDeprecatedKeys("taskmanager.net.server.numThreads");


public static final String SEND_RECEIVE_BUFFER_SIZE = "taskmanager.net.sendReceiveBufferSize"; public static final ConfigOption<Integer> NUM_THREADS_CLIENT = ConfigOptions

.key("taskmanager.network.netty.client.numThreads")
public static final String TRANSPORT_TYPE = "taskmanager.net.transport"; .defaultValue(-1)
.withDeprecatedKeys("taskmanager.net.client.numThreads");

public static final ConfigOption<Integer> CONNECT_BACKLOG = ConfigOptions
.key("taskmanager.network.netty.server.backlog")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.server.backlog");

public static final ConfigOption<Integer> CLIENT_CONNECT_TIMEOUT_SECONDS = ConfigOptions
.key("taskmanager.network.netty.client.connectTimeoutSec")
.defaultValue(120) // default: 120s = 2min
.withDeprecatedKeys("taskmanager.net.client.connectTimeoutSec");

public static final ConfigOption<Integer> SEND_RECEIVE_BUFFER_SIZE = ConfigOptions
.key("taskmanager.network.netty.sendReceiveBufferSize")
.defaultValue(0) // default: 0 => Netty's default
.withDeprecatedKeys("taskmanager.net.sendReceiveBufferSize");

public static final ConfigOption<String> TRANSPORT_TYPE = ConfigOptions
.key("taskmanager.network.netty.transport")
.defaultValue("nio")
.withDeprecatedKeys("taskmanager.net.transport");


// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


Expand Down Expand Up @@ -111,101 +134,50 @@ public int getNumberOfSlots() {
return numberOfSlots; return numberOfSlots;
} }


// ------------------------------------------------------------------------
// Setters
// ------------------------------------------------------------------------

public NettyConfig setServerConnectBacklog(int connectBacklog) {
checkArgument(connectBacklog >= 0);
config.setInteger(CONNECT_BACKLOG, connectBacklog);

return this;
}

public NettyConfig setServerNumThreads(int numThreads) {
checkArgument(numThreads >= 0);
config.setInteger(NUM_THREADS_SERVER, numThreads);

return this;
}

public NettyConfig setClientNumThreads(int numThreads) {
checkArgument(numThreads >= 0);
config.setInteger(NUM_THREADS_CLIENT, numThreads);

return this;
}

public NettyConfig setClientConnectTimeoutSeconds(int connectTimeoutSeconds) {
checkArgument(connectTimeoutSeconds >= 0);
config.setInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, connectTimeoutSeconds);

return this;
}

public NettyConfig setSendAndReceiveBufferSize(int bufferSize) {
checkArgument(bufferSize >= 0);
config.setInteger(SEND_RECEIVE_BUFFER_SIZE, bufferSize);

return this;
}

public NettyConfig setTransportType(String transport) {
if (transport.equals("nio") || transport.equals("epoll") || transport.equals("auto")) {
config.setString(TRANSPORT_TYPE, transport);
}
else {
throw new IllegalArgumentException("Unknown transport type.");
}

return this;
}

// ------------------------------------------------------------------------ // ------------------------------------------------------------------------
// Getters // Getters
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


public int getServerConnectBacklog() { public int getServerConnectBacklog() {
// default: 0 => Netty's default return config.getInteger(CONNECT_BACKLOG);
return config.getInteger(CONNECT_BACKLOG, 0);
} }


public int getNumberOfArenas() { public int getNumberOfArenas() {
// default: number of slots // default: number of slots
return config.getInteger(NUM_ARENAS, numberOfSlots); final int configValue = config.getInteger(NUM_ARENAS);
return configValue == -1 ? numberOfSlots : configValue;
} }


public int getServerNumThreads() { public int getServerNumThreads() {
// default: number of task slots // default: number of task slots
return config.getInteger(NUM_THREADS_SERVER, numberOfSlots); final int configValue = config.getInteger(NUM_THREADS_SERVER);
return configValue == -1 ? numberOfSlots : configValue;
} }


public int getClientNumThreads() { public int getClientNumThreads() {
// default: number of task slots // default: number of task slots
return config.getInteger(NUM_THREADS_CLIENT, numberOfSlots); final int configValue = config.getInteger(NUM_THREADS_CLIENT);
return configValue == -1 ? numberOfSlots : configValue;
} }


public int getClientConnectTimeoutSeconds() { public int getClientConnectTimeoutSeconds() {
// default: 120s = 2min return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS);
return config.getInteger(CLIENT_CONNECT_TIMEOUT_SECONDS, 120);
} }


public int getSendAndReceiveBufferSize() { public int getSendAndReceiveBufferSize() {
// default: 0 => Netty's default return config.getInteger(SEND_RECEIVE_BUFFER_SIZE);
return config.getInteger(SEND_RECEIVE_BUFFER_SIZE, 0);
} }


public TransportType getTransportType() { public TransportType getTransportType() {
String transport = config.getString(TRANSPORT_TYPE, "nio"); String transport = config.getString(TRANSPORT_TYPE);


if (transport.equals("nio")) { switch (transport) {
return TransportType.NIO; case "nio":
} return TransportType.NIO;
else if (transport.equals("epoll")) { case "epoll":
return TransportType.EPOLL; return TransportType.EPOLL;
} default:
else { return TransportType.AUTO;
return TransportType.AUTO;
} }
} }


Expand Down

0 comments on commit aed3b80

Please sign in to comment.