Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-4677: Avoid unnecessary task movement across threads during rebalance #2429

Closed
wants to merge 9 commits into from

Conversation

dguy
Copy link
Contributor

@dguy dguy commented Jan 24, 2017

Makes task assignment more sticky by preferring to assign tasks to clients that had previously had the task as active task. If there are no clients with the task previously active, then search for a standby. Finally falling back to the least loaded client.

@dguy
Copy link
Contributor Author

dguy commented Jan 24, 2017

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1150/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1152/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1150/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 24, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1150/
Test PASSed (JDK 8 and Scala 2.12).

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice PR! Code is super clean. Tests are amazing!

Still, some comments ;P

}

@Test
public void shouldNotMigrateActiveTaskToOtherProcess() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just wondering if this test is good. Seems like a "RoundRobinAssigner" might compute the exact same assignment. Some more "randomness" would be good IMHO. (Not a must though.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Yes it needs to actually test the reverse previous assignment, too. Which indeed highlights a problem.


taskAssignor.assign(0);

assertThat(clients.get(p2).activeTasks(), hasItems(task01));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hasItem(task01) -> equalTo(Collections.singleton(task01))

}

@Test
public void shouldKeepActiveTaskStickynessWhenMoreClientThanActiveTasks() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above: add more "randomness"

@Test
public void shouldAssignToClientWithStandbyIfProcessWithPreviousReachedCapacity() throws Exception {
final ClientState<TaskId> client = createClientWithPreviousActiveTasks(p1, 1, task00, task01);
// give p1 assignment so already at capacity
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments unnecessary IMHO -- test is pretty well written and speaks for itself -- test method name nails it. (really start to enjoy you way of writing code -- need to get closer to this for my own code)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy you said that the comment is unnecessary! :-)

@Test
public void shouldAssignStandbyTasksToClientThatDontHaveSameActiveTask() throws Exception {
final TaskId task03 = new TaskId(0, 3);
createClientWithActiveTasks(p1, 1, task00);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final Integer p4 = 4;
And use below. (or add as class member together with task03 in the first place -- even if only used here, it reduced "noise" in reading the test).

But why do you need 4 clients/task here? Three would do, too, IMHO.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Strictly speaking we don't need 4, but i wanted to mix it up a bit as 3 generally used throughout. Good to try something different.

}

@Test
public void shouldAssignActiveAndStandbyTasks() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems to be covered by shouldAssignMultipleReplicasOfStandbyTask() already

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite - this is testing specifically that assign(numStandbyReplicas) assigns both active and standby tasks (when numStandbyReplicas is > 0). The other method is just testing StandbyTask assignment only


@Test
public void shouldAssignAtLeastOneTaskToEachClientIfPossible() throws Exception {
// add a process with 3 threads
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove comments (as above)

@@ -17,40 +17,34 @@

package org.apache.kafka.streams.processor.internals.assignment;


import java.util.HashSet;
import java.util.Set;

public class ClientState<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not part of this PR: but why does ClientState have a generic type? It's only used with TaskId anyway? Or did I miss anything?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In unit tests we use Float as its type as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax - i had the same thought and did start down the path of changing it. Soon realised it was a battle for another day as it would've muddied the PR

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@guozhangwang @dguy I did have a quick look at the tests using ClientState<Integer> -- I actually do not see any reason why those tests need to use Integer instead of TaskId. It might be a slight simplification for the test, but if we don't need the generic type in the actual code, a test does not justify to have a generic IMHO and I have a strong opinion about removing it. This would be a nice "beginner" PR so not much work for us. But I want your opinion before I create a JIRA for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax - yes i agree. As i mentioned above, i did start doing that as part of the PR but didn't want to add too much noise to it.

return null;
}
final HashSet<ID> constrainTo = new HashSet<>(ids);
constrainTo.retainAll(clientsWithin);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

constrainTo is never used. I guess you can remove both lines. ids should not contain any invalid IDs anyway -- otherwise it's a bug and retainAll would only mask the bug, but not fix it.

Copy link
Contributor Author

@dguy dguy Jan 25, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oops - the constraint is needed, but obviously there wasn't a test case to prove it. Will have to add one

new TaskId(1, 2));

taskAssignor.assign(0);
assertTrue("expected client 2 to have more assigned tasks than client 1", clients.get(p2).assignedTaskCount() > clients.get(p1).assignedTaskCount());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: line too long

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this, I guess we could improve StickyTaskAssinger. If I am not off, load balancing on stream basis is not optimal -- but I am also not sure if the effort to improve it is worth it...

If we extend this test to assign more tasks, let's say 12, client p2 will get 7 tasks assigned and p1 get 5 tasks assigned (while it would be better to assign 8 tasks to p2 such that all 3 thread get 4 tasks each). The problem is, that the capacity factors are not considered: p2 should get twice as many tasks assigned as p1 -- but the algorithm says only "more" -- and this more is determined be the diff of the capacity (ie. in this case p2 will get at most 2 more tasks assigned than p1.

Or maybe my analysis is wrong (I did not run the code and step through it.)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any thoughts about this @dguy @guozhangwang -- this is a different issues than the newly created JIRA (even if we might be able to subsume it with it). The issue described here would also be there, if we only assign stateful standby tasks.

@mjsax
Copy link
Member

mjsax commented Jan 25, 2017

One more general comment:

This RS raised the thought, that our standby task configuration is rather "all or nothing". Do you think it would make sense to allow a more fine grained configuration? I.e., allow to specify the number of standby replicas individually for each stateful operator?

Furthermore: if I understand the PR correctly, we create standby task even for stateless tasks -- thats seems to be rather odd (even if not necessarily harmful).

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a trade-off between load balancing and task stickiness. I think in many cases we should favor the latter over the former, but it seems we are only favoring that until the client has number of assigned tasks equal to the number of threads?

For example, say we have two client with one thread (hence one capacity) each and four tasks, and each client is assigned two tasks (1, 2), (3, 4); say a new topic is discovered an a new rebalance is triggered with no tasks added and no members changed, is it possible that we will then get (1, 3) and (2, 4) since each client runs over capacity?

@@ -17,40 +17,34 @@

package org.apache.kafka.streams.processor.internals.assignment;


import java.util.HashSet;
import java.util.Set;

public class ClientState<T> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In unit tests we use Float as its type as well.

"]";
}

boolean reachedCapacity() {
return assignedTasks.size() >= capacity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Capacity is the number of consumers, i.e. stream threads on this client; and one thread should be able to handle multiple tasks. Should we use this criterion that the assigned tasks has exceeded the number of threads to determine if the client is "full"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need something? What else are you suggesting? It is not like you can just say it is 4 * threads etc, as not all threads are equal. So it seems simplest just to stick with this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dguy This is related to the general comment I had above: do we still try to favor stickiness after each thread has assigned a task? In my general comment I asked for an example, and I'm thinking now about another slightly different one:

Say we have two client with one thread (hence one capacity) each and four tasks, and each client is assigned two tasks (1, 2), (3, 4), both of them are "over-capacity" now. When adding a new client with one thread, one of the task will be migrated to that client, say task4. The question is with this assignor, do we enforce the assignment to be (1, 2), (3), (4) or task2 can be randomly assigned to another client after task1 has been assigned to client1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes - we will enforce the assignment to be along these lines. i.e., only 1 task will move. So the assignment could be either (1, 2), (3), (4) or (1), (3, 4), (2)
I'll commit some updates soon

for (final TaskId taskId : taskIds) {
final Set<ID> ids = findClientsWithoutAssignedTask(taskId);
if (ids.isEmpty()) {
continue;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

return previous;
}

private ClientState<TaskId> clientWithPreviousAssignment(final TaskId taskId, final Set<ID> clientsWithin) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: findClientsWithPreviousAssignedTask?

return findLeastLoadedClientWithStandby(taskId, clientsWithin);
}

private ClientState<TaskId> findLeastLoadedClientWithStandby(final TaskId taskId, final Set<ID> clientsWithin) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: findLeastLoadedClientWithPreviousStandByTask?

if (ids.isEmpty()) {
continue;
}
final ClientState<TaskId> client = findClient(taskId, ids);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the old approach, we use taskPairs trying to distribute the standby tasks to better tolerate failures; for example if we have four clients, four tasks, and replica.num = 1, we would rather have

client1: active: t1, standby: t2
client2: active: t2, standby: t3
client3: active: t3, standby: t4
client4: active: t4, standby: t1

Instead of

client1: active: t1, standby: t2
client2: active: t2, standby: t1
client3: active: t3, standby: t4
client4: active: t4, standby: t3

since otherwise if we lose client1 and client2, we lose t2 only in the first case but will lose both t1 and t2 in the second case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @guozhangwang. That makes sense i'll add it back in.

@asfbot
Copy link

asfbot commented Jan 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1184/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1182/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1182/
Test PASSed (JDK 7 and Scala 2.10).

@dguy
Copy link
Contributor Author

dguy commented Jan 25, 2017

@guozhangwang, @mjsax - comments addressed
@guozhangwang regarding:

There is a trade-off between load balancing and task stickiness. I think in many cases we should favor the latter over the former, but it seems we are only favoring that until the client has number of assigned tasks equal to the number of threads?

For example, say we have two client with one thread (hence one capacity) each and four tasks, and each client is assigned two tasks (1, 2), (3, 4); say a new topic is discovered an a new rebalance is triggered with no tasks added and no members changed, is it possible that we will then get (1, 3) and (2, 4) since each client runs over capacity?

I've changed it so that it will tend to favour sticky ness more, so the scenario you have mentioned wont happen. In this case the previous assignments will stay the same, but 1 of the clients will get the new task.

@mjsax, regarding your general comment - as discussed offline i think this is a task for another JIRA. I raised https://issues.apache.org/jira/browse/KAFKA-4696.

@asfbot
Copy link

asfbot commented Jan 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1191/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1195/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1193/
Test PASSed (JDK 8 and Scala 2.12).

@mjsax
Copy link
Member

mjsax commented Jan 25, 2017

Any thought about this:

This RS raised the thought, that our standby task configuration is rather "all or nothing". Do you think it would make sense to allow a more fine grained configuration? I.e., allow to specify the number of standby replicas individually for each stateful operator?

@guozhangwang
Copy link
Contributor

Any thought about this:

It is an interesting question. Today num.replicas is on tasks, which is abstracted away for end users; so as you mentioned if we want to be user-customizable finer-grained then it needs to be per-operator, which I feel would be quite tricky for users to understand / specify.

Any thoughts about this @dguy @guozhangwang -- this is a different issues than the newly created JIRA (even if we might be able to subsume it with it). The issue described here would also be there, if we only assign stateful standby tasks.

I think I agree. It seems that once all clients are over capacity then we just treat them equally when assigning (assuming no prev standby / active tasks considered). It seems better to assign the tasks to over-capacity clients also to be depending on their total capacity. For example, if client1 has cap. 1, client2 has cap. 2, and we have a total of 6 tasks, it would be better to assign them as 2 : 4 to these two clients than 3 : 3?

@asfbot
Copy link

asfbot commented Jan 25, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1193/
Test FAILed (JDK 7 and Scala 2.10).

@mjsax
Copy link
Member

mjsax commented Jan 25, 2017

About fine grained configuration:
Yes, users should not need to think about tasks. It should be on operator basis. Thus, we would need to make standby tasks "smarter" and only populate some stores (as requested by the user) in the background. Would be a bigger change, but not impossible.

For example, if client1 has cap. 1, client2 has cap. 2, and we have a total of 6 tasks, it would be better to assign them as 2 : 4 to these two clients than 3 : 3?

Yes, that is exactly what I had in mind.

@guozhangwang
Copy link
Contributor

@dguy
Copy link
Contributor Author

dguy commented Jan 26, 2017

@guozhangwang - i believe the test failure is unrelated.

@dguy
Copy link
Contributor Author

dguy commented Jan 26, 2017

@mjsax - regarding this:

Any thought about this:

This RS raised the thought, that our standby task configuration is rather "all or nothing". Do you think it would make sense to allow a more fine grained configuration? I.e., allow to specify the number of standby replicas individually for each stateful operator?

We probably need a wider discussion on this.

@dguy
Copy link
Contributor Author

dguy commented Jan 26, 2017

@guozhangwang

I think I agree. It seems that once all clients are over capacity then we just treat them equally when assigning (assuming no prev standby / active tasks considered). It seems better to assign the tasks to over-capacity clients also to be depending on their total capacity. For example, if client1 has cap. 1, client2 has cap. 2, and we have a total of 6 tasks, it would be better to assign them as 2 : 4 to these two clients than 3 : 3?

This already happens. The test shouldAssignMoreTasksToClientWithMoreCapacity shows this.
@mjsax was referring to having more tasks. I'll expand the test and see what i can come up with

@guozhangwang
Copy link
Contributor

This already happens. The test shouldAssignMoreTasksToClientWithMoreCapacity shows this.
@mjsax was referring to having more tasks. I'll expand the test and see what i can come up with

I think we are talking about the same, that when all clients are over-capacity already, should the num.tasks be proportional to client's total capacity, and from the tests it seems the case.

Copy link
Contributor

@guozhangwang guozhangwang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more comment, otherwise LGTM. Thanks @dguy for the patch!!

Let's update the created JIRA for keeping track of the broader discussion of per-operator num.replica configuration.

}

boolean hasMoreAvailableCapacityThan(final ClientState<T> other) {
final int otherLoad = other.assignedTaskCount() / other.capacity;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should these two be float than integers, otherwise e.g. 5/3 and 4/3 would be the same?

@asfbot
Copy link

asfbot commented Jan 26, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1269/
Test PASSed (JDK 7 and Scala 2.10).

@dguy
Copy link
Contributor Author

dguy commented Jan 27, 2017

Thanks @guozhangwang. I've updated the new JIRA with the the per-operator num.replica info. I've also re-opened the queryOnRebalance JIRA.
Updated the code to use double rather than integer in the hasMoreAvailableCapacityThan - also added checks for capacity <= 0

@asfbot
Copy link

asfbot commented Jan 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1285/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1287/
Test FAILed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1285/
Test FAILed (JDK 8 and Scala 2.12).

@guozhangwang
Copy link
Contributor

retest this please

@asfbot
Copy link

asfbot commented Jan 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1291/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1289/
Test PASSed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 27, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1289/
Test FAILed (JDK 8 and Scala 2.12).

}

public ClientState(double capacity) {
ClientState(int capacity) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add final

this(new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), new HashSet<T>(), capacity);
}

private ClientState(Set<T> activeTasks, Set<T> standbyTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, double capacity) {
private ClientState(Set<T> activeTasks, Set<T> standbyTasks, Set<T> assignedTasks, Set<T> prevActiveTasks, Set<T> prevAssignedTasks, int capacity) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add final to all

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unify this with ClientState(int capacity) ? (ie, remove the private constructor with long parameter list and just initialize members directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The private constructor is there for the copy method. I'm going to leave it as is.

@@ -59,28 +53,102 @@ private ClientState(Set<T> activeTasks, Set<T> standbyTasks, Set<T> assignedTask
}

public void assign(T taskId, boolean active) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add final to both

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well i can yes, but i don't believe i've changed this!

for (final TaskId taskId : taskIds) {
final Set<ID> ids = findClientsWithoutAssignedTask(taskId);
if (ids.isEmpty()) {
log.warn("Unable to assign replica for task [{}]", taskId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't like that we write this message multiple time.
What about cloning taksIds before the for loop, and remove a taskId from it, when it cannot be assigned.
Also update the log message like
log.warn("Could only create {} stand-by tasks for task [{}] instead of {} as requested. Not enough task available. You need to increase the number of threads to maintain the requested number of standby tasks.", i, taskId, numStandbyReplicas);
Or some similar hint want the issue is and how to fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated so it only logs once. Also improved the log message. thanks


public class ClientStateTest {

private final ClientState<Integer> client = new ClientState<>(1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be an new instance for each test method? I am wondering how test could pass?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a new instance for each test method. JUnit creates a new instance of the class for each test method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I though some annotation would be needed for that... What it the JUnit rule for this? What members get re-created for each single test method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All non-static members. You get a new instance of the Test class for each test method.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Good to know :)


@Test
public void shouldAssignTasksToClientWithPreviousStandbyTasks() throws Exception {
final ClientState<TaskId> client1 = createClientWithPreviousActiveTasks(p1, 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you use createClient? This call confused me... (same below)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - i thought i'd replaced all of those

assertThat(clients.get(p2).standbyTasks(), not(hasItems(task01)));
assertThat(clients.get(p3).standbyTasks(), not(hasItems(task02)));
assertThat(clients.get(p4).standbyTasks(), not(hasItems(task03)));
assertThat(allStandbyTasks(), equalTo(Arrays.asList(task00, task01, task02, task03)));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe check, that each tasks has one standby task assigned?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not always possible (with the current algo) to guarantee that each task will get 1 task. It depends somewhat on the previous active tasks and the order that the tasks are iterated in. I can change the test to make sure that at least 3 tasks have standby and that no task has more than 2

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I general that might be correct. But for this specific test setup, we know the pre conditions: the overall logic dictates, that active tasks get reassigned to there previous client -- it this would not happen, there would be something wrong. Thus, the 4 active task are "pinned" to the 4 clients by precondition and thus, in order to load balance the standby task, each client must get exactly one -- or do I miss something?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah you missed something. I can make the setup of the test such that this happens, but with the current params setup we get 1 client with 2 tasks, 2 clients with 1 task, and 1 client with 0 tasks. This happens because we would otherwise end up with the standby and active task for task003 being on the same node i.e,
active assignment:
task00 -> p1
task01 -> p2
task02 -> p3
task03 -> p4

standby assignment
task00 -> p2
task01 -> p3
task02 -> p1
task03 -> p1 (as it can't go to p4)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarification. Does make sense now -- I did not consider the "task pairs" heuristic.

}

@Test
public void shouldNotAssignStandbyTaskReplicasWhenNoClientHasCapacityLeftOver() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not about capacity, is it? It's about having not task that does not have the task assigned (as active or standby). -> shouldNotAssignStandbyTaskReplicasWhenNoClientAvailableWithoutHavingTheTaskAssigned

&& client.reachedCapacity()
&& availableCapacity <= taskIds.size()
&& hasClientsWithZeroTasks();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get this condition:
I understand (1) !hasNewTasks (that is only true, if we scale out and not loose any existing client, right?) and I understand (2) client.reachedCapacity() (no reason to assign somewhere else I client has spare capacity).

But the last two conditions puzzle me.

(3) Let's say I have 1 client with capacity 1 and overall 2 tasks, both assigned to it (ie, taskIds.size() == 2). I add a new client with capacity 2. Now availableCapacity == 3 and taskIds.size() is still 2. Thus availableCapacity <= taskIds.size() (3 <= 2) is false, and no load rebalancing would happen. However, this would result in an overload of the original client (2 tasks assigned with capacity 1), while the new client is under utilized (0 tasks assigned with capacity 2). What do I miss?

(4) Let's say I have 1 client with capacity 1 and overall 3 tasks, all assigned to it (ie, taskIds.size() == 3). I add a new client with capacity 2. Condition (3) would be met because (3 <= 3) so one task would be re-assigned from it's original client to the new client. But not the second tasks, because after the first client got a task assigned there is no client with zero tasks left. Thus the original client would still be overloaded (2 tasks assigned with capacity 1) while the new client is under utilized (1 tasks assigned with capacity 2). What do I miss?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing out. It was an evolution of tests that led to it, but the third condition is not required and the the 4th condition should be hasClientsWithMoreAvailableCapacity(client). I've added a test and changed it. Thanks!

this.clients = clients;
this.taskIds = taskIds;
this.availableCapacity = sumCapacity(clients.values());
taskPairs = new TaskPairs(taskIds.size() * (taskIds.size() - 1) * 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/ 2, not * 2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops - thanks!

@asfbot
Copy link

asfbot commented Jan 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1348/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1352/
Test PASSed (JDK 8 and Scala 2.11).

@asfbot
Copy link

asfbot commented Jan 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1348/
Test FAILed (JDK 8 and Scala 2.12).

@dguy
Copy link
Contributor Author

dguy commented Jan 31, 2017

comments all addressed

@asfbot
Copy link

asfbot commented Jan 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk7-scala2.10/1350/
Test FAILed (JDK 7 and Scala 2.10).

@asfbot
Copy link

asfbot commented Jan 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/1350/
Test PASSed (JDK 8 and Scala 2.12).

@asfbot
Copy link

asfbot commented Jan 31, 2017

Refer to this link for build results (access rights to CI server needed):
https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/1354/
Test PASSed (JDK 8 and Scala 2.11).

@mjsax
Copy link
Member

mjsax commented Jan 31, 2017

@dguy One more minor comment.

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@guozhangwang
Copy link
Contributor

LGTM. Merged to trunk.

@asfgit asfgit closed this in 0b48ea1 Feb 1, 2017
@guozhangwang
Copy link
Contributor

@mjsax Re "removing templates in ClientState": I agree. Feel free to file a newbie JIRA for it.

@mjsax
Copy link
Member

mjsax commented Feb 6, 2017

soenkeliebau pushed a commit to soenkeliebau/kafka that referenced this pull request Feb 7, 2017
…alance

Makes task assignment more sticky by preferring to assign tasks to clients that had previously had the task as active task. If there are no clients with the task previously active, then search for a standby. Finally falling back to the least loaded client.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax, Guozhang Wang

Closes apache#2429 from dguy/kafka-4677
@dguy dguy deleted the kafka-4677 branch February 17, 2017 00:58
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants