Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,17 @@ public ReplicationServer(ContainerController controller,

int replicationServerWorkers =
replicationConfig.getReplicationMaxStreams();
LOG.info("Initializing replication server with thread count = {}",
replicationConfig.getReplicationMaxStreams());
int replicationQueueLimit =
replicationConfig.getReplicationQueueLimit();
LOG.info("Initializing replication server with thread count = {}"
+ " queue length = {}",
replicationConfig.getReplicationMaxStreams(),
replicationConfig.getReplicationQueueLimit());
this.executor =
new ThreadPoolExecutor(replicationServerWorkers,
replicationServerWorkers,
60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new LinkedBlockingQueue<>(replicationQueueLimit),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ReplicationContainerReader-%d")
.build());
Expand Down Expand Up @@ -153,6 +157,7 @@ public static final class ReplicationConfig {

public static final String PREFIX = "hdds.datanode.replication";
public static final String STREAMS_LIMIT_KEY = "streams.limit";
public static final String QUEUE_LIMIT = "queue.limit";

public static final String REPLICATION_STREAMS_LIMIT_KEY =
PREFIX + "." + STREAMS_LIMIT_KEY;
Expand Down Expand Up @@ -180,6 +185,18 @@ public static final class ReplicationConfig {
)
private int replicationMaxStreams = REPLICATION_MAX_STREAMS_DEFAULT;

/**
* The maximum of replication request queue length.
*/
@Config(key = QUEUE_LIMIT,
type = ConfigType.INT,
defaultValue = "4096",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How this default value 4096 is derived for queue limit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sumitagrawl, Do you think this has something to do with the failure of ci

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@z-bb not related to CI failure, I mean what should be appropriate default value for nettyServer handling upload/download container ... as this is given to NettyServer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess 4096 this value is picked for the moment based on experience, @z-bb , right?
And Guangbao, do you remember the the biggest queue length you have ever seen before without the queue limitation imposed?

Copy link
Contributor Author

@z-bb z-bb Sep 12, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess 4096 this value is picked for the moment based on experience, @z-bb , right? And Guangbao, do you remember the the biggest queue length you have ever seen before without the queue limitation imposed?

@ChenSammi Yes, based on experience, we plan to add a monitor for the queue length. I will confirm this value later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@z-bb , I see. We can revisit the default value of this "queue.limit" property once you got more info from the monitor history.

tags = {DATANODE},
description = "The maximum number of queued requests for container " +
"replication"
)
private int replicationQueueLimit = 4096;

@Config(key = "port", defaultValue = "9886",
description = "Port used for the server2server replication server",
tags = {DATANODE, MANAGEMENT})
Expand Down Expand Up @@ -221,6 +238,14 @@ public void setReplicationMaxStreams(int replicationMaxStreams) {
this.replicationMaxStreams = replicationMaxStreams;
}

public int getReplicationQueueLimit() {
return replicationQueueLimit;
}

public void setReplicationQueueLimit(int limit) {
this.replicationQueueLimit = limit;
}

@PostConstruct
public void validate() {
if (replicationMaxStreams < 1) {
Expand Down