Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#927] Improvement: improve the control of server heartbeat #928

Merged
merged 1 commit into from
Jun 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ rss-xxx.tgz will be generated for deployment
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public Thread newThread(Runnable r) {
}

int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf);
// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
// retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = conf.getLong(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
int retryTimes = conf.getInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff

int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);

// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
// retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<

int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);

// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
// retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
Expand Down
1 change: 0 additions & 1 deletion deploy/kubernetes/operator/examples/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ data:
rss.server.hdfs.base.path hdfs://${your-hdfs-path}
rss.server.health.check.enable false
rss.server.heartbeat.interval 10000
rss.server.heartbeat.timeout 60000
rss.server.memory.shuffle.highWaterMark.percentage 70.0
rss.server.memory.shuffle.lowWaterMark.percentage 10.0
rss.server.pending.event.timeoutSec 600
Expand Down
2 changes: 0 additions & 2 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ This document will introduce how to deploy Uniffle shuffle servers.
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
Expand Down Expand Up @@ -143,7 +142,6 @@ rss.storage.basePath /data1/rssdata,/data2/rssdata....
rss.server.flush.thread.alive 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ public void testRetryAssgin() throws Throwable {
RemoteStorageInfo remoteStorage = new RemoteStorageInfo("");
ShuffleAssignmentsInfo response = null;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
int heartbeatTimeout = shuffleServerConf.getInteger("rss.server.heartbeat.timeout", 65000);
int heartbeatInterval = shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000);
Thread.sleep(heartbeatInterval * 2);
shuffleWriteClientImpl.registerCoordinators(COORDINATOR_QUORUM);
Expand Down Expand Up @@ -410,7 +409,7 @@ public void testRetryAssgin() throws Throwable {
);
});
return shuffleAssignments;
}, heartbeatTimeout, maxTryTime);
}, heartbeatInterval, maxTryTime);

assertNotNull(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,11 @@ public class RegisterHeartBeat {
private final ScheduledExecutorService service =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("startHeartBeat");
private final ExecutorService heartBeatExecutorService;
private long heartBeatTimeout;

public RegisterHeartBeat(ShuffleServer shuffleServer) {
ShuffleServerConf conf = shuffleServer.getShuffleServerConf();
this.heartBeatInitialDelay = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_DELAY);
this.heartBeatInterval = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL);
this.heartBeatTimeout = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_TIMEOUT);
this.coordinatorQuorum = conf.getString(ShuffleServerConf.RSS_COORDINATOR_QUORUM);
CoordinatorClientFactory factory =
new CoordinatorClientFactory(conf.get(ShuffleServerConf.RSS_CLIENT_TYPE));
Expand Down Expand Up @@ -107,6 +105,7 @@ boolean sendHeartBeat(
Map<String, StorageInfo> localStorageInfo,
int nettyPort) {
boolean sendSuccessfully = false;
// use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request = new RssSendHeartBeatRequest(
id,
ip,
Expand All @@ -115,7 +114,7 @@ boolean sendHeartBeat(
preAllocatedMemory,
availableMemory,
eventNumInFlush,
heartBeatTimeout,
heartBeatInterval,
tags,
isHealthy,
serverStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(10 * 1000L)
.withDescription("Heartbeat interval to Coordinator (ms)");

public static final ConfigOption<Long> SERVER_HEARTBEAT_TIMEOUT = ConfigOptions
.key("rss.server.heartbeat.timeout")
.longType()
.defaultValue(60 * 1000L)
.withDescription("rss heartbeat interval ms");

public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
.key("rss.server.flush.threadPool.size")
.intType()
Expand Down
1 change: 0 additions & 1 deletion server/src/test/resources/server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ rss.server.buffer.spill.threshold 130
rss.server.partition.buffer.size 128
rss.jetty.http.port 12345
rss.jetty.corePool.size 64
rss.server.heartbeat.timeout 1
rss.server.write.timeout 2000
rss.server.shuffleBufferManager.trigger.flush.interval 500