From e5003be907acef87c2770e3f2914953f62017b0e Mon Sep 17 00:00:00 2001 From: Jian He Date: Wed, 12 Aug 2015 15:07:50 -0700 Subject: [PATCH] YARN-4026. Refactored ContainerAllocator to accept a list of priorites rather than a single priority. Contributed by Wangda Tan --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/LeafQueue.java | 7 +- .../allocator/ContainerAllocation.java | 33 ++-- .../allocator/ContainerAllocator.java | 109 ++++++++----- .../allocator/RegularContainerAllocator.java | 123 +++++++++++---- .../common/fica/FiCaSchedulerApp.java | 103 +------------ .../scheduler/capacity/TestLeafQueue.java | 144 ++++++++++++++---- 7 files changed, 315 insertions(+), 207 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 199a930eef626..4c70a8a6da495 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -393,6 +393,9 @@ Release 2.8.0 - UNRELEASED YARN-3966. Fix excessive loggings in CapacityScheduler. (Jian He via wangda) + YARN-4026. Refactored ContainerAllocator to accept a list of priorites + rather than a single priority. (Wangda Tan via jianhe) + OPTIMIZATIONS YARN-3339. TestDockerContainerExecutor should pull a single image and not diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 5976f58c2b64e..ff1baff2ee13e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -763,8 +763,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId()); synchronized (application) { - CSAssignment assignment = application.assignReservedContainer(node, reservedContainer, - clusterResource, schedulingMode); + CSAssignment assignment = + application.assignContainers(clusterResource, node, + currentResourceLimits, schedulingMode, reservedContainer); handleExcessReservedContainer(clusterResource, assignment); return assignment; } @@ -812,7 +813,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // Try to schedule CSAssignment assignment = application.assignContainers(clusterResource, node, - currentResourceLimits, schedulingMode); + currentResourceLimits, schedulingMode, null); if (LOG.isDebugEnabled()) { LOG.debug("post-assignContainers for application " diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java index 00c1bb974fb5e..1df9410d1f869 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocation.java @@ -25,18 +25,31 @@ import org.apache.hadoop.yarn.util.resource.Resources; public class ContainerAllocation { + /** + * Skip the locality (e.g. node-local, rack-local, any), and look at other + * localities of the same priority + */ + public static final ContainerAllocation LOCALITY_SKIPPED = + new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED); + + /** + * Skip the priority, and look at other priorities of the same application + */ public static final ContainerAllocation PRIORITY_SKIPPED = new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED); - + + /** + * Skip the application, and look at other applications of the same queue + */ public static final ContainerAllocation APP_SKIPPED = new ContainerAllocation(null, null, AllocationState.APP_SKIPPED); + /** + * Skip the leaf-queue, and look at other queues of the same parent queue + */ public static final ContainerAllocation QUEUE_SKIPPED = new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED); - - public static final ContainerAllocation LOCALITY_SKIPPED = - new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED); - + RMContainer containerToBeUnreserved; private Resource resourceToBeAllocated = Resources.none(); AllocationState state; @@ -50,26 +63,26 @@ public ContainerAllocation(RMContainer containerToBeUnreserved, this.resourceToBeAllocated = resourceToBeAllocated; this.state = state; } - + public RMContainer getContainerToBeUnreserved() { return containerToBeUnreserved; } - + public Resource getResourceToBeAllocated() { if (resourceToBeAllocated == null) { return Resources.none(); } return resourceToBeAllocated; } - + public AllocationState getAllocationState() { return state; } - + public NodeType getContainerNodeType() { return containerNodeType; } - + public Container getUpdatedContainer() { return updatedContainer; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java index b4168dd28e033..6e296cdb83ba9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/ContainerAllocator.java @@ -18,12 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator; -import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -36,6 +39,8 @@ * extensible. */ public abstract class ContainerAllocator { + private static final Log LOG = LogFactory.getLog(ContainerAllocator.class); + FiCaSchedulerApp application; final ResourceCalculator rc; final RMContext rmContext; @@ -46,27 +51,8 @@ public ContainerAllocator(FiCaSchedulerApp application, this.rc = rc; this.rmContext = rmContext; } - - /** - * preAllocation is to perform checks, etc. to see if we can/cannot allocate - * container. It will put necessary information to returned - * {@link ContainerAllocation}. - */ - abstract ContainerAllocation preAllocation( - Resource clusterResource, FiCaSchedulerNode node, - SchedulingMode schedulingMode, ResourceLimits resourceLimits, - Priority priority, RMContainer reservedContainer); - - /** - * doAllocation is to update application metrics, create containers, etc. - * According to allocating conclusion decided by preAllocation. - */ - abstract ContainerAllocation doAllocation( - ContainerAllocation allocationResult, Resource clusterResource, - FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority, - RMContainer reservedContainer); - - boolean checkHeadroom(Resource clusterResource, + + protected boolean checkHeadroom(Resource clusterResource, ResourceLimits currentResourceLimits, Resource required, FiCaSchedulerNode node) { // If headroom + currentReservation < required, we cannot allocate this @@ -83,6 +69,68 @@ boolean checkHeadroom(Resource clusterResource, currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved), required); } + + protected CSAssignment getCSAssignmentFromAllocateResult( + Resource clusterResource, ContainerAllocation result, + RMContainer rmContainer) { + // Handle skipped + boolean skipped = + (result.getAllocationState() == AllocationState.APP_SKIPPED); + CSAssignment assignment = new CSAssignment(skipped); + assignment.setApplication(application); + + // Handle excess reservation + assignment.setExcessReservation(result.getContainerToBeUnreserved()); + + // If we allocated something + if (Resources.greaterThan(rc, clusterResource, + result.getResourceToBeAllocated(), Resources.none())) { + Resource allocatedResource = result.getResourceToBeAllocated(); + Container updatedContainer = result.getUpdatedContainer(); + + assignment.setResource(allocatedResource); + assignment.setType(result.getContainerNodeType()); + + if (result.getAllocationState() == AllocationState.RESERVED) { + // This is a reserved container + LOG.info("Reserved container " + " application=" + + application.getApplicationId() + " resource=" + allocatedResource + + " queue=" + this.toString() + " cluster=" + clusterResource); + assignment.getAssignmentInformation().addReservationDetails( + updatedContainer.getId(), + application.getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrReservations(); + Resources.addTo(assignment.getAssignmentInformation().getReserved(), + allocatedResource); + } else if (result.getAllocationState() == AllocationState.ALLOCATED){ + // This is a new container + // Inform the ordering policy + LOG.info("assignedContainer" + " application attempt=" + + application.getApplicationAttemptId() + " container=" + + updatedContainer.getId() + " queue=" + this + " clusterResource=" + + clusterResource); + + application + .getCSLeafQueue() + .getOrderingPolicy() + .containerAllocated(application, + application.getRMContainer(updatedContainer.getId())); + + assignment.getAssignmentInformation().addAllocationDetails( + updatedContainer.getId(), + application.getCSLeafQueue().getQueuePath()); + assignment.getAssignmentInformation().incrAllocations(); + Resources.addTo(assignment.getAssignmentInformation().getAllocated(), + allocatedResource); + + if (rmContainer != null) { + assignment.setFulfilledReservation(true); + } + } + } + + return assignment; + } /** * allocate needs to handle following stuffs: @@ -96,20 +144,7 @@ boolean checkHeadroom(Resource clusterResource, * container, this will also update metrics * */ - public ContainerAllocation allocate(Resource clusterResource, + public abstract CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, - ResourceLimits resourceLimits, Priority priority, - RMContainer reservedContainer) { - ContainerAllocation result = - preAllocation(clusterResource, node, schedulingMode, - resourceLimits, priority, reservedContainer); - - if (AllocationState.ALLOCATED == result.state - || AllocationState.RESERVED == result.state) { - result = doAllocation(result, clusterResource, node, - schedulingMode, priority, reservedContainer); - } - - return result; - } + ResourceLimits resourceLimits, RMContainer reservedContainer); } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java index 6effcd3a64eb8..dcb99ed806313 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/allocator/RegularContainerAllocator.java @@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; @@ -154,7 +155,6 @@ private ContainerAllocation preCheckForNewContainer(Resource clusterResource, return null; } - @Override ContainerAllocation preAllocation(Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, ResourceLimits resourceLimits, Priority priority, @@ -295,14 +295,14 @@ private ContainerAllocation assignOffSwitchContainers( schedulingMode, currentResoureLimits); } - return ContainerAllocation.QUEUE_SKIPPED; + return ContainerAllocation.APP_SKIPPED; } private ContainerAllocation assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, Priority priority, RMContainer reservedContainer, SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { - ContainerAllocation assigned; + ContainerAllocation allocation; NodeType requestType = null; // Data-local @@ -310,14 +310,14 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, application.getResourceRequest(priority, node.getNodeName()); if (nodeLocalResourceRequest != null) { requestType = NodeType.NODE_LOCAL; - assigned = + allocation = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, priority, reservedContainer, schedulingMode, currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, - assigned.getResourceToBeAllocated(), Resources.none())) { - assigned.requestNodeType = requestType; - return assigned; + allocation.getResourceToBeAllocated(), Resources.none())) { + allocation.requestNodeType = requestType; + return allocation; } } @@ -333,14 +333,14 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, requestType = NodeType.RACK_LOCAL; } - assigned = + allocation = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, priority, reservedContainer, schedulingMode, currentResoureLimits); if (Resources.greaterThan(rc, clusterResource, - assigned.getResourceToBeAllocated(), Resources.none())) { - assigned.requestNodeType = requestType; - return assigned; + allocation.getResourceToBeAllocated(), Resources.none())) { + allocation.requestNodeType = requestType; + return allocation; } } @@ -356,13 +356,19 @@ private ContainerAllocation assignContainersOnNode(Resource clusterResource, requestType = NodeType.OFF_SWITCH; } - assigned = + allocation = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, node, priority, reservedContainer, schedulingMode, currentResoureLimits); - assigned.requestNodeType = requestType; + allocation.requestNodeType = requestType; + + // When a returned allocation is LOCALITY_SKIPPED, since we're in + // off-switch request now, we will skip this app w.r.t priorities + if (allocation.state == AllocationState.LOCALITY_SKIPPED) { + allocation.state = AllocationState.APP_SKIPPED; + } - return assigned; + return allocation; } return ContainerAllocation.PRIORITY_SKIPPED; @@ -388,7 +394,7 @@ private ContainerAllocation assignContainer(Resource clusterResource, // to label not match. This can be caused by node label changed // We should un-reserve this container. return new ContainerAllocation(rmContainer, null, - AllocationState.QUEUE_SKIPPED); + AllocationState.LOCALITY_SKIPPED); } Resource capability = request.getCapability(); @@ -400,7 +406,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, LOG.warn("Node : " + node.getNodeID() + " does not have sufficient resource for request : " + request + " node total capability : " + node.getTotalResource()); - return ContainerAllocation.QUEUE_SKIPPED; + // Skip this locality request + return ContainerAllocation.LOCALITY_SKIPPED; } assert Resources.greaterThan( @@ -457,7 +464,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, // continue)). If we failed to unreserve some resource, we can't // continue. if (null == unreservedContainer) { - return ContainerAllocation.QUEUE_SKIPPED; + // Skip the locality request + return ContainerAllocation.LOCALITY_SKIPPED; } } } @@ -468,19 +476,20 @@ private ContainerAllocation assignContainer(Resource clusterResource, result.containerNodeType = type; return result; } else { - // if we are allowed to allocate but this node doesn't have space, reserve it or - // if this was an already a reserved container, reserve it again + // if we are allowed to allocate but this node doesn't have space, reserve + // it or if this was an already a reserved container, reserve it again if (shouldAllocOrReserveNewContainer || rmContainer != null) { if (reservationsContinueLooking && rmContainer == null) { // we could possibly ignoring queue capacity or user limits when - // reservationsContinueLooking is set. Make sure we didn't need to unreserve - // one. + // reservationsContinueLooking is set. Make sure we didn't need to + // unreserve one. if (needToUnreserve) { if (LOG.isDebugEnabled()) { LOG.debug("we needed to unreserve to be able to allocate"); } - return ContainerAllocation.QUEUE_SKIPPED; + // Skip the locality request + return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -490,7 +499,8 @@ private ContainerAllocation assignContainer(Resource clusterResource, result.containerNodeType = type; return result; } - return ContainerAllocation.QUEUE_SKIPPED; + // Skip the locality request + return ContainerAllocation.LOCALITY_SKIPPED; } } @@ -563,8 +573,7 @@ private ContainerAllocation handleNewContainerAllocation( // Skip this app if we failed to allocate. ContainerAllocation ret = new ContainerAllocation(allocationResult.containerToBeUnreserved, - null, AllocationState.QUEUE_SKIPPED); - ret.state = AllocationState.APP_SKIPPED; + null, AllocationState.APP_SKIPPED); return ret; } @@ -578,7 +587,6 @@ private ContainerAllocation handleNewContainerAllocation( return allocationResult; } - @Override ContainerAllocation doAllocation(ContainerAllocation allocationResult, Resource clusterResource, FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority, @@ -591,7 +599,7 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, // something went wrong getting/creating the container if (container == null) { LOG.warn("Couldn't get container for allocation!"); - return ContainerAllocation.QUEUE_SKIPPED; + return ContainerAllocation.APP_SKIPPED; } if (allocationResult.getAllocationState() == AllocationState.ALLOCATED) { @@ -626,4 +634,65 @@ ContainerAllocation doAllocation(ContainerAllocation allocationResult, return allocationResult; } + + private ContainerAllocation allocate(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, Priority priority, + RMContainer reservedContainer) { + ContainerAllocation result = + preAllocation(clusterResource, node, schedulingMode, resourceLimits, + priority, reservedContainer); + + if (AllocationState.ALLOCATED == result.state + || AllocationState.RESERVED == result.state) { + result = + doAllocation(result, clusterResource, node, schedulingMode, priority, + reservedContainer); + } + + return result; + } + + @Override + public CSAssignment assignContainers(Resource clusterResource, + FiCaSchedulerNode node, SchedulingMode schedulingMode, + ResourceLimits resourceLimits, + RMContainer reservedContainer) { + if (reservedContainer == null) { + // Check if application needs more resource, skip if it doesn't need more. + if (!application.hasPendingResourceRequest(rc, + node.getPartition(), clusterResource, schedulingMode)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId() + + ", because it doesn't need more resource, schedulingMode=" + + schedulingMode.name() + " node-label=" + node.getPartition()); + } + return CSAssignment.SKIP_ASSIGNMENT; + } + + // Schedule in priority order + for (Priority priority : application.getPriorities()) { + ContainerAllocation result = + allocate(clusterResource, node, schedulingMode, resourceLimits, + priority, null); + + AllocationState allocationState = result.getAllocationState(); + if (allocationState == AllocationState.PRIORITY_SKIPPED) { + continue; + } + return getCSAssignmentFromAllocateResult(clusterResource, result, + null); + } + + // We will reach here if we skipped all priorities of the app, so we will + // skip the app. + return CSAssignment.SKIP_ASSIGNMENT; + } else { + ContainerAllocation result = + allocate(clusterResource, node, schedulingMode, resourceLimits, + reservedContainer.getReservedPriority(), reservedContainer); + return getCSAssignmentFromAllocateResult(clusterResource, result, + reservedContainer); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index f9a6bc25186dd..74d77f59b1883 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -57,10 +57,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AllocationState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.RegularContainerAllocator; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocation; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; @@ -280,7 +278,7 @@ public synchronized Resource getTotalPendingRequests() { return ret; } - public synchronized void addPreemptContainer(ContainerId cont){ + public synchronized void addPreemptContainer(ContainerId cont) { // ignore already completed containers if (liveContainers.containsKey(cont)) { containersToPreempt.add(cont); @@ -430,112 +428,19 @@ public RMContainer findNodeToUnreserve(Resource clusterResource, public LeafQueue getCSLeafQueue() { return (LeafQueue)queue; } - - private CSAssignment getCSAssignmentFromAllocateResult( - Resource clusterResource, ContainerAllocation result) { - // Handle skipped - boolean skipped = - (result.getAllocationState() == AllocationState.APP_SKIPPED); - CSAssignment assignment = new CSAssignment(skipped); - assignment.setApplication(this); - - // Handle excess reservation - assignment.setExcessReservation(result.getContainerToBeUnreserved()); - - // If we allocated something - if (Resources.greaterThan(rc, clusterResource, - result.getResourceToBeAllocated(), Resources.none())) { - Resource allocatedResource = result.getResourceToBeAllocated(); - Container updatedContainer = result.getUpdatedContainer(); - - assignment.setResource(allocatedResource); - assignment.setType(result.getContainerNodeType()); - - if (result.getAllocationState() == AllocationState.RESERVED) { - // This is a reserved container - LOG.info("Reserved container " + " application=" + getApplicationId() - + " resource=" + allocatedResource + " queue=" - + this.toString() + " cluster=" + clusterResource); - assignment.getAssignmentInformation().addReservationDetails( - updatedContainer.getId(), getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrReservations(); - Resources.addTo(assignment.getAssignmentInformation().getReserved(), - allocatedResource); - assignment.setFulfilledReservation(true); - } else { - // This is a new container - // Inform the ordering policy - LOG.info("assignedContainer" + " application attempt=" - + getApplicationAttemptId() + " container=" - + updatedContainer.getId() + " queue=" + this + " clusterResource=" - + clusterResource); - - getCSLeafQueue().getOrderingPolicy().containerAllocated(this, - getRMContainer(updatedContainer.getId())); - - assignment.getAssignmentInformation().addAllocationDetails( - updatedContainer.getId(), getCSLeafQueue().getQueuePath()); - assignment.getAssignmentInformation().incrAllocations(); - Resources.addTo(assignment.getAssignmentInformation().getAllocated(), - allocatedResource); - } - } - - return assignment; - } public CSAssignment assignContainers(Resource clusterResource, FiCaSchedulerNode node, ResourceLimits currentResourceLimits, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, RMContainer reservedContainer) { if (LOG.isDebugEnabled()) { LOG.debug("pre-assignContainers for application " + getApplicationId()); showRequests(); } - // Check if application needs more resource, skip if it doesn't need more. - if (!hasPendingResourceRequest(rc, - node.getPartition(), clusterResource, schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("Skip app_attempt=" + getApplicationAttemptId() - + ", because it doesn't need more resource, schedulingMode=" - + schedulingMode.name() + " node-label=" + node.getPartition()); - } - return CSAssignment.SKIP_ASSIGNMENT; - } - synchronized (this) { - // Schedule in priority order - for (Priority priority : getPriorities()) { - ContainerAllocation allocationResult = - containerAllocator.allocate(clusterResource, node, - schedulingMode, currentResourceLimits, priority, null); - - // If it's a skipped allocation - AllocationState allocationState = allocationResult.getAllocationState(); - - if (allocationState == AllocationState.PRIORITY_SKIPPED) { - continue; - } - return getCSAssignmentFromAllocateResult(clusterResource, - allocationResult); - } + return containerAllocator.assignContainers(clusterResource, node, + schedulingMode, currentResourceLimits, reservedContainer); } - - // We will reach here if we skipped all priorities of the app, so we will - // skip the app. - return CSAssignment.SKIP_ASSIGNMENT; - } - - - public synchronized CSAssignment assignReservedContainer( - FiCaSchedulerNode node, RMContainer rmContainer, - Resource clusterResource, SchedulingMode schedulingMode) { - ContainerAllocation result = - containerAllocator.allocate(clusterResource, node, - schedulingMode, new ResourceLimits(Resources.none()), - rmContainer.getReservedPriority(), rmContainer); - - return getCSAssignmentFromAllocateResult(clusterResource, result); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 0efadc14d0ba5..fe8be06f35412 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -622,16 +622,9 @@ public void testUserLimits() throws Exception { final ApplicationAttemptId appAttemptId_1 = TestUtils.getMockApplicationAttemptId(1, 0); FiCaSchedulerApp app_1 = - new FiCaSchedulerApp(appAttemptId_1, user_0, a, - a.getActiveUsersManager(), spyRMContext); - a.submitApplicationAttempt(app_1, user_0); // same user - - final ApplicationAttemptId appAttemptId_2 = - TestUtils.getMockApplicationAttemptId(2, 0); - FiCaSchedulerApp app_2 = - new FiCaSchedulerApp(appAttemptId_2, user_1, a, + new FiCaSchedulerApp(appAttemptId_1, user_1, a, a.getActiveUsersManager(), spyRMContext); - a.submitApplicationAttempt(app_2, user_1); + a.submitApplicationAttempt(app_1, user_1); // different user // Setup some nodes String host_0 = "127.0.0.1"; @@ -647,7 +640,7 @@ public void testUserLimits() throws Exception { // Setup resource-requests Priority priority = TestUtils.createMockPriority(1); app_0.updateResourceRequests(Collections.singletonList( - TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true, + TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true, priority, recordFactory))); app_1.updateResourceRequests(Collections.singletonList( @@ -662,39 +655,38 @@ public void testUserLimits() throws Exception { a.setUserLimit(50); a.setUserLimitFactor(2); - // Now, only user_0 should be active since he is the only one with - // outstanding requests - assertEquals("There should only be 1 active user!", - 1, a.getActiveUsersManager().getNumActiveUsers()); - - // This commented code is key to test 'activeUsers'. - // It should fail the test if uncommented since - // it would increase 'activeUsers' to 2 and stop user_2 - // Pre MAPREDUCE-3732 this test should fail without this block too -// app_2.updateResourceRequests(Collections.singletonList( -// TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority, -// recordFactory))); + // There're two active users + assertEquals(2, a.getActiveUsersManager().getNumActiveUsers()); // 1 container to user_0 a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - assertEquals(2*GB, a.getUsedResources().getMemory()); - assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(3*GB, a.getUsedResources().getMemory()); + assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(0*GB, app_1.getCurrentConsumption().getMemory()); - // Again one to user_0 since he hasn't exceeded user limit yet + // Allocate one container to app_1. Even if app_0 + // submit earlier, it cannot get this container assigned since user_0 + // exceeded user-limit already. a.assignContainers(clusterResource, node_0, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - assertEquals(3*GB, a.getUsedResources().getMemory()); - assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(4*GB, a.getUsedResources().getMemory()); + assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); - // One more to user_0 since he is the only active user + // Allocate one container to app_0, before allocating this container, + // user-limit = ceil((4 + 1) / 2) = 3G. app_0's used resource (3G) <= + // user-limit. a.assignContainers(clusterResource, node_1, new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); - assertEquals(4*GB, a.getUsedResources().getMemory()); - assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); - assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); + assertEquals(7*GB, a.getUsedResources().getMemory()); + assertEquals(6*GB, app_0.getCurrentConsumption().getMemory()); + assertEquals(1*GB, app_1.getCurrentConsumption().getMemory()); + + // app_0 doesn't have outstanding resources, there's only one active user. + assertEquals("There should only be 1 active user!", + 1, a.getActiveUsersManager().getNumActiveUsers()); + } @Test @@ -2569,6 +2561,96 @@ public void testFairAssignment() throws Exception { Assert.assertEquals(3*GB, app_0.getCurrentConsumption().getMemory()); } + + @Test + public void testLocalityDelaySkipsApplication() throws Exception { + + // Manipulate queue 'a' + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + + // User + String user_0 = "user_0"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); + a.submitApplicationAttempt(app_0, user_0); + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_0, a, + mock(ActiveUsersManager.class), spyRMContext); + a.submitApplicationAttempt(app_1, user_0); + + // Setup some nodes and racks + String host_0 = "127.0.0.1"; + String rack_0 = "rack_0"; + FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 8*GB); + + String host_1 = "127.0.0.2"; + String rack_1 = "rack_1"; + FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, rack_1, 0, 8*GB); + + String host_2 = "127.0.0.3"; + String rack_2 = "rack_2"; + FiCaSchedulerNode node_2 = TestUtils.getMockNode(host_2, rack_2, 0, 8*GB); + + final int numNodes = 3; + Resource clusterResource = + Resources.createResource(numNodes * (8*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + + // Setup resource-requests and submit + // App0 has node local request for host_0/host_1, and app1 has node local + // request for host2. + Priority priority = TestUtils.createMockPriority(1); + List app_0_requests_0 = new ArrayList(); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_0, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_0, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(host_1, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(rack_1, 1*GB, 1, + true, priority, recordFactory)); + app_0_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra + true, priority, recordFactory)); + app_0.updateResourceRequests(app_0_requests_0); + + List app_1_requests_0 = new ArrayList(); + app_1_requests_0.add( + TestUtils.createResourceRequest(host_2, 1*GB, 1, + true, priority, recordFactory)); + app_1_requests_0.add( + TestUtils.createResourceRequest(rack_2, 1*GB, 1, + true, priority, recordFactory)); + app_1_requests_0.add( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // one extra + true, priority, recordFactory)); + app_1.updateResourceRequests(app_1_requests_0); + + // Start testing... + // When doing allocation, even if app_0 submit earlier than app_1, app_1 can + // still get allocated because app_0 is waiting for node-locality-delay + CSAssignment assignment = null; + + // Check app_0's scheduling opportunities increased and app_1 get allocated + assignment = a.assignContainers(clusterResource, node_2, + new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); + verifyContainerAllocated(assignment, NodeType.NODE_LOCAL); + assertEquals(1, app_0.getSchedulingOpportunities(priority)); + assertEquals(3, app_0.getTotalRequiredResources(priority)); + assertEquals(0, app_0.getLiveContainers().size()); + assertEquals(1, app_1.getLiveContainers().size()); + } private List createListOfApps(int noOfApps, String user, LeafQueue defaultQueue) {