Skip to content

Commit

Permalink
YARN-3124. Fixed CS LeafQueue/ParentQueue to use QueueCapacities to t…
Browse files Browse the repository at this point in the history
…rack capacities-by-label. Contributed by Wangda Tan
  • Loading branch information
jian-he committed Feb 12, 2015
1 parent 11d8934 commit 18a5942
Show file tree
Hide file tree
Showing 17 changed files with 412 additions and 547 deletions.
3 changes: 3 additions & 0 deletions hadoop-yarn-project/CHANGES.txt
Expand Up @@ -281,6 +281,9 @@ Release 2.7.0 - UNRELEASED


YARN-3181. FairScheduler: Fix up outdated findbugs issues. (kasha) YARN-3181. FairScheduler: Fix up outdated findbugs issues. (kasha)


YARN-3124. Fixed CS LeafQueue/ParentQueue to use QueueCapacities to track
capacities-by-label. (Wangda Tan via jianhe)

OPTIMIZATIONS OPTIMIZATIONS


YARN-2990. FairScheduler's delay-scheduling always waits for node-local and YARN-2990. FairScheduler's delay-scheduling always waits for node-local and
Expand Down
Expand Up @@ -194,6 +194,8 @@
<Field name="absoluteNodeLabelCapacities" /> <Field name="absoluteNodeLabelCapacities" />
<Field name="reservationsContinueLooking" /> <Field name="reservationsContinueLooking" />
<Field name="absoluteCapacityByNodeLabels" /> <Field name="absoluteCapacityByNodeLabels" />
<Field name="authorizer" />
<Field name="parent" />
</Or> </Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" /> <Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match> </Match>
Expand Down
Expand Up @@ -44,23 +44,15 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator;


import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;


public abstract class AbstractCSQueue implements CSQueue { public abstract class AbstractCSQueue implements CSQueue {


CSQueue parent; CSQueue parent;
final String queueName; final String queueName;
float capacity;
float maximumCapacity;
float absoluteCapacity;
float absoluteMaxCapacity;
float absoluteUsedCapacity = 0.0f;

float usedCapacity = 0.0f;
volatile int numContainers; volatile int numContainers;


final Resource minimumAllocation; Resource minimumAllocation;
Resource maximumAllocation; Resource maximumAllocation;
QueueState state; QueueState state;
final QueueMetrics metrics; final QueueMetrics metrics;
Expand All @@ -70,10 +62,6 @@ public abstract class AbstractCSQueue implements CSQueue {
Set<String> accessibleLabels; Set<String> accessibleLabels;
RMNodeLabelsManager labelManager; RMNodeLabelsManager labelManager;
String defaultLabelExpression; String defaultLabelExpression;
Map<String, Float> absoluteCapacityByNodeLabels;
Map<String, Float> capacitiyByNodeLabels;
Map<String, Float> absoluteMaxCapacityByNodeLabels;
Map<String, Float> maxCapacityByNodeLabels;


Map<AccessType, AccessControlList> acls = Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>(); new HashMap<AccessType, AccessControlList>();
Expand All @@ -83,15 +71,17 @@ public abstract class AbstractCSQueue implements CSQueue {
// Track resource usage-by-label like used-resource/pending-resource, etc. // Track resource usage-by-label like used-resource/pending-resource, etc.
ResourceUsage queueUsage; ResourceUsage queueUsage;


// Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
// etc.
QueueCapacities queueCapacities;

private final RecordFactory recordFactory = private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null); RecordFactoryProvider.getRecordFactory(null);
private CapacitySchedulerContext csContext; protected CapacitySchedulerContext csContext;
protected YarnAuthorizationProvider authorizer = null; protected YarnAuthorizationProvider authorizer = null;


public AbstractCSQueue(CapacitySchedulerContext cs, public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException { String queueName, CSQueue parent, CSQueue old) throws IOException {
this.minimumAllocation = cs.getMinimumResourceCapability();
this.maximumAllocation = cs.getMaximumResourceCapability();
this.labelManager = cs.getRMContext().getNodeLabelManager(); this.labelManager = cs.getRMContext().getNodeLabelManager();
this.parent = parent; this.parent = parent;
this.queueName = queueName; this.queueName = queueName;
Expand All @@ -102,68 +92,55 @@ public AbstractCSQueue(CapacitySchedulerContext cs,
QueueMetrics.forQueue(getQueuePath(), parent, QueueMetrics.forQueue(getQueuePath(), parent,
cs.getConfiguration().getEnableUserMetrics(), cs.getConfiguration().getEnableUserMetrics(),
cs.getConf()); cs.getConf());

// get labels
this.accessibleLabels = cs.getConfiguration().getAccessibleNodeLabels(getQueuePath());
this.defaultLabelExpression = cs.getConfiguration()
.getDefaultNodeLabelExpression(getQueuePath());


// inherit from parent if labels not set
if (this.accessibleLabels == null && parent != null) {
this.accessibleLabels = parent.getAccessibleNodeLabels();
}
SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
this.accessibleLabels);

// inherit from parent if labels not set
if (this.defaultLabelExpression == null && parent != null
&& this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
}

// set capacity by labels
capacitiyByNodeLabels =
cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), accessibleLabels,
labelManager);

// set maximum capacity by labels
maxCapacityByNodeLabels =
cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(),
accessibleLabels, labelManager);
this.csContext = cs; this.csContext = cs;

// initialize ResourceUsage
queueUsage = new ResourceUsage(); queueUsage = new ResourceUsage();
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath()); queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
authorizer = YarnAuthorizationProvider.getInstance(cs.getConf());
// initialize QueueCapacities
queueCapacities = new QueueCapacities(parent == null);
}

protected void setupConfigurableCapacities() {
CSQueueUtils.loadUpdateAndCheckCapacities(
getQueuePath(),
accessibleLabels,
csContext.getConfiguration(),
queueCapacities,
parent == null ? null : parent.getQueueCapacities(),
csContext.getRMContext().getNodeLabelManager());
} }


@Override @Override
public synchronized float getCapacity() { public synchronized float getCapacity() {
return capacity; return queueCapacities.getCapacity();
} }


@Override @Override
public synchronized float getAbsoluteCapacity() { public synchronized float getAbsoluteCapacity() {
return absoluteCapacity; return queueCapacities.getAbsoluteCapacity();
} }


@Override @Override
public float getAbsoluteMaximumCapacity() { public float getAbsoluteMaximumCapacity() {
return absoluteMaxCapacity; return queueCapacities.getAbsoluteMaximumCapacity();
} }


@Override @Override
public synchronized float getAbsoluteUsedCapacity() { public synchronized float getAbsoluteUsedCapacity() {
return absoluteUsedCapacity; return queueCapacities.getAbsoluteUsedCapacity();
} }


@Override @Override
public float getMaximumCapacity() { public float getMaximumCapacity() {
return maximumCapacity; return queueCapacities.getMaximumCapacity();
} }


@Override @Override
public synchronized float getUsedCapacity() { public synchronized float getUsedCapacity() {
return usedCapacity; return queueCapacities.getUsedCapacity();
} }


@Override @Override
Expand Down Expand Up @@ -216,12 +193,12 @@ public boolean hasAccess(QueueACL acl, UserGroupInformation user) {


@Override @Override
public synchronized void setUsedCapacity(float usedCapacity) { public synchronized void setUsedCapacity(float usedCapacity) {
this.usedCapacity = usedCapacity; queueCapacities.setUsedCapacity(usedCapacity);
} }


@Override @Override
public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) { public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
this.absoluteUsedCapacity = absUsedCapacity; queueCapacities.setAbsoluteUsedCapacity(absUsedCapacity);
} }


/** /**
Expand All @@ -230,61 +207,56 @@ public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
*/ */
synchronized void setMaxCapacity(float maximumCapacity) { synchronized void setMaxCapacity(float maximumCapacity) {
// Sanity check // Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); CSQueueUtils.checkMaxCapacity(getQueueName(),
queueCapacities.getCapacity(), maximumCapacity);
float absMaxCapacity = float absMaxCapacity =
CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent); CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, CSQueueUtils.checkAbsoluteCapacity(getQueueName(),
queueCapacities.getAbsoluteCapacity(),
absMaxCapacity); absMaxCapacity);


this.maximumCapacity = maximumCapacity; queueCapacities.setMaximumCapacity(maximumCapacity);
this.absoluteMaxCapacity = absMaxCapacity; queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
}

@Override
public float getAbsActualCapacity() {
// for now, simply return actual capacity = guaranteed capacity for parent
// queue
return absoluteCapacity;
} }


@Override @Override
public String getDefaultNodeLabelExpression() { public String getDefaultNodeLabelExpression() {
return defaultLabelExpression; return defaultLabelExpression;
} }


synchronized void setupQueueConfigs(Resource clusterResource, float capacity, synchronized void setupQueueConfigs(Resource clusterResource)
float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<AccessType, AccessControlList> acls,
Set<String> labels, String defaultLabelExpression,
Map<String, Float> nodeLabelCapacities,
Map<String, Float> maximumNodeLabelCapacities,
boolean reservationContinueLooking, Resource maxAllocation)
throws IOException { throws IOException {
// Sanity check // get labels
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity); this.accessibleLabels =
CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity, csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath());
absoluteMaxCapacity); this.defaultLabelExpression = csContext.getConfiguration()

.getDefaultNodeLabelExpression(getQueuePath());
this.capacity = capacity;
this.absoluteCapacity = absoluteCapacity;

this.maximumCapacity = maximumCapacity;
this.absoluteMaxCapacity = absoluteMaxCapacity;


this.state = state; // inherit from parent if labels not set
if (this.accessibleLabels == null && parent != null) {
this.accessibleLabels = parent.getAccessibleNodeLabels();
}
SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
this.accessibleLabels);

// inherit from parent if labels not set
if (this.defaultLabelExpression == null && parent != null
&& this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
}


this.acls = acls; // After we setup labels, we can setup capacities
setupConfigurableCapacities();


// set labels this.minimumAllocation = csContext.getMinimumResourceCapability();
this.accessibleLabels = labels; this.maximumAllocation =
csContext.getConfiguration().getMaximumAllocationPerQueue(
getQueuePath());


// set label expression authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
this.defaultLabelExpression = defaultLabelExpression;


// copy node label capacity this.state = csContext.getConfiguration().getState(getQueuePath());
this.capacitiyByNodeLabels = new HashMap<String, Float>(nodeLabelCapacities); this.acls = csContext.getConfiguration().getAcls(getQueuePath());
this.maxCapacityByNodeLabels =
new HashMap<String, Float>(maximumNodeLabelCapacities);


// Update metrics // Update metrics
CSQueueUtils.updateQueueStatistics( CSQueueUtils.updateQueueStatistics(
Expand All @@ -311,34 +283,19 @@ synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
} }
} }
} }

// calculate absolute capacity by each node label
this.absoluteCapacityByNodeLabels =
CSQueueUtils.computeAbsoluteCapacityByNodeLabels(
this.capacitiyByNodeLabels, parent);

// calculate maximum capacity by each node label
this.absoluteMaxCapacityByNodeLabels =
CSQueueUtils.computeAbsoluteMaxCapacityByNodeLabels(
maximumNodeLabelCapacities, parent);

// check absoluteMaximumNodeLabelCapacities is valid
CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(),
absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels);

this.reservationsContinueLooking = reservationContinueLooking;


this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this); this.reservationsContinueLooking = csContext.getConfiguration()
.getReservationContinueLook();


this.maximumAllocation = maxAllocation; this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
} }


protected QueueInfo getQueueInfo() { protected QueueInfo getQueueInfo() {
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class); QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
queueInfo.setQueueName(queueName); queueInfo.setQueueName(queueName);
queueInfo.setAccessibleNodeLabels(accessibleLabels); queueInfo.setAccessibleNodeLabels(accessibleLabels);
queueInfo.setCapacity(capacity); queueInfo.setCapacity(queueCapacities.getCapacity());
queueInfo.setMaximumCapacity(maximumCapacity); queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
queueInfo.setQueueState(state); queueInfo.setQueueState(state);
queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression); queueInfo.setDefaultNodeLabelExpression(defaultLabelExpression);
queueInfo.setCurrentCapacity(getUsedCapacity()); queueInfo.setCurrentCapacity(getUsedCapacity());
Expand Down Expand Up @@ -388,51 +345,6 @@ protected synchronized void releaseResource(Resource clusterResource,
--numContainers; --numContainers;
} }


@Private
public float getCapacityByNodeLabel(String label) {
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
if (null == parent) {
return 1f;
}
return getCapacity();
}

if (!capacitiyByNodeLabels.containsKey(label)) {
return 0f;
} else {
return capacitiyByNodeLabels.get(label);
}
}

@Private
public float getAbsoluteCapacityByNodeLabel(String label) {
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
if (null == parent) {
return 1f;
}
return getAbsoluteCapacity();
}

if (!absoluteCapacityByNodeLabels.containsKey(label)) {
return 0f;
} else {
return absoluteCapacityByNodeLabels.get(label);
}
}

@Private
public float getAbsoluteMaximumCapacityByNodeLabel(String label) {
if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
return getAbsoluteMaximumCapacity();
}

if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
return 0f;
} else {
return absoluteMaxCapacityByNodeLabels.get(label);
}
}

@Private @Private
public boolean getReservationContinueLooking() { public boolean getReservationContinueLooking() {
return reservationsContinueLooking; return reservationsContinueLooking;
Expand All @@ -442,20 +354,20 @@ public boolean getReservationContinueLooking() {
public Map<AccessType, AccessControlList> getACLs() { public Map<AccessType, AccessControlList> getACLs() {
return acls; return acls;
} }

@Private @Private
public Resource getUsedResourceByLabel(String nodeLabel) { public boolean getPreemptionDisabled() {
return queueUsage.getUsed(nodeLabel); return preemptionDisabled;
} }


@VisibleForTesting @Private
public ResourceUsage getResourceUsage() { public QueueCapacities getQueueCapacities() {
return queueUsage; return queueCapacities;
} }

@Private @Private
public boolean getPreemptionDisabled() { public ResourceUsage getQueueResourceUsage() {
return preemptionDisabled; return queueUsage;
} }


/** /**
Expand Down

0 comments on commit 18a5942

Please sign in to comment.