Skip to content

Commit

Permalink
YARN-5999. AMRMClientAsync will stop if any exceptions thrown on allo…
Browse files Browse the repository at this point in the history
…cate call. Contributed by Jian He
  • Loading branch information
xgong committed Dec 14, 2016
1 parent f5e0bd3 commit 64a2d5b
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 19 deletions.
Expand Up @@ -61,7 +61,7 @@ public class AMRMClientAsyncImpl<T extends ContainerRequest>
private final HeartbeatThread heartbeatThread; private final HeartbeatThread heartbeatThread;
private final CallbackHandlerThread handlerThread; private final CallbackHandlerThread handlerThread;


private final BlockingQueue<AllocateResponse> responseQueue; private final BlockingQueue<Object> responseQueue;


private final Object unregisterHeartbeatLock = new Object(); private final Object unregisterHeartbeatLock = new Object();


Expand All @@ -70,8 +70,6 @@ public class AMRMClientAsyncImpl<T extends ContainerRequest>


private volatile String collectorAddr; private volatile String collectorAddr;


private volatile Throwable savedException;

/** /**
* *
* @param intervalMs heartbeat interval in milliseconds between AM and RM * @param intervalMs heartbeat interval in milliseconds between AM and RM
Expand All @@ -90,7 +88,6 @@ public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
handlerThread = new CallbackHandlerThread(); handlerThread = new CallbackHandlerThread();
responseQueue = new LinkedBlockingQueue<>(); responseQueue = new LinkedBlockingQueue<>();
keepRunning = true; keepRunning = true;
savedException = null;
} }


/** /**
Expand All @@ -111,9 +108,8 @@ public AMRMClientAsyncImpl(AMRMClient<T> client, int intervalMs,
super(client, intervalMs, callbackHandler); super(client, intervalMs, callbackHandler);
heartbeatThread = new HeartbeatThread(); heartbeatThread = new HeartbeatThread();
handlerThread = new CallbackHandlerThread(); handlerThread = new CallbackHandlerThread();
responseQueue = new LinkedBlockingQueue<AllocateResponse>(); responseQueue = new LinkedBlockingQueue<Object>();
keepRunning = true; keepRunning = true;
savedException = null;
} }


@Override @Override
Expand Down Expand Up @@ -265,7 +261,7 @@ public HeartbeatThread() {


public void run() { public void run() {
while (true) { while (true) {
AllocateResponse response = null; Object response = null;
// synchronization ensures we don't send heartbeats after unregistering // synchronization ensures we don't send heartbeats after unregistering
synchronized (unregisterHeartbeatLock) { synchronized (unregisterHeartbeatLock) {
if (!keepRunning) { if (!keepRunning) {
Expand All @@ -280,10 +276,7 @@ public void run() {
return; return;
} catch (Throwable ex) { } catch (Throwable ex) {
LOG.error("Exception on heartbeat", ex); LOG.error("Exception on heartbeat", ex);
savedException = ex; response = ex;
// interrupt handler thread in case it waiting on the queue
handlerThread.interrupt();
return;
} }
if (response != null) { if (response != null) {
while (true) { while (true) {
Expand Down Expand Up @@ -316,19 +309,20 @@ public void run() {
return; return;
} }
try { try {
AllocateResponse response; Object object;
if(savedException != null) {
LOG.error("Stopping callback due to: ", savedException);
handler.onError(savedException);
return;
}
try { try {
response = responseQueue.take(); object = responseQueue.take();
} catch (InterruptedException ex) { } catch (InterruptedException ex) {
LOG.info("Interrupted while waiting for queue", ex); LOG.info("Interrupted while waiting for queue", ex);
continue; continue;
} }
if (object instanceof Throwable) {
progress = handler.getProgress();
handler.onError((Throwable) object);
continue;
}


AllocateResponse response = (AllocateResponse) object;
String collectorAddress = response.getCollectorAddr(); String collectorAddress = response.getCollectorAddr();
TimelineClient timelineClient = client.getRegisteredTimelineClient(); TimelineClient timelineClient = client.getRegisteredTimelineClient();
if (timelineClient != null && collectorAddress != null if (timelineClient != null && collectorAddress != null
Expand Down
Expand Up @@ -213,7 +213,7 @@ private void runHeartBeatThrowOutException(Exception ex) throws Exception{


asyncClient.stop(); asyncClient.stop();
// stopping should have joined all threads and completed all callbacks // stopping should have joined all threads and completed all callbacks
Assert.assertTrue(callbackHandler.callbackCount == 0); Assert.assertTrue(callbackHandler.callbackCount > 0);
} }


@Test (timeout = 10000) @Test (timeout = 10000)
Expand Down

0 comments on commit 64a2d5b

Please sign in to comment.