Skip to content

Commit

Permalink
YARN-4026. Refactored ContainerAllocator to accept a list of priorite…
Browse files Browse the repository at this point in the history
…s rather than a single priority. Contributed by Wangda Tan
  • Loading branch information
jian-he committed Aug 12, 2015
1 parent 1c12adb commit e5003be
Show file tree
Hide file tree
Showing 7 changed files with 315 additions and 207 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -393,6 +393,9 @@ Release 2.8.0 - UNRELEASED

YARN-3966. Fix excessive loggings in CapacityScheduler. (Jian He via wangda)

YARN-4026. Refactored ContainerAllocator to accept a list of priorites
rather than a single priority. (Wangda Tan via jianhe)

OPTIMIZATIONS

YARN-3339. TestDockerContainerExecutor should pull a single image and not
Expand Down
Expand Up @@ -763,8 +763,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
synchronized (application) {
CSAssignment assignment = application.assignReservedContainer(node, reservedContainer,
clusterResource, schedulingMode);
CSAssignment assignment =
application.assignContainers(clusterResource, node,
currentResourceLimits, schedulingMode, reservedContainer);
handleExcessReservedContainer(clusterResource, assignment);
return assignment;
}
Expand Down Expand Up @@ -812,7 +813,7 @@ public synchronized CSAssignment assignContainers(Resource clusterResource,
// Try to schedule
CSAssignment assignment =
application.assignContainers(clusterResource, node,
currentResourceLimits, schedulingMode);
currentResourceLimits, schedulingMode, null);

if (LOG.isDebugEnabled()) {
LOG.debug("post-assignContainers for application "
Expand Down
Expand Up @@ -25,18 +25,31 @@
import org.apache.hadoop.yarn.util.resource.Resources;

public class ContainerAllocation {
/**
* Skip the locality (e.g. node-local, rack-local, any), and look at other
* localities of the same priority
*/
public static final ContainerAllocation LOCALITY_SKIPPED =
new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);

/**
* Skip the priority, and look at other priorities of the same application
*/
public static final ContainerAllocation PRIORITY_SKIPPED =
new ContainerAllocation(null, null, AllocationState.PRIORITY_SKIPPED);


/**
* Skip the application, and look at other applications of the same queue
*/
public static final ContainerAllocation APP_SKIPPED =
new ContainerAllocation(null, null, AllocationState.APP_SKIPPED);

/**
* Skip the leaf-queue, and look at other queues of the same parent queue
*/
public static final ContainerAllocation QUEUE_SKIPPED =
new ContainerAllocation(null, null, AllocationState.QUEUE_SKIPPED);

public static final ContainerAllocation LOCALITY_SKIPPED =
new ContainerAllocation(null, null, AllocationState.LOCALITY_SKIPPED);


RMContainer containerToBeUnreserved;
private Resource resourceToBeAllocated = Resources.none();
AllocationState state;
Expand All @@ -50,26 +63,26 @@ public ContainerAllocation(RMContainer containerToBeUnreserved,
this.resourceToBeAllocated = resourceToBeAllocated;
this.state = state;
}

public RMContainer getContainerToBeUnreserved() {
return containerToBeUnreserved;
}

public Resource getResourceToBeAllocated() {
if (resourceToBeAllocated == null) {
return Resources.none();
}
return resourceToBeAllocated;
}

public AllocationState getAllocationState() {
return state;
}

public NodeType getContainerNodeType() {
return containerNodeType;
}

public Container getUpdatedContainer() {
return updatedContainer;
}
Expand Down
Expand Up @@ -18,12 +18,15 @@

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator;

import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
Expand All @@ -36,6 +39,8 @@
* extensible.
*/
public abstract class ContainerAllocator {
private static final Log LOG = LogFactory.getLog(ContainerAllocator.class);

FiCaSchedulerApp application;
final ResourceCalculator rc;
final RMContext rmContext;
Expand All @@ -46,27 +51,8 @@ public ContainerAllocator(FiCaSchedulerApp application,
this.rc = rc;
this.rmContext = rmContext;
}

/**
* preAllocation is to perform checks, etc. to see if we can/cannot allocate
* container. It will put necessary information to returned
* {@link ContainerAllocation}.
*/
abstract ContainerAllocation preAllocation(
Resource clusterResource, FiCaSchedulerNode node,
SchedulingMode schedulingMode, ResourceLimits resourceLimits,
Priority priority, RMContainer reservedContainer);

/**
* doAllocation is to update application metrics, create containers, etc.
* According to allocating conclusion decided by preAllocation.
*/
abstract ContainerAllocation doAllocation(
ContainerAllocation allocationResult, Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode, Priority priority,
RMContainer reservedContainer);

boolean checkHeadroom(Resource clusterResource,

protected boolean checkHeadroom(Resource clusterResource,
ResourceLimits currentResourceLimits, Resource required,
FiCaSchedulerNode node) {
// If headroom + currentReservation < required, we cannot allocate this
Expand All @@ -83,6 +69,68 @@ boolean checkHeadroom(Resource clusterResource,
currentResourceLimits.getHeadroom(), resourceCouldBeUnReserved),
required);
}

protected CSAssignment getCSAssignmentFromAllocateResult(
Resource clusterResource, ContainerAllocation result,
RMContainer rmContainer) {
// Handle skipped
boolean skipped =
(result.getAllocationState() == AllocationState.APP_SKIPPED);
CSAssignment assignment = new CSAssignment(skipped);
assignment.setApplication(application);

// Handle excess reservation
assignment.setExcessReservation(result.getContainerToBeUnreserved());

// If we allocated something
if (Resources.greaterThan(rc, clusterResource,
result.getResourceToBeAllocated(), Resources.none())) {
Resource allocatedResource = result.getResourceToBeAllocated();
Container updatedContainer = result.getUpdatedContainer();

assignment.setResource(allocatedResource);
assignment.setType(result.getContainerNodeType());

if (result.getAllocationState() == AllocationState.RESERVED) {
// This is a reserved container
LOG.info("Reserved container " + " application="
+ application.getApplicationId() + " resource=" + allocatedResource
+ " queue=" + this.toString() + " cluster=" + clusterResource);
assignment.getAssignmentInformation().addReservationDetails(
updatedContainer.getId(),
application.getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrReservations();
Resources.addTo(assignment.getAssignmentInformation().getReserved(),
allocatedResource);
} else if (result.getAllocationState() == AllocationState.ALLOCATED){
// This is a new container
// Inform the ordering policy
LOG.info("assignedContainer" + " application attempt="
+ application.getApplicationAttemptId() + " container="
+ updatedContainer.getId() + " queue=" + this + " clusterResource="
+ clusterResource);

application
.getCSLeafQueue()
.getOrderingPolicy()
.containerAllocated(application,
application.getRMContainer(updatedContainer.getId()));

assignment.getAssignmentInformation().addAllocationDetails(
updatedContainer.getId(),
application.getCSLeafQueue().getQueuePath());
assignment.getAssignmentInformation().incrAllocations();
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
allocatedResource);

if (rmContainer != null) {
assignment.setFulfilledReservation(true);
}
}
}

return assignment;
}

/**
* allocate needs to handle following stuffs:
Expand All @@ -96,20 +144,7 @@ boolean checkHeadroom(Resource clusterResource,
* container, this will also update metrics</li>
* </ul>
*/
public ContainerAllocation allocate(Resource clusterResource,
public abstract CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, SchedulingMode schedulingMode,
ResourceLimits resourceLimits, Priority priority,
RMContainer reservedContainer) {
ContainerAllocation result =
preAllocation(clusterResource, node, schedulingMode,
resourceLimits, priority, reservedContainer);

if (AllocationState.ALLOCATED == result.state
|| AllocationState.RESERVED == result.state) {
result = doAllocation(result, clusterResource, node,
schedulingMode, priority, reservedContainer);
}

return result;
}
ResourceLimits resourceLimits, RMContainer reservedContainer);
}

0 comments on commit e5003be

Please sign in to comment.