Skip to content

Commit

Permalink
YARN-3026. Move application-specific container allocation logic from …
Browse files Browse the repository at this point in the history
…LeafQueue to FiCaSchedulerApp. Contributed by Wangda Tan
  • Loading branch information
jian-he committed Jul 24, 2015
1 parent fc42fa8 commit 83fe34a
Show file tree
Hide file tree
Showing 16 changed files with 1,048 additions and 1,046 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -345,6 +345,9 @@ Release 2.8.0 - UNRELEASED
YARN-3844. Make hadoop-yarn-project Native code -Wall-clean (Alan Burlison YARN-3844. Make hadoop-yarn-project Native code -Wall-clean (Alan Burlison
via Colin P. McCabe) via Colin P. McCabe)


YARN-3026. Move application-specific container allocation logic from
LeafQueue to FiCaSchedulerApp. (Wangda Tan via jianhe)

OPTIMIZATIONS OPTIMIZATIONS


YARN-3339. TestDockerContainerExecutor should pull a single image and not YARN-3339. TestDockerContainerExecutor should pull a single image and not
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -292,7 +292,8 @@ void setNMTokenSecretManager(NMTokenSecretManagerInRM nmTokenSecretManager) {
activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager); activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager);
} }


void setScheduler(ResourceScheduler scheduler) { @VisibleForTesting
public void setScheduler(ResourceScheduler scheduler) {
activeServiceContext.setScheduler(scheduler); activeServiceContext.setScheduler(scheduler);
} }


Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -26,27 +26,40 @@
* that, it's not "extra") resource you can get. * that, it's not "extra") resource you can get.
*/ */
public class ResourceLimits { public class ResourceLimits {
volatile Resource limit; private volatile Resource limit;


// This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES // This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES
// config. This limit indicates how much we need to unreserve to allocate // config. This limit indicates how much we need to unreserve to allocate
// another container. // another container.
private volatile Resource amountNeededUnreserve; private volatile Resource amountNeededUnreserve;


// How much resource you can use for next allocation, if this isn't enough for
// next container allocation, you may need to consider unreserve some
// containers.
private volatile Resource headroom;

public ResourceLimits(Resource limit) { public ResourceLimits(Resource limit) {
this.amountNeededUnreserve = Resources.none(); this(limit, Resources.none());
this.limit = limit;
} }


public ResourceLimits(Resource limit, Resource amountNeededUnreserve) { public ResourceLimits(Resource limit, Resource amountNeededUnreserve) {
this.amountNeededUnreserve = amountNeededUnreserve; this.amountNeededUnreserve = amountNeededUnreserve;
this.headroom = limit;
this.limit = limit; this.limit = limit;
} }


public Resource getLimit() { public Resource getLimit() {
return limit; return limit;
} }


public Resource getHeadroom() {
return headroom;
}

public void setHeadroom(Resource headroom) {
this.headroom = headroom;
}

public Resource getAmountNeededUnreserve() { public Resource getAmountNeededUnreserve() {
return amountNeededUnreserve; return amountNeededUnreserve;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public abstract class AbstractCSQueue implements CSQueue {
volatile int numContainers; volatile int numContainers;


final Resource minimumAllocation; final Resource minimumAllocation;
Resource maximumAllocation; volatile Resource maximumAllocation;
QueueState state; QueueState state;
final CSQueueMetrics metrics; final CSQueueMetrics metrics;
protected final PrivilegedEntity queueEntity; protected final PrivilegedEntity queueEntity;
Expand All @@ -77,7 +77,7 @@ public abstract class AbstractCSQueue implements CSQueue {


Map<AccessType, AccessControlList> acls = Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>(); new HashMap<AccessType, AccessControlList>();
boolean reservationsContinueLooking; volatile boolean reservationsContinueLooking;
private boolean preemptionDisabled; private boolean preemptionDisabled;


// Track resource usage-by-label like used-resource/pending-resource, etc. // Track resource usage-by-label like used-resource/pending-resource, etc.
Expand Down Expand Up @@ -333,7 +333,7 @@ public QueueStatistics getQueueStatistics() {
} }


@Private @Private
public synchronized Resource getMaximumAllocation() { public Resource getMaximumAllocation() {
return maximumAllocation; return maximumAllocation;
} }


Expand Down Expand Up @@ -448,13 +448,8 @@ private Resource getCurrentLimitResource(String nodePartition,
} }


synchronized boolean canAssignToThisQueue(Resource clusterResource, synchronized boolean canAssignToThisQueue(Resource clusterResource,
String nodePartition, ResourceLimits currentResourceLimits, String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved,
Resource nowRequired, Resource resourceCouldBeUnreserved,
SchedulingMode schedulingMode) { SchedulingMode schedulingMode) {
// New total resource = used + required
Resource newTotalResource =
Resources.add(queueUsage.getUsed(nodePartition), nowRequired);

// Get current limited resource: // Get current limited resource:
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
// queues' max capacity. // queues' max capacity.
Expand All @@ -470,8 +465,14 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
getCurrentLimitResource(nodePartition, clusterResource, getCurrentLimitResource(nodePartition, clusterResource,
currentResourceLimits, schedulingMode); currentResourceLimits, schedulingMode);


if (Resources.greaterThan(resourceCalculator, clusterResource, Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
newTotalResource, currentLimitResource)) {
// Set headroom for currentResourceLimits
currentResourceLimits.setHeadroom(Resources.subtract(currentLimitResource,
nowTotalUsed));

if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
nowTotalUsed, currentLimitResource)) {


// if reservation continous looking enabled, check to see if could we // if reservation continous looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application // potentially use this node instead of a reserved node if the application
Expand All @@ -483,7 +484,7 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
resourceCouldBeUnreserved, Resources.none())) { resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved // resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource = Resource newTotalWithoutReservedResource =
Resources.subtract(newTotalResource, resourceCouldBeUnreserved); Resources.subtract(nowTotalUsed, resourceCouldBeUnreserved);


// when total-used-without-reserved-resource < currentLimit, we still // when total-used-without-reserved-resource < currentLimit, we still
// have chance to allocate on this node by unreserving some containers // have chance to allocate on this node by unreserving some containers
Expand All @@ -498,8 +499,6 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
+ newTotalWithoutReservedResource + ", maxLimitCapacity: " + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ currentLimitResource); + currentLimitResource);
} }
currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource,
currentLimitResource));
return true; return true;
} }
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class CSAssignment {


final private Resource resource; final private Resource resource;
private NodeType type; private NodeType type;
private final RMContainer excessReservation; private RMContainer excessReservation;
private final FiCaSchedulerApp application; private FiCaSchedulerApp application;
private final boolean skipped; private final boolean skipped;
private boolean fulfilledReservation; private boolean fulfilledReservation;
private final AssignmentInformation assignmentInformation; private final AssignmentInformation assignmentInformation;
Expand Down Expand Up @@ -80,10 +80,18 @@ public FiCaSchedulerApp getApplication() {
return application; return application;
} }


public void setApplication(FiCaSchedulerApp application) {
this.application = application;
}

public RMContainer getExcessReservation() { public RMContainer getExcessReservation() {
return excessReservation; return excessReservation;
} }


public void setExcessReservation(RMContainer rmContainer) {
excessReservation = rmContainer;
}

public boolean getSkipped() { public boolean getSkipped() {
return skipped; return skipped;
} }
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -25,22 +25,16 @@ public class CapacityHeadroomProvider {
LeafQueue.User user; LeafQueue.User user;
LeafQueue queue; LeafQueue queue;
FiCaSchedulerApp application; FiCaSchedulerApp application;
Resource required;
LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo; LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;


public CapacityHeadroomProvider( public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue,
LeafQueue.User user, FiCaSchedulerApp application,
LeafQueue queue, LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
FiCaSchedulerApp application,
Resource required,
LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {

this.user = user; this.user = user;
this.queue = queue; this.queue = queue;
this.application = application; this.application = application;
this.required = required;
this.queueResourceLimitsInfo = queueResourceLimitsInfo; this.queueResourceLimitsInfo = queueResourceLimitsInfo;

} }


public Resource getHeadroom() { public Resource getHeadroom() {
Expand All @@ -52,7 +46,7 @@ public Resource getHeadroom() {
clusterResource = queueResourceLimitsInfo.getClusterResource(); clusterResource = queueResourceLimitsInfo.getClusterResource();
} }
Resource headroom = queue.getHeadroom(user, queueCurrentLimit, Resource headroom = queue.getHeadroom(user, queueCurrentLimit,
clusterResource, application, required); clusterResource, application);


// Corner case to deal with applications being slightly over-limit // Corner case to deal with applications being slightly over-limit
if (headroom.getMemory() < 0) { if (headroom.getMemory() < 0) {
Expand Down
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -1178,16 +1178,6 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
updateSchedulerHealth(lastNodeUpdateTime, node, tmp); updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
schedulerHealth.updateSchedulerFulfilledReservationCounts(1); schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
} }

RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
Container container = excessReservation.getContainer();
queue.completedContainer(clusterResource, assignment.getApplication(),
node, excessReservation, SchedulerUtils
.createAbnormalContainerStatus(container.getId(),
SchedulerUtils.UNRESERVED_CONTAINER),
RMContainerEventType.RELEASED, null, true);
}
} }


// Try to schedule more if there are no reservations to fulfill // Try to schedule more if there are no reservations to fulfill
Expand Down Expand Up @@ -1241,10 +1231,6 @@ private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
RMNodeLabelsManager.NO_LABEL, clusterResource)), RMNodeLabelsManager.NO_LABEL, clusterResource)),
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY); SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
updateSchedulerHealth(lastNodeUpdateTime, node, assignment); updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
if (Resources.greaterThan(calculator, clusterResource,
assignment.getResource(), Resources.none())) {
return;
}
} }
} else { } else {
LOG.info("Skipping scheduling since node " LOG.info("Skipping scheduling since node "
Expand Down
Loading

0 comments on commit 83fe34a

Please sign in to comment.