Skip to content

Commit

Permalink
YARN-4041. Slow delegation token renewal can severely prolong RM reco…
Browse files Browse the repository at this point in the history
…very. Contributed by Sunil G
  • Loading branch information
jlowe committed Oct 23, 2015
1 parent 533a2be commit d3a34a4
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 24 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -1076,6 +1076,9 @@ Release 2.7.2 - UNRELEASED
YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called YARN-4209. RMStateStore FENCED state doesn’t work due to updateFencedState called
by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks) by stateMachine.doTransition. (Zhihai Xu via rohithsharmaks)


YARN-4041. Slow delegation token renewal can severely prolong RM recovery
(Sunil G via jlowe)

Release 2.7.1 - 2015-07-06 Release 2.7.1 - 2015-07-06


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -946,14 +946,16 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) {
} }


if (UserGroupInformation.isSecurityEnabled()) { if (UserGroupInformation.isSecurityEnabled()) {
// synchronously renew delegation token on recovery. // asynchronously renew delegation token on recovery.
try { try {
app.rmContext.getDelegationTokenRenewer().addApplicationSync( app.rmContext.getDelegationTokenRenewer()
app.getApplicationId(), app.parseCredentials(), .addApplicationAsyncDuringRecovery(app.getApplicationId(),
app.submissionContext.getCancelTokensWhenComplete(), app.getUser()); app.parseCredentials(),
app.submissionContext.getCancelTokensWhenComplete(),
app.getUser());
} catch (Exception e) { } catch (Exception e) {
String msg = "Failed to renew token for " + app.applicationId String msg = "Failed to fetch user credentials from application:"
+ " on recovery : " + e.getMessage(); + e.getMessage();
app.diagnostics.append(msg); app.diagnostics.append(msg);
LOG.error(msg, e); LOG.error(msg, e);
} }
Expand Down
Expand Up @@ -387,6 +387,25 @@ public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
applicationId, ts, shouldCancelAtEnd, user)); applicationId, ts, shouldCancelAtEnd, user));
} }


/**
* Asynchronously add application tokens for renewal.
*
* @param applicationId
* added application
* @param ts
* tokens
* @param shouldCancelAtEnd
* true if tokens should be canceled when the app is done else false.
* @param user
* user
*/
public void addApplicationAsyncDuringRecovery(ApplicationId applicationId,
Credentials ts, boolean shouldCancelAtEnd, String user) {
processDelegationTokenRenewerEvent(
new DelegationTokenRenewerAppRecoverEvent(applicationId, ts,
shouldCancelAtEnd, user));
}

/** /**
* Synchronously renew delegation tokens. * Synchronously renew delegation tokens.
* @param user user * @param user user
Expand All @@ -398,7 +417,7 @@ public void addApplicationSync(ApplicationId applicationId, Credentials ts,
applicationId, ts, shouldCancelAtEnd, user)); applicationId, ts, shouldCancelAtEnd, user));
} }


private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt) private void handleAppSubmitEvent(AbstractDelegationTokenRenewerAppEvent evt)
throws IOException, InterruptedException { throws IOException, InterruptedException {
ApplicationId applicationId = evt.getApplicationId(); ApplicationId applicationId = evt.getApplicationId();
Credentials ts = evt.getCredentials(); Credentials ts = evt.getCredentials();
Expand Down Expand Up @@ -842,6 +861,10 @@ public void run() {
DelegationTokenRenewerAppSubmitEvent appSubmitEvt = DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
(DelegationTokenRenewerAppSubmitEvent) evt; (DelegationTokenRenewerAppSubmitEvent) evt;
handleDTRenewerAppSubmitEvent(appSubmitEvt); handleDTRenewerAppSubmitEvent(appSubmitEvt);
} else if (evt instanceof DelegationTokenRenewerAppRecoverEvent) {
DelegationTokenRenewerAppRecoverEvent appRecoverEvt =
(DelegationTokenRenewerAppRecoverEvent) evt;
handleDTRenewerAppRecoverEvent(appRecoverEvt);
} else if (evt.getType().equals( } else if (evt.getType().equals(
DelegationTokenRenewerEventType.FINISH_APPLICATION)) { DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
DelegationTokenRenewer.this.handleAppFinishEvent(evt); DelegationTokenRenewer.this.handleAppFinishEvent(evt);
Expand Down Expand Up @@ -876,17 +899,50 @@ private void handleDTRenewerAppSubmitEvent(
} }
} }
} }


static class DelegationTokenRenewerAppSubmitEvent extends @SuppressWarnings("unchecked")
private void handleDTRenewerAppRecoverEvent(
DelegationTokenRenewerAppRecoverEvent event) {
try {
// Setup tokens for renewal during recovery
DelegationTokenRenewer.this.handleAppSubmitEvent(event);
} catch (Throwable t) {
LOG.warn(
"Unable to add the application to the delegation token renewer.", t);
}
}

static class DelegationTokenRenewerAppSubmitEvent
extends
AbstractDelegationTokenRenewerAppEvent {
public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
Credentials credentails, boolean shouldCancelAtEnd, String user) {
super(appId, credentails, shouldCancelAtEnd, user,
DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
}
}

static class DelegationTokenRenewerAppRecoverEvent
extends
AbstractDelegationTokenRenewerAppEvent {
public DelegationTokenRenewerAppRecoverEvent(ApplicationId appId,
Credentials credentails, boolean shouldCancelAtEnd, String user) {
super(appId, credentails, shouldCancelAtEnd, user,
DelegationTokenRenewerEventType.RECOVER_APPLICATION);
}
}

static class AbstractDelegationTokenRenewerAppEvent extends
DelegationTokenRenewerEvent { DelegationTokenRenewerEvent {


private Credentials credentials; private Credentials credentials;
private boolean shouldCancelAtEnd; private boolean shouldCancelAtEnd;
private String user; private String user;


public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId, public AbstractDelegationTokenRenewerAppEvent(ApplicationId appId,
Credentials credentails, boolean shouldCancelAtEnd, String user) { Credentials credentails, boolean shouldCancelAtEnd, String user,
super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION); DelegationTokenRenewerEventType type) {
super(appId, type);
this.credentials = credentails; this.credentials = credentails;
this.shouldCancelAtEnd = shouldCancelAtEnd; this.shouldCancelAtEnd = shouldCancelAtEnd;
this.user = user; this.user = user;
Expand All @@ -907,6 +963,7 @@ public String getUser() {


enum DelegationTokenRenewerEventType { enum DelegationTokenRenewerEventType {
VERIFY_AND_START_APPLICATION, VERIFY_AND_START_APPLICATION,
RECOVER_APPLICATION,
FINISH_APPLICATION FINISH_APPLICATION
} }


Expand Down
Expand Up @@ -1179,24 +1179,24 @@ public void testDelegationTokenRestoredInDelegationTokenRenewer()


// Need to wait for a while as now token renewal happens on another thread // Need to wait for a while as now token renewal happens on another thread
// and is asynchronous in nature. // and is asynchronous in nature.
waitForTokensToBeRenewed(rm2); waitForTokensToBeRenewed(rm2, tokenSet);


// verify tokens are properly populated back to rm2 DelegationTokenRenewer // verify tokens are properly populated back to rm2 DelegationTokenRenewer
Assert.assertEquals(tokenSet, rm2.getRMContext() Assert.assertEquals(tokenSet, rm2.getRMContext()
.getDelegationTokenRenewer().getDelegationTokens()); .getDelegationTokenRenewer().getDelegationTokens());
} }


private void waitForTokensToBeRenewed(MockRM rm2) throws Exception { private void waitForTokensToBeRenewed(MockRM rm2,
int waitCnt = 20; HashSet<Token<RMDelegationTokenIdentifier>> tokenSet) throws Exception {
boolean atleastOneAppInNEWState = true; // Max wait time to get the token renewal can be kept as 1sec (100 * 10ms)
while (waitCnt-- > 0 && atleastOneAppInNEWState) { int waitCnt = 100;
atleastOneAppInNEWState = false; while (waitCnt-- > 0) {
for (RMApp rmApp : rm2.getRMContext().getRMApps().values()) { if (tokenSet.equals(rm2.getRMContext().getDelegationTokenRenewer()
if (rmApp.getState() == RMAppState.NEW) { .getDelegationTokens())) {
Thread.sleep(1000); // Stop waiting as tokens are populated to DelegationTokenRenewer.
atleastOneAppInNEWState = true; break;
break; } else {
} Thread.sleep(10);
} }
} }
} }
Expand Down

0 comments on commit d3a34a4

Please sign in to comment.