diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index d42562cf6140a..bcbba9830ae15 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -2983,6 +2983,17 @@ public static boolean isAclEnabled(Configuration conf) { public static final int DEFAULT_YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN = 60; + public static final String YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC = + "yarn.dispatcher.monitor.event.timeout.msec"; + public static final long DEFAULT_YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC = + 600000l; + public static final String YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC = + "yarn.dispatcher.monitor.event.interval.msec"; + public static final long DEFAULT_YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC = + 10000; + public static final String YARN_RM_DISPATCHER_MONITOR_ENABLE = + "yarn.rm.dispatcher.monitor.enable"; + /** * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH * entries diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 1c4ed24b47d78..a51210d14001a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -30,6 +30,8 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.metrics.EventTypeMetrics; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.MonotonicClock; @@ -104,6 +106,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { */ private String dispatcherThreadName = "AsyncDispatcher event handler"; + private boolean monitorEnable = false; + private long monitorInterval; + private long monitorTimeout; + private Event lastMonitorEvent = null; + private long lastMonitorEventTime; + private Thread monitorThread; + public AsyncDispatcher() { this(new LinkedBlockingQueue()); } @@ -203,6 +212,19 @@ protected void serviceStart() throws Exception { eventHandlingThread = new Thread(createThread()); eventHandlingThread.setName(dispatcherThreadName); eventHandlingThread.start(); + if (monitorEnable) { + monitorInterval = getConfig().getLong( + YarnConfiguration.YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC, + YarnConfiguration.DEFAULT_YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC); + monitorTimeout = getConfig() + .getLong(YarnConfiguration.YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC, + YarnConfiguration.DEFAULT_YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC); + lastMonitorEvent = null; + lastMonitorEventTime = Time.now(); + monitorThread = new Thread(createMonitorThread()); + eventHandlingThread.setName(dispatcherThreadName + " monitor"); + monitorThread.start(); + } } public void setDrainEventsOnStop() { @@ -413,4 +435,46 @@ public void addMetrics(EventTypeMetrics metrics, public int getEventQueueSize() { return eventQueue.size(); } + + public void setMonitorEnable(boolean monitorEnable) { + this.monitorEnable = monitorEnable; + } + + Runnable createMonitorThread() { + return new Runnable() { + @Override + public void run() { + try { + while (!stopped && !Thread.currentThread().isInterrupted()) { + Event currentEvent = eventQueue.peek(); + if (currentEvent != null && (currentEvent == lastMonitorEvent)) { + if ((Time.now() - lastMonitorEventTime > monitorTimeout)) { + LOG.info( + "Event update timeout, maybe AsyncDispatcher have already stuck, so exit..."); + ReflectionUtils.logThreadInfo(LOG, "AsyncDispatcher stuck", 0); + if (exitOnDispatchException) { + Thread shutDownThread = new Thread(createShutDownThread()); + shutDownThread.setName("AsyncDispatcher ShutDown handler"); + shutDownThread.start(); + } + return; + } + } else { + lastMonitorEvent = currentEvent; + lastMonitorEventTime = Time.now(); + } + Thread.sleep(monitorInterval); + } + } catch (Throwable e) { + LOG.info("AsyncDispatcher Monitor Thread failed, caused by", e); + } + } + }; + } + + @VisibleForTesting + public Thread getMonitorThread() { + return monitorThread; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 8b2dfa08b0dd0..988af7447dab4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -43,6 +43,7 @@ import static org.apache.hadoop.metrics2.lib.Interns.info; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.mockito.Mockito.*; public class TestAsyncDispatcher { @@ -407,4 +408,41 @@ public void testDispatcherMetricsHistogram() throws Exception { } } + + @Test(timeout = 60000) + public void testDispatcherHandleTimeout() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setLong(YarnConfiguration.YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC, + 1000l); + conf.setLong(YarnConfiguration.YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC, + 10l); + + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.setMonitorEnable(true); + dispatcher.register(TestEnum.class, new TestHandler(5000)); + dispatcher.disableExitOnDispatchException(); + dispatcher.init(conf); + dispatcher.start(); + + for (int i = 0; i < 10; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType); + dispatcher.getEventHandler().handle(event); + } + + // wait dispatcher drain or exit + boolean drained; + boolean alive; + while (true) { + drained = dispatcher.isDrained(); + alive = dispatcher.getMonitorThread().isAlive(); + if (drained || !alive) + break; + Thread.sleep(1000); + } + dispatcher.stop(); + + assertFalse("Dispatcher should not drain", drained); + assertFalse("Monitor thread should not be alive", alive); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 3bd6a0fe43a8c..55252b88f3760 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -512,6 +512,11 @@ protected Dispatcher createDispatcher() { rmAppAttemptEventTypeMetrics .getEnumClass()); + if (conf.getBoolean(YarnConfiguration.YARN_RM_DISPATCHER_MONITOR_ENABLE, + false)) { + dispatcher.setMonitorEnable(true); + } + return dispatcher; }