From d034debf5c4610c44c0ea0e7a4ec7eced025feca Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Tue, 9 Aug 2022 14:55:50 +0800 Subject: [PATCH 1/4] [Log Improvement] Output the registering/lost/exclude nodes in log --- .../coordinator/SimpleClusterManager.java | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java index 83b29c8915..330377adf3 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java @@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -59,6 +60,7 @@ public class SimpleClusterManager implements ClusterManager { private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService checkNodesExecutorService; private FileSystem hadoopFileSystem; + private long outputAliveServerCount = 0; public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) throws IOException { this.shuffleNodesMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX); @@ -99,6 +101,13 @@ void nodesCheck() { } } } + if (!deleteIds.isEmpty() || outputAliveServerCount % 30 == 0) { + LOG.info("Alive servers number: {}, ids: {}", + servers.size(), + servers.keySet().stream().collect(Collectors.toList()) + ); + } + outputAliveServerCount++; CoordinatorMetrics.gaugeTotalServerNum.set(servers.size()); } catch (Exception e) { @@ -107,6 +116,7 @@ void nodesCheck() { } private void updateExcludeNodes(String path) { + int originalExcludeNodesNumber = excludeNodes.size(); try { Path hadoopPath = new Path(path); FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath); @@ -118,12 +128,16 @@ private void updateExcludeNodes(String path) { } else { excludeNodes = Sets.newConcurrentHashSet(); } - CoordinatorMetrics.gaugeExcludeServerNum.set(excludeNodes.size()); } catch (FileNotFoundException fileNotFoundException) { excludeNodes = Sets.newConcurrentHashSet(); } catch (Exception e) { LOG.warn("Error when updating exclude nodes, the exclude nodes file path: " + path, e); } + int newlyExcludeNodesNumber = excludeNodes.size(); + if (newlyExcludeNodesNumber != originalExcludeNodesNumber) { + LOG.info("Exclude nodes number: {}, nodes list: {}", newlyExcludeNodesNumber, excludeNodes); + } + CoordinatorMetrics.gaugeExcludeServerNum.set(excludeNodes.size()); } private void parseExcludeNodesFile(DataInputStream fsDataInputStream) throws IOException { @@ -143,6 +157,9 @@ private void parseExcludeNodesFile(DataInputStream fsDataInputStream) throws IOE @Override public void add(ServerNode node) { + if (!servers.containsKey(node.getId())) { + LOG.info("Newly registering node: {}", node.getId()); + } servers.put(node.getId(), node); Set tags = node.getTags(); // remove node with all tags to deal with the situation of tag change From 00024915eb1a7cf01a6fc0ab6768004ef4be776e Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Wed, 10 Aug 2022 10:33:33 +0800 Subject: [PATCH 2/4] config --- .../org/apache/uniffle/coordinator/CoordinatorConf.java | 6 ++++++ .../apache/uniffle/coordinator/SimpleClusterManager.java | 6 +++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index ebb50ceff7..ab81b964a3 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -47,6 +47,12 @@ public class CoordinatorConf extends RssBaseConf { .longType() .defaultValue(30 * 1000L) .withDescription("timeout if can't get heartbeat from shuffle server"); + public static final ConfigOption COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES = ConfigOptions + .key("rss.coordinator.nodes.periodic.output.interval.times") + .longType() + .defaultValue(30L) + .withDescription("The periodic interval times of output alive nodes. The interval sec can be calculated by (" + + COORDINATOR_HEARTBEAT_TIMEOUT.key() + "/3 * rss.coordinator.nodes.periodic.output.interval.times)"); public static final ConfigOption COORDINATOR_ASSIGNMENT_STRATEGY = ConfigOptions .key("rss.coordinator.assignment.strategy") .stringType() diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java index 330377adf3..efba646c43 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java @@ -60,7 +60,9 @@ public class SimpleClusterManager implements ClusterManager { private ScheduledExecutorService scheduledExecutorService; private ScheduledExecutorService checkNodesExecutorService; private FileSystem hadoopFileSystem; + private long outputAliveServerCount = 0; + private final long periodicOutputIntervalTimes; public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) throws IOException { this.shuffleNodesMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX); @@ -68,6 +70,8 @@ public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) thro // the thread for checking if shuffle server report heartbeat in time scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( ThreadUtils.getThreadFactory("SimpleClusterManager-%d")); + + periodicOutputIntervalTimes = conf.get(CoordinatorConf.COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES); scheduledExecutorService.scheduleAtFixedRate( () -> nodesCheck(), heartbeatTimeout / 3, heartbeatTimeout / 3, TimeUnit.MILLISECONDS); @@ -101,7 +105,7 @@ void nodesCheck() { } } } - if (!deleteIds.isEmpty() || outputAliveServerCount % 30 == 0) { + if (!deleteIds.isEmpty() || outputAliveServerCount % periodicOutputIntervalTimes == 0) { LOG.info("Alive servers number: {}, ids: {}", servers.size(), servers.keySet().stream().collect(Collectors.toList()) From 730fc5092db034abaa6afde941873dfc87b8c6b4 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Wed, 10 Aug 2022 10:34:43 +0800 Subject: [PATCH 3/4] fix --- .../java/org/apache/uniffle/coordinator/CoordinatorConf.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index ab81b964a3..4ba56dacba 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -48,11 +48,11 @@ public class CoordinatorConf extends RssBaseConf { .defaultValue(30 * 1000L) .withDescription("timeout if can't get heartbeat from shuffle server"); public static final ConfigOption COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES = ConfigOptions - .key("rss.coordinator.nodes.periodic.output.interval.times") + .key("rss.coordinator.server.periodic.output.interval.times") .longType() .defaultValue(30L) .withDescription("The periodic interval times of output alive nodes. The interval sec can be calculated by (" - + COORDINATOR_HEARTBEAT_TIMEOUT.key() + "/3 * rss.coordinator.nodes.periodic.output.interval.times)"); + + COORDINATOR_HEARTBEAT_TIMEOUT.key() + "/3 * rss.coordinator.server.periodic.output.interval.times)"); public static final ConfigOption COORDINATOR_ASSIGNMENT_STRATEGY = ConfigOptions .key("rss.coordinator.assignment.strategy") .stringType() From 7da7c6532061143725121f69db49e8f271a4f352 Mon Sep 17 00:00:00 2001 From: Junfan Zhang Date: Wed, 10 Aug 2022 11:16:29 +0800 Subject: [PATCH 4/4] check value --- .../java/org/apache/uniffle/coordinator/CoordinatorConf.java | 1 + 1 file changed, 1 insertion(+) diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index 4ba56dacba..18bd66148b 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -50,6 +50,7 @@ public class CoordinatorConf extends RssBaseConf { public static final ConfigOption COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES = ConfigOptions .key("rss.coordinator.server.periodic.output.interval.times") .longType() + .checkValue(ConfigUtils.POSITIVE_LONG_VALIDATOR, "output server list interval times must be positive") .defaultValue(30L) .withDescription("The periodic interval times of output alive nodes. The interval sec can be calculated by (" + COORDINATOR_HEARTBEAT_TIMEOUT.key() + "/3 * rss.coordinator.server.periodic.output.interval.times)");