Skip to content

Commit

Permalink
YARN-3243. CapacityScheduler should pass headroom from parent to chil…
Browse files Browse the repository at this point in the history
…dren to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.
  • Loading branch information
jian-he committed Mar 17, 2015
1 parent a89b087 commit 487374b
Show file tree
Hide file tree
Showing 13 changed files with 561 additions and 517 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -56,6 +56,9 @@ Release 2.8.0 - UNRELEASED


IMPROVEMENTS IMPROVEMENTS


YARN-3243. CapacityScheduler should pass headroom from parent to children
to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)

OPTIMIZATIONS OPTIMIZATIONS


YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not
Expand Down
Expand Up @@ -20,10 +20,13 @@


import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;


import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.authorize.AccessControlList;
Expand All @@ -34,6 +37,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
Expand All @@ -49,6 +53,7 @@
import com.google.common.collect.Sets; import com.google.common.collect.Sets;


public abstract class AbstractCSQueue implements CSQueue { public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);


CSQueue parent; CSQueue parent;
final String queueName; final String queueName;
Expand Down Expand Up @@ -406,21 +411,102 @@ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q) {
parentQ.getPreemptionDisabled()); parentQ.getPreemptionDisabled());
} }


protected Resource getCurrentResourceLimit(Resource clusterResource, private Resource getCurrentLimitResource(String nodeLabel,
ResourceLimits currentResourceLimits) { Resource clusterResource, ResourceLimits currentResourceLimits) {
/* /*
* Queue's max available resource = min(my.max, my.limit) * Current limit resource: For labeled resource: limit = queue-max-resource
* my.limit is set by my parent, considered used resource of my siblings * (TODO, this part need update when we support labeled-limit) For
* non-labeled resource: limit = min(queue-max-resource,
* limit-set-by-parent)
*/ */
Resource queueMaxResource = Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource, Resources.multiplyAndNormalizeDown(resourceCalculator,
queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation); labelManager.getResourceByLabel(nodeLabel, clusterResource),
Resource queueCurrentResourceLimit = queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
Resources.min(resourceCalculator, clusterResource, queueMaxResource, if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
currentResourceLimits.getLimit()); return Resources.min(resourceCalculator, clusterResource,
queueCurrentResourceLimit = queueMaxResource, currentResourceLimits.getLimit());
Resources.roundDown(resourceCalculator, queueCurrentResourceLimit, }
minimumAllocation); return queueMaxResource;
return queueCurrentResourceLimit; }

synchronized boolean canAssignToThisQueue(Resource clusterResource,
Set<String> nodeLabels, ResourceLimits currentResourceLimits,
Resource nowRequired, Resource resourceCouldBeUnreserved) {
// Get label of this queue can access, it's (nodeLabel AND queueLabel)
Set<String> labelCanAccess;
if (null == nodeLabels || nodeLabels.isEmpty()) {
labelCanAccess = new HashSet<String>();
// Any queue can always access any node without label
labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
} else {
labelCanAccess = new HashSet<String>(
accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
: Sets.intersection(accessibleLabels, nodeLabels));
}

for (String label : labelCanAccess) {
// New total resource = used + required
Resource newTotalResource =
Resources.add(queueUsage.getUsed(label), nowRequired);

Resource currentLimitResource =
getCurrentLimitResource(label, clusterResource, currentResourceLimits);

// if reservation continous looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application
// has reserved containers.
// TODO, now only consider reservation cases when the node has no label
if (this.reservationsContinueLooking
&& label.equals(RMNodeLabelsManager.NO_LABEL)
&& Resources.greaterThan(resourceCalculator, clusterResource,
resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource =
Resources.subtract(newTotalResource, resourceCouldBeUnreserved);

// when total-used-without-reserved-resource < currentLimit, we still
// have chance to allocate on this node by unreserving some containers
if (Resources.lessThan(resourceCalculator, clusterResource,
newTotalWithoutReservedResource, currentLimitResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("try to use reserved: " + getQueueName()
+ " usedResources: " + queueUsage.getUsed()
+ ", clusterResources: " + clusterResource
+ ", reservedResources: " + resourceCouldBeUnreserved
+ ", capacity-without-reserved: "
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ currentLimitResource);
}
return true;
}
}

// Otherwise, if any of the label of this node beyond queue limit, we
// cannot allocate on this node. Consider a small epsilon here.
if (Resources.greaterThan(resourceCalculator, clusterResource,
newTotalResource, currentLimitResource)) {
return false;
}

if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
+ "Check assign to queue, label=" + label
+ " usedResources: " + queueUsage.getUsed(label)
+ " clusterResources: " + clusterResource
+ " currentUsedCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
queueUsage.getUsed(label),
labelManager.getResourceByLabel(label, clusterResource))
+ " max-capacity: "
+ queueCapacities.getAbsoluteMaximumCapacity(label)
+ ")");
}
return true;
}

// Actually, this will not happen, since labelCanAccess will be always
// non-empty
return false;
} }
} }
Expand Up @@ -189,13 +189,11 @@ public void finishApplicationAttempt(FiCaSchedulerApp application,
* Assign containers to applications in the queue or it's children (if any). * Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster. * @param clusterResource the resource of the cluster.
* @param node node on which resources are available * @param node node on which resources are available
* @param needToUnreserve assign container only if it can unreserve one first
* @param resourceLimits how much overall resource of this queue can use. * @param resourceLimits how much overall resource of this queue can use.
* @return the assignment * @return the assignment
*/ */
public CSAssignment assignContainers(Resource clusterResource, public CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve, FiCaSchedulerNode node, ResourceLimits resourceLimits);
ResourceLimits resourceLimits);


/** /**
* A container assigned to the queue has completed. * A container assigned to the queue has completed.
Expand Down
Expand Up @@ -1061,9 +1061,14 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
node.getNodeID()); node.getNodeID());


LeafQueue queue = ((LeafQueue)reservedApplication.getQueue()); LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
CSAssignment assignment = queue.assignContainers(clusterResource, node, CSAssignment assignment =
false, new ResourceLimits( queue.assignContainers(
clusterResource)); clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));


RMContainer excessReservation = assignment.getExcessReservation(); RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) { if (excessReservation != null) {
Expand All @@ -1087,8 +1092,13 @@ false, new ResourceLimits(
LOG.debug("Trying to schedule on node: " + node.getNodeName() + LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource()); ", available: " + node.getAvailableResource());
} }
root.assignContainers(clusterResource, node, false, new ResourceLimits( root.assignContainers(
clusterResource)); clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
RMNodeLabelsManager.NO_LABEL, clusterResource)));
} }
} else { } else {
LOG.info("Skipping scheduling since node " + node.getNodeID() + LOG.info("Skipping scheduling since node " + node.getNodeID() +
Expand Down Expand Up @@ -1209,6 +1219,13 @@ private synchronized void addNode(RMNode nodeManager) {
usePortForNodeName, nodeManager.getNodeLabels()); usePortForNodeName, nodeManager.getNodeLabels());
this.nodes.put(nodeManager.getNodeID(), schedulerNode); this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability()); Resources.addTo(clusterResource, nodeManager.getTotalCapability());

// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}

root.updateClusterResource(clusterResource, new ResourceLimits( root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource)); clusterResource));
int numNodes = numNodeManagers.incrementAndGet(); int numNodes = numNodeManagers.incrementAndGet();
Expand All @@ -1220,12 +1237,6 @@ private synchronized void addNode(RMNode nodeManager) {
if (scheduleAsynchronously && numNodes == 1) { if (scheduleAsynchronously && numNodes == 1) {
asyncSchedulerThread.beginSchedule(); asyncSchedulerThread.beginSchedule();
} }

// update this node to node label manager
if (labelManager != null) {
labelManager.activateNode(nodeManager.getNodeID(),
nodeManager.getTotalCapability());
}
} }


private synchronized void removeNode(RMNode nodeInfo) { private synchronized void removeNode(RMNode nodeInfo) {
Expand Down

0 comments on commit 487374b

Please sign in to comment.