Skip to content

Commit

Permalink
YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsa…
Browse files Browse the repository at this point in the history
…ge to track used-resources-by-label. Contributed by Wangda Tan
  • Loading branch information
jian-he committed Mar 20, 2015
1 parent d81109e commit 586348e
Show file tree
Hide file tree
Showing 19 changed files with 509 additions and 129 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -65,6 +65,9 @@ Release 2.8.0 - UNRELEASED
YARN-3357. Move TestFifoScheduler to FIFO package. (Rohith Sharmaks
via devaraj)

YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to
track used-resources-by-label. (Wangda Tan via jianhe)

OPTIMIZATIONS

YARN-3339. TestDockerContainerExecutor should pull a single image and not
Expand Down
Expand Up @@ -358,14 +358,15 @@ public synchronized void recoverContainersOnNode(
container));

// recover scheduler node
nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
SchedulerNode schedulerNode = nodes.get(nm.getNodeID());
schedulerNode.recoverContainer(rmContainer);

// recover queue: update headroom etc.
Queue queue = schedulerAttempt.getQueue();
queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);

// recover scheduler attempt
schedulerAttempt.recoverContainer(rmContainer);
schedulerAttempt.recoverContainer(schedulerNode, rmContainer);

// set master container for the current running AMContainer for this
// attempt.
Expand Down
Expand Up @@ -20,8 +20,6 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -191,6 +189,16 @@ synchronized public void updateResourceRequests(
request.getCapability());
metrics.decrPendingResources(user, lastRequestContainers,
lastRequestCapability);

// update queue:
queue.incPendingResource(
request.getNodeLabelExpression(),
Resources.multiply(request.getCapability(),
request.getNumContainers()));
if (lastRequest != null) {
queue.decPendingResource(lastRequest.getNodeLabelExpression(),
Resources.multiply(lastRequestCapability, lastRequestContainers));
}
}
}
}
Expand Down Expand Up @@ -376,6 +384,9 @@ synchronized private void decrementOutstanding(
if (numOffSwitchContainers == 0) {
checkForDeactivation();
}

queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
offSwitchRequest.getCapability());
}

synchronized private void checkForDeactivation() {
Expand Down Expand Up @@ -404,6 +415,12 @@ synchronized public void move(Queue newQueue) {
request.getCapability());
newMetrics.incrPendingResources(user, request.getNumContainers(),
request.getCapability());

Resource delta = Resources.multiply(request.getCapability(),
request.getNumContainers());
// Update Queue
queue.decPendingResource(request.getNodeLabelExpression(), delta);
newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
}
}
oldMetrics.moveAppFrom(this);
Expand All @@ -423,6 +440,12 @@ synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
request.getCapability());

// Update Queue
queue.decPendingResource(
request.getNodeLabelExpression(),
Resources.multiply(request.getCapability(),
request.getNumContainers()));
}
}
metrics.finishAppAttempt(applicationId, pending, user);
Expand Down
Expand Up @@ -90,4 +90,24 @@ public void recoverContainer(Resource clusterResource,
* @return default label expression
*/
public String getDefaultNodeLabelExpression();

/**
* When new outstanding resource is asked, calling this will increase pending
* resource in a queue.
*
* @param nodeLabel asked by application
* @param resourceToInc new resource asked
*/
public void incPendingResource(String nodeLabel, Resource resourceToInc);

/**
* When an outstanding resource is fulfilled or canceled, calling this will
* decrease pending resource in a queue.
*
* @param nodeLabel
* asked by application
* @param resourceToDec
* new resource asked
*/
public void decPendingResource(String nodeLabel, Resource resourceToDec);
}
Expand Up @@ -20,6 +20,7 @@

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
Expand Down Expand Up @@ -75,14 +76,17 @@ public UsageByLabel(String label) {
};
}

public Resource getUsed() {
return resArr[ResourceType.USED.idx];
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("{used=" + resArr[0] + "%, ");
sb.append("pending=" + resArr[1] + "%, ");
sb.append("am_used=" + resArr[2] + "%, ");
sb.append("reserved=" + resArr[3] + "%, ");
sb.append("headroom=" + resArr[4] + "%}");
sb.append("reserved=" + resArr[3] + "%}");
return sb.toString();
}
}
Expand Down Expand Up @@ -117,6 +121,17 @@ public void decUsed(String label, Resource res) {
public void setUsed(Resource res) {
setUsed(NL, res);
}

public void copyAllUsed(ResourceUsage other) {
try {
writeLock.lock();
for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed()));
}
} finally {
writeLock.unlock();
}
}

public void setUsed(String label, Resource res) {
_set(label, ResourceType.USED, res);
Expand Down
Expand Up @@ -87,13 +87,12 @@ public class SchedulerApplicationAttempt {

private final Multiset<Priority> reReservations = HashMultiset.create();

protected final Resource currentReservation = Resource.newInstance(0, 0);
private Resource resourceLimit = Resource.newInstance(0, 0);
protected Resource currentConsumption = Resource.newInstance(0, 0);
private Resource amResource = Resources.none();
private boolean unmanagedAM = true;
private boolean amRunning = false;
private LogAggregationContext logAggregationContext;

protected ResourceUsage attemptResourceUsage = new ResourceUsage();

protected List<RMContainer> newlyAllocatedContainers =
new ArrayList<RMContainer>();
Expand Down Expand Up @@ -217,11 +216,11 @@ public String getQueueName() {
}

public Resource getAMResource() {
return amResource;
return attemptResourceUsage.getAMUsed();
}

public void setAMResource(Resource amResource) {
this.amResource = amResource;
attemptResourceUsage.setAMUsed(amResource);
}

public boolean isAmRunning() {
Expand Down Expand Up @@ -260,7 +259,7 @@ public synchronized int getReReservations(Priority priority) {
@Stable
@Private
public synchronized Resource getCurrentReservation() {
return currentReservation;
return attemptResourceUsage.getReserved();
}

public Queue getQueue() {
Expand Down Expand Up @@ -311,8 +310,8 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
rmContainer =
new RMContainerImpl(container, getApplicationAttemptId(),
node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
Resources.addTo(currentReservation, container.getResource());
attemptResourceUsage.incReserved(node.getPartition(),
container.getResource());

// Reset the re-reservation count
resetReReservations(priority);
Expand All @@ -336,7 +335,7 @@ public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
+ " reserved container " + rmContainer + " on node " + node
+ ". This attempt currently has " + reservedContainers.size()
+ " reserved containers at priority " + priority
+ "; currentReservation " + currentReservation.getMemory());
+ "; currentReservation " + container.getResource());
}

return rmContainer;
Expand Down Expand Up @@ -402,9 +401,9 @@ public synchronized void showRequests() {
for (Priority priority : getPriorities()) {
Map<String, ResourceRequest> requests = getResourceRequests(priority);
if (requests != null) {
LOG.debug("showRequests:" + " application=" + getApplicationId() +
" headRoom=" + getHeadroom() +
" currentConsumption=" + currentConsumption.getMemory());
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " headRoom=" + getHeadroom() + " currentConsumption="
+ attemptResourceUsage.getUsed().getMemory());
for (ResourceRequest request : requests.values()) {
LOG.debug("showRequests:" + " application=" + getApplicationId()
+ " request=" + request);
Expand All @@ -415,7 +414,7 @@ public synchronized void showRequests() {
}

public Resource getCurrentConsumption() {
return currentConsumption;
return attemptResourceUsage.getUsed();
}

public static class ContainersAndNMTokensAllocation {
Expand Down Expand Up @@ -548,12 +547,17 @@ synchronized AggregateAppResourceUsage getRunningAggregateAppResourceUsage() {
}

public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage();
AggregateAppResourceUsage runningResourceUsage =
getRunningAggregateAppResourceUsage();
Resource usedResourceClone =
Resources.clone(attemptResourceUsage.getUsed());
Resource reservedResourceClone =
Resources.clone(attemptResourceUsage.getReserved());
return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
reservedContainers.size(), Resources.clone(currentConsumption),
Resources.clone(currentReservation),
Resources.add(currentConsumption, currentReservation),
resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
reservedContainers.size(), usedResourceClone, reservedResourceClone,
Resources.add(usedResourceClone, reservedResourceClone),
runningResourceUsage.getMemorySeconds(),
runningResourceUsage.getVcoreSeconds());
}

public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
Expand All @@ -572,7 +576,7 @@ public synchronized void transferStateFromPreviousAttempt(
SchedulerApplicationAttempt appAttempt) {
this.liveContainers = appAttempt.getLiveContainersMap();
// this.reReservations = appAttempt.reReservations;
this.currentConsumption = appAttempt.getCurrentConsumption();
this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage);
this.resourceLimit = appAttempt.getResourceLimit();
// this.currentReservation = appAttempt.currentReservation;
// this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
Expand Down Expand Up @@ -603,7 +607,8 @@ public synchronized void move(Queue newQueue) {
this.queue = newQueue;
}

public synchronized void recoverContainer(RMContainer rmContainer) {
public synchronized void recoverContainer(SchedulerNode node,
RMContainer rmContainer) {
// recover app scheduling info
appSchedulingInfo.recoverContainer(rmContainer);

Expand All @@ -613,8 +618,9 @@ public synchronized void recoverContainer(RMContainer rmContainer) {
LOG.info("SchedulerAttempt " + getApplicationAttemptId()
+ " is recovering container " + rmContainer.getContainerId());
liveContainers.put(rmContainer.getContainerId(), rmContainer);
Resources.addTo(currentConsumption, rmContainer.getContainer()
.getResource());
attemptResourceUsage.incUsed(node.getPartition(), rmContainer
.getContainer().getResource());

// resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
// is called.
// newlyAllocatedContainers.add(rmContainer);
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
Expand Down Expand Up @@ -294,4 +295,17 @@ public Set<String> getLabels() {
public void updateLabels(Set<String> labels) {
this.labels = labels;
}

/**
* Get partition of which the node belongs to, if node-labels of this node is
* empty or null, it belongs to NO_LABEL partition. And since we only support
* one partition for each node (YARN-2694), first label will be its partition.
*/
public String getPartition() {
if (this.labels == null || this.labels.isEmpty()) {
return RMNodeLabelsManager.NO_LABEL;
} else {
return this.labels.iterator().next();
}
}
}
Expand Up @@ -509,4 +509,28 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource,
// non-empty
return false;
}

@Override
public void incPendingResource(String nodeLabel, Resource resourceToInc) {
if (nodeLabel == null) {
nodeLabel = RMNodeLabelsManager.NO_LABEL;
}
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.incPending(nodeLabel, resourceToInc);
if (null != parent) {
parent.incPendingResource(nodeLabel, resourceToInc);
}
}

@Override
public void decPendingResource(String nodeLabel, Resource resourceToDec) {
if (nodeLabel == null) {
nodeLabel = RMNodeLabelsManager.NO_LABEL;
}
// ResourceUsage has its own lock, no addition lock needs here.
queueUsage.decPending(nodeLabel, resourceToDec);
if (null != parent) {
parent.decPendingResource(nodeLabel, resourceToDec);
}
}
}

0 comments on commit 586348e

Please sign in to comment.