diff --git a/README.md b/README.md index 2bbe5129df..a4246fb548 100644 --- a/README.md +++ b/README.md @@ -187,7 +187,6 @@ rss-xxx.tgz will be generated for deployment rss.server.flush.threadPool.size 10 rss.server.buffer.capacity 40g rss.server.read.buffer.capacity 20g - rss.server.heartbeat.timeout 60000 rss.server.heartbeat.interval 10000 rss.rpc.message.max.size 1073741824 rss.server.preAllocation.expired 120000 diff --git a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java index c79dda6297..61c2531d9c 100644 --- a/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java +++ b/client-mr/core/src/main/java/org/apache/hadoop/mapreduce/v2/app/RssMRAppMaster.java @@ -195,7 +195,7 @@ public Thread newThread(Runnable r) { } int requiredAssignmentShuffleServersNum = RssMRUtils.getRequiredShuffleServerNumber(conf); - // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result + // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result long retryInterval = conf.getLong(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL, RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL_DEFAULT_VALUE); int retryTimes = conf.getInt(RssMRConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES, diff --git a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index f4740cf028..27c2226ea0 100644 --- a/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark2/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -256,7 +256,7 @@ public ShuffleHandle registerShuffle(int shuffleId, int numMaps, Shuff int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf); - // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result + // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL); int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES); diff --git a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java index d5fec8cbb7..d0892ce66e 100644 --- a/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java +++ b/client-spark/spark3/src/main/java/org/apache/spark/shuffle/RssShuffleManager.java @@ -335,7 +335,7 @@ public ShuffleHandle registerShuffle(int shuffleId, ShuffleDependency< int requiredShuffleServerNumber = RssSparkShuffleUtils.getRequiredShuffleServerNumber(sparkConf); - // retryInterval must bigger than `rss.server.heartbeat.timeout`, or maybe it will return the same result + // retryInterval must bigger than `rss.server.heartbeat.interval`, or maybe it will return the same result long retryInterval = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_INTERVAL); int retryTimes = sparkConf.get(RssSparkConfig.RSS_CLIENT_ASSIGNMENT_RETRY_TIMES); int estimateTaskConcurrency = RssSparkShuffleUtils.estimateTaskConcurrency(sparkConf); diff --git a/deploy/kubernetes/operator/examples/configuration.yaml b/deploy/kubernetes/operator/examples/configuration.yaml index 16b551a951..5fcb70b8fc 100644 --- a/deploy/kubernetes/operator/examples/configuration.yaml +++ b/deploy/kubernetes/operator/examples/configuration.yaml @@ -64,7 +64,6 @@ data: rss.server.hdfs.base.path hdfs://${your-hdfs-path} rss.server.health.check.enable false rss.server.heartbeat.interval 10000 - rss.server.heartbeat.timeout 60000 rss.server.memory.shuffle.highWaterMark.percentage 70.0 rss.server.memory.shuffle.lowWaterMark.percentage 10.0 rss.server.pending.event.timeoutSec 600 diff --git a/docs/server_guide.md b/docs/server_guide.md index 4f8011431f..ce83b93f88 100644 --- a/docs/server_guide.md +++ b/docs/server_guide.md @@ -47,7 +47,6 @@ This document will introduce how to deploy Uniffle shuffle servers. rss.server.flush.threadPool.size 10 rss.server.buffer.capacity 40g rss.server.read.buffer.capacity 20g - rss.server.heartbeat.timeout 60000 rss.server.heartbeat.interval 10000 rss.rpc.message.max.size 1073741824 rss.server.preAllocation.expired 120000 @@ -143,7 +142,6 @@ rss.storage.basePath /data1/rssdata,/data2/rssdata.... rss.server.flush.thread.alive 10 rss.server.buffer.capacity 40g rss.server.read.buffer.capacity 20g -rss.server.heartbeat.timeout 60000 rss.server.heartbeat.interval 10000 rss.rpc.message.max.size 1073741824 rss.server.preAllocation.expired 120000 diff --git a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java index 9a450ca57f..566d99e9d7 100644 --- a/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java +++ b/integration-test/common/src/test/java/org/apache/uniffle/test/ShuffleWithRssClientTest.java @@ -375,7 +375,6 @@ public void testRetryAssgin() throws Throwable { RemoteStorageInfo remoteStorage = new RemoteStorageInfo(""); ShuffleAssignmentsInfo response = null; ShuffleServerConf shuffleServerConf = getShuffleServerConf(); - int heartbeatTimeout = shuffleServerConf.getInteger("rss.server.heartbeat.timeout", 65000); int heartbeatInterval = shuffleServerConf.getInteger("rss.server.heartbeat.interval", 1000); Thread.sleep(heartbeatInterval * 2); shuffleWriteClientImpl.registerCoordinators(COORDINATOR_QUORUM); @@ -410,7 +409,7 @@ public void testRetryAssgin() throws Throwable { ); }); return shuffleAssignments; - }, heartbeatTimeout, maxTryTime); + }, heartbeatInterval, maxTryTime); assertNotNull(response); } diff --git a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java index ddb7d53a83..893b9c556f 100644 --- a/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java +++ b/server/src/main/java/org/apache/uniffle/server/RegisterHeartBeat.java @@ -51,13 +51,11 @@ public class RegisterHeartBeat { private final ScheduledExecutorService service = ThreadUtils.getDaemonSingleThreadScheduledExecutor("startHeartBeat"); private final ExecutorService heartBeatExecutorService; - private long heartBeatTimeout; public RegisterHeartBeat(ShuffleServer shuffleServer) { ShuffleServerConf conf = shuffleServer.getShuffleServerConf(); this.heartBeatInitialDelay = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_DELAY); this.heartBeatInterval = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_INTERVAL); - this.heartBeatTimeout = conf.getLong(ShuffleServerConf.SERVER_HEARTBEAT_TIMEOUT); this.coordinatorQuorum = conf.getString(ShuffleServerConf.RSS_COORDINATOR_QUORUM); CoordinatorClientFactory factory = new CoordinatorClientFactory(conf.get(ShuffleServerConf.RSS_CLIENT_TYPE)); @@ -107,6 +105,7 @@ boolean sendHeartBeat( Map localStorageInfo, int nettyPort) { boolean sendSuccessfully = false; + // use `rss.server.heartbeat.interval` as the timeout option RssSendHeartBeatRequest request = new RssSendHeartBeatRequest( id, ip, @@ -115,7 +114,7 @@ boolean sendHeartBeat( preAllocatedMemory, availableMemory, eventNumInFlush, - heartBeatTimeout, + heartBeatInterval, tags, isHealthy, serverStatus, diff --git a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java index a34c0138f2..ecd116091e 100644 --- a/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java +++ b/server/src/main/java/org/apache/uniffle/server/ShuffleServerConf.java @@ -75,12 +75,6 @@ public class ShuffleServerConf extends RssBaseConf { .defaultValue(10 * 1000L) .withDescription("Heartbeat interval to Coordinator (ms)"); - public static final ConfigOption SERVER_HEARTBEAT_TIMEOUT = ConfigOptions - .key("rss.server.heartbeat.timeout") - .longType() - .defaultValue(60 * 1000L) - .withDescription("rss heartbeat interval ms"); - public static final ConfigOption SERVER_FLUSH_THREAD_POOL_SIZE = ConfigOptions .key("rss.server.flush.threadPool.size") .intType() diff --git a/server/src/test/resources/server.conf b/server/src/test/resources/server.conf index ca7a47bfaa..585da76039 100644 --- a/server/src/test/resources/server.conf +++ b/server/src/test/resources/server.conf @@ -24,6 +24,5 @@ rss.server.buffer.spill.threshold 130 rss.server.partition.buffer.size 128 rss.jetty.http.port 12345 rss.jetty.corePool.size 64 -rss.server.heartbeat.timeout 1 rss.server.write.timeout 2000 rss.server.shuffleBufferManager.trigger.flush.interval 500