Skip to content

Commit

Permalink
KAFKA-15693: Immediately reassign lost connectors and tasks when sche…
Browse files Browse the repository at this point in the history
…duled rebalance delay is disabled (#14647)

Reviewers: Sagar Rao <sagarmeansocean@gmail.com>, Yash Mayya <yash.mayya@gmail.com>
  • Loading branch information
C0urante committed Nov 9, 2023
1 parent 81cceed commit 39c6170
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 10 deletions.
Expand Up @@ -48,6 +48,7 @@

import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
import static org.apache.kafka.connect.util.ConnectUtils.combineCollections;
Expand Down Expand Up @@ -337,10 +338,8 @@ ClusterAssignment performTaskAssignment(

// The complete set of connectors and tasks that should be newly-assigned during this round
ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder()
.addConnectors(created.connectors())
.addTasks(created.tasks())
.addConnectors(lostAssignmentsToReassign.connectors())
.addTasks(lostAssignmentsToReassign.tasks())
.addAll(created)
.addAll(lostAssignmentsToReassign)
.build();

assignConnectors(nextWorkerAssignment, toAssign.connectors());
Expand Down Expand Up @@ -460,8 +459,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
+ "missing assignments that the leader is detecting are probably due to some "
+ "workers failing to receive the new assignments in the previous rebalance. "
+ "Will reassign missing tasks as new tasks");
lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
lostAssignmentsToReassign.addAll(lostAssignments);
return;
} else if (maxDelay == 0) {
log.debug("Scheduled rebalance delays are disabled ({} = 0); "
+ "reassigning all lost connectors and tasks immediately",
SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG
);
lostAssignmentsToReassign.addAll(lostAssignments);
return;
}

Expand Down Expand Up @@ -498,8 +503,7 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
}
} else {
log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
lostAssignmentsToReassign.addAll(lostAssignments);
}
resetDelay();
// Resetting the flag as now we can permit successive revoking rebalances.
Expand Down Expand Up @@ -840,8 +844,7 @@ private static List<WorkerLoad> workerAssignment(Map<String, ConnectorsAndTasks>
private static void addAll(Map<String, ConnectorsAndTasks.Builder> base, Map<String, ConnectorsAndTasks> toAdd) {
toAdd.forEach((worker, assignment) -> base
.computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder())
.addConnectors(assignment.connectors())
.addTasks(assignment.tasks())
.addAll(assignment)
);
}

Expand Down
Expand Up @@ -480,6 +480,12 @@ public ConnectorsAndTasks.Builder addTasks(Collection<ConnectorTaskId> tasks) {
return this;
}

public ConnectorsAndTasks.Builder addAll(ConnectorsAndTasks connectorsAndTasks) {
return this
.addConnectors(connectorsAndTasks.connectors())
.addTasks(connectorsAndTasks.tasks());
}

public ConnectorsAndTasks build() {
return new ConnectorsAndTasks(withConnectors, withTasks);
}
Expand Down
Expand Up @@ -1037,6 +1037,83 @@ public void testLostAssignmentHandlingWhenWorkerBouncesBackButFinallyLeaves() {
assertEquals(0, assignor.delay);
}

@Test
public void testLostAssignmentHandlingWhenScheduledDelayIsDisabled() {
// Customize assignor for this test case
rebalanceDelay = 0;
time = new MockTime();
initAssignor();

assertTrue(assignor.candidateWorkersForReassignment.isEmpty());
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);

Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));

// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));

assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
assignor.candidateWorkersForReassignment);
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);

assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());

String veryFlakyWorker = "worker1";
WorkerLoad lostLoad = configuredAssignment.remove(veryFlakyWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.with(lostLoad.connectors(), lostLoad.tasks()).build();

// Lost assignments detected - Immediately reassigned
ConnectorsAndTasks.Builder lostAssignmentsToReassign = new ConnectorsAndTasks.Builder();
assignor.handleLostAssignments(lostAssignments, lostAssignmentsToReassign,
new ArrayList<>(configuredAssignment.values()));

assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
assignor.candidateWorkersForReassignment);
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
assertEquals("Wrong assignment of lost connectors",
lostAssignments.connectors(), lostAssignmentsToReassign.build().connectors());
assertEquals("Wrong assignment of lost tasks",
lostAssignments.tasks(), lostAssignmentsToReassign.build().tasks());
}

@Test
public void testScheduledDelayIsDisabled() {
// Customize assignor for this test case
rebalanceDelay = 0;
time = new MockTime();
initAssignor();

// First assignment with 2 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
performStandardRebalance();
assertDelay(0);
assertWorkers("worker1", "worker2");
assertConnectorAllocations(1, 1);
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();

// Second assignment with only one worker remaining in the group. The worker that left the
// group was a follower. Re-assignments take place immediately
removeWorkers("worker2");
performStandardRebalance();
assertDelay(rebalanceDelay);
assertWorkers("worker1");
assertConnectorAllocations(2);
assertTaskAllocations(8);
assertBalancedAndCompleteAllocation();
}

@Test
public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
// First assignment with 1 worker and 2 connectors configured but not yet assigned
Expand Down

0 comments on commit 39c6170

Please sign in to comment.