Skip to content

Commit

Permalink
YARN-4000. RM crashes with NPE if leaf queue becomes parent queue dur…
Browse files Browse the repository at this point in the history
…ing restart. Contributed by Varun Saxena
  • Loading branch information
jian-he committed Oct 16, 2015
1 parent a121fa1 commit cf23f2c
Show file tree
Hide file tree
Showing 25 changed files with 397 additions and 380 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -945,6 +945,9 @@ Release 2.8.0 - UNRELEASED


YARN-4250. NPE in AppSchedulingInfo#isRequestLabelChanged. (Brahma Reddy Battula via rohithsharmaks) YARN-4250. NPE in AppSchedulingInfo#isRequestLabelChanged. (Brahma Reddy Battula via rohithsharmaks)


YARN-4000. RM crashes with NPE if leaf queue becomes parent queue during restart.
(Varun Saxena via jianhe)

Release 2.7.2 - UNRELEASED Release 2.7.2 - UNRELEASED


INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -141,7 +141,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptFailedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeSignalContainerEvent;
Expand Down Expand Up @@ -676,11 +677,8 @@ public FailApplicationAttemptResponse failApplicationAttempt(
} }
} }


this.rmContext this.rmContext.getDispatcher().getEventHandler().handle(
.getDispatcher() new RMAppAttemptEvent(attemptId, RMAppAttemptEventType.FAIL,
.getEventHandler()
.handle(
new RMAppAttemptFailedEvent(attemptId,
"Attempt failed by user.")); "Attempt failed by user."));


RMAuditLogger.logSuccess(callerUGI.getShortUserName(), RMAuditLogger.logSuccess(callerUGI.getShortUserName(),
Expand Down Expand Up @@ -735,8 +733,9 @@ public KillApplicationResponse forceKillApplication(
return KillApplicationResponse.newInstance(true); return KillApplicationResponse.newInstance(true);
} }


this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler().handle(
.handle(new RMAppEvent(applicationId, RMAppEventType.KILL)); new RMAppEvent(applicationId, RMAppEventType.KILL,
"Application killed by user."));


// For UnmanagedAMs, return true so they don't retry // For UnmanagedAMs, return true so they don't retry
return KillApplicationResponse.newInstance( return KillApplicationResponse.newInstance(
Expand Down
Expand Up @@ -50,7 +50,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
Expand Down Expand Up @@ -304,7 +303,8 @@ protected void submitApplication(
// scheduler about the existence of the application // scheduler about the existence of the application
assert application.getState() == RMAppState.NEW; assert application.getState() == RMAppState.NEW;
this.rmContext.getDispatcher().getEventHandler() this.rmContext.getDispatcher().getEventHandler()
.handle(new RMAppRejectedEvent(applicationId, e.getMessage())); .handle(new RMAppEvent(applicationId,
RMAppEventType.APP_REJECTED, e.getMessage()));
throw RPCUtil.getRemoteException(e); throw RPCUtil.getRemoteException(e);
} }
} }
Expand Down
Expand Up @@ -60,7 +60,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ConverterUtils;


import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -257,8 +256,8 @@ public void run() {
String message = "Error launching " + application.getAppAttemptId() String message = "Error launching " + application.getAppAttemptId()
+ ". Got exception: " + StringUtils.stringifyException(ie); + ". Got exception: " + StringUtils.stringifyException(ie);
LOG.info(message); LOG.info(message);
handler.handle(new RMAppAttemptLaunchFailedEvent(application handler.handle(new RMAppAttemptEvent(application
.getAppAttemptId(), message)); .getAppAttemptId(), RMAppAttemptEventType.LAUNCH_FAILED, message));
} }
break; break;
case CLEANUP: case CLEANUP:
Expand Down
Expand Up @@ -24,13 +24,24 @@
public class RMAppEvent extends AbstractEvent<RMAppEventType>{ public class RMAppEvent extends AbstractEvent<RMAppEventType>{


private final ApplicationId appId; private final ApplicationId appId;
private final String diagnosticMsg;


public RMAppEvent(ApplicationId appId, RMAppEventType type) { public RMAppEvent(ApplicationId appId, RMAppEventType type) {
this(appId, type, "");
}

public RMAppEvent(ApplicationId appId, RMAppEventType type,
String diagnostic) {
super(type); super(type);
this.appId = appId; this.appId = appId;
this.diagnosticMsg = diagnostic;
} }


public ApplicationId getApplicationId() { public ApplicationId getApplicationId() {
return this.appId; return this.appId;
} }

public String getDiagnosticMsg() {
return this.diagnosticMsg;
}
} }
Expand Up @@ -22,20 +22,14 @@


public class RMAppFailedAttemptEvent extends RMAppEvent { public class RMAppFailedAttemptEvent extends RMAppEvent {


private final String diagnostics;
private final boolean transferStateFromPreviousAttempt; private final boolean transferStateFromPreviousAttempt;


public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event, public RMAppFailedAttemptEvent(ApplicationId appId, RMAppEventType event,
String diagnostics, boolean transferStateFromPreviousAttempt) { String diagnostics, boolean transferStateFromPreviousAttempt) {
super(appId, event); super(appId, event, diagnostics);
this.diagnostics = diagnostics;
this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt; this.transferStateFromPreviousAttempt = transferStateFromPreviousAttempt;
} }


public String getDiagnostics() {
return this.diagnostics;
}

public boolean getTransferStateFromPreviousAttempt() { public boolean getTransferStateFromPreviousAttempt() {
return transferStateFromPreviousAttempt; return transferStateFromPreviousAttempt;
} }
Expand Down

This file was deleted.

Expand Up @@ -1046,7 +1046,7 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
if (this.submissionContext.getUnmanagedAM()) { if (this.submissionContext.getUnmanagedAM()) {
// RM does not manage the AM. Do not retry // RM does not manage the AM. Do not retry
msg = "Unmanaged application " + this.getApplicationId() msg = "Unmanaged application " + this.getApplicationId()
+ " failed due to " + failedEvent.getDiagnostics() + " failed due to " + failedEvent.getDiagnosticMsg()
+ ". Failing the application."; + ". Failing the application.";
} else if (this.isNumAttemptsBeyondThreshold) { } else if (this.isNumAttemptsBeyondThreshold) {
int globalLimit = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, int globalLimit = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
Expand All @@ -1061,7 +1061,7 @@ private String getAppAttemptFailedDiagnostics(RMAppEvent event) {
(globalLimit == maxAppAttempts) ? "" (globalLimit == maxAppAttempts) ? ""
: (" (global limit =" + globalLimit : (" (global limit =" + globalLimit
+ "; local limit is =" + maxAppAttempts + ")"), + "; local limit is =" + maxAppAttempts + ")"),
failedEvent.getDiagnostics()); failedEvent.getDiagnosticMsg());
} }
return msg; return msg;
} }
Expand Down Expand Up @@ -1102,21 +1102,14 @@ private void rememberTargetTransitionsAndStoreState(RMAppEvent event,
String diags = null; String diags = null;
switch (event.getType()) { switch (event.getType()) {
case APP_REJECTED: case APP_REJECTED:
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent) event;
diags = rejectedEvent.getMessage();
break;
case ATTEMPT_FINISHED: case ATTEMPT_FINISHED:
RMAppFinishedAttemptEvent finishedEvent = case ATTEMPT_KILLED:
(RMAppFinishedAttemptEvent) event; diags = event.getDiagnosticMsg();
diags = finishedEvent.getDiagnostics();
break; break;
case ATTEMPT_FAILED: case ATTEMPT_FAILED:
RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event; RMAppFailedAttemptEvent failedEvent = (RMAppFailedAttemptEvent) event;
diags = getAppAttemptFailedDiagnostics(failedEvent); diags = getAppAttemptFailedDiagnostics(failedEvent);
break; break;
case ATTEMPT_KILLED:
diags = getAppKilledDiagnostics();
break;
default: default:
break; break;
} }
Expand Down Expand Up @@ -1164,9 +1157,7 @@ public AppFinishedTransition() {
} }


public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppFinishedAttemptEvent finishedEvent = app.diagnostics.append(event.getDiagnosticMsg());
(RMAppFinishedAttemptEvent)event;
app.diagnostics.append(finishedEvent.getDiagnostics());
super.transition(app, event); super.transition(app, event);
}; };
} }
Expand Down Expand Up @@ -1212,21 +1203,21 @@ public AppKilledTransition() {


@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.diagnostics.append(getAppKilledDiagnostics()); app.diagnostics.append(event.getDiagnosticMsg());
super.transition(app, event); super.transition(app, event);
}; };
} }


private static String getAppKilledDiagnostics() {
return "Application killed by user.";
}

private static class KillAttemptTransition extends RMAppTransition { private static class KillAttemptTransition extends RMAppTransition {
@Override @Override
public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
app.stateBeforeKilling = app.getState(); app.stateBeforeKilling = app.getState();
app.handler.handle(new RMAppAttemptEvent(app.currentAttempt // Forward app kill diagnostics in the event to kill app attempt.
.getAppAttemptId(), RMAppAttemptEventType.KILL)); // These diagnostics will be returned back in ATTEMPT_KILLED event sent by
// RMAppAttemptImpl.
app.handler.handle(
new RMAppAttemptEvent(app.currentAttempt.getAppAttemptId(),
RMAppAttemptEventType.KILL, event.getDiagnosticMsg()));
} }
} }


Expand All @@ -1237,8 +1228,7 @@ public AppRejectedTransition() {
} }


public void transition(RMAppImpl app, RMAppEvent event) { public void transition(RMAppImpl app, RMAppEvent event) {
RMAppRejectedEvent rejectedEvent = (RMAppRejectedEvent)event; app.diagnostics.append(event.getDiagnosticMsg());
app.diagnostics.append(rejectedEvent.getMessage());
super.transition(app, event); super.transition(app, event);
}; };
} }
Expand Down

This file was deleted.

Expand Up @@ -24,14 +24,25 @@
public class RMAppAttemptEvent extends AbstractEvent<RMAppAttemptEventType> { public class RMAppAttemptEvent extends AbstractEvent<RMAppAttemptEventType> {


private final ApplicationAttemptId appAttemptId; private final ApplicationAttemptId appAttemptId;
private final String diagnosticMsg;


public RMAppAttemptEvent(ApplicationAttemptId appAttemptId, public RMAppAttemptEvent(ApplicationAttemptId appAttemptId,
RMAppAttemptEventType type) { RMAppAttemptEventType type) {
this(appAttemptId, type, "");
}

public RMAppAttemptEvent(ApplicationAttemptId appAttemptId,
RMAppAttemptEventType type, String diagnostics) {
super(type); super(type);
this.appAttemptId = appAttemptId; this.appAttemptId = appAttemptId;
this.diagnosticMsg = diagnostics;
} }


public ApplicationAttemptId getApplicationAttemptId() { public ApplicationAttemptId getApplicationAttemptId() {
return this.appAttemptId; return this.appAttemptId;
} }

public String getDiagnosticMsg() {
return diagnosticMsg;
}
} }

0 comments on commit cf23f2c

Please sign in to comment.