New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-2763: better stream task assignment #497
Conversation
@@ -29,6 +29,7 @@ | |||
import org.slf4j.LoggerFactory; | |||
|
|||
import java.util.List; | |||
import java.util.UUID; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For easy trouble-shooting and debugging, could we add the host name as prefix to the UUID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does the consumer group protocol supports such debugging? Identifying a problematic host is general enough to be considered by KafkaConsumer implementation.
} | ||
|
||
private double computeAdditionCost(T task, ClientState<T> state) { | ||
double cost = Math.floor((double) state.assignedTasks.size() / state.capacity); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the motivation of initializing the cost as this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For balancing. This cost represents the min load per thread of the client.
LGTM except some commenting. I feel the lower-level tasks => clients logic is a bit too complex, but admit it is quite tricky to handle various considerations:
Actually for 3), I feel it is may not always be optimal do this, for example there are some discussions about partition assignment (currently we follow the same principle to distribute the partition replicas): |
guozhangwang When the rebalance happens each consumer reports the following information to the coordinator. * Client UUID (a unique id assigned to an instance of KafkaStreaming) * Task ids of previously running tasks * Task ids of valid local states on the client's state directory TaskAssignor does the following * Assign a task to a client which was running it previously. If there is no such client, assign a task to a client which has its valid local state. * Try to balance the load among stream threads. * A client may have more than one stream threads. The assignor tries to assign tasks to a client proportionally to the number of threads. Author: Yasuhiro Matsuda <yasuhiro@confluent.io> Reviewers: Guozhang Wang Closes #497 from ymatsuda/task_assignment
@guozhangwang
When the rebalance happens each consumer reports the following information to the coordinator.
TaskAssignor does the following