Skip to content

Commit

Permalink
comments from 3/N
Browse files Browse the repository at this point in the history
  • Loading branch information
lihaosky committed Jul 26, 2023
1 parent e9358e4 commit e56f1fd
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 35 deletions.
Expand Up @@ -88,7 +88,7 @@ public synchronized boolean canEnableRackAwareAssignor() {
}

// Visible for testing. This method also checks if all TopicPartitions exist in cluster
public boolean populateTopicsToDiscribe(final Set<String> topicsToDescribe) {
public boolean populateTopicsToDescribe(final Set<String> topicsToDescribe) {
// Make sure rackId exist for all TopicPartitions needed
for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
for (final TopicPartition topicPartition : topicPartitions) {
Expand Down Expand Up @@ -118,7 +118,7 @@ public boolean populateTopicsToDiscribe(final Set<String> topicsToDescribe) {
private boolean validateTopicPartitionRack() {
// Make sure rackId exist for all TopicPartitions needed
final Set<String> topicsToDescribe = new HashSet<>();
if (!populateTopicsToDiscribe(topicsToDescribe)) {
if (!populateTopicsToDescribe(topicsToDescribe)) {
return false;
}

Expand Down Expand Up @@ -234,8 +234,17 @@ private static int getSinkID(final List<UUID> clientList, final List<TaskId> tas
return clientList.size() + taskIdList.size();
}

// For testing. canEnableRackAwareAssignor must be called first
long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final SortedSet<TaskId> activeTasks, final int trafficCost, final int nonOverlapCost) {
/**
* Compute the cost for the provided {@code activeTasks}. The passed in active tasks must be contained in {@code clientState}.
*/
long activeTasksCost(final SortedSet<TaskId> activeTasks,
final SortedMap<UUID, ClientState> clientStates,
final int trafficCost,
final int nonOverlapCost) {
if (activeTasks.isEmpty()) {
return 0;
}

final List<UUID> clientList = new ArrayList<>(clientStates.keySet());
final List<TaskId> taskIdList = new ArrayList<>(activeTasks);
final Graph<Integer> graph = constructActiveTaskGraph(clientList, taskIdList,
Expand All @@ -252,14 +261,14 @@ long activeTasksCost(final SortedMap<UUID, ClientState> clientStates, final Sort
* cross rack traffic can be higher. In extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost}
* to a positive value, the computed assignment will be minimum for cross rack traffic. If we set {@code trafficCost} to 0,
* and {@code nonOverlapCost} to a positive value, the computed assignment should be the same as input
* @param clientStates Client states
* @param activeTasks Tasks to reassign if needed. They must be assigned already in clientStates
* @param clientStates Client states
* @param trafficCost Cost of cross rack traffic for each TopicPartition
* @param nonOverlapCost Cost of assign a task to a different client
* @return Total cost after optimization
*/
public long optimizeActiveTasks(final SortedMap<UUID, ClientState> clientStates,
final SortedSet<TaskId> activeTasks,
public long optimizeActiveTasks(final SortedSet<TaskId> activeTasks,
final SortedMap<UUID, ClientState> clientStates,
final int trafficCost,
final int nonOverlapCost) {
if (activeTasks.isEmpty()) {
Expand Down Expand Up @@ -321,13 +330,12 @@ private Graph<Integer> constructActiveTaskGraph(final List<UUID> clientList,
if (!taskClientMap.containsKey(taskId)) {
throw new IllegalArgumentException("Task " + taskId + " not assigned to any client");
}
}

final int sinkId = getSinkID(clientList, taskIdList);
for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) {
// Add edge from source to task
graph.addEdge(SOURCE_ID, taskNodeId, 1, 0, 1);
}

final int sinkId = getSinkID(clientList, taskIdList);
// It's possible that some clients have 0 task assign. These clients will have 0 tasks assigned
// even though it may have higher traffic cost. This is to maintain the original assigned task count
for (int i = 0; i < clientList.size(); i++) {
Expand Down
Expand Up @@ -71,11 +71,11 @@ public final class AssignmentTestUtils {
public static final UUID UUID_8 = uuidForInt(8);
public static final UUID UUID_9 = uuidForInt(9);

public static final String RACK_0 = "rock0";
public static final String RACK_1 = "rock1";
public static final String RACK_2 = "rock2";
public static final String RACK_3 = "rock3";
public static final String RACK_4 = "rock4";
public static final String RACK_0 = "rack0";
public static final String RACK_1 = "rack1";
public static final String RACK_2 = "rack2";
public static final String RACK_3 = "rack3";
public static final String RACK_4 = "rack4";

public static final Node NODE_0 = new Node(0, "node0", 1, RACK_0);
public static final Node NODE_1 = new Node(1, "node1", 1, RACK_1);
Expand Down
Expand Up @@ -176,7 +176,7 @@ public void shouldDisableActiveWhenMissingClusterInfo() {

// False since partitionWithoutInfo10 is missing in cluster metadata
assertFalse(assignor.canEnableRackAwareAssignor());
assertFalse(assignor.populateTopicsToDiscribe(new HashSet<>()));
assertFalse(assignor.populateTopicsToDescribe(new HashSet<>()));
assertTrue(assignor.validateClientRack());
}

Expand All @@ -192,7 +192,7 @@ public void shouldDisableActiveWhenRackMissingInNode() {
);

assertTrue(assignor.validateClientRack());
assertFalse(assignor.populateTopicsToDiscribe(new HashSet<>()));
assertFalse(assignor.populateTopicsToDescribe(new HashSet<>()));
// False since nodeMissingRack has one node which doesn't have rack
assertFalse(assignor.canEnableRackAwareAssignor());
}
Expand Down Expand Up @@ -290,7 +290,7 @@ public void shouldDisableRackAwareAssignorForActiveWithDescribingTopicsFailure()
);

assertFalse(assignor.canEnableRackAwareAssignor());
assertTrue(assignor.populateTopicsToDiscribe(new HashSet<>()));
assertTrue(assignor.populateTopicsToDescribe(new HashSet<>()));
}

@Test
Expand All @@ -314,10 +314,10 @@ public void shouldOptimizeEmptyActiveTasks() {
final SortedSet<TaskId> taskIds = mkSortedSet();

assertTrue(assignor.canEnableRackAwareAssignor());
final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost);
assertEquals(0, originalCost);

final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost);
assertEquals(0, cost);

assertEquals(mkSet(TASK_0_1, TASK_1_1), clientState0.activeTasks());
Expand Down Expand Up @@ -356,11 +356,11 @@ public void shouldOptimizeActiveTasks() {

assertTrue(assignor.canEnableRackAwareAssignor());
int expected = stateful ? 40 : 4;
final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost);
assertEquals(expected, originalCost);

expected = stateful ? 4 : 0;
final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost);
assertEquals(expected, cost);

assertEquals(mkSet(TASK_0_0, TASK_1_0), clientState0.activeTasks());
Expand Down Expand Up @@ -390,8 +390,8 @@ public void shouldOptimizeRandom() {
.collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().activeTasks().size()));

assertTrue(assignor.canEnableRackAwareAssignor());
final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost);
final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost);
assertThat(cost, lessThanOrEqualTo(originalCost));

for (final Entry<UUID, ClientState> entry : clientStateMap.entrySet()) {
Expand Down Expand Up @@ -425,10 +425,10 @@ public void shouldMaintainOriginalAssignment() {

// Because trafficCost is 0, original assignment should be maintained
assertTrue(assignor.canEnableRackAwareAssignor());
final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, 0, 1);
final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, 0, 1);
assertEquals(0, originalCost);

final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, 0, 1);
final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, 0, 1);
assertEquals(0, cost);

// Make sure assignment doesn't change
Expand Down Expand Up @@ -468,12 +468,12 @@ public void shouldOptimizeActiveTasksWithMoreClients() {
final SortedSet<TaskId> taskIds = mkSortedSet(TASK_0_0, TASK_1_0);

assertTrue(assignor.canEnableRackAwareAssignor());
final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost);
int expected = stateful ? 20 : 2;
assertEquals(expected, originalCost);

expected = stateful ? 2 : 0;
final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost);
assertEquals(expected, cost);

// UUID_1 remains empty
Expand Down Expand Up @@ -512,12 +512,12 @@ public void shouldOptimizeActiveTasksWithMoreClientsWithMoreThanOneTask() {
final SortedSet<TaskId> taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_0);

assertTrue(assignor.canEnableRackAwareAssignor());
final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost);
int expected = stateful ? 20 : 2;
assertEquals(expected, originalCost);

expected = stateful ? 2 : 0;
final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost);
assertEquals(expected, cost);

// Because original assignment is not balanced (3 tasks but client 0 has no task), we maintain it
Expand Down Expand Up @@ -555,10 +555,10 @@ public void shouldBalanceAssignmentWithMoreCost() {

assertTrue(assignor.canEnableRackAwareAssignor());
final int expectedCost = stateful ? 10 : 1;
final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost);
assertEquals(expectedCost, originalCost);

final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost);
final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost);
assertEquals(expectedCost, cost);

// Even though assigning all tasks to UUID_2 will result in min cost, but it's not balanced
Expand Down Expand Up @@ -590,7 +590,7 @@ public void shouldThrowIfMissingCallcanEnableRackAwareAssignor() {
));
final SortedSet<TaskId> taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_1);
final Exception exception = assertThrows(IllegalStateException.class,
() -> assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost));
() -> assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost));
Assertions.assertEquals("Client 00000000-0000-0000-0000-000000000002 doesn't have rack configured. Maybe forgot to call "
+ "canEnableRackAwareAssignor first", exception.getMessage());
}
Expand Down Expand Up @@ -619,7 +619,7 @@ public void shouldThrowIfTaskInMultipleClients() {
final SortedSet<TaskId> taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_1);
assertTrue(assignor.canEnableRackAwareAssignor());
final Exception exception = assertThrows(IllegalArgumentException.class,
() -> assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost));
() -> assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost));
Assertions.assertEquals(
"Task 1_1 assigned to multiple clients 00000000-0000-0000-0000-000000000005, "
+ "00000000-0000-0000-0000-000000000002", exception.getMessage());
Expand Down Expand Up @@ -649,7 +649,7 @@ public void shouldThrowIfTaskMissingInClients() {
final SortedSet<TaskId> taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1);
assertTrue(assignor.canEnableRackAwareAssignor());
final Exception exception = assertThrows(IllegalArgumentException.class,
() -> assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost));
() -> assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost));
Assertions.assertEquals(
"Task 1_0 not assigned to any client", exception.getMessage());
}
Expand Down

0 comments on commit e56f1fd

Please sign in to comment.