Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YARN-11684. Fix general contract violation in PriorityQueueComparator… #6754

Merged
merged 1 commit into from
Apr 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels
.RMNodeLabelsManager;
Expand All @@ -32,7 +33,6 @@
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
Expand All @@ -54,17 +54,7 @@
public class PriorityUtilizationQueueOrderingPolicy
implements QueueOrderingPolicy {
private List<CSQueue> queues;
private boolean respectPriority;

// This makes multiple threads can sort queues at the same time
// For different partitions.
private static ThreadLocal<String> partitionToLookAt =
ThreadLocal.withInitial(new Supplier<String>() {
@Override
public String get() {
return RMNodeLabelsManager.NO_LABEL;
}
});
private final boolean respectPriority;

/**
* Compare two queues with possibly different priority and assigned capacity,
Expand Down Expand Up @@ -101,15 +91,21 @@ public static int compare(double relativeAssigned1, double relativeAssigned2,
/**
* Comparator that both looks at priority and utilization
*/
private class PriorityQueueComparator
final private class PriorityQueueComparator
implements Comparator<PriorityQueueResourcesForSorting> {

final private String partition;

private PriorityQueueComparator(String partition) {
this.partition = partition;
}

@Override
public int compare(PriorityQueueResourcesForSorting q1Sort,
PriorityQueueResourcesForSorting q2Sort) {
String p = partitionToLookAt.get();

int rc = compareQueueAccessToPartition(q1Sort.queue, q2Sort.queue, p);
int rc = compareQueueAccessToPartition(
q1Sort.nodeLabelAccessible,
q2Sort.nodeLabelAccessible);
if (0 != rc) {
return rc;
}
Expand All @@ -133,17 +129,17 @@ public int compare(PriorityQueueResourcesForSorting q1Sort,
float used2 = q2Sort.absoluteUsedCapacity;

return compare(q1Sort, q2Sort, used1, used2,
q1Sort.queue.getPriority().
getPriority(), q2Sort.queue.getPriority().getPriority());
q1Sort.priority.
getPriority(), q2Sort.priority.getPriority());
} else{
// both q1 has positive abs capacity and q2 has positive abs
// capacity
float used1 = q1Sort.usedCapacity;
float used2 = q2Sort.usedCapacity;

return compare(q1Sort, q2Sort, used1, used2,
q1Sort.queue.getPriority().getPriority(),
q2Sort.queue.getPriority().getPriority());
q1Sort.priority.getPriority(),
q2Sort.priority.getPriority());
}
}

Expand Down Expand Up @@ -181,8 +177,7 @@ private int compare(PriorityQueueResourcesForSorting q1Sort,
return rc;
}

private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2,
String partition) {
private int compareQueueAccessToPartition(boolean q1Accessible, boolean q2Accessible) {
// Everybody has access to default partition
if (StringUtils.equals(partition, RMNodeLabelsManager.NO_LABEL)) {
return 0;
Expand All @@ -192,14 +187,6 @@ private int compareQueueAccessToPartition(CSQueue q1, CSQueue q2,
* Check accessible to given partition, if one queue accessible and
* the other not, accessible queue goes first.
*/
boolean q1Accessible =
q1.getAccessibleNodeLabels() != null && q1.getAccessibleNodeLabels()
.contains(partition) || q1.getAccessibleNodeLabels().contains(
RMNodeLabelsManager.ANY);
boolean q2Accessible =
q2.getAccessibleNodeLabels() != null && q2.getAccessibleNodeLabels()
.contains(partition) || q2.getAccessibleNodeLabels().contains(
RMNodeLabelsManager.ANY);
if (q1Accessible && !q2Accessible) {
return -1;
} else if (!q1Accessible && q2Accessible) {
Expand All @@ -218,22 +205,32 @@ public static class PriorityQueueResourcesForSorting {
private final float usedCapacity;
private final Resource configuredMinResource;
private final float absoluteCapacity;
private final Priority priority;
private final boolean nodeLabelAccessible;
private final CSQueue queue;

PriorityQueueResourcesForSorting(CSQueue queue) {
PriorityQueueResourcesForSorting(CSQueue queue, String partition) {
this.queue = queue;
this.absoluteUsedCapacity =
queue.getQueueCapacities().
getAbsoluteUsedCapacity(partitionToLookAt.get());
getAbsoluteUsedCapacity(partition);
this.usedCapacity =
queue.getQueueCapacities().
getUsedCapacity(partitionToLookAt.get());
getUsedCapacity(partition);
this.absoluteCapacity =
queue.getQueueCapacities().
getAbsoluteCapacity(partitionToLookAt.get());
getAbsoluteCapacity(partition);
this.configuredMinResource =
queue.getQueueResourceQuotas().
getConfiguredMinResource(partitionToLookAt.get());
getConfiguredMinResource(partition);
this.priority = queue.getPriority();
this.nodeLabelAccessible = queue.getAccessibleNodeLabels() != null &&
queue.getAccessibleNodeLabels().contains(partition) ||
queue.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY);
}

static PriorityQueueResourcesForSorting create(CSQueue queue, String partition) {
return new PriorityQueueResourcesForSorting(queue, partition);
}

public CSQueue getQueue() {
Expand All @@ -252,14 +249,13 @@ public void setQueues(List<CSQueue> queues) {

@Override
public Iterator<CSQueue> getAssignmentIterator(String partition) {
// partitionToLookAt is a thread local variable, therefore it is safe to mutate it.
PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);

// Copy (for thread safety) and sort the snapshot of the queues in order to avoid breaking
// the prerequisites of TimSort. See YARN-10178 for details.
return new ArrayList<>(queues).stream().map(PriorityQueueResourcesForSorting::new).sorted(
new PriorityQueueComparator()).map(PriorityQueueResourcesForSorting::getQueue).collect(
Collectors.toList()).iterator();
return new ArrayList<>(queues).stream()
.map(queue -> PriorityQueueResourcesForSorting.create(queue, partition))
.sorted(new PriorityQueueComparator(partition))
.map(PriorityQueueResourcesForSorting::getQueue)
.collect(Collectors.toList()).iterator();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,21 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableTable;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.junit.Assert;
import org.junit.Test;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

Expand Down Expand Up @@ -250,4 +255,90 @@ public void testPriorityUtilizationOrdering() {
verifyOrder(policy, "x", new String[] { "e", "c", "d", "b", "a" });

}

@Test
public void testComparatorDoesNotValidateGeneralContract() {
final String[] nodeLabels = {"x", "y", "z"};
PriorityUtilizationQueueOrderingPolicy policy =
new PriorityUtilizationQueueOrderingPolicy(true);

final String partition = nodeLabels[randInt(0, nodeLabels.length - 1)];
List<CSQueue> list = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
CSQueue q = mock(CSQueue.class);
when(q.getQueuePath()).thenReturn(String.format("%d", i));

// simulating change in queueCapacities
when(q.getQueueCapacities())
.thenReturn(randomQueueCapacities(partition))
.thenReturn(randomQueueCapacities(partition))
.thenReturn(randomQueueCapacities(partition))
.thenReturn(randomQueueCapacities(partition))
.thenReturn(randomQueueCapacities(partition));

// simulating change in the priority
when(q.getPriority())
.thenReturn(Priority.newInstance(randInt(0, 10)))
.thenReturn(Priority.newInstance(randInt(0, 10)))
.thenReturn(Priority.newInstance(randInt(0, 10)))
.thenReturn(Priority.newInstance(randInt(0, 10)))
.thenReturn(Priority.newInstance(randInt(0, 10)));

if (randInt(0, nodeLabels.length) == 1) {
// simulating change in nodeLabels
when(q.getAccessibleNodeLabels())
.thenReturn(randomNodeLabels(nodeLabels))
.thenReturn(randomNodeLabels(nodeLabels))
.thenReturn(randomNodeLabels(nodeLabels))
.thenReturn(randomNodeLabels(nodeLabels))
.thenReturn(randomNodeLabels(nodeLabels));
}

// simulating change in configuredMinResource
when(q.getQueueResourceQuotas())
.thenReturn(randomResourceQuotas(partition))
.thenReturn(randomResourceQuotas(partition))
.thenReturn(randomResourceQuotas(partition))
.thenReturn(randomResourceQuotas(partition))
.thenReturn(randomResourceQuotas(partition));
list.add(q);
}

policy.setQueues(list);
// java.lang.IllegalArgumentException: Comparison method violates its general contract!
assertDoesNotThrow(() -> policy.getAssignmentIterator(partition));
}

private QueueCapacities randomQueueCapacities(String partition) {
QueueCapacities qc = new QueueCapacities(false);
qc.setAbsoluteCapacity(partition, (float) randFloat(0.0d, 100.0d));
qc.setUsedCapacity(partition, (float) randFloat(0.0d, 100.0d));
qc.setAbsoluteUsedCapacity(partition, (float) randFloat(0.0d, 100.0d));
return qc;
}

private Set<String> randomNodeLabels(String[] availableNodeLabels) {
Set<String> nodeLabels = new HashSet<>();
for (String label : availableNodeLabels) {
if (randInt(0, 1) == 1) {
nodeLabels.add(label);
}
}
return nodeLabels;
}

private QueueResourceQuotas randomResourceQuotas(String partition) {
QueueResourceQuotas qr = new QueueResourceQuotas();
qr.setConfiguredMinResource(partition,
Resource.newInstance(randInt(1, 10) * 1024, randInt(1, 10)));
return qr;
}

private static double randFloat(double min, double max) {
return min + ThreadLocalRandom.current().nextFloat() * (max - min);
}

private static int randInt(int min, int max) {
return ThreadLocalRandom.current().nextInt(min, max + 1);
}
}