From 64b845f05de624983e43eb81f288302c1f39150f Mon Sep 17 00:00:00 2001 From: Chris Egerton Date: Thu, 9 Nov 2023 10:48:43 -0500 Subject: [PATCH] KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled (#14647) Reviewers: Sagar Rao , Yash Mayya --- .../IncrementalCooperativeAssignor.java | 23 +++--- .../distributed/WorkerCoordinator.java | 6 ++ .../IncrementalCooperativeAssignorTest.java | 77 +++++++++++++++++++ 3 files changed, 96 insertions(+), 10 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java index d48589423dc7..0836bf2c4bae 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java @@ -48,6 +48,7 @@ import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember; import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment; +import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2; import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState; import static org.apache.kafka.connect.util.ConnectUtils.combineCollections; @@ -337,10 +338,8 @@ ClusterAssignment performTaskAssignment( // The complete set of connectors and tasks that should be newly-assigned during this round ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder() - .addConnectors(created.connectors()) - .addTasks(created.tasks()) - .addConnectors(lostAssignmentsToReassign.connectors()) - .addTasks(lostAssignmentsToReassign.tasks()) + .addAll(created) + .addAll(lostAssignmentsToReassign) .build(); assignConnectors(nextWorkerAssignment, toAssign.connectors()); @@ -460,8 +459,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, + "missing assignments that the leader is detecting are probably due to some " + "workers failing to receive the new assignments in the previous rebalance. " + "Will reassign missing tasks as new tasks"); - lostAssignmentsToReassign.addConnectors(lostAssignments.connectors()); - lostAssignmentsToReassign.addTasks(lostAssignments.tasks()); + lostAssignmentsToReassign.addAll(lostAssignments); + return; + } else if (maxDelay == 0) { + log.debug("Scheduled rebalance delays are disabled ({} = 0); " + + "reassigning all lost connectors and tasks immediately", + SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG + ); + lostAssignmentsToReassign.addAll(lostAssignments); return; } @@ -498,8 +503,7 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments, } } else { log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks"); - lostAssignmentsToReassign.addConnectors(lostAssignments.connectors()); - lostAssignmentsToReassign.addTasks(lostAssignments.tasks()); + lostAssignmentsToReassign.addAll(lostAssignments); } resetDelay(); // Resetting the flag as now we can permit successive revoking rebalances. @@ -840,8 +844,7 @@ private static List workerAssignment(Map private static void addAll(Map base, Map toAdd) { toAdd.forEach((worker, assignment) -> base .computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder()) - .addConnectors(assignment.connectors()) - .addTasks(assignment.tasks()) + .addAll(assignment) ); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java index 9ebe71e46921..fab9e0a65e70 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java @@ -480,6 +480,12 @@ public ConnectorsAndTasks.Builder addTasks(Collection tasks) { return this; } + public ConnectorsAndTasks.Builder addAll(ConnectorsAndTasks connectorsAndTasks) { + return this + .addConnectors(connectorsAndTasks.connectors()) + .addTasks(connectorsAndTasks.tasks()); + } + public ConnectorsAndTasks build() { return new ConnectorsAndTasks(withConnectors, withTasks); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java index a07391e11523..3edb4d52dc0b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java @@ -1037,6 +1037,83 @@ public void testLostAssignmentHandlingWhenWorkerBouncesBackButFinallyLeaves() { assertEquals(0, assignor.delay); } + @Test + public void testLostAssignmentHandlingWhenScheduledDelayIsDisabled() { + // Customize assignor for this test case + rebalanceDelay = 0; + time = new MockTime(); + initAssignor(); + + assertTrue(assignor.candidateWorkersForReassignment.isEmpty()); + assertEquals(0, assignor.scheduledRebalance); + assertEquals(0, assignor.delay); + + Map configuredAssignment = new HashMap<>(); + configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4)); + configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4)); + configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4)); + + // No lost assignments + assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(), + new ConnectorsAndTasks.Builder(), + new ArrayList<>(configuredAssignment.values())); + + assertEquals("Wrong set of workers for reassignments", + Collections.emptySet(), + assignor.candidateWorkersForReassignment); + assertEquals(0, assignor.scheduledRebalance); + assertEquals(0, assignor.delay); + + assignor.previousMembers = new HashSet<>(configuredAssignment.keySet()); + + String veryFlakyWorker = "worker1"; + WorkerLoad lostLoad = configuredAssignment.remove(veryFlakyWorker); + ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder() + .with(lostLoad.connectors(), lostLoad.tasks()).build(); + + // Lost assignments detected - Immediately reassigned + ConnectorsAndTasks.Builder lostAssignmentsToReassign = new ConnectorsAndTasks.Builder(); + assignor.handleLostAssignments(lostAssignments, lostAssignmentsToReassign, + new ArrayList<>(configuredAssignment.values())); + + assertEquals("Wrong set of workers for reassignments", + Collections.emptySet(), + assignor.candidateWorkersForReassignment); + assertEquals(0, assignor.scheduledRebalance); + assertEquals(0, assignor.delay); + assertEquals("Wrong assignment of lost connectors", + lostAssignments.connectors(), lostAssignmentsToReassign.build().connectors()); + assertEquals("Wrong assignment of lost tasks", + lostAssignments.tasks(), lostAssignmentsToReassign.build().tasks()); + } + + @Test + public void testScheduledDelayIsDisabled() { + // Customize assignor for this test case + rebalanceDelay = 0; + time = new MockTime(); + initAssignor(); + + // First assignment with 2 workers and 2 connectors configured but not yet assigned + addNewEmptyWorkers("worker2"); + performStandardRebalance(); + assertDelay(0); + assertWorkers("worker1", "worker2"); + assertConnectorAllocations(1, 1); + assertTaskAllocations(4, 4); + assertBalancedAndCompleteAllocation(); + + // Second assignment with only one worker remaining in the group. The worker that left the + // group was a follower. Re-assignments take place immediately + removeWorkers("worker2"); + performStandardRebalance(); + assertDelay(rebalanceDelay); + assertWorkers("worker1"); + assertConnectorAllocations(2); + assertTaskAllocations(8); + assertBalancedAndCompleteAllocation(); + } + @Test public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() { // First assignment with 1 worker and 2 connectors configured but not yet assigned