Skip to content

Commit

Permalink
Eleminate rss.server.heartbeat.timeout replaced with rss.server.heart…
Browse files Browse the repository at this point in the history
…beat.interval
  • Loading branch information
summaryzb committed Jun 5, 2023
1 parent d09cdc2 commit 52a457c
Show file tree
Hide file tree
Showing 10 changed files with 6 additions and 19 deletions.
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@ rss-xxx.tgz will be generated for deployment
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ public Thread newThread(Runnable r) {
}

int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf);
// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
// retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = conf.getLong(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL,
RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE);
int retryTimes = conf.getInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff

int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);

// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
// retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public <K, V, C> ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency<

int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf);

// retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result
// retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result
long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL);
int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES);
int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf);
Expand Down
1 change: 0 additions & 1 deletion deploy/kubernetes/operator/examples/configuration.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ data:
rss.server.hdfs.base.path hdfs://${your-hdfs-path}
rss.server.health.check.enable false
rss.server.heartbeat.interval 10000
rss.server.heartbeat.timeout 60000
rss.server.memory.shuffle.highWaterMark.percentage 70.0
rss.server.memory.shuffle.lowWaterMark.percentage 10.0
rss.server.pending.event.timeoutSec 600
Expand Down
2 changes: 0 additions & 2 deletions docs/server_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ This document will introduce how to deploy Uniffle shuffle servers.
rss.server.flush.threadPool.size 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
Expand Down Expand Up @@ -143,7 +142,6 @@ rss.storage.basePath /data1/rssdata,/data2/rssdata....
rss.server.flush.thread.alive 10
rss.server.buffer.capacity 40g
rss.server.read.buffer.capacity 20g
rss.server.heartbeat.timeout 60000
rss.server.heartbeat.interval 10000
rss.rpc.message.max.size 1073741824
rss.server.preAllocation.expired 120000
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,7 +375,6 @@ public void testRetryAssgin() throws Throwable {
RemoteStorageInfo remoteStorage = new RemoteStorageInfo("");
ShuffleAssignmentsInfo response = null;
ShuffleServerConf shuffleServerConf = getShuffleServerConf();
int heartbeatTimeout = shuffleServerConf.getInteger("rss.server.heartbeat.timeout", 65000);
int heartbeatInterval = shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000);
Thread.sleep(heartbeatInterval * 2);
shuffleWriteClientImpl.registerCoordinators(COORDINATOR_QUORUM);
Expand Down Expand Up @@ -410,7 +409,7 @@ public void testRetryAssgin() throws Throwable {
);
});
return shuffleAssignments;
}, heartbeatTimeout, maxTryTime);
}, heartbeatInterval, maxTryTime);

assertNotNull(response);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,11 @@ public class RegisterHeartBeat {
private final ScheduledExecutorService service =
ThreadUtils.getDaemonSingleThreadScheduledExecutor("startHeartBeat");
private final ExecutorService heartBeatExecutorService;
private long heartBeatTimeout;

public RegisterHeartBeat(ShuffleServer shuffleServer) {
ShuffleServerConf conf = shuffleServer.getShuffleServerConf();
this.heartBeatInitialDelay = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_DELAY);
this.heartBeatInterval = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL);
this.heartBeatTimeout = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_TIMEOUT);
this.coordinatorQuorum = conf.getString(ShuffleServerConf.RSS_COORDINATOR_QUORUM);
CoordinatorClientFactory factory =
new CoordinatorClientFactory(conf.get(ShuffleServerConf.RSS_CLIENT_TYPE));
Expand Down Expand Up @@ -107,6 +105,7 @@ boolean sendHeartBeat(
Map<String, StorageInfo> localStorageInfo,
int nettyPort) {
boolean sendSuccessfully = false;
// use `rss.server.heartbeat.interval` as the timeout option
RssSendHeartBeatRequest request = new RssSendHeartBeatRequest(
id,
ip,
Expand All @@ -115,7 +114,7 @@ boolean sendHeartBeat(
preAllocatedMemory,
availableMemory,
eventNumInFlush,
heartBeatTimeout,
heartBeatInterval,
tags,
isHealthy,
serverStatus,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,6 @@ public class ShuffleServerConf extends RssBaseConf {
.defaultValue(10 * 1000L)
.withDescription("Heartbeat interval to Coordinator (ms)");

public static final ConfigOption<Long> SERVER_HEARTBEAT_TIMEOUT = ConfigOptions
.key("rss.server.heartbeat.timeout")
.longType()
.defaultValue(60 * 1000L)
.withDescription("rss heartbeat interval ms");

public static final ConfigOption<Integer> SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions
.key("rss.server.flush.threadPool.size")
.intType()
Expand Down
1 change: 0 additions & 1 deletion server/src/test/resources/server.conf
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,5 @@ rss.server.buffer.spill.threshold 130
rss.server.partition.buffer.size 128
rss.jetty.http.port 12345
rss.jetty.corePool.size 64
rss.server.heartbeat.timeout 1
rss.server.write.timeout 2000
rss.server.shuffleBufferManager.trigger.flush.interval 500

0 comments on commit 52a457c

Please sign in to comment.