New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15022: [9/N] use RackAwareTaskAssignor in StickyTaskAssignor #14178
Conversation
@@ -505,4 +505,15 @@ private void assertNotAssigned(final TaskId task) { | |||
throw new IllegalArgumentException("Tried to assign task " + task + ", but it is already assigned: " + this); | |||
} | |||
} | |||
|
|||
public ClientState copy() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we usually use a copy-constructor for such cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense. Let me add a copy constructor.
@@ -346,6 +346,7 @@ public long optimizeActiveTasks(final SortedSet<TaskId> activeTasks, | |||
log.info("Assignment before active task optimization is {}\n with cost {}", clientStates, | |||
activeTasksCost(activeTasks, clientStates, trafficCost, nonOverlapCost)); | |||
|
|||
final long startTime = System.currentTimeMillis(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should avoid calling System.currentTimeMillis()
directly, but use the central Time
object instead. Not sure how complex it would be to get a handle on it? (also below)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a time in StreamPartitionAssignor
, we can pass it into RackAwareTaskAssignor
constructor
private static final int DEFAULT_STATEFUL_TRAFFIC_COST = 1; | ||
private static final int DEFAULT_STATEFUL_NON_OVERLAP_COST = 10; | ||
private static final int STATELESS_TRAFFIC_COST = 1; | ||
private static final int STATELESS_NON_OVERLAP_COST = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems better to make the vars in the prod code accessible instead of duplicating them?
@@ -617,21 +652,23 @@ static Map<UUID, Map<String, Optional<String>>> getRandomProcessRacks(final int | |||
} | |||
Collections.shuffle(racks); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just realizing that we might want to hand in a Random
object as second parameter to allow us to reproduce a test run? Can you check the code for other places where we use shuffle
to allow us to improve it across the board?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks shuffle
can take random. I think we can use one static random in this class
cdd5567
to
d188977
Compare
…pache#14178) Part of KIP-925. Use rack aware assignor in StickyTaskAssignor. Reviewers: Matthias J. Sax <matthias@confluent.io>
…pache#14178) Part of KIP-925. Use rack aware assignor in StickyTaskAssignor. Reviewers: Matthias J. Sax <matthias@confluent.io>
…pache#14178) Part of KIP-925. Use rack aware assignor in StickyTaskAssignor. Reviewers: Matthias J. Sax <matthias@confluent.io>
…pache#14178) Part of KIP-925. Use rack aware assignor in StickyTaskAssignor. Reviewers: Matthias J. Sax <matthias@confluent.io>
Description
Use rack aware assignor in
StickyTaskAssignor
. This PR is on top of #14164.Test
Update existing unit test