Skip to content

Commit

Permalink
YARN-3999. RM hangs on draing events. Contributed by Jian He
Browse files Browse the repository at this point in the history
  • Loading branch information
xgong committed Aug 12, 2015
1 parent 7c796fd commit 3ae716f
Show file tree
Hide file tree
Showing 16 changed files with 104 additions and 93 deletions.
Expand Up @@ -83,11 +83,13 @@ public void start() {


public void stop() { public void stop() {
shouldRun = false; shouldRun = false;
monitorThread.interrupt(); if (monitorThread != null) {
try { monitorThread.interrupt();
monitorThread.join(); try {
} catch (InterruptedException e) { monitorThread.join();
Thread.currentThread().interrupt(); } catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} }
} }


Expand Down
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -771,6 +771,7 @@ Release 2.7.2 - UNRELEASED
YARN-3978. Configurably turn off the saving of container info in Generic AHS YARN-3978. Configurably turn off the saving of container info in Generic AHS
(Eric Payne via jeagles) (Eric Payne via jeagles)



OPTIMIZATIONS OPTIMIZATIONS


BUG FIXES BUG FIXES
Expand Down Expand Up @@ -801,6 +802,8 @@ Release 2.7.2 - UNRELEASED
YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when
Node is connected/disconnected (Bibin A Chundatt via jlowe) Node is connected/disconnected (Bibin A Chundatt via jlowe)


YARN-3999. RM hangs on draing events. (Jian He via xgong)

Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -1300,6 +1300,11 @@ private static void addDeprecatedKeys() {
public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS public static final long DEFAULT_RESOURCEMANAGER_CONNECT_RETRY_INTERVAL_MS
= 30 * 1000; = 30 * 1000;


public static final String DISPATCHER_DRAIN_EVENTS_TIMEOUT =
YARN_PREFIX + "dispatcher.drain-events.timeout";

public static final long DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT = 300000;

/** /**
* CLASSPATH for YARN applications. A comma-separated list of CLASSPATH * CLASSPATH for YARN applications. A comma-separated list of CLASSPATH
* entries * entries
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownHookManager; import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;


import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -138,9 +139,14 @@ protected void serviceStop() throws Exception {
if (drainEventsOnStop) { if (drainEventsOnStop) {
blockNewEvents = true; blockNewEvents = true;
LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); LOG.info("AsyncDispatcher is draining to stop, igonring any new events.");
long endTime = System.currentTimeMillis() + getConfig()
.getLong(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT,
YarnConfiguration.DEFAULT_DISPATCHER_DRAIN_EVENTS_TIMEOUT);

synchronized (waitForDrained) { synchronized (waitForDrained) {
while (!drained && eventHandlingThread != null while (!drained && eventHandlingThread != null
&& eventHandlingThread.isAlive()) { && eventHandlingThread.isAlive()
&& System.currentTimeMillis() < endTime) {
waitForDrained.wait(1000); waitForDrained.wait(1000);
LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" + LOG.info("Waiting for AsyncDispatcher to drain. Thread state is :" +
eventHandlingThread.getState()); eventHandlingThread.getState());
Expand Down
Expand Up @@ -90,6 +90,15 @@
<value>10</value> <value>10</value>
</property> </property>


<property>
<description>Timeout in milliseconds when YARN dispatcher tries to drain the
events. Typically, this happens when service is stopping. e.g. RM drains
the ATS events dispatcher when stopping.
</description>
<name>yarn.dispatcher.drain-events.timeout</name>
<value>300000</value>
</property>

<property> <property>
<description>The expiry interval for application master reporting.</description> <description>The expiry interval for application master reporting.</description>
<name>yarn.am.liveness-monitor.expiry-interval-ms</name> <name>yarn.am.liveness-monitor.expiry-interval-ms</name>
Expand Down
Expand Up @@ -18,18 +18,17 @@


package org.apache.hadoop.yarn.event; package org.apache.hadoop.yarn.event;


import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;

import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;


import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;


import static org.mockito.Mockito.*;

public class TestAsyncDispatcher { public class TestAsyncDispatcher {


/* This test checks whether dispatcher hangs on close if following two things /* This test checks whether dispatcher hangs on close if following two things
Expand Down Expand Up @@ -58,5 +57,23 @@ public void testDispatcherOnCloseIfQueueEmpty() throws Exception {
eventQueue.isEmpty()); eventQueue.isEmpty());
disp.close(); disp.close();
} }

// Test dispatcher should timeout on draining events.
@Test(timeout=10000)
public void testDispatchStopOnTimeout() throws Exception {
BlockingQueue<Event> eventQueue = new LinkedBlockingQueue<Event>();
eventQueue = spy(eventQueue);
// simulate dispatcher is not drained.
when(eventQueue.isEmpty()).thenReturn(false);

YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.DISPATCHER_DRAIN_EVENTS_TIMEOUT, 2000);
DrainDispatcher disp = new DrainDispatcher(eventQueue);
disp.init(conf);
disp.setDrainEventsOnStop();
disp.start();
disp.waitForEventThreadToWait();
disp.close();
}
} }


Expand Up @@ -92,8 +92,6 @@ public class RMActiveServiceContext {
private NodesListManager nodesListManager; private NodesListManager nodesListManager;
private ResourceTrackerService resourceTrackerService; private ResourceTrackerService resourceTrackerService;
private ApplicationMasterService applicationMasterService; private ApplicationMasterService applicationMasterService;
private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;
private RMNodeLabelsManager nodeLabelManager; private RMNodeLabelsManager nodeLabelManager;
private long epoch; private long epoch;
private Clock systemClock = new SystemClock(); private Clock systemClock = new SystemClock();
Expand All @@ -117,7 +115,6 @@ public RMActiveServiceContext(Dispatcher rmDispatcher,
RMContainerTokenSecretManager containerTokenSecretManager, RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
RMApplicationHistoryWriter rmApplicationHistoryWriter,
ResourceScheduler scheduler) { ResourceScheduler scheduler) {
this(); this();
this.setContainerAllocationExpirer(containerAllocationExpirer); this.setContainerAllocationExpirer(containerAllocationExpirer);
Expand All @@ -128,7 +125,6 @@ public RMActiveServiceContext(Dispatcher rmDispatcher,
this.setContainerTokenSecretManager(containerTokenSecretManager); this.setContainerTokenSecretManager(containerTokenSecretManager);
this.setNMTokenSecretManager(nmTokenSecretManager); this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager); this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
this.setScheduler(scheduler); this.setScheduler(scheduler);


RMStateStore nullStore = new NullRMStateStore(); RMStateStore nullStore = new NullRMStateStore();
Expand Down Expand Up @@ -368,32 +364,6 @@ public boolean isWorkPreservingRecoveryEnabled() {
return this.isWorkPreservingRecoveryEnabled; return this.isWorkPreservingRecoveryEnabled;
} }


@Private
@Unstable
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return rmApplicationHistoryWriter;
}

@Private
@Unstable
public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) {
this.systemMetricsPublisher = systemMetricsPublisher;
}

@Private
@Unstable
public SystemMetricsPublisher getSystemMetricsPublisher() {
return systemMetricsPublisher;
}

@Private
@Unstable
public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
}

@Private @Private
@Unstable @Unstable
public long getEpoch() { public long getEpoch() {
Expand Down
Expand Up @@ -68,6 +68,9 @@ public class RMContextImpl implements RMContext {


private Configuration yarnConfiguration; private Configuration yarnConfiguration;


private RMApplicationHistoryWriter rmApplicationHistoryWriter;
private SystemMetricsPublisher systemMetricsPublisher;

/** /**
* Default constructor. To be used in conjunction with setter methods for * Default constructor. To be used in conjunction with setter methods for
* individual fields. * individual fields.
Expand All @@ -87,15 +90,14 @@ public RMContextImpl(Dispatcher rmDispatcher,
RMContainerTokenSecretManager containerTokenSecretManager, RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
RMApplicationHistoryWriter rmApplicationHistoryWriter,
ResourceScheduler scheduler) { ResourceScheduler scheduler) {
this(); this();
this.setDispatcher(rmDispatcher); this.setDispatcher(rmDispatcher);
setActiveServiceContext(new RMActiveServiceContext(rmDispatcher, setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
delegationTokenRenewer, appTokenSecretManager, delegationTokenRenewer, appTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager, containerTokenSecretManager, nmTokenSecretManager,
clientToAMTokenSecretManager, rmApplicationHistoryWriter, clientToAMTokenSecretManager,
scheduler)); scheduler));


ConfigurationProvider provider = new LocalConfigurationProvider(); ConfigurationProvider provider = new LocalConfigurationProvider();
Expand All @@ -112,8 +114,7 @@ public RMContextImpl(Dispatcher rmDispatcher,
AMRMTokenSecretManager appTokenSecretManager, AMRMTokenSecretManager appTokenSecretManager,
RMContainerTokenSecretManager containerTokenSecretManager, RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager, NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager, ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager) {
RMApplicationHistoryWriter rmApplicationHistoryWriter) {
this( this(
rmDispatcher, rmDispatcher,
containerAllocationExpirer, containerAllocationExpirer,
Expand All @@ -123,9 +124,7 @@ public RMContextImpl(Dispatcher rmDispatcher,
appTokenSecretManager, appTokenSecretManager,
containerTokenSecretManager, containerTokenSecretManager,
nmTokenSecretManager, nmTokenSecretManager,
clientToAMTokenSecretManager, clientToAMTokenSecretManager, null);
rmApplicationHistoryWriter,
null);
} }


@Override @Override
Expand Down Expand Up @@ -351,25 +350,25 @@ public boolean isWorkPreservingRecoveryEnabled() {


@Override @Override
public RMApplicationHistoryWriter getRMApplicationHistoryWriter() { public RMApplicationHistoryWriter getRMApplicationHistoryWriter() {
return activeServiceContext.getRMApplicationHistoryWriter(); return this.rmApplicationHistoryWriter;
} }


@Override @Override
public void setSystemMetricsPublisher( public void setSystemMetricsPublisher(
SystemMetricsPublisher systemMetricsPublisher) { SystemMetricsPublisher systemMetricsPublisher) {
activeServiceContext.setSystemMetricsPublisher(systemMetricsPublisher); this.systemMetricsPublisher = systemMetricsPublisher;
} }


@Override @Override
public SystemMetricsPublisher getSystemMetricsPublisher() { public SystemMetricsPublisher getSystemMetricsPublisher() {
return activeServiceContext.getSystemMetricsPublisher(); return this.systemMetricsPublisher;
} }


@Override @Override
public void setRMApplicationHistoryWriter( public void setRMApplicationHistoryWriter(
RMApplicationHistoryWriter rmApplicationHistoryWriter) { RMApplicationHistoryWriter rmApplicationHistoryWriter) {
activeServiceContext this.rmApplicationHistoryWriter = rmApplicationHistoryWriter;
.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
} }


@Override @Override
Expand Down
Expand Up @@ -250,7 +250,7 @@ protected void serviceInit(Configuration conf) throws Exception {
adminService = createAdminService(); adminService = createAdminService();
addService(adminService); addService(adminService);
rmContext.setRMAdminService(adminService); rmContext.setRMAdminService(adminService);

rmContext.setYarnConfiguration(conf); rmContext.setYarnConfiguration(conf);


createAndInitActiveServices(); createAndInitActiveServices();
Expand All @@ -259,6 +259,15 @@ protected void serviceInit(Configuration conf) throws Exception {
YarnConfiguration.RM_BIND_HOST, YarnConfiguration.RM_BIND_HOST,
WebAppUtils.getRMWebAppURLWithoutScheme(this.conf)); WebAppUtils.getRMWebAppURLWithoutScheme(this.conf));


RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);

SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
addService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);

super.serviceInit(this.conf); super.serviceInit(this.conf);
} }


Expand Down Expand Up @@ -411,7 +420,6 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmContext.setActiveServiceContext(activeServiceContext); rmContext.setActiveServiceContext(activeServiceContext);


conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true); conf.setBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, true);

rmSecretManagerService = createRMSecretManagerService(); rmSecretManagerService = createRMSecretManagerService();
addService(rmSecretManagerService); addService(rmSecretManagerService);


Expand Down Expand Up @@ -468,15 +476,6 @@ protected void serviceInit(Configuration configuration) throws Exception {
rmContext.setDelegationTokenRenewer(delegationTokenRenewer); rmContext.setDelegationTokenRenewer(delegationTokenRenewer);
} }


RMApplicationHistoryWriter rmApplicationHistoryWriter =
createRMApplicationHistoryWriter();
addService(rmApplicationHistoryWriter);
rmContext.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);

SystemMetricsPublisher systemMetricsPublisher = createSystemMetricsPublisher();
addService(systemMetricsPublisher);
rmContext.setSystemMetricsPublisher(systemMetricsPublisher);

// Register event handler for NodesListManager // Register event handler for NodesListManager
nodesListManager = new NodesListManager(rmContext); nodesListManager = new NodesListManager(rmContext);
rmDispatcher.register(NodesListManagerEventType.class, nodesListManager); rmDispatcher.register(NodesListManagerEventType.class, nodesListManager);
Expand Down Expand Up @@ -596,11 +595,13 @@ protected void serviceStart() throws Exception {
@Override @Override
protected void serviceStop() throws Exception { protected void serviceStop() throws Exception {


DefaultMetricsSystem.shutdown(); super.serviceStop();

if (pauseMonitor != null) { if (pauseMonitor != null) {
pauseMonitor.stop(); pauseMonitor.stop();
} }


DefaultMetricsSystem.shutdown();
if (rmContext != null) { if (rmContext != null) {
RMStateStore store = rmContext.getStateStore(); RMStateStore store = rmContext.getStateStore();
try { try {
Expand All @@ -610,7 +611,6 @@ protected void serviceStop() throws Exception {
} }
} }


super.serviceStop();
} }


protected void createPolicyMonitors() { protected void createPolicyMonitors() {
Expand Down Expand Up @@ -1033,12 +1033,12 @@ synchronized void transitionToStandby(boolean initialize)
} }


LOG.info("Transitioning to standby state"); LOG.info("Transitioning to standby state");
if (rmContext.getHAServiceState() == HAServiceState state = rmContext.getHAServiceState();
HAServiceProtocol.HAServiceState.ACTIVE) { rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
if (state == HAServiceProtocol.HAServiceState.ACTIVE) {
stopActiveServices(); stopActiveServices();
reinitialize(initialize); reinitialize(initialize);
} }
rmContext.setHAServiceState(HAServiceProtocol.HAServiceState.STANDBY);
LOG.info("Transitioned to standby state"); LOG.info("Transitioned to standby state");
} }


Expand Down
Expand Up @@ -120,15 +120,16 @@ public RMContext mockRMContext(int n, long time) {
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class); RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext context = new RMContextImpl(rmDispatcher, RMContext context = new RMContextImpl(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor, containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
null, null, null, null, null, writer) { null, null, null, null, null) {
@Override @Override
public ConcurrentMap<ApplicationId, RMApp> getRMApps() { public ConcurrentMap<ApplicationId, RMApp> getRMApps() {
return map; return map;
} }
}; };
((RMContextImpl)context).setStateStore(mock(RMStateStore.class)); ((RMContextImpl)context).setStateStore(mock(RMStateStore.class));
metricsPublisher = mock(SystemMetricsPublisher.class); metricsPublisher = mock(SystemMetricsPublisher.class);
((RMContextImpl)context).setSystemMetricsPublisher(metricsPublisher); context.setSystemMetricsPublisher(metricsPublisher);
context.setRMApplicationHistoryWriter(writer);
return context; return context;
} }


Expand Down
Expand Up @@ -66,6 +66,7 @@ public void setUp() throws Exception {


@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
resourceManager.stop();
} }


private org.apache.hadoop.yarn.server.resourcemanager.NodeManager private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
Expand Down
Expand Up @@ -87,9 +87,9 @@ public void setUp() throws Exception {


rmContext = rmContext =
new RMContextImpl(rmDispatcher, null, null, null, new RMContextImpl(rmDispatcher, null, null, null,
null, null, null, null, null, null, null, null, null, null);
new RMApplicationHistoryWriter());
rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher()); rmContext.setSystemMetricsPublisher(new SystemMetricsPublisher());
rmContext.setRMApplicationHistoryWriter(mock(RMApplicationHistoryWriter.class));


scheduler = mock(YarnScheduler.class); scheduler = mock(YarnScheduler.class);
doAnswer( doAnswer(
Expand Down

0 comments on commit 3ae716f

Please sign in to comment.