Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-15693: Immediately reassign lost connectors and tasks when scheduled rebalance delay is disabled #14647

Merged
merged 2 commits into from Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -48,6 +48,7 @@

import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
import static org.apache.kafka.connect.util.ConnectUtils.combineCollections;
Expand Down Expand Up @@ -337,10 +338,8 @@ ClusterAssignment performTaskAssignment(

// The complete set of connectors and tasks that should be newly-assigned during this round
ConnectorsAndTasks toAssign = new ConnectorsAndTasks.Builder()
.addConnectors(created.connectors())
.addTasks(created.tasks())
.addConnectors(lostAssignmentsToReassign.connectors())
.addTasks(lostAssignmentsToReassign.tasks())
.addAll(created)
.addAll(lostAssignmentsToReassign)
.build();

assignConnectors(nextWorkerAssignment, toAssign.connectors());
Expand Down Expand Up @@ -460,8 +459,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
+ "missing assignments that the leader is detecting are probably due to some "
+ "workers failing to receive the new assignments in the previous rebalance. "
+ "Will reassign missing tasks as new tasks");
lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
lostAssignmentsToReassign.addAll(lostAssignments);
return;
} else if (maxDelay == 0) {
log.debug("Scheduled rebalance delays are disabled ({} = 0); "
+ "reassigning all lost connectors and tasks immediately",
SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG
);
lostAssignmentsToReassign.addAll(lostAssignments);
return;
}

Expand Down Expand Up @@ -498,8 +503,7 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
}
} else {
log.debug("No single candidate worker was found to assign lost tasks. Treating lost tasks as new tasks");
lostAssignmentsToReassign.addConnectors(lostAssignments.connectors());
lostAssignmentsToReassign.addTasks(lostAssignments.tasks());
lostAssignmentsToReassign.addAll(lostAssignments);
}
resetDelay();
// Resetting the flag as now we can permit successive revoking rebalances.
Expand Down Expand Up @@ -840,8 +844,7 @@ private static List<WorkerLoad> workerAssignment(Map<String, ConnectorsAndTasks>
private static void addAll(Map<String, ConnectorsAndTasks.Builder> base, Map<String, ConnectorsAndTasks> toAdd) {
toAdd.forEach((worker, assignment) -> base
.computeIfAbsent(worker, w -> new ConnectorsAndTasks.Builder())
.addConnectors(assignment.connectors())
.addTasks(assignment.tasks())
.addAll(assignment)
);
}

Expand Down
Expand Up @@ -480,6 +480,12 @@ public ConnectorsAndTasks.Builder addTasks(Collection<ConnectorTaskId> tasks) {
return this;
}

public ConnectorsAndTasks.Builder addAll(ConnectorsAndTasks connectorsAndTasks) {
yashmayya marked this conversation as resolved.
Show resolved Hide resolved
return this
.addConnectors(connectorsAndTasks.connectors())
.addTasks(connectorsAndTasks.tasks());
}

public ConnectorsAndTasks build() {
return new ConnectorsAndTasks(withConnectors, withTasks);
}
Expand Down
Expand Up @@ -1037,6 +1037,83 @@ public void testLostAssignmentHandlingWhenWorkerBouncesBackButFinallyLeaves() {
assertEquals(0, assignor.delay);
}

@Test
public void testLostAssignmentHandlingWhenScheduledDelayIsDisabled() {
// Customize assignor for this test case
rebalanceDelay = 0;
time = new MockTime();
initAssignor();

assertTrue(assignor.candidateWorkersForReassignment.isEmpty());
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);

Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));

// No lost assignments
assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
new ConnectorsAndTasks.Builder(),
new ArrayList<>(configuredAssignment.values()));

assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
assignor.candidateWorkersForReassignment);
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);

assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());

String veryFlakyWorker = "worker1";
WorkerLoad lostLoad = configuredAssignment.remove(veryFlakyWorker);
ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
.with(lostLoad.connectors(), lostLoad.tasks()).build();

// Lost assignments detected - Immediately reassigned
ConnectorsAndTasks.Builder lostAssignmentsToReassign = new ConnectorsAndTasks.Builder();
assignor.handleLostAssignments(lostAssignments, lostAssignmentsToReassign,
new ArrayList<>(configuredAssignment.values()));

assertEquals("Wrong set of workers for reassignments",
Collections.emptySet(),
assignor.candidateWorkersForReassignment);
assertEquals(0, assignor.scheduledRebalance);
assertEquals(0, assignor.delay);
assertEquals("Wrong assignment of lost connectors",
lostAssignments.connectors(), lostAssignmentsToReassign.build().connectors());
assertEquals("Wrong assignment of lost tasks",
lostAssignments.tasks(), lostAssignmentsToReassign.build().tasks());
}

@Test
public void testScheduledDelayIsDisabled() {
// Customize assignor for this test case
rebalanceDelay = 0;
time = new MockTime();
initAssignor();

// First assignment with 2 workers and 2 connectors configured but not yet assigned
addNewEmptyWorkers("worker2");
performStandardRebalance();
assertDelay(0);
assertWorkers("worker1", "worker2");
assertConnectorAllocations(1, 1);
assertTaskAllocations(4, 4);
assertBalancedAndCompleteAllocation();

// Second assignment with only one worker remaining in the group. The worker that left the
// group was a follower. Re-assignments take place immediately
removeWorkers("worker2");
performStandardRebalance();
assertDelay(rebalanceDelay);
assertWorkers("worker1");
assertConnectorAllocations(2);
assertTaskAllocations(8);
assertBalancedAndCompleteAllocation();
}

@Test
public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
// First assignment with 1 worker and 2 connectors configured but not yet assigned
Expand Down