From e56f1fdaa36e6361a9a8ccc5b2450c2f39e74d50 Mon Sep 17 00:00:00 2001 From: Hao Li <1127478+lihaosky@users.noreply.github.com> Date: Wed, 26 Jul 2023 16:33:55 -0700 Subject: [PATCH] comments from 3/N --- .../assignment/RackAwareTaskAssignor.java | 28 ++++++++----- .../assignment/AssignmentTestUtils.java | 10 ++--- .../assignment/RackAwareTaskAssignorTest.java | 40 +++++++++---------- 3 files changed, 43 insertions(+), 35 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java index 82a9a9d9c2e3..71e471491628 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignor.java @@ -88,7 +88,7 @@ public synchronized boolean canEnableRackAwareAssignor() { } // Visible for testing. This method also checks if all TopicPartitions exist in cluster - public boolean populateTopicsToDiscribe(final Set topicsToDescribe) { + public boolean populateTopicsToDescribe(final Set topicsToDescribe) { // Make sure rackId exist for all TopicPartitions needed for (final Set topicPartitions : partitionsForTask.values()) { for (final TopicPartition topicPartition : topicPartitions) { @@ -118,7 +118,7 @@ public boolean populateTopicsToDiscribe(final Set topicsToDescribe) { private boolean validateTopicPartitionRack() { // Make sure rackId exist for all TopicPartitions needed final Set topicsToDescribe = new HashSet<>(); - if (!populateTopicsToDiscribe(topicsToDescribe)) { + if (!populateTopicsToDescribe(topicsToDescribe)) { return false; } @@ -234,8 +234,17 @@ private static int getSinkID(final List clientList, final List tas return clientList.size() + taskIdList.size(); } - // For testing. canEnableRackAwareAssignor must be called first - long activeTasksCost(final SortedMap clientStates, final SortedSet activeTasks, final int trafficCost, final int nonOverlapCost) { + /** + * Compute the cost for the provided {@code activeTasks}. The passed in active tasks must be contained in {@code clientState}. + */ + long activeTasksCost(final SortedSet activeTasks, + final SortedMap clientStates, + final int trafficCost, + final int nonOverlapCost) { + if (activeTasks.isEmpty()) { + return 0; + } + final List clientList = new ArrayList<>(clientStates.keySet()); final List taskIdList = new ArrayList<>(activeTasks); final Graph graph = constructActiveTaskGraph(clientList, taskIdList, @@ -252,14 +261,14 @@ long activeTasksCost(final SortedMap clientStates, final Sort * cross rack traffic can be higher. In extreme case, if we set {@code nonOverlapCost} to 0 and @{code trafficCost} * to a positive value, the computed assignment will be minimum for cross rack traffic. If we set {@code trafficCost} to 0, * and {@code nonOverlapCost} to a positive value, the computed assignment should be the same as input - * @param clientStates Client states * @param activeTasks Tasks to reassign if needed. They must be assigned already in clientStates + * @param clientStates Client states * @param trafficCost Cost of cross rack traffic for each TopicPartition * @param nonOverlapCost Cost of assign a task to a different client * @return Total cost after optimization */ - public long optimizeActiveTasks(final SortedMap clientStates, - final SortedSet activeTasks, + public long optimizeActiveTasks(final SortedSet activeTasks, + final SortedMap clientStates, final int trafficCost, final int nonOverlapCost) { if (activeTasks.isEmpty()) { @@ -321,13 +330,12 @@ private Graph constructActiveTaskGraph(final List clientList, if (!taskClientMap.containsKey(taskId)) { throw new IllegalArgumentException("Task " + taskId + " not assigned to any client"); } - } - final int sinkId = getSinkID(clientList, taskIdList); - for (int taskNodeId = 0; taskNodeId < taskIdList.size(); taskNodeId++) { + // Add edge from source to task graph.addEdge(SOURCE_ID, taskNodeId, 1, 0, 1); } + final int sinkId = getSinkID(clientList, taskIdList); // It's possible that some clients have 0 task assign. These clients will have 0 tasks assigned // even though it may have higher traffic cost. This is to maintain the original assigned task count for (int i = 0; i < clientList.size(); i++) { diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java index 5f53e028a7ab..5da4da5ce96f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java @@ -71,11 +71,11 @@ public final class AssignmentTestUtils { public static final UUID UUID_8 = uuidForInt(8); public static final UUID UUID_9 = uuidForInt(9); - public static final String RACK_0 = "rock0"; - public static final String RACK_1 = "rock1"; - public static final String RACK_2 = "rock2"; - public static final String RACK_3 = "rock3"; - public static final String RACK_4 = "rock4"; + public static final String RACK_0 = "rack0"; + public static final String RACK_1 = "rack1"; + public static final String RACK_2 = "rack2"; + public static final String RACK_3 = "rack3"; + public static final String RACK_4 = "rack4"; public static final Node NODE_0 = new Node(0, "node0", 1, RACK_0); public static final Node NODE_1 = new Node(1, "node1", 1, RACK_1); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java index 4a07b14f9516..1e98b7a47d70 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java @@ -176,7 +176,7 @@ public void shouldDisableActiveWhenMissingClusterInfo() { // False since partitionWithoutInfo10 is missing in cluster metadata assertFalse(assignor.canEnableRackAwareAssignor()); - assertFalse(assignor.populateTopicsToDiscribe(new HashSet<>())); + assertFalse(assignor.populateTopicsToDescribe(new HashSet<>())); assertTrue(assignor.validateClientRack()); } @@ -192,7 +192,7 @@ public void shouldDisableActiveWhenRackMissingInNode() { ); assertTrue(assignor.validateClientRack()); - assertFalse(assignor.populateTopicsToDiscribe(new HashSet<>())); + assertFalse(assignor.populateTopicsToDescribe(new HashSet<>())); // False since nodeMissingRack has one node which doesn't have rack assertFalse(assignor.canEnableRackAwareAssignor()); } @@ -290,7 +290,7 @@ public void shouldDisableRackAwareAssignorForActiveWithDescribingTopicsFailure() ); assertFalse(assignor.canEnableRackAwareAssignor()); - assertTrue(assignor.populateTopicsToDiscribe(new HashSet<>())); + assertTrue(assignor.populateTopicsToDescribe(new HashSet<>())); } @Test @@ -314,10 +314,10 @@ public void shouldOptimizeEmptyActiveTasks() { final SortedSet taskIds = mkSortedSet(); assertTrue(assignor.canEnableRackAwareAssignor()); - final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost); assertEquals(0, originalCost); - final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost); assertEquals(0, cost); assertEquals(mkSet(TASK_0_1, TASK_1_1), clientState0.activeTasks()); @@ -356,11 +356,11 @@ public void shouldOptimizeActiveTasks() { assertTrue(assignor.canEnableRackAwareAssignor()); int expected = stateful ? 40 : 4; - final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost); assertEquals(expected, originalCost); expected = stateful ? 4 : 0; - final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost); assertEquals(expected, cost); assertEquals(mkSet(TASK_0_0, TASK_1_0), clientState0.activeTasks()); @@ -390,8 +390,8 @@ public void shouldOptimizeRandom() { .collect(Collectors.toMap(Map.Entry::getKey, e -> e.getValue().activeTasks().size())); assertTrue(assignor.canEnableRackAwareAssignor()); - final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost); - final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost); + final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost); assertThat(cost, lessThanOrEqualTo(originalCost)); for (final Entry entry : clientStateMap.entrySet()) { @@ -425,10 +425,10 @@ public void shouldMaintainOriginalAssignment() { // Because trafficCost is 0, original assignment should be maintained assertTrue(assignor.canEnableRackAwareAssignor()); - final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, 0, 1); + final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, 0, 1); assertEquals(0, originalCost); - final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, 0, 1); + final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, 0, 1); assertEquals(0, cost); // Make sure assignment doesn't change @@ -468,12 +468,12 @@ public void shouldOptimizeActiveTasksWithMoreClients() { final SortedSet taskIds = mkSortedSet(TASK_0_0, TASK_1_0); assertTrue(assignor.canEnableRackAwareAssignor()); - final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost); int expected = stateful ? 20 : 2; assertEquals(expected, originalCost); expected = stateful ? 2 : 0; - final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost); assertEquals(expected, cost); // UUID_1 remains empty @@ -512,12 +512,12 @@ public void shouldOptimizeActiveTasksWithMoreClientsWithMoreThanOneTask() { final SortedSet taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_0); assertTrue(assignor.canEnableRackAwareAssignor()); - final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost); int expected = stateful ? 20 : 2; assertEquals(expected, originalCost); expected = stateful ? 2 : 0; - final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost); assertEquals(expected, cost); // Because original assignment is not balanced (3 tasks but client 0 has no task), we maintain it @@ -555,10 +555,10 @@ public void shouldBalanceAssignmentWithMoreCost() { assertTrue(assignor.canEnableRackAwareAssignor()); final int expectedCost = stateful ? 10 : 1; - final long originalCost = assignor.activeTasksCost(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long originalCost = assignor.activeTasksCost(taskIds, clientStateMap, trafficCost, nonOverlapCost); assertEquals(expectedCost, originalCost); - final long cost = assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost); + final long cost = assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost); assertEquals(expectedCost, cost); // Even though assigning all tasks to UUID_2 will result in min cost, but it's not balanced @@ -590,7 +590,7 @@ public void shouldThrowIfMissingCallcanEnableRackAwareAssignor() { )); final SortedSet taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_1); final Exception exception = assertThrows(IllegalStateException.class, - () -> assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost)); + () -> assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost)); Assertions.assertEquals("Client 00000000-0000-0000-0000-000000000002 doesn't have rack configured. Maybe forgot to call " + "canEnableRackAwareAssignor first", exception.getMessage()); } @@ -619,7 +619,7 @@ public void shouldThrowIfTaskInMultipleClients() { final SortedSet taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_1); assertTrue(assignor.canEnableRackAwareAssignor()); final Exception exception = assertThrows(IllegalArgumentException.class, - () -> assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost)); + () -> assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost)); Assertions.assertEquals( "Task 1_1 assigned to multiple clients 00000000-0000-0000-0000-000000000005, " + "00000000-0000-0000-0000-000000000002", exception.getMessage()); @@ -649,7 +649,7 @@ public void shouldThrowIfTaskMissingInClients() { final SortedSet taskIds = mkSortedSet(TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1); assertTrue(assignor.canEnableRackAwareAssignor()); final Exception exception = assertThrows(IllegalArgumentException.class, - () -> assignor.optimizeActiveTasks(clientStateMap, taskIds, trafficCost, nonOverlapCost)); + () -> assignor.optimizeActiveTasks(taskIds, clientStateMap, trafficCost, nonOverlapCost)); Assertions.assertEquals( "Task 1_0 not assigned to any client", exception.getMessage()); }