Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4677: [Follow Up] add optimization to StickyTaskAssignor for rolling rebounce #2609

Closed
wants to merge 2 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented Feb 28, 2017

Detect when a rebalance has happened due to one or more existing nodes bouncing. Keep assignment of previous active tasks the same and only assign the tasks that were not active to the new clients.

@dguy
Copy link
Contributor Author

dguy commented Feb 28, 2017

@guozhangwang based on my investigation into increasing the session timeout on rebalance. There is an optimization that can be made to the StickyTaskAssignor that means that active tasks won't move during bounces.

@asfbot
Copy link

asfbot commented Feb 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1887/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Feb 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1884/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Feb 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1885/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Feb 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1890/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Feb 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1889/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Feb 28, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1892/
Test FAILed (JDK 8 and Scala 2.11).

@guozhangwang
Copy link
Contributor

Is the Jenkins failure possibly related, as I did not see this before:

org.apache.kafka.streams.integration.ResetIntegrationTest > testReprocessingFromScratchAfterResetWithIntermediateUserTopic FAILED
    java.lang.AssertionError: 
    Expected: <[KeyValue(2976621437475, 1), KeyValue(2976621437515, 1), KeyValue(2976621437495, 1), KeyValue(2976621437535, 1), KeyValue(2976621437515, 1), KeyValue(2976621437555, 1), KeyValue(2976621437495, 1), KeyValue(2976621437535, 1), KeyValue(2976621437515, 2), KeyValue(2976621437535, 2), KeyValue(2976621437575, 1), KeyValue(2976621437555, 1), KeyValue(2976621437535, 2), KeyValue(2976621437555, 2), KeyValue(2976621437575, 1), KeyValue(2976621437595, 1), KeyValue(2976621437555, 2), KeyValue(2976621437575, 2), KeyValue(2976621437615, 1), KeyValue(2976621437595, 1), KeyValue(2976621437575, 2), KeyValue(2976621437595, 2), KeyValue(2976621437635, 1), KeyValue(2976621437615, 1), KeyValue(2976621437575, 3), KeyValue(2976621437595, 2), KeyValue(2976621437615, 2), KeyValue(2976621437635, 1), KeyValue(2976621437575, 3), KeyValue(2976621437595, 3), KeyValue(2976621437615, 2), KeyValue(2976621437635, 2), KeyValue(2976621437575, 4), KeyValue(2976621437595, 3), KeyValue(2976621437615, 3), KeyValue(2976621437635, 2), KeyValue(2976621437575, 4), KeyValue(2976621437595, 4), KeyValue(2976621437615, 3), KeyValue(2976621437635, 3)]>
         but: was <[KeyValue(2976621437475, 1), KeyValue(2976621437515, 1), KeyValue(2976621437495, 1), KeyValue(2976621437535, 1), KeyValue(2976621437515, 1), KeyValue(2976621437555, 1), KeyValue(2976621437495, 1), KeyValue(2976621437535, 1), KeyValue(2976621437515, 2), KeyValue(2976621437535, 2), KeyValue(2976621437575, 1), KeyValue(2976621437555, 1), KeyValue(2976621437535, 2), KeyValue(2976621437555, 2), KeyValue(2976621437575, 1), KeyValue(2976621437595, 1), KeyValue(2976621437555, 2), KeyValue(2976621437575, 2), KeyValue(2976621437615, 1), KeyValue(2976621437595, 1), KeyValue(2976621437575, 2), KeyValue(2976621437595, 2), KeyValue(2976621437635, 1), KeyValue(2976621437615, 1), KeyValue(2976621437575, 3), KeyValue(2976621437595, 2), KeyValue(2976621437615, 2), KeyValue(2976621437635, 1), KeyValue(2976621437575, 3), KeyValue(2976621437595, 3), KeyValue(2976621437615, 2), KeyValue(2976621437635, 2), KeyValue(2976621437575, 4), KeyValue(2976621437595, 3), KeyValue(2976621437615, 3), KeyValue(2976621437635, 2), KeyValue(2976621437575, 4), KeyValue(2976621437595, 4)]>
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
        at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:8)
        at org.apache.kafka.streams.integration.ResetIntegrationTest.testReprocessingFromScratchAfterResetWithIntermediateUserTopic(ResetIntegrationTest.java:200)

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One meta-comment is whether we should have some additional checks before applying this short-cut optimization. For example:

  1. Check if the result assignment is balanced.
  2. There are no new tasks created: sum of prevAssignedTasks should be equal to the new list of tasks.
  3. no "new" client is likely joining (we cannot tell for sure): clientsWithoutPreviousActiveTasks should have some tasks in prevAssignedTasks meaning that it is not likely new clients;
  4. no "old" client was likely to be dropped (again we cannot tell for sure so just heuristics): sum of clientsWithoutPreviousActiveTasks's prevAssignedTasks should cover noPreviousActiveAssignment, otherwise it is likely that some client has been dropped from the group.

Some related commit:

#1543

@@ -35,13 +35,11 @@
private final Map<TaskId, ID> previousActiveTaskAssignment = new HashMap<>();
private final Map<TaskId, Set<ID>> previousStandbyTaskAssignment = new HashMap<>();
private final TaskPairs taskPairs;
private final int availableCapacity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sumCapacity can be removed as well.

@dguy
Copy link
Contributor Author

dguy commented Mar 1, 2017

@guozhangwang i don't see how this change would have caused that failure.

  1. Check if the result assignment is balanced.

The assignment is not always balanced, so i'm not sure this is going to be very effective.

  1. There are no new tasks created: sum of prevAssignedTasks should be equal to the new list of tasks.

It is already checked that no new tasks are created. line 79: if (previouslyAssignedTaskIds.equals(taskIds) ...

  1. no "new" client is likely joining (we cannot tell for sure): clientsWithoutPreviousActiveTasks should have some tasks in prevAssignedTasks meaning that it is not likely new clients;

Yes we can add a check for this

  1. no "old" client was likely to be dropped (again we cannot tell for sure so just heuristics): sum of clientsWithoutPreviousActiveTasks's prevAssignedTasks should cover noPreviousActiveAssignment, otherwise it is likely that some client has been dropped from the group.

I'll add some tests and checks for this

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1924/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1922/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1921/
Test FAILed (JDK 7 and Scala 2.10).

@dguy
Copy link
Contributor Author

dguy commented Mar 1, 2017

@guozhangwang i've re-worked it a bit. I removed the optimisation as such as i found a more general solution.
Now, first, we try and assign the previousActiveTasks to the clients that previously had the active.
If there are still unassigned tasks, then try and assign to clients that had them as standby tasks (or have previously seen them)
Finally assign any remaining tasks across all clients.

rework assignmnent such that it first assigns tasks to previous active clients
where possible.
@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1927/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1924/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1925/
Test FAILed (JDK 8 and Scala 2.12).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @dguy , I had a nit comment otherwise LGTM.

Some corner cases for this general solution would be 1) multiple threads on the same JVM and hence lots of clients would claim to "have seen this task" before, and hence we may end up with some shuffling, but since they are likely to locate on the same node it is OK; 2) some nodes have not cleaned up its state directory and hence claimed seen it before, in that case we may still have some shuffling and different client's local state store may be either far or close to the "changelog end offset", but this should be a rare case.

private void assign(final TaskId taskId, final Set<ID> clientsWithin, final boolean active) {
final ClientState<TaskId> client = findClient(taskId, clientsWithin);
taskPairs.addPairs(taskId, client.assignedTasks());
client.assign(taskId, active);
}

private void assignTaskToClient(final Set<TaskId> assigned, final TaskId taskId, final ClientState<TaskId> client) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe rename to assignTasksToClient and the previous assign function to allocateTaskWithClientCandidates?

Copy link
Contributor Author

@dguy dguy Mar 1, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll rename the assign method as suggested, but this one is actually just assignTaskToClient. The Set<TaskId> param is just to keep track of what has been assigned.

@dguy
Copy link
Contributor Author

dguy commented Mar 1, 2017

@guozhangwang with respect to multiple threads on the same JVM. There is only 1 Client per JVM, i.e., if there were 4 threads then the capacity of the Client would be 4.

w.r.t. point 2. Yes i agree. I think we could add a further optimization where by we send the checkpointed offsets for each of these and try and use the client with the most recent offset.

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1928/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1927/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Mar 1, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1930/
Test PASSed (JDK 8 and Scala 2.11).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Merged to trunk.

@asfgit asfgit closed this in 0fba529 Mar 1, 2017
@dguy dguy deleted the kstreams-575 branch March 30, 2017 11:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants