KAFKA-12309 The revocation algorithm produces uneven distributions#10077
KAFKA-12309 The revocation algorithm produces uneven distributions#10077chia7712 wants to merge 11 commits intoapache:trunkfrom
Conversation
| tasks.values().size(), | ||
| tasks.values().stream().distinct().collect(Collectors.toList()).size()); | ||
| assertTrue("Connectors are imbalanced: " + formatAssignment(connectors), maxConnectors - minConnectors < 2); | ||
| if (minConnectors > 1) assertEquals("Some workers have no connectors", connectors.size(), connect.workers().size()); |
There was a problem hiding this comment.
make sure there is no idle worker
|
@rhauch Could you take a look? this uneven distributions can be reproduced easily so it would be nice to fix it asap. |
| for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) { | ||
| int currentSize = forTask ? existing.tasksSize() : existing.connectorsSize(); | ||
| int expectedSize; | ||
| if (existingWorkers.size() == 1 || currentSize == 1) expectedSize = ceil; |
There was a problem hiding this comment.
Why do we need this condition? And why under this case, we don't need to have numberOfBiggers--? After all, we assigned one ceiling to this worker, didn't we?
There was a problem hiding this comment.
That is a specify case as the node having single task is already in balance. We can move the job to another node but it does not balance anything but it brings extra down-time.
I will add comments for that case.
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
kkonstantine
left a comment
There was a problem hiding this comment.
Thanks for the PR @chia7712
I'd like to return for a detailed review after we exclude stylistic changes that are not necessary. Also, it'd be good to enhance the description besides the example and call out the corner cases for which logic has been added.
...c/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
Outdated
Show resolved
Hide resolved
|
@kkonstantine thanks for your feedback.
Sorry that I did not follow the code style in connect module. I have reverted all incorrect style. Also, I have updated the description. |
kkonstantine
left a comment
There was a problem hiding this comment.
Thanks for the quick turnaround @chia7712 !
Much easier to review line my line now (at least for me).
I'll do my best to review by the end of the week.
issue: https://issues.apache.org/jira/browse/KAFKA-12309
issue description
When adding a new worker, the revocation algorithm tries to revoke some connectors/tasks from existent workers so as to move them to new worker to balance load. However, the algorithm can revoke incorrect number of connectors/tasks for following cases.
Assignments:
"worker_0" -> 8 connectors/tasks
"worker_1" -> 8 connectors/tasks
(New) "worker_2" -> 0 connectors/tasks
Result
"worker_0" -> 6 connectors/tasks
"worker_1" -> 6 connectors/tasks
"worker_2" -> 4 connectors/tasks
(expected) Result
"worker_0" -> 6 connectors/tasks
"worker_1" -> 5 connectors/tasks
"worker_2" -> 5 connectors/tasks
or
"worker_0" -> 5 connectors/tasks
"worker_1" -> 6 connectors/tasks
"worker_2" -> 5 connectors/tasks
root cause
the algorithm makes all existent workers keep
ceil(numToRevoke = existing.tasksSize() - ceilTasks;) connectors/tasks. That results in that there is no enough revoked connectors/tasks to be assigned to new worker.extra changes included by this PR
Committer Checklist (excluded from commit message)