Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2983,6 +2983,17 @@ public static boolean isAclEnabled(Configuration conf) {
public static final int
DEFAULT_YARN_DISPATCHER_CPU_MONITOR_SAMPLES_PER_MIN = 60;

public static final String YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC =
"yarn.dispatcher.monitor.event.timeout.msec";
public static final long DEFAULT_YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC =
600000l;
public static final String YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC =
"yarn.dispatcher.monitor.event.interval.msec";
public static final long DEFAULT_YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC =
10000;
public static final String YARN_RM_DISPATCHER_MONITOR_ENABLE =
"yarn.rm.dispatcher.monitor.enable";

/**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.util.concurrent.TimeUnit;

import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.metrics.EventTypeMetrics;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
Expand Down Expand Up @@ -104,6 +106,13 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
*/
private String dispatcherThreadName = "AsyncDispatcher event handler";

private boolean monitorEnable = false;
private long monitorInterval;
private long monitorTimeout;
private Event lastMonitorEvent = null;
private long lastMonitorEventTime;
private Thread monitorThread;

public AsyncDispatcher() {
this(new LinkedBlockingQueue<Event>());
}
Expand Down Expand Up @@ -203,6 +212,19 @@ protected void serviceStart() throws Exception {
eventHandlingThread = new Thread(createThread());
eventHandlingThread.setName(dispatcherThreadName);
eventHandlingThread.start();
if (monitorEnable) {
monitorInterval = getConfig().getLong(
YarnConfiguration.YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC,
YarnConfiguration.DEFAULT_YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC);
monitorTimeout = getConfig()
.getLong(YarnConfiguration.YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC,
YarnConfiguration.DEFAULT_YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC);
lastMonitorEvent = null;
lastMonitorEventTime = Time.now();
monitorThread = new Thread(createMonitorThread());
eventHandlingThread.setName(dispatcherThreadName + " monitor");
monitorThread.start();
}
}

public void setDrainEventsOnStop() {
Expand Down Expand Up @@ -413,4 +435,46 @@ public void addMetrics(EventTypeMetrics metrics,
public int getEventQueueSize() {
return eventQueue.size();
}

public void setMonitorEnable(boolean monitorEnable) {
this.monitorEnable = monitorEnable;
}

Runnable createMonitorThread() {
return new Runnable() {
@Override
public void run() {
try {
while (!stopped && !Thread.currentThread().isInterrupted()) {
Event currentEvent = eventQueue.peek();
if (currentEvent != null && (currentEvent == lastMonitorEvent)) {
if ((Time.now() - lastMonitorEventTime > monitorTimeout)) {
LOG.info(
"Event update timeout, maybe AsyncDispatcher have already stuck, so exit...");
ReflectionUtils.logThreadInfo(LOG, "AsyncDispatcher stuck", 0);
if (exitOnDispatchException) {
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
return;
}
} else {
lastMonitorEvent = currentEvent;
lastMonitorEventTime = Time.now();
}
Thread.sleep(monitorInterval);
}
} catch (Throwable e) {
LOG.info("AsyncDispatcher Monitor Thread failed, caused by", e);
}
}
};
}

@VisibleForTesting
public Thread getMonitorThread() {
return monitorThread;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import static org.apache.hadoop.metrics2.lib.Interns.info;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.mockito.Mockito.*;

public class TestAsyncDispatcher {
Expand Down Expand Up @@ -407,4 +408,41 @@ public void testDispatcherMetricsHistogram() throws Exception {
}

}

@Test(timeout = 60000)
public void testDispatcherHandleTimeout() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
conf.setLong(YarnConfiguration.YARN_DISPATCHER_MONITOR_EVENT_TIMEOUT_MSEC,
1000l);
conf.setLong(YarnConfiguration.YARN_DISPATCHER_MONITOR_EVENT_INTERVAL_MSEC,
10l);

AsyncDispatcher dispatcher = new AsyncDispatcher();
dispatcher.setMonitorEnable(true);
dispatcher.register(TestEnum.class, new TestHandler(5000));
dispatcher.disableExitOnDispatchException();
dispatcher.init(conf);
dispatcher.start();

for (int i = 0; i < 10; ++i) {
Event event = mock(Event.class);
when(event.getType()).thenReturn(TestEnum.TestEventType);
dispatcher.getEventHandler().handle(event);
}

// wait dispatcher drain or exit
boolean drained;
boolean alive;
while (true) {
drained = dispatcher.isDrained();
alive = dispatcher.getMonitorThread().isAlive();
if (drained || !alive)
break;
Thread.sleep(1000);
}
dispatcher.stop();

assertFalse("Dispatcher should not drain", drained);
assertFalse("Monitor thread should not be alive", alive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,11 @@ protected Dispatcher createDispatcher() {
rmAppAttemptEventTypeMetrics
.getEnumClass());

if (conf.getBoolean(YarnConfiguration.YARN_RM_DISPATCHER_MONITOR_ENABLE,
false)) {
dispatcher.setMonitorEnable(true);
}

return dispatcher;
}

Expand Down