Skip to content

Commit

Permalink
YARN-3493. RM fails to come up with error "Failed to load/recover sta…
Browse files Browse the repository at this point in the history
…te" when mem settings are changed. (Jian He via wangda)
  • Loading branch information
wangdatan committed Apr 18, 2015
1 parent 6779467 commit f65eeb4
Show file tree
Hide file tree
Showing 8 changed files with 119 additions and 72 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -224,6 +224,9 @@ Release 2.8.0 - UNRELEASED
YARN-3021. YARN's delegation-token handling disallows certain trust setups
to operate properly over DistCp. (Yongjun Zhang via jianhe)

YARN-3493. RM fails to come up with error "Failed to load/recover state"
when mem settings are changed. (Jian He via wangda)

Release 2.7.1 - UNRELEASED

INCOMPATIBLE CHANGES
Expand Down
Expand Up @@ -499,7 +499,7 @@ public AllocateResponse allocate(AllocateRequest request)

// sanity check
try {
RMServerUtils.validateResourceRequests(ask,
RMServerUtils.normalizeAndValidateRequests(ask,
rScheduler.getMaximumResourceCapability(), app.getQueue(),
rScheduler);
} catch (InvalidResourceRequestException e) {
Expand Down
Expand Up @@ -279,7 +279,7 @@ protected void submitApplication(
ApplicationId applicationId = submissionContext.getApplicationId();

RMAppImpl application =
createAndPopulateNewRMApp(submissionContext, submitTime, user);
createAndPopulateNewRMApp(submissionContext, submitTime, user, false);
ApplicationId appId = submissionContext.getApplicationId();

if (UserGroupInformation.isSecurityEnabled()) {
Expand Down Expand Up @@ -316,16 +316,18 @@ protected void recoverApplication(ApplicationStateData appState,
// create and recover app.
RMAppImpl application =
createAndPopulateNewRMApp(appContext, appState.getSubmitTime(),
appState.getUser());
appState.getUser(), true);

application.handle(new RMAppRecoverEvent(appId, rmState));
}

private RMAppImpl createAndPopulateNewRMApp(
ApplicationSubmissionContext submissionContext,
long submitTime, String user)
throws YarnException {
ApplicationSubmissionContext submissionContext, long submitTime,
String user, boolean isRecovery) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext);
ResourceRequest amReq =
validateAndCreateResourceRequest(submissionContext, isRecovery);

// Create RMApp
RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
Expand All @@ -343,7 +345,7 @@ private RMAppImpl createAndPopulateNewRMApp(
String message = "Application with id " + applicationId
+ " is already present! Cannot add a duplicate!";
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
throw new YarnException(message);
}
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
Expand All @@ -356,7 +358,7 @@ private RMAppImpl createAndPopulateNewRMApp(
}

private ResourceRequest validateAndCreateResourceRequest(
ApplicationSubmissionContext submissionContext)
ApplicationSubmissionContext submissionContext, boolean isRecovery)
throws InvalidResourceRequestException {
// Validation of the ApplicationSubmissionContext needs to be completed
// here. Only those fields that are dependent on RM's configuration are
Expand All @@ -365,31 +367,29 @@ private ResourceRequest validateAndCreateResourceRequest(

// Check whether AM resource requirements are within required limits
if (!submissionContext.getUnmanagedAM()) {
ResourceRequest amReq;
if (submissionContext.getAMContainerResourceRequest() != null) {
amReq = submissionContext.getAMContainerResourceRequest();
} else {
amReq =
BuilderUtils.newResourceRequest(
RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
submissionContext.getResource(), 1);
ResourceRequest amReq = submissionContext.getAMContainerResourceRequest();
if (amReq == null) {
amReq = BuilderUtils
.newResourceRequest(RMAppAttemptImpl.AM_CONTAINER_PRIORITY,
ResourceRequest.ANY, submissionContext.getResource(), 1);
}

// set label expression for AM container
if (null == amReq.getNodeLabelExpression()) {
amReq.setNodeLabelExpression(submissionContext
.getNodeLabelExpression());
}

try {
SchedulerUtils.validateResourceRequest(amReq,
SchedulerUtils.normalizeAndValidateRequest(amReq,
scheduler.getMaximumResourceCapability(),
submissionContext.getQueue(), scheduler);
submissionContext.getQueue(), scheduler, isRecovery);
} catch (InvalidResourceRequestException e) {
LOG.warn("RM app submission failed in validating AM resource request"
+ " for application " + submissionContext.getApplicationId(), e);
throw e;
}

SchedulerUtils.normalizeRequest(amReq, scheduler.getResourceCalculator(),
scheduler.getClusterResource(),
scheduler.getMinimumResourceCapability(),
Expand Down
Expand Up @@ -90,11 +90,11 @@ public static List<RMNode> queryRMNodes(RMContext context,
* Utility method to validate a list resource requests, by insuring that the
* requested memory/vcore is non-negative and not greater than max
*/
public static void validateResourceRequests(List<ResourceRequest> ask,
public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
Resource maximumResource, String queueName, YarnScheduler scheduler)
throws InvalidResourceRequestException {
for (ResourceRequest resReq : ask) {
SchedulerUtils.validateResourceRequest(resReq, maximumResource,
SchedulerUtils.normalizeAndvalidateRequest(resReq, maximumResource,
queueName, scheduler);
}
}
Expand Down
Expand Up @@ -940,7 +940,6 @@ public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
appAttempt.amReq.setResourceName(ResourceRequest.ANY);
appAttempt.amReq.setRelaxLocality(true);

// SchedulerUtils.validateResourceRequests is not necessary because
// AM resource has been checked when submission
Allocation amContainerAllocation =
appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
Expand Down
Expand Up @@ -189,14 +189,58 @@ public static void normalizeRequest(
ask.setCapability(normalized);
}

private static void normalizeNodeLabelExpressionInRequest(
ResourceRequest resReq, QueueInfo queueInfo) {

String labelExp = resReq.getNodeLabelExpression();

// if queue has default label expression, and RR doesn't have, use the
// default label expression of queue
if (labelExp == null && queueInfo != null && ResourceRequest.ANY
.equals(resReq.getResourceName())) {
labelExp = queueInfo.getDefaultNodeLabelExpression();
}

// If labelExp still equals to null, set it to be NO_LABEL
if (labelExp == null) {
labelExp = RMNodeLabelsManager.NO_LABEL;
}
resReq.setNodeLabelExpression(labelExp);
}

public static void normalizeAndValidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler,
boolean isRecovery)
throws InvalidResourceRequestException {

QueueInfo queueInfo = null;
try {
queueInfo = scheduler.getQueueInfo(queueName, false, false);
} catch (IOException e) {
// it is possible queue cannot get when queue mapping is set, just ignore
// the queueInfo here, and move forward
}
SchedulerUtils.normalizeNodeLabelExpressionInRequest(resReq, queueInfo);
if (!isRecovery) {
validateResourceRequest(resReq, maximumResource, queueInfo);
}
}

public static void normalizeAndvalidateRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler)
throws InvalidResourceRequestException {
normalizeAndValidateRequest(resReq, maximumResource, queueName, scheduler,
false);
}

/**
* Utility method to validate a resource request, by insuring that the
* requested memory/vcore is non-negative and not greater than max
*
* @throws InvalidResourceRequestException when there is invalid request
*/
public static void validateResourceRequest(ResourceRequest resReq,
Resource maximumResource, String queueName, YarnScheduler scheduler)
Resource maximumResource, QueueInfo queueInfo)
throws InvalidResourceRequestException {
if (resReq.getCapability().getMemory() < 0 ||
resReq.getCapability().getMemory() > maximumResource.getMemory()) {
Expand All @@ -216,31 +260,7 @@ public static void validateResourceRequest(ResourceRequest resReq,
+ resReq.getCapability().getVirtualCores()
+ ", maxVirtualCores=" + maximumResource.getVirtualCores());
}

// Get queue from scheduler
QueueInfo queueInfo = null;
try {
queueInfo = scheduler.getQueueInfo(queueName, false, false);
} catch (IOException e) {
// it is possible queue cannot get when queue mapping is set, just ignore
// the queueInfo here, and move forward
}

// check labels in the resource request.
String labelExp = resReq.getNodeLabelExpression();

// if queue has default label expression, and RR doesn't have, use the
// default label expression of queue
if (labelExp == null && queueInfo != null
&& ResourceRequest.ANY.equals(resReq.getResourceName())) {
labelExp = queueInfo.getDefaultNodeLabelExpression();
}

// If labelExp still equals to null, set it to be NO_LABEL
resReq
.setNodeLabelExpression(labelExp == null ? RMNodeLabelsManager.NO_LABEL
: labelExp);

// we don't allow specify label expression other than resourceName=ANY now
if (!ResourceRequest.ANY.equals(resReq.getResourceName())
&& labelExp != null && !labelExp.trim().isEmpty()) {
Expand Down
Expand Up @@ -1011,4 +1011,30 @@ public void addApplicationSync(ApplicationId applicationId,
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
rm2.waitForState(app1.getApplicationId(), RMAppState.FAILED);
}

/**
* Test validateAndCreateResourceRequest fails on recovery, app should ignore
* this Exception and continue
*/
@Test (timeout = 30000)
public void testAppFailToValidateResourceRequestOnRecovery() throws Exception{
MemoryRMStateStore memStore = new MemoryRMStateStore();
memStore.init(conf);
rm1 = new MockRM(conf, memStore);
rm1.start();
MockNM nm1 =
new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService());
nm1.registerNode();
RMApp app1 = rm1.submitApp(200);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);

// Change the config so that validateAndCreateResourceRequest throws
// exception on recovery
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 50);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 100);

rm2 = new MockRM(conf, memStore);
nm1.setResourceTrackerService(rm2.getResourceTrackerService());
rm2.start();
}
}

0 comments on commit f65eeb4

Please sign in to comment.