diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index 438a39b0973e1..22e84063068e4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -93,8 +93,12 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + public class NodeManager extends CompositeService implements EventHandler, NodeManagerMXBean { @@ -149,6 +153,8 @@ public int getExitCode() { private NMLogAggregationStatusTracker nmLogAggregationStatusTracker; + private ScheduledThreadPoolExecutor eventQueueMetricExecutor; + /** * Default Container State transition listener. */ @@ -498,6 +504,16 @@ protected void serviceInit(Configuration conf) throws Exception { registerMXBean(); context.getContainerExecutor().start(); + + eventQueueMetricExecutor = new ScheduledThreadPoolExecutor(1, + new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("EventQueueSizeMetricThread").build()); + eventQueueMetricExecutor.scheduleAtFixedRate(() -> { + metrics.setNmDispatcherEventQueueSize(dispatcher.getEventQueueSize()); + metrics.setSchedulerEventQueueSize( + containerManager.getDispatcher().getEventQueueSize()); + }, 1, 1, TimeUnit.SECONDS); + super.serviceInit(conf); // TODO add local dirs to del } @@ -525,6 +541,9 @@ protected void serviceStop() throws Exception { // release of NMLevelDBStore. stopRecoveryStore(); } + if (eventQueueMetricExecutor != null) { + eventQueueMetricExecutor.shutdownNow(); + } } public String getName() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java index 775196f582887..d8ea817a790d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/metrics/NodeManagerMetrics.java @@ -118,6 +118,11 @@ public class NodeManagerMetrics { @Metric("Container localization time in milliseconds") MutableRate localizationDurationMillis; + @Metric("# of nm dispatcher event queue size") + MutableGaugeInt nmDispatcherEventQueueSize; + @Metric("# of scheduler dispatcher event queue size") + MutableGaugeInt schedulerDispatcherEventQueueSize; + // CHECKSTYLE:ON:VisibilityModifier private JvmMetrics jvmMetrics = null; @@ -481,4 +486,20 @@ public void localizationCacheHitMiss(long size) { public void localizationComplete(long downloadMillis) { localizationDurationMillis.add(downloadMillis); } + + public int getNmDispatcherEventQueueSize() { + return nmDispatcherEventQueueSize.value(); + } + + public void setNmDispatcherEventQueueSize(int nmDispatcherEventQueueSize) { + this.nmDispatcherEventQueueSize.set(nmDispatcherEventQueueSize); + } + + public int getSchedulerEventQueueSize() { + return schedulerDispatcherEventQueueSize.value(); + } + + public void setSchedulerEventQueueSize(int schedulerEventQueueSize) { + this.schedulerDispatcherEventQueueSize.set(schedulerEventQueueSize); + } }