Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueMetrics;

@Metrics(context = "yarn")
public class PartitionQueueMetrics extends QueueMetrics {
public class PartitionQueueMetrics extends CSQueueMetrics {

private String partition;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,13 @@ protected static StringBuilder sourceName(String queueName) {
return sb;
}

static StringBuilder pSourceName(String partition) {
protected static StringBuilder pSourceName(String partition) {
StringBuilder sb = new StringBuilder(P_RECORD_INFO.name());
sb.append(",partition").append('=').append(partition);
return sb;
}

static StringBuilder qSourceName(String queueName) {
protected static StringBuilder qSourceName(String queueName) {
StringBuilder sb = new StringBuilder();
int i = 0;
for (String node : Q_SPLITTER.split(queueName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1961,10 +1961,8 @@ public void updateClusterResource(Resource clusterResource,
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
RMNodeLabelsManager.NO_LABEL, this);
labelManager, clusterResource, this);

// queue metrics are updated, more resource may be available
// activate the pending applications if possible
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,8 @@ public class CSQueueMetrics extends QueueMetrics {

private CSQueueMetricsForCustomResources csQueueMetricsForCustomResources;

CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
protected CSQueueMetrics(MetricsSystem ms, String queueName, Queue parent,
boolean enableUserMetrics, Configuration conf) {
super(ms, queueName, parent, enableUserMetrics, conf);
}

Expand Down Expand Up @@ -120,50 +120,91 @@ public long getUsedAMResourceVCores() {
}

public void setAMResouceLimit(String partition, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
AMResourceLimitMB.set(res.getMemorySize());
AMResourceLimitVCores.set(res.getVirtualCores());
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.AMResourceLimitMB.set(res.getMemorySize());
partitionQueueMetrics.AMResourceLimitVCores.set(res.getVirtualCores());
}
}

public void setAMResouceLimitForUser(String partition,
String user, Resource res) {
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAMResouceLimit(partition, res);
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAMResouceLimit(partition, res);
}
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
CSQueueMetrics userMetrics = (CSQueueMetrics) partitionQueueMetrics.getUserMetrics(user);
if (userMetrics != null) {
userMetrics.setAMResouceLimit(partition, res);
}
}
}

public void incAMUsed(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
usedAMResourceMB.incr(res.getMemorySize());
usedAMResourceVCores.incr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
userMetrics.incAMUsed(partition, user, res);
}
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.usedAMResourceMB.incr(res.getMemorySize());
partitionQueueMetrics.usedAMResourceVCores.incr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) partitionQueueMetrics.getUserMetrics(user);
if (userMetrics != null) {
userMetrics.incAMUsed(partition, user, res);
}
}
}

public void decAMUsed(String partition, String user, Resource res) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
usedAMResourceMB.decr(res.getMemorySize());
usedAMResourceVCores.decr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user);
if (userMetrics != null) {
userMetrics.decAMUsed(partition, user, res);
}
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.usedAMResourceMB.decr(res.getMemorySize());
partitionQueueMetrics.usedAMResourceVCores.decr(res.getVirtualCores());
CSQueueMetrics userMetrics = (CSQueueMetrics) partitionQueueMetrics.getUserMetrics(user);
if (userMetrics != null) {
userMetrics.decAMUsed(partition, user, res);
}
}
}

public float getUsedCapacity() {
return usedCapacity.value();
}

public void setUsedCapacity(String partition, float usedCap) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
this.usedCapacity.set(usedCap);
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.usedCapacity.set(usedCap);
}
}

public float getAbsoluteUsedCapacity() {
Expand All @@ -172,9 +213,14 @@ public float getAbsoluteUsedCapacity() {

public void setAbsoluteUsedCapacity(String partition,
Float absoluteUsedCap) {
if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) {
this.absoluteUsedCapacity.set(absoluteUsedCap);
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.absoluteUsedCapacity.set(absoluteUsedCap);
}
}

public long getGuaranteedMB() {
Expand All @@ -196,6 +242,19 @@ public void setGuaranteedResources(String partition, Resource res) {
GUARANTEED_CAPACITY_METRIC_PREFIX, GUARANTEED_CAPACITY_METRIC_DESC);
}
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.guaranteedMB.set(res.getMemorySize());
partitionQueueMetrics.guaranteedVCores.set(res.getVirtualCores());
if (partitionQueueMetrics.csQueueMetricsForCustomResources != null) {
partitionQueueMetrics.csQueueMetricsForCustomResources.setGuaranteedCapacity(res);
partitionQueueMetrics.csQueueMetricsForCustomResources.registerCustomResources(
partitionQueueMetrics.csQueueMetricsForCustomResources.getGuaranteedCapacity(),
partitionQueueMetrics.registry, GUARANTEED_CAPACITY_METRIC_PREFIX,
GUARANTEED_CAPACITY_METRIC_DESC);
}
}
}

public long getMaxCapacityMB() {
Expand All @@ -217,6 +276,19 @@ public void setMaxCapacityResources(String partition, Resource res) {
MAX_CAPACITY_METRIC_PREFIX, MAX_CAPACITY_METRIC_DESC);
}
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.maxCapacityMB.set(res.getMemorySize());
partitionQueueMetrics.maxCapacityVCores.set(res.getVirtualCores());
if (partitionQueueMetrics.csQueueMetricsForCustomResources != null) {
partitionQueueMetrics.csQueueMetricsForCustomResources.setMaxCapacity(res);
partitionQueueMetrics.csQueueMetricsForCustomResources.registerCustomResources(
partitionQueueMetrics.csQueueMetricsForCustomResources.getMaxCapacity(),
partitionQueueMetrics.registry,
MAX_CAPACITY_METRIC_PREFIX, MAX_CAPACITY_METRIC_DESC);
}
}
}

@Override
Expand Down Expand Up @@ -283,6 +355,12 @@ public void setGuaranteedCapacities(String partition, float capacity,
guaranteedCapacity.set(capacity);
guaranteedAbsoluteCapacity.set(absoluteCapacity);
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.guaranteedCapacity.set(capacity);
partitionQueueMetrics.guaranteedAbsoluteCapacity.set(absoluteCapacity);
}
}

public float getMaxCapacity() {
Expand All @@ -299,5 +377,11 @@ public void setMaxCapacities(String partition, float capacity,
maxCapacity.set(capacity);
maxAbsoluteCapacity.set(absoluteCapacity);
}

CSQueueMetrics partitionQueueMetrics = (CSQueueMetrics) getPartitionQueueMetrics(partition);
if (partitionQueueMetrics != null) {
partitionQueueMetrics.maxCapacity.set(capacity);
partitionQueueMetrics.maxAbsoluteCapacity.set(absoluteCapacity);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -167,15 +167,10 @@ public static void updateUsedCapacity(final ResourceCalculator rc,
queueCapacities
.setAbsoluteReservedCapacity(nodePartition, absoluteReservedCapacity);

// QueueMetrics does not support per-label capacities,
// so we report values only for the default partition.

queueMetrics.setUsedCapacity(nodePartition,
queueCapacities.getUsedCapacity(RMNodeLabelsManager.NO_LABEL));
queueCapacities.getUsedCapacity(nodePartition));
queueMetrics.setAbsoluteUsedCapacity(nodePartition,
queueCapacities.getAbsoluteUsedCapacity(
RMNodeLabelsManager.NO_LABEL));

queueCapacities.getAbsoluteUsedCapacity(nodePartition));
}

private static Resource getMaxAvailableResourceToQueuePartition(
Expand Down Expand Up @@ -249,29 +244,6 @@ public static void updateQueueStatistics(
}
}

/**
* Updated configured capacity/max-capacity for queue.
* @param rc resource calculator
* @param partitionResource total cluster resources for this partition
* @param partition partition being updated
* @param queue queue
*/
public static void updateConfiguredCapacityMetrics(ResourceCalculator rc,
Resource partitionResource, String partition, AbstractCSQueue queue) {
queue.getMetrics().setGuaranteedResources(partition, rc.multiplyAndNormalizeDown(
partitionResource, queue.getQueueCapacities().getAbsoluteCapacity(partition),
queue.getMinimumAllocation()));
queue.getMetrics().setMaxCapacityResources(partition, rc.multiplyAndNormalizeDown(
partitionResource, queue.getQueueCapacities().getAbsoluteMaximumCapacity(partition),
queue.getMinimumAllocation()));
queue.getMetrics().setGuaranteedCapacities(partition,
queue.getQueueCapacities().getCapacity(partition),
queue.getQueueCapacities().getAbsoluteCapacity(partition));
queue.getMetrics().setMaxCapacities(partition,
queue.getQueueCapacities().getMaximumCapacity(partition),
queue.getQueueCapacities().getAbsoluteMaximumCapacity(partition));
}

public static void updateAbsoluteCapacitiesByNodeLabels(QueueCapacities queueCapacities,
QueueCapacities parentQueueCapacities,
Set<String> nodeLabels) {
Expand Down Expand Up @@ -301,4 +273,39 @@ public static void updateAbsoluteCapacitiesByNodeLabels(QueueCapacities queueCap
}
}
}

/**
* Updated configured capacity/max-capacity for queue.
* @param rc resource calculator
* @param labelManager label manager
* @param clusterResource the resource of cluster
* @param queue queue
*/
public static void updateConfiguredCapacityMetrics(ResourceCalculator rc,
RMNodeLabelsManager labelManager, Resource clusterResource,
AbstractCSQueue queue) {
Resource defaultPartitionResource = labelManager.getResourceByLabel(null, clusterResource);
updateConfiguredCapacityMetricsWithPartition(rc, defaultPartitionResource, RMNodeLabelsManager.NO_LABEL, queue);

for (String partition : queue.getNodeLabelsForQueue()) {
Resource partitionResource = labelManager.getResourceByLabel(partition, clusterResource);
updateConfiguredCapacityMetricsWithPartition(rc, partitionResource, partition, queue);
}
}

private static void updateConfiguredCapacityMetricsWithPartition(ResourceCalculator rc,
Resource partitionResource, String partition, AbstractCSQueue queue) {
queue.getMetrics().setGuaranteedResources(partition, rc.multiplyAndNormalizeDown(
partitionResource, queue.getQueueCapacities().getAbsoluteCapacity(partition),
queue.getMinimumAllocation()));
queue.getMetrics().setMaxCapacityResources(partition, rc.multiplyAndNormalizeDown(
partitionResource, queue.getQueueCapacities().getAbsoluteMaximumCapacity(partition),
queue.getMinimumAllocation()));
queue.getMetrics().setGuaranteedCapacities(partition,
queue.getQueueCapacities().getCapacity(partition),
queue.getQueueCapacities().getAbsoluteCapacity(partition));
queue.getMetrics().setMaxCapacities(partition,
queue.getQueueCapacities().getMaximumCapacity(partition),
queue.getQueueCapacities().getAbsoluteMaximumCapacity(partition));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1289,8 +1289,7 @@ public void updateClusterResource(Resource clusterResource,
this, labelManager, null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
RMNodeLabelsManager.NO_LABEL, this);
labelManager, clusterResource, this);
} catch (IOException e) {
LOG.error("Error during updating cluster resource: ", e);
throw new YarnRuntimeException("Fatal issue during scheduling", e);
Expand Down