Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,24 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Responses larger than this will be logged */
public static final String IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
"ipc.server.max.response.size";
public static final String SERVER_RPC_MAX_RESPONSE_SIZE_KEY =
"server.max.response.size";
/** Default value for IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY */
public static final int IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT =
1024*1024;
/** Number of threads in RPC server reading from the socket */
public static final String IPC_SERVER_RPC_READ_THREADS_KEY =
"ipc.server.read.threadpool.size";
public static final String SERVER_RPC_READ_THREADS_KEY =
"server.read.threadpool.size";
/** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;

/** Number of pending connections that may be queued per socket reader */
public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY =
"ipc.server.read.connection-queue.size";
public static final String SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY =
"server.read.connection-queue.size";
/** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
100;
Expand All @@ -94,6 +100,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** How many calls per handler are allowed in the queue. */
public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY =
"ipc.server.handler.queue.size";
public static final String SERVER_HANDLER_QUEUE_SIZE_KEY =
"server.handler.queue.size";
/** Default value for IPC_SERVER_HANDLER_QUEUE_SIZE_KEY */
public static final int IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT = 100;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -830,9 +830,13 @@ static Class<? extends RpcScheduler> getSchedulerClass(
public synchronized void refreshCallQueue(Configuration conf) {
// Create the next queue
String prefix = getQueueClassPrefix();
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
this.maxQueueSize = conf.getInt(prefix + "." +
CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 0);
if (this.maxQueueSize < 1) {
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
}
callQueue.swapQueue(
getSchedulerClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
getQueueClass(CommonConfigurationKeys.IPC_NAMESPACE, port, conf),
Expand Down Expand Up @@ -3192,23 +3196,41 @@ protected Server(String bindAddress, int port,
if (queueSizePerHandler != -1) {
this.maxQueueSize = handlerCount * queueSizePerHandler;
} else {
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
this.maxQueueSize = conf.getInt(getQueueClassPrefix() + "." +
CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 0);
if (this.maxQueueSize < 1) {
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
}
}
int tmpMaxRespSize = conf.getInt(getQueueClassPrefix() + "." +
CommonConfigurationKeys.SERVER_RPC_MAX_RESPONSE_SIZE_KEY, 0);
if (tmpMaxRespSize < 1) {
this.maxRespSize = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
} else {
this.maxRespSize = tmpMaxRespSize;
}
this.maxRespSize = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
if (numReaders != -1) {
this.readThreads = numReaders;
} else {
this.readThreads = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
this.readThreads = conf.getInt(getQueueClassPrefix() + "." +
CommonConfigurationKeys.SERVER_RPC_READ_THREADS_KEY, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @jianghuazhu , if we share default value with IPC_SERVER_RPC_READ_THREADS_KEY, maybe we could change

this.readThreads = conf.getInt(getQueueClassPrefix() + "." +
          CommonConfigurationKeys.SERVER_RPC_READ_THREADS_KEY, 0);
if (this.readThreads < 1) {
        this.readThreads = conf.getInt(
            CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
            CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
}

to
this.readThreads = conf.getInt(prefix + "." + CommonConfigurationKeys.SERVER_RPC_READ_THREADS_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);.

What do you think?

Copy link
Contributor Author

@jianghuazhu jianghuazhu Sep 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @tomscut for the comment.
If there is an RPC service whose port is 8020, we hope that the number of read thread pools allocated for this RPC (ipc.8020.server.read.threadpool.size) is different from other RPCs.
The logic here is:

  1. First judge whether ipc.8020.server.read.threadpool.size has been allocated. If it is not allocated, the value we get should be 0, indicating that we want to use the public unified configuration (ipc.server.read.threadpool .size).
  2. If it is not allocated, we should use the public unified configuration ipc.server.read.threadpool.size.
    At present, CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT is set to 1. If we use this attribute, we can still get a valid value when ipc.8020.server.read.threadpool.size is not set, which may cause a Ambiguity.

This is my idea, welcome to continue to communicate. @tomscut

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @jianghuazhu for your comments.

@jojochuang @virajjasani Could you please review this PR. Thanks.

if (this.readThreads < 1) {
this.readThreads = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
}
}
this.readerPendingConnectionQueue = conf.getInt(getQueueClassPrefix() + "." +
CommonConfigurationKeys.SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, 0);
if (this.readerPendingConnectionQueue < 1) {
this.readerPendingConnectionQueue = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
}
this.readerPendingConnectionQueue = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);

// Setup appropriate callqueue
final String prefix = getQueueClassPrefix();
Expand Down Expand Up @@ -3691,6 +3713,16 @@ public int getMaxQueueSize() {
return maxQueueSize;
}

@VisibleForTesting
public int getMaxRespSize() {
return maxRespSize;
}

@VisibleForTesting
public int getReaderPendingConnectionQueue() {
return readerPendingConnectionQueue;
}

/**
* The number of reader threads for this server.
* @return The number of reader threads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2501,6 +2501,47 @@
</description>
</property>

<property>
<name>ipc.[port_number].server.max.response.size</name>
<value></value>
<description>The maximum response size of the server.
[port_number] is the port used by the IPC server to be configured.
For example, ipc.8020.server.max.response.size will adjust the maximum
response size implementation for the IPC server running at port 8020.
</description>
</property>

<property>
<name>ipc.[port_number].server.read.threadpool.size</name>
<value></value>
<description>The maximum thread pool size for the server to read request data.
[port_number] is the port used by the IPC server to be configured.
For example, ipc.8020.server.read.threadpool.size will adjust the maximum
thread pool size implementation for the IPC server running at port 8020.
</description>
</property>

<property>
<name>ipc.[port_number].server.read.connection-queue.size</name>
<value></value>
<description>The size of the queue for the server to read request
connection data. [port_number] is the port used by the IPC server
to be configured. For example, ipc.8020.server.read.connection-queue.size
will adjust the connection queue size implementation for the IPC server
running at port 8020.
</description>
</property>

<property>
<name>ipc.[port_number].server.handler.queue.size</name>
<value></value>
<description>The queue size of server handler.[port_number] is the port
used by the IPC server to be configured. For example,
ipc.8020.server.handler.queue.size will adjust the handler queue size
implementation for the IPC server running at port 8020.
</description>
</property>

<!-- FairCallQueue properties -->
<!-- See FairCallQueue documentation for a table of all properties -->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,10 @@ public void initializeMemberVariables() {
xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.handler");
xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.lockfree");
xmlPropsToSkipCompare.add("ipc.[port_number].weighted-cost.response");
xmlPropsToSkipCompare.add("ipc.[port_number].server.max.response.size");
xmlPropsToSkipCompare.add("ipc.[port_number].server.read.threadpool.size");
xmlPropsToSkipCompare.add("ipc.[port_number].server.read.connection-queue.size");
xmlPropsToSkipCompare.add("ipc.[port_number].server.handler.queue.size");

// Deprecated properties. These should eventually be removed from the
// class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,31 @@ public void testConfRpc() throws IOException {
assertEquals(2 * 10, server.getMaxQueueSize());
}

@Test
public void testConfRpc2() throws IOException {
Configuration tmpConf = new Configuration();
tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." +
CommonConfigurationKeys.SERVER_RPC_MAX_RESPONSE_SIZE_KEY, 3);
tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." +
CommonConfigurationKeys.SERVER_RPC_READ_THREADS_KEY, 3);
tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." +
CommonConfigurationKeys.SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, 3);
tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." + PORT + "." +
CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 3);
Server server = newServerBuilder(tmpConf)
.setNumHandlers(1).setVerbose(false).setPort(PORT).build();
assertEquals(3, server.getMaxRespSize());
assertEquals(3, server.getNumReaders());
assertEquals(3, server.getReaderPendingConnectionQueue());
assertEquals(3, server.getMaxQueueSize());

tmpConf.setInt(CommonConfigurationKeys.IPC_NAMESPACE + "." +
server.getPort() + "." +
CommonConfigurationKeys.SERVER_HANDLER_QUEUE_SIZE_KEY, 5);
server.refreshCallQueue(tmpConf);
assertEquals(5, server.getMaxQueueSize());
}

@Test
public void testProxyAddress() throws Exception {
Server server = null;
Expand Down