Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,26 @@ public final class ConfigConstants {
public static final String TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY = "taskmanager.network.bufferSizeInBytes";

/**
* The number of incoming connection threads used in NettyConnectionManager for the ServerBootstrap.
* The number of incoming network IO threads (e.g. incoming connection threads used in NettyConnectionManager
* for the ServerBootstrap.)
*/
public static final String TASK_MANAGER_NETTY_NUM_IN_THREADS_KEY = "taskmanager.netty.numInThreads";
public static final String TASK_MANAGER_NET_NUM_IN_THREADS_KEY = "taskmanager.net.numInThreads";

/**
* The number of outgoing connection threads used in NettyConnectionManager for the Bootstrap.
* The number of outgoing network IO threads (e.g. outgoing connection threads used in NettyConnectionManager for
* the Bootstrap.)
*/
public static final String TASK_MANAGER_NETTY_NUM_OUT_THREADS_KEY = "taskmanager.netty.numOutThreads";
public static final String TASK_MANAGER_NET_NUM_OUT_THREADS_KEY = "taskmanager.net.numOutThreads";

/**
* The low water mark used in NettyConnectionManager for the Bootstrap.
*/
public static final String TASK_MANAGER_NETTY_LOW_WATER_MARK = "taskmanager.netty.lowWaterMark";
public static final String TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = "taskmanager.net.nettyLowWaterMark";

/**
* The high water mark used in NettyConnectionManager for the Bootstrap.
*/
public static final String TASK_MANAGER_NETTY_HIGH_WATER_MARK = "taskmanager.netty.highWaterMark";
public static final String TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = "taskmanager.net.nettyHighWaterMark";

/**
* Parameter for the interval in which the RaskManager sends the periodic heart beat messages
Expand Down Expand Up @@ -333,28 +335,30 @@ public final class ConfigConstants {
public static final int DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE = 32768;

/**
* Default number of incoming connection threads used in NettyConnectionManager for the ServerBootstrap. If set
* to -1, NettyConnectionManager will pick a reasonable default depending on the number of cores of the machine.
* Default number of incoming network IO threads (e.g. number of incoming connection threads used in
* NettyConnectionManager for the ServerBootstrap). If set to -1, a reasonable default depending on the number of
* cores will be picked.
*/
public static final int DEFAULT_TASK_MANAGER_NETTY_NUM_IN_THREADS = -1;
public static final int DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS = -1;

/**
* Default number of outgoing connection threads used in NettyConnectionManager for the Bootstrap. If set
* to -1, NettyConnectionManager will pick a reasonable default depending on the number of cores of the machine.
* Default number of outgoing network IO threads (e.g. number of outgoing connection threads used in
* NettyConnectionManager for the Bootstrap). If set to -1, a reasonable default depending on the number of cores
* will be picked.
*/
public static final int DEFAULT_TASK_MANAGER_NETTY_NUM_OUT_THREADS = -1;
public static final int DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS = -1;

/**
* Default low water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
* will use half of the network buffer size as the low water mark.
*/
public static final int DEFAULT_TASK_MANAGER_NETTY_LOW_WATER_MARK = -1;
public static final int DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = -1;

/**
* Default high water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
* will use the network buffer size as the high water mark.
*/
public static final int DEFAULT_TASK_MANAGER_NETTY_HIGH_WATER_MARK = -1;
public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;

/**
* The default interval for TaskManager heart beats (2000 msecs).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,6 @@ public TaskManager() throws Exception {
final int pageSize = GlobalConfiguration.getInteger(ConfigConstants.TASK_MANAGER_NETWORK_BUFFER_SIZE_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);

// Initialize network buffer pool
int numBuffers = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETWORK_NUM_BUFFERS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_NUM_BUFFERS);
Expand All @@ -287,20 +286,20 @@ public TaskManager() throws Exception {
ConfigConstants.DEFAULT_TASK_MANAGER_NETWORK_BUFFER_SIZE);

int numInThreads = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETTY_NUM_IN_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_NUM_IN_THREADS);
ConfigConstants.TASK_MANAGER_NET_NUM_IN_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_IN_THREADS);

int numOutThreads = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETTY_NUM_OUT_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_NUM_OUT_THREADS);
ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);

int lowWaterMark = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETTY_LOW_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_LOW_WATER_MARK);
ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);

int highWaterMark = GlobalConfiguration.getInteger(
ConfigConstants.TASK_MANAGER_NETTY_HIGH_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NETTY_HIGH_WATER_MARK);
ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);

// Initialize the channel manager
try {
Expand All @@ -309,7 +308,7 @@ public TaskManager() throws Exception {
numBuffers, bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
} catch (IOException ioe) {
LOG.error(StringUtils.stringifyException(ioe));
throw new Exception("Failed to instantiate Byte-buffered channel manager. " + ioe.getMessage(), ioe);
throw new Exception("Failed to instantiate channel manager. " + ioe.getMessage(), ioe);
}

{
Expand Down