From b821ab5e91bd2393f0d16990c7f57d129cd4b2a4 Mon Sep 17 00:00:00 2001 From: zhaoguangbao Date: Thu, 24 Aug 2023 16:03:31 +0800 Subject: [PATCH] HDDS-9208. Add queue limit in ReplicationServer. --- .../replication/ReplicationServer.java | 31 +++++++++++++++++-- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java index 657a4dbe6d46..a2e0209f6437 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationServer.java @@ -78,13 +78,17 @@ public ReplicationServer(ContainerController controller, int replicationServerWorkers = replicationConfig.getReplicationMaxStreams(); - LOG.info("Initializing replication server with thread count = {}", - replicationConfig.getReplicationMaxStreams()); + int replicationQueueLimit = + replicationConfig.getReplicationQueueLimit(); + LOG.info("Initializing replication server with thread count = {}" + + " queue length = {}", + replicationConfig.getReplicationMaxStreams(), + replicationConfig.getReplicationQueueLimit()); this.executor = new ThreadPoolExecutor(replicationServerWorkers, replicationServerWorkers, 60, TimeUnit.SECONDS, - new LinkedBlockingQueue<>(), + new LinkedBlockingQueue<>(replicationQueueLimit), new ThreadFactoryBuilder().setDaemon(true) .setNameFormat("ReplicationContainerReader-%d") .build()); @@ -153,6 +157,7 @@ public static final class ReplicationConfig { public static final String PREFIX = "hdds.datanode.replication"; public static final String STREAMS_LIMIT_KEY = "streams.limit"; + public static final String QUEUE_LIMIT = "queue.limit"; public static final String REPLICATION_STREAMS_LIMIT_KEY = PREFIX + "." + STREAMS_LIMIT_KEY; @@ -180,6 +185,18 @@ public static final class ReplicationConfig { ) private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT; + /** + * The maximum of replication request queue length. + */ + @Config(key = QUEUE_LIMIT, + type = ConfigType.INT, + defaultValue = "4096", + tags = {DATANODE}, + description = "The maximum number of queued requests for container " + + "replication" + ) + private int replicationQueueLimit = 4096; + @Config(key = "port", defaultValue = "9886", description = "Port used for the server2server replication server", tags = {DATANODE, MANAGEMENT}) @@ -221,6 +238,14 @@ public void setReplicationMaxStreams(int replicationMaxStreams) { this.replicationMaxStreams = replicationMaxStreams; } + public int getReplicationQueueLimit() { + return replicationQueueLimit; + } + + public void setReplicationQueueLimit(int limit) { + this.replicationQueueLimit = limit; + } + @PostConstruct public void validate() { if (replicationMaxStreams < 1) {