diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 1ea44df5032d6..52ee75985b770 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -62,18 +62,24 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Responses larger than this will be logged */
public static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
"ipc.server.max.response.size";
+ public static final String SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
+ "server.max.response.size";
/** Default value for IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY */
public static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT =
1024*1024;
/** Number of threads in RPC server reading from the socket */
public static final String IPC_SERVER_RPC_READ_THREADS_KEY =
"ipc.server.read.threadpool.size";
+ public static final String SERVER_RPC_READ_THREADS_KEY =
+ "server.read.threadpool.size";
/** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
/** Number of pending connections that may be queued per socket reader */
public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY =
"ipc.server.read.connection-queue.size";
+ public static final String SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY =
+ "server.read.connection-queue.size";
/** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
100;
@@ -94,6 +100,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** How many calls per handler are allowed in the queue. */
public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
"ipc.server.handler.queue.size";
+ public static final String SERVER_HANDLER_QUEUE_SIZE_KEY =
+ "server.handler.queue.size";
/** Default value for IPC_SERVER_HANDLER_QUEUE_SIZE_KEY */
public static final int IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index 922c6e937fb9b..2181a3d0cda36 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -830,9 +830,13 @@ static Class extends RpcScheduler> getSchedulerClass(
public synchronized void refreshCallQueue(Configuration conf) {
// Create the next queue
String prefix = getQueueClassPrefix();
- this.maxQueueSize = handlerCount * conf.getInt(
- CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
- CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+ this.maxQueueSize = conf.getInt(prefix + "." +
+ CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 0);
+ if (this.maxQueueSize < 1) {
+ this.maxQueueSize = handlerCount * conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
+ CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+ }
callQueue.swapQueue(
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
@@ -3192,23 +3196,41 @@ protected Server(String bindAddress, int port,
if (queueSizePerHandler != -1) {
this.maxQueueSize = handlerCount * queueSizePerHandler;
} else {
- this.maxQueueSize = handlerCount * conf.getInt(
- CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
- CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+ this.maxQueueSize = conf.getInt(getQueueClassPrefix() + "." +
+ CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 0);
+ if (this.maxQueueSize < 1) {
+ this.maxQueueSize = handlerCount * conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
+ CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
+ }
+ }
+ int tmpMaxRespSize = conf.getInt(getQueueClassPrefix() + "." +
+ CommonConfigurationKeys.SERVER_RPC_MAX_RESPONSE_SIZE_KEY, 0);
+ if (tmpMaxRespSize < 1) {
+ this.maxRespSize = conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
+ CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
+ } else {
+ this.maxRespSize = tmpMaxRespSize;
}
- this.maxRespSize = conf.getInt(
- CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
- CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
if (numReaders != -1) {
this.readThreads = numReaders;
} else {
- this.readThreads = conf.getInt(
- CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
- CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
+ this.readThreads = conf.getInt(getQueueClassPrefix() + "." +
+ CommonConfigurationKeys.SERVER_RPC_READ_THREADS_KEY, 0);
+ if (this.readThreads < 1) {
+ this.readThreads = conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
+ }
+ }
+ this.readerPendingConnectionQueue = conf.getInt(getQueueClassPrefix() + "." +
+ CommonConfigurationKeys.SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, 0);
+ if (this.readerPendingConnectionQueue < 1) {
+ this.readerPendingConnectionQueue = conf.getInt(
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
+ CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
}
- this.readerPendingConnectionQueue = conf.getInt(
- CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
- CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
// Setup appropriate callqueue
final String prefix = getQueueClassPrefix();
@@ -3691,6 +3713,16 @@ public int getMaxQueueSize() {
return maxQueueSize;
}
+ @VisibleForTesting
+ public int getMaxRespSize() {
+ return maxRespSize;
+ }
+
+ @VisibleForTesting
+ public int getReaderPendingConnectionQueue() {
+ return readerPendingConnectionQueue;
+ }
+
/**
* The number of reader threads for this server.
* @return The number of reader threads.
diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index 4d289a71b5b31..8364c980b8da2 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2501,6 +2501,47 @@
+
+ ipc.[port_number].server.max.response.size
+
+ The maximum response size of the server.
+ [port_number] is the port used by the IPC server to be configured.
+ For example, ipc.8020.server.max.response.size will adjust the maximum
+ response size implementation for the IPC server running at port 8020.
+
+
+
+
+ ipc.[port_number].server.read.threadpool.size
+
+ The maximum thread pool size for the server to read request data.
+ [port_number] is the port used by the IPC server to be configured.
+ For example, ipc.8020.server.read.threadpool.size will adjust the maximum
+ thread pool size implementation for the IPC server running at port 8020.
+
+
+
+
+ ipc.[port_number].server.read.connection-queue.size
+
+ The size of the queue for the server to read request
+ connection data. [port_number] is the port used by the IPC server
+ to be configured. For example, ipc.8020.server.read.connection-queue.size
+ will adjust the connection queue size implementation for the IPC server
+ running at port 8020.
+
+
+
+
+ ipc.[port_number].server.handler.queue.size
+
+ The queue size of server handler.[port_number] is the port
+ used by the IPC server to be configured. For example,
+ ipc.8020.server.handler.queue.size will adjust the handler queue size
+ implementation for the IPC server running at port 8020.
+
+
+
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
index 9fcf4a5eb55a2..c24b6db1e1531 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java
@@ -184,6 +184,10 @@ public void initializeMemberVariables() {
xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.handler");
xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.lockfree");
xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.response");
+ xmlPropsToSkipCompare.add("ipc.[port_number].server.max.response.size");
+ xmlPropsToSkipCompare.add("ipc.[port_number].server.read.threadpool.size");
+ xmlPropsToSkipCompare.add("ipc.[port_number].server.read.connection-queue.size");
+ xmlPropsToSkipCompare.add("ipc.[port_number].server.handler.queue.size");
// Deprecated properties. These should eventually be removed from the
// class.
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
index b78900b609e54..9a5bf1356cbf6 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java
@@ -371,6 +371,31 @@ public void testConfRpc() throws IOException {
assertEquals(2 * 10, server.getMaxQueueSize());
}
+ @Test
+ public void testConfRpc2() throws IOException {
+ Configuration tmpConf = new Configuration();
+ tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." +
+ CommonConfigurationKeys.SERVER_RPC_MAX_RESPONSE_SIZE_KEY, 3);
+ tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." +
+ CommonConfigurationKeys.SERVER_RPC_READ_THREADS_KEY, 3);
+ tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." +
+ CommonConfigurationKeys.SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, 3);
+ tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." +
+ CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 3);
+ Server server = newServerBuilder(tmpConf)
+ .setNumHandlers(1).setVerbose(false).setPort(PORT).build();
+ assertEquals(3, server.getMaxRespSize());
+ assertEquals(3, server.getNumReaders());
+ assertEquals(3, server.getReaderPendingConnectionQueue());
+ assertEquals(3, server.getMaxQueueSize());
+
+ tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." +
+ server.getPort() + "." +
+ CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 5);
+ server.refreshCallQueue(tmpConf);
+ assertEquals(5, server.getMaxQueueSize());
+ }
+
@Test
public void testProxyAddress() throws Exception {
Server server = null;