diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java index ebb50ceff7..18bd66148b 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/CoordinatorConf.java @@ -47,6 +47,13 @@ public class CoordinatorConf extends RssBaseConf { .longType() .defaultValue(30 * 1000L) .withDescription("timeout if can't get heartbeat from shuffle server"); + public static final ConfigOption COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES = ConfigOptions + .key("rss.coordinator.server.periodic.output.interval.times") + .longType() + .checkValue(ConfigUtils.POSITIVE_LONG_VALIDATOR, "output server list interval times must be positive") + .defaultValue(30L) + .withDescription("The periodic interval times of output alive nodes. The interval sec can be calculated by (" + + COORDINATOR_HEARTBEAT_TIMEOUT.key() + "/3 * rss.coordinator.server.periodic.output.interval.times)"); public static final ConfigOption COORDINATOR_ASSIGNMENT_STRATEGY = ConfigOptions .key("rss.coordinator.assignment.strategy") .stringType() diff --git a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java index 83b29c8915..efba646c43 100644 --- a/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java +++ b/coordinator/src/main/java/org/apache/uniffle/coordinator/SimpleClusterManager.java @@ -30,6 +30,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; @@ -60,12 +61,17 @@ public class SimpleClusterManager implements ClusterManager { private ScheduledExecutorService checkNodesExecutorService; private FileSystem hadoopFileSystem; + private long outputAliveServerCount = 0; + private final long periodicOutputIntervalTimes; + public SimpleClusterManager(CoordinatorConf conf, Configuration hadoopConf) throws IOException { this.shuffleNodesMax = conf.getInteger(CoordinatorConf.COORDINATOR_SHUFFLE_NODES_MAX); this.heartbeatTimeout = conf.getLong(CoordinatorConf.COORDINATOR_HEARTBEAT_TIMEOUT); // the thread for checking if shuffle server report heartbeat in time scheduledExecutorService = Executors.newSingleThreadScheduledExecutor( ThreadUtils.getThreadFactory("SimpleClusterManager-%d")); + + periodicOutputIntervalTimes = conf.get(CoordinatorConf.COORDINATOR_NODES_PERIODIC_OUTPUT_INTERVAL_TIMES); scheduledExecutorService.scheduleAtFixedRate( () -> nodesCheck(), heartbeatTimeout / 3, heartbeatTimeout / 3, TimeUnit.MILLISECONDS); @@ -99,6 +105,13 @@ void nodesCheck() { } } } + if (!deleteIds.isEmpty() || outputAliveServerCount % periodicOutputIntervalTimes == 0) { + LOG.info("Alive servers number: {}, ids: {}", + servers.size(), + servers.keySet().stream().collect(Collectors.toList()) + ); + } + outputAliveServerCount++; CoordinatorMetrics.gaugeTotalServerNum.set(servers.size()); } catch (Exception e) { @@ -107,6 +120,7 @@ void nodesCheck() { } private void updateExcludeNodes(String path) { + int originalExcludeNodesNumber = excludeNodes.size(); try { Path hadoopPath = new Path(path); FileStatus fileStatus = hadoopFileSystem.getFileStatus(hadoopPath); @@ -118,12 +132,16 @@ private void updateExcludeNodes(String path) { } else { excludeNodes = Sets.newConcurrentHashSet(); } - CoordinatorMetrics.gaugeExcludeServerNum.set(excludeNodes.size()); } catch (FileNotFoundException fileNotFoundException) { excludeNodes = Sets.newConcurrentHashSet(); } catch (Exception e) { LOG.warn("Error when updating exclude nodes, the exclude nodes file path: " + path, e); } + int newlyExcludeNodesNumber = excludeNodes.size(); + if (newlyExcludeNodesNumber != originalExcludeNodesNumber) { + LOG.info("Exclude nodes number: {}, nodes list: {}", newlyExcludeNodesNumber, excludeNodes); + } + CoordinatorMetrics.gaugeExcludeServerNum.set(excludeNodes.size()); } private void parseExcludeNodesFile(DataInputStream fsDataInputStream) throws IOException { @@ -143,6 +161,9 @@ private void parseExcludeNodesFile(DataInputStream fsDataInputStream) throws IOE @Override public void add(ServerNode node) { + if (!servers.containsKey(node.getId())) { + LOG.info("Newly registering node: {}", node.getId()); + } servers.put(node.getId(), node); Set tags = node.getTags(); // remove node with all tags to deal with the situation of tag change