Skip to content

Commit

Permalink
YARN-6050. AMs can't be scheduled on racks or nodes (rkanter)
Browse files Browse the repository at this point in the history
  • Loading branch information
rkanter committed Mar 28, 2017
1 parent 64ea62c commit 9bae672
Show file tree
Hide file tree
Showing 32 changed files with 1,212 additions and 140 deletions.
Expand Up @@ -22,6 +22,7 @@
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
Expand Down Expand Up @@ -589,7 +590,8 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
amResourceRequest.setCapability(capability); amResourceRequest.setCapability(capability);
amResourceRequest.setNumContainers(1); amResourceRequest.setNumContainers(1);
amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim()); amResourceRequest.setNodeLabelExpression(amNodelabelExpression.trim());
appContext.setAMContainerResourceRequest(amResourceRequest); appContext.setAMContainerResourceRequests(
Collections.singletonList(amResourceRequest));
} }
// set labels for the Job containers // set labels for the Job containers
appContext.setNodeLabelExpression(jobConf appContext.setNodeLabelExpression(jobConf
Expand Down
Expand Up @@ -571,7 +571,7 @@ public void testNodeLabelExp() throws Exception {
buildSubmitContext(yarnRunner, jobConf); buildSubmitContext(yarnRunner, jobConf);


assertEquals(appSubCtx.getNodeLabelExpression(), "GPU"); assertEquals(appSubCtx.getNodeLabelExpression(), "GPU");
assertEquals(appSubCtx.getAMContainerResourceRequest() assertEquals(appSubCtx.getAMContainerResourceRequests().get(0)
.getNodeLabelExpression(), "highMem"); .getNodeLabelExpression(), "highMem");
} }


Expand Down
Expand Up @@ -18,6 +18,8 @@


package org.apache.hadoop.yarn.api.records; package org.apache.hadoop.yarn.api.records;


import java.util.Collections;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;


Expand Down Expand Up @@ -100,7 +102,7 @@ public static ApplicationSubmissionContext newInstance(
amReq.setNumContainers(1); amReq.setNumContainers(1);
amReq.setRelaxLocality(true); amReq.setRelaxLocality(true);
amReq.setNodeLabelExpression(amContainerLabelExpression); amReq.setNodeLabelExpression(amContainerLabelExpression);
context.setAMContainerResourceRequest(amReq); context.setAMContainerResourceRequests(Collections.singletonList(amReq));
return context; return context;
} }


Expand Down Expand Up @@ -159,7 +161,8 @@ public static ApplicationSubmissionContext newInstance(
context.setApplicationType(applicationType); context.setApplicationType(applicationType);
context.setKeepContainersAcrossApplicationAttempts(keepContainers); context.setKeepContainersAcrossApplicationAttempts(keepContainers);
context.setNodeLabelExpression(appLabelExpression); context.setNodeLabelExpression(appLabelExpression);
context.setAMContainerResourceRequest(resourceRequest); context.setAMContainerResourceRequests(
Collections.singletonList(resourceRequest));
return context; return context;
} }


Expand Down Expand Up @@ -454,29 +457,61 @@ public abstract void setKeepContainersAcrossApplicationAttempts(
public abstract void setNodeLabelExpression(String nodeLabelExpression); public abstract void setNodeLabelExpression(String nodeLabelExpression);


/** /**
* Get ResourceRequest of AM container, if this is not null, scheduler will * Get the ResourceRequest of the AM container.
* use this to acquire resource for AM container. *
* * If this is not null, scheduler will use this to acquire resource for AM
* container.
*
* If this is null, scheduler will assemble a ResourceRequest by using * If this is null, scheduler will assemble a ResourceRequest by using
* <em>getResource</em> and <em>getPriority</em> of * <em>getResource</em> and <em>getPriority</em> of
* <em>ApplicationSubmissionContext</em>. * <em>ApplicationSubmissionContext</em>.
* *
* Number of containers and Priority will be ignore. * Number of containers and Priority will be ignored.
* *
* @return ResourceRequest of AM container * @return ResourceRequest of the AM container
* @deprecated See {@link #getAMContainerResourceRequests()}
*/ */
@Public @Public
@Evolving @Evolving
@Deprecated
public abstract ResourceRequest getAMContainerResourceRequest(); public abstract ResourceRequest getAMContainerResourceRequest();


/** /**
* Set ResourceRequest of AM container * Set ResourceRequest of the AM container
* @param request of AM container * @param request of the AM container
* @deprecated See {@link #setAMContainerResourceRequests(List)}
*/ */
@Public @Public
@Evolving @Evolving
@Deprecated
public abstract void setAMContainerResourceRequest(ResourceRequest request); public abstract void setAMContainerResourceRequest(ResourceRequest request);


/**
* Get the ResourceRequests of the AM container.
*
* If this is not null, scheduler will use this to acquire resource for AM
* container.
*
* If this is null, scheduler will use the ResourceRequest as determined by
* <em>getAMContainerResourceRequest</em> and its behavior.
*
* Number of containers and Priority will be ignored.
*
* @return List of ResourceRequests of the AM container
*/
@Public
@Evolving
public abstract List<ResourceRequest> getAMContainerResourceRequests();

/**
* Set ResourceRequests of the AM container.
* @param requests of the AM container
*/
@Public
@Evolving
public abstract void setAMContainerResourceRequests(
List<ResourceRequest> requests);

/** /**
* Get the attemptFailuresValidityInterval in milliseconds for the application * Get the attemptFailuresValidityInterval in milliseconds for the application
* *
Expand Down
Expand Up @@ -378,7 +378,7 @@ message ApplicationSubmissionContextProto {
optional LogAggregationContextProto log_aggregation_context = 14; optional LogAggregationContextProto log_aggregation_context = 14;
optional ReservationIdProto reservation_id = 15; optional ReservationIdProto reservation_id = 15;
optional string node_label_expression = 16; optional string node_label_expression = 16;
optional ResourceRequestProto am_container_resource_request = 17; repeated ResourceRequestProto am_container_resource_request = 17;
repeated ApplicationTimeoutMapProto application_timeouts = 18; repeated ApplicationTimeoutMapProto application_timeouts = 18;
} }


Expand Down
Expand Up @@ -18,6 +18,8 @@


package org.apache.hadoop.yarn.api.records.impl.pb; package org.apache.hadoop.yarn.api.records.impl.pb;


import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
Expand Down Expand Up @@ -66,7 +68,7 @@ public class ApplicationSubmissionContextPBImpl
private ContainerLaunchContext amContainer = null; private ContainerLaunchContext amContainer = null;
private Resource resource = null; private Resource resource = null;
private Set<String> applicationTags = null; private Set<String> applicationTags = null;
private ResourceRequest amResourceRequest = null; private List<ResourceRequest> amResourceRequests = null;
private LogAggregationContext logAggregationContext = null; private LogAggregationContext logAggregationContext = null;
private ReservationId reservationId = null; private ReservationId reservationId = null;
private Map<ApplicationTimeoutType, Long> applicationTimeouts = null; private Map<ApplicationTimeoutType, Long> applicationTimeouts = null;
Expand Down Expand Up @@ -127,9 +129,10 @@ private void mergeLocalToBuilder() {
builder.clearApplicationTags(); builder.clearApplicationTags();
builder.addAllApplicationTags(this.applicationTags); builder.addAllApplicationTags(this.applicationTags);
} }
if (this.amResourceRequest != null) { if (this.amResourceRequests != null) {
builder.setAmContainerResourceRequest( builder.clearAmContainerResourceRequest();
convertToProtoFormat(this.amResourceRequest)); builder.addAllAmContainerResourceRequest(
convertToProtoFormat(this.amResourceRequests));
} }
if (this.logAggregationContext != null) { if (this.logAggregationContext != null) {
builder.setLogAggregationContext( builder.setLogAggregationContext(
Expand Down Expand Up @@ -430,13 +433,23 @@ private PriorityPBImpl convertFromProtoFormat(PriorityProto p) {
private PriorityProto convertToProtoFormat(Priority t) { private PriorityProto convertToProtoFormat(Priority t) {
return ((PriorityPBImpl)t).getProto(); return ((PriorityPBImpl)t).getProto();
} }


private ResourceRequestPBImpl convertFromProtoFormat(ResourceRequestProto p) { private List<ResourceRequest> convertFromProtoFormat(
return new ResourceRequestPBImpl(p); List<ResourceRequestProto> ps) {
List<ResourceRequest> rs = new ArrayList<>();
for (ResourceRequestProto p : ps) {
rs.add(new ResourceRequestPBImpl(p));
}
return rs;
} }


private ResourceRequestProto convertToProtoFormat(ResourceRequest t) { private List<ResourceRequestProto> convertToProtoFormat(
return ((ResourceRequestPBImpl)t).getProto(); List<ResourceRequest> ts) {
List<ResourceRequestProto> rs = new ArrayList<>(ts.size());
for (ResourceRequest t : ts) {
rs.add(((ResourceRequestPBImpl)t).getProto());
}
return rs;
} }


private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) { private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
Expand Down Expand Up @@ -485,25 +498,46 @@ public void setNodeLabelExpression(String labelExpression) {
} }


@Override @Override
@Deprecated
public ResourceRequest getAMContainerResourceRequest() { public ResourceRequest getAMContainerResourceRequest() {
List<ResourceRequest> reqs = getAMContainerResourceRequests();
if (reqs == null || reqs.isEmpty()) {
return null;
}
return getAMContainerResourceRequests().get(0);
}

@Override
public List<ResourceRequest> getAMContainerResourceRequests() {
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
if (this.amResourceRequest != null) { if (this.amResourceRequests != null) {
return amResourceRequest; return amResourceRequests;
} // Else via proto } // Else via proto
if (!p.hasAmContainerResourceRequest()) { if (p.getAmContainerResourceRequestCount() == 0) {
return null; return null;
} }
amResourceRequest = convertFromProtoFormat(p.getAmContainerResourceRequest()); amResourceRequests =
return amResourceRequest; convertFromProtoFormat(p.getAmContainerResourceRequestList());
return amResourceRequests;
} }


@Override @Override
@Deprecated
public void setAMContainerResourceRequest(ResourceRequest request) { public void setAMContainerResourceRequest(ResourceRequest request) {
maybeInitBuilder(); maybeInitBuilder();
if (request == null) { if (request == null) {
builder.clearAmContainerResourceRequest(); builder.clearAmContainerResourceRequest();
} }
this.amResourceRequest = request; this.amResourceRequests = Collections.singletonList(request);
}

@Override
public void setAMContainerResourceRequests(List<ResourceRequest> requests) {
maybeInitBuilder();
if (requests == null) {
builder.clearAmContainerResourceRequest();
}
this.amResourceRequests = requests;
} }


@Override @Override
Expand Down
Expand Up @@ -801,6 +801,28 @@ private <T> Map<NodeId, Set<T>> generateNodeLabelsInfoPerNode(Class<T> type) {
} }
} }


/**
* Get nodes that have no labels.
*
* @return set of nodes with no labels
*/
public Set<NodeId> getNodesWithoutALabel() {
try {
readLock.lock();
Set<NodeId> nodes = new HashSet<>();
for (Host host : nodeCollections.values()) {
for (NodeId nodeId : host.nms.keySet()) {
if (getLabelsByNode(nodeId).isEmpty()) {
nodes.add(nodeId);
}
}
}
return Collections.unmodifiableSet(nodes);
} finally {
readLock.unlock();
}
}



/** /**
* Get mapping of labels to nodes for all the labels. * Get mapping of labels to nodes for all the labels.
Expand Down

0 comments on commit 9bae672

Please sign in to comment.