New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-15022: [1/N] initial implementation of rack aware assignor #13851
Conversation
a2d715c
to
b6773c4
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Spotbugs error in build:
- /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13851/streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java
- /home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-13851/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Outdated
Show resolved
Hide resolved
"Could not create topics within %d milliseconds. " + | ||
"This can happen if the Kafka cluster is temporarily not available.", | ||
retryTimeoutMs); | ||
log.error(timeoutError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we log here? Given that we are throwing the exception, it should get logged when the exception is caught, and thus it seems we double log here unnecessarily?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this mimicking the makeReady
function logs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seem makeReady
is not clean for this case -- should we instead try to cleanup makeReady
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel this log is ok since it logs internal retryTimeoutMs
here which makes it clear. Caller will have to find it out to log it.
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java
Outdated
Show resolved
Hide resolved
|
||
private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0); | ||
private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1); | ||
private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this "partitionWithoutInfo" to make it clear what its purpose is?
.../java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java
Outdated
Show resolved
Hide resolved
); | ||
|
||
// False since tp10 is missing in cluster metadata | ||
assertFalse(assignor.canEnableForActive()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we verify that we return false
because of tp10
but not because of a bug regarding tp00
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally yes. But this might be hard to verify... May make validateTopicPartitionRack
public and verify it's false? Or we have to extract
final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
if (partitionInfo == null) {
log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
return false;
}
to a public function and verify that returns false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure either -- I am slightly concerned about missing/masking a bug if we cannot verify it properly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make a few method public to test this
.../java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reviewing @mjsax
"Could not create topics within %d milliseconds. " + | ||
"This can happen if the Kafka cluster is temporarily not available.", | ||
retryTimeoutMs); | ||
log.error(timeoutError); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this mimicking the makeReady
function logs.
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java
Outdated
Show resolved
Hide resolved
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java
Outdated
Show resolved
Hide resolved
); | ||
|
||
// False since tp10 is missing in cluster metadata | ||
assertFalse(assignor.canEnableForActive()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ideally yes. But this might be hard to verify... May make validateTopicPartitionRack
public and verify it's false? Or we have to extract
final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
if (partitionInfo == null) {
log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
return false;
}
to a public function and verify that returns false?
|
||
final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor( | ||
metadata, | ||
Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just to simplify to one task
final Cluster metadata = new Cluster( | ||
"cluster", | ||
new HashSet<>(Arrays.asList(node0, node1, node2)), | ||
new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to. The test is mainly testing
processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
This PR is on topic of #13846
Description
Initial implementation to check if rack aware assignment can be enabled. Add logic to get topic partition information in InternalTopicManager
Test
Unit test for assignor and topic manager