From c7f6f3a3ad69b1cc52e3e679458e5ac3324af382 Mon Sep 17 00:00:00 2001 From: zhujianghua Date: Fri, 17 Sep 2021 13:16:45 +0800 Subject: [PATCH] HDFS-16107.Split RPC configuration to isolate RPC. --- .../hadoop/fs/CommonConfigurationKeys.java | 8 +++ .../java/org/apache/hadoop/ipc/Server.java | 62 ++++++++++++++----- .../src/main/resources/core-default.xml | 41 ++++++++++++ .../conf/TestCommonConfigurationFields.java | 4 ++ .../java/org/apache/hadoop/ipc/TestRPC.java | 25 ++++++++ 5 files changed, 125 insertions(+), 15 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 1ea44df5032d6..52ee75985b770 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -62,18 +62,24 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** Responses larger than this will be logged */ public static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY = "ipc.server.max.response.size"; + public static final String SERVER_RPC_MAX_RESPONSE_SIZE_KEY = + "server.max.response.size"; /** Default value for IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY */ public static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT = 1024*1024; /** Number of threads in RPC server reading from the socket */ public static final String IPC_SERVER_RPC_READ_THREADS_KEY = "ipc.server.read.threadpool.size"; + public static final String SERVER_RPC_READ_THREADS_KEY = + "server.read.threadpool.size"; /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */ public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1; /** Number of pending connections that may be queued per socket reader */ public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY = "ipc.server.read.connection-queue.size"; + public static final String SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY = + "server.read.connection-queue.size"; /** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */ public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT = 100; @@ -94,6 +100,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { /** How many calls per handler are allowed in the queue. */ public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = "ipc.server.handler.queue.size"; + public static final String SERVER_HANDLER_QUEUE_SIZE_KEY = + "server.handler.queue.size"; /** Default value for IPC_SERVER_HANDLER_QUEUE_SIZE_KEY */ public static final int IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 922c6e937fb9b..2181a3d0cda36 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -830,9 +830,13 @@ static Class getSchedulerClass( public synchronized void refreshCallQueue(Configuration conf) { // Create the next queue String prefix = getQueueClassPrefix(); - this.maxQueueSize = handlerCount * conf.getInt( - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + this.maxQueueSize = conf.getInt(prefix + "." + + CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 0); + if (this.maxQueueSize < 1) { + this.maxQueueSize = handlerCount * conf.getInt( + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + } callQueue.swapQueue( getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf), @@ -3192,23 +3196,41 @@ protected Server(String bindAddress, int port, if (queueSizePerHandler != -1) { this.maxQueueSize = handlerCount * queueSizePerHandler; } else { - this.maxQueueSize = handlerCount * conf.getInt( - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, - CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + this.maxQueueSize = conf.getInt(getQueueClassPrefix() + "." + + CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 0); + if (this.maxQueueSize < 1) { + this.maxQueueSize = handlerCount * conf.getInt( + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT); + } + } + int tmpMaxRespSize = conf.getInt(getQueueClassPrefix() + "." + + CommonConfigurationKeys.SERVER_RPC_MAX_RESPONSE_SIZE_KEY, 0); + if (tmpMaxRespSize < 1) { + this.maxRespSize = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); + } else { + this.maxRespSize = tmpMaxRespSize; } - this.maxRespSize = conf.getInt( - CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY, - CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT); if (numReaders != -1) { this.readThreads = numReaders; } else { - this.readThreads = conf.getInt( - CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, - CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + this.readThreads = conf.getInt(getQueueClassPrefix() + "." + + CommonConfigurationKeys.SERVER_RPC_READ_THREADS_KEY, 0); + if (this.readThreads < 1) { + this.readThreads = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); + } + } + this.readerPendingConnectionQueue = conf.getInt(getQueueClassPrefix() + "." + + CommonConfigurationKeys.SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, 0); + if (this.readerPendingConnectionQueue < 1) { + this.readerPendingConnectionQueue = conf.getInt( + CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, + CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); } - this.readerPendingConnectionQueue = conf.getInt( - CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, - CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); @@ -3691,6 +3713,16 @@ public int getMaxQueueSize() { return maxQueueSize; } + @VisibleForTesting + public int getMaxRespSize() { + return maxRespSize; + } + + @VisibleForTesting + public int getReaderPendingConnectionQueue() { + return readerPendingConnectionQueue; + } + /** * The number of reader threads for this server. * @return The number of reader threads. diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 4d289a71b5b31..8364c980b8da2 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -2501,6 +2501,47 @@ + + ipc.[port_number].server.max.response.size + + The maximum response size of the server. + [port_number] is the port used by the IPC server to be configured. + For example, ipc.8020.server.max.response.size will adjust the maximum + response size implementation for the IPC server running at port 8020. + + + + + ipc.[port_number].server.read.threadpool.size + + The maximum thread pool size for the server to read request data. + [port_number] is the port used by the IPC server to be configured. + For example, ipc.8020.server.read.threadpool.size will adjust the maximum + thread pool size implementation for the IPC server running at port 8020. + + + + + ipc.[port_number].server.read.connection-queue.size + + The size of the queue for the server to read request + connection data. [port_number] is the port used by the IPC server + to be configured. For example, ipc.8020.server.read.connection-queue.size + will adjust the connection queue size implementation for the IPC server + running at port 8020. + + + + + ipc.[port_number].server.handler.queue.size + + The queue size of server handler.[port_number] is the port + used by the IPC server to be configured. For example, + ipc.8020.server.handler.queue.size will adjust the handler queue size + implementation for the IPC server running at port 8020. + + + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java index 9fcf4a5eb55a2..c24b6db1e1531 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/conf/TestCommonConfigurationFields.java @@ -184,6 +184,10 @@ public void initializeMemberVariables() { xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.handler"); xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.lockfree"); xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.response"); + xmlPropsToSkipCompare.add("ipc.[port_number].server.max.response.size"); + xmlPropsToSkipCompare.add("ipc.[port_number].server.read.threadpool.size"); + xmlPropsToSkipCompare.add("ipc.[port_number].server.read.connection-queue.size"); + xmlPropsToSkipCompare.add("ipc.[port_number].server.handler.queue.size"); // Deprecated properties. These should eventually be removed from the // class. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index b78900b609e54..9a5bf1356cbf6 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -371,6 +371,31 @@ public void testConfRpc() throws IOException { assertEquals(2 * 10, server.getMaxQueueSize()); } + @Test + public void testConfRpc2() throws IOException { + Configuration tmpConf = new Configuration(); + tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." + + CommonConfigurationKeys.SERVER_RPC_MAX_RESPONSE_SIZE_KEY, 3); + tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." + + CommonConfigurationKeys.SERVER_RPC_READ_THREADS_KEY, 3); + tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." + + CommonConfigurationKeys.SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, 3); + tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." + + CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 3); + Server server = newServerBuilder(tmpConf) + .setNumHandlers(1).setVerbose(false).setPort(PORT).build(); + assertEquals(3, server.getMaxRespSize()); + assertEquals(3, server.getNumReaders()); + assertEquals(3, server.getReaderPendingConnectionQueue()); + assertEquals(3, server.getMaxQueueSize()); + + tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + + server.getPort() + "." + + CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 5); + server.refreshCallQueue(tmpConf); + assertEquals(5, server.getMaxQueueSize()); + } + @Test public void testProxyAddress() throws Exception { Server server = null;