New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14950: implement assign() and assignment() #13605
Conversation
updateMetadata(time.milliseconds()); | ||
} | ||
|
||
private void updateMetadata(long milliseconds) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nitpicky: can we fold updateMetadata
into assign
?
@@ -537,7 +568,9 @@ public void subscribe(Pattern pattern) { | |||
|
|||
@Override | |||
public void unsubscribe() { | |||
throw new KafkaException("method not implemented"); | |||
// fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we want to push the clearing of buffered data and SubscriptionState
mutation into the background thread so that it's executed together?
The coordinator's onLeavePrepare
needs to know what partitions are being removed. As it's written, it's possible that the set of partitions could be removed before the background thread gets a chance to run the onLeavePrepare
logic.
consumer = spy(newConsumer(time, new StringDeserializer(), new StringDeserializer())); | ||
consumer.assign(singleton(new TopicPartition("foo", 3))); | ||
assertTrue(consumer.subscription().isEmpty()); | ||
assertTrue(consumer.assignment().isEmpty()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't it be expected to have the foo-3 assignment here? (assigned from user in the subscriptionState as a result of the previous consumer.assign
)
closingi this in light of #13797 |
In this PR: I implemented assign() and assignment(). Ported the original tests from the KafkaConsumerTest.java
Different to the original Implementation:
Missing: