New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-14950: implement assign() and assignment() #13797
Conversation
unsubscribe add tests Update DefaultBackgroundThreadTest.java Bad test...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee : Thanks for the PR. Left a few comments.
.../java/org/apache/kafka/clients/consumer/internals/events/MetadataUpdateApplicationEvent.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
Show resolved
Hide resolved
/* | ||
this.coordinator.onLeavePrepare(); | ||
this.coordinator.maybeLeaveGroup("the consumer unsubscribed from all topics"); | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we uncomment the code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed those lines as it's for unsubscribe which isn't baked enough for this PR.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
Outdated
Show resolved
Hide resolved
|
||
log.info("Assigned to partition(s): {}", Utils.join(partitions, ", ")); | ||
if (this.subscriptions.assignFromUser(new HashSet<>(partitions))) | ||
updateMetadata(time.milliseconds()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing consumer seems to only update the metadata for new topics. updateMetadata
doesn't make it clear that it's for new topics. Could we make it clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the latest rename and refactor address your concerns?
@@ -537,7 +568,9 @@ public void subscribe(Pattern pattern) { | |||
|
|||
@Override | |||
public void unsubscribe() { | |||
throw new KafkaException("method not implemented"); | |||
// fetcher.clearBufferedDataForUnassignedPartitions(Collections.emptySet()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Backed out the unsubscribe logic as it wasn't implemented in any meaningful way.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
Outdated
Show resolved
Hide resolved
@@ -166,7 +208,7 @@ private DefaultBackgroundThread mockBackgroundThread() { | |||
applicationEventsQueue, | |||
backgroundEventsQueue, | |||
this.errorEventHandler, | |||
processor, | |||
applicationEventProcessor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed.
@philipnee Was the following comment addressed? Kirk mentioned to take a closer look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee : Thanks for the updated PR. A couple of more comments.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/PrototypeAsyncConsumer.java
Outdated
Show resolved
Hide resolved
.../main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java
Show resolved
Hide resolved
@junrao - thanks for the review. I've addressed the two issues you pointed out. Would you be able to take another look at it? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee : Thanks for the updated PR. The PR LGTM. Are the 23 test failures related?
Thanks, @junrao -The failing tests should be fixed in the latest commit: The integration test failed because of missing subscription state dependency, which was added in one of the subsequent PR. The failing tests are irrelevant, and these are:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@philipnee : Thanks for the updated PR. LGTM
@philipnee : Could you update the description of the PR (to be included in the commit message) before I merge it? The description still has the following and I am not sure if it's still relevant.
|
Hey @junrao - Thanks again. I updated the description to reflect the updated code more accurately. Here I just quoted the paragraph I made the change to:
|
We will explicitly send an assignment change event to the background thread to invoke auto-commit if the group.id is configured. After updating the subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to the background thread to update the metadata. Co-authored-by: Kirk True <kirk@kirktrue.pro> Reviewers: Jun Rao <junrao@gmail.com>
We will explicitly send an assignment change event to the background thread to invoke auto-commit if the group.id is configured. After updating the subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to the background thread to update the metadata. Co-authored-by: Kirk True <kirk@kirktrue.pro> Reviewers: Jun Rao <junrao@gmail.com>
…resolve some conflicts (#32) * cherry-pick * KAFKA-14950: implement assign() and assignment() (apache#13797) We will explicitly send an assignment change event to the background thread to invoke auto-commit if the group.id is configured. After updating the subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to the background thread to update the metadata. Co-authored-by: Kirk True <kirk@kirktrue.pro> Reviewers: Jun Rao <junrao@gmail.com> * cherry-pick conflict * Update CommitRequestManager.java Update ApplicationEventProcessor.java --------- Co-authored-by: Kirk True <kirk@kirktrue.pro>
We will explicitly send an assignment change event to the background thread to invoke auto-commit if the group.id is configured. After updating the subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to the background thread to update the metadata. Co-authored-by: Kirk True <kirk@kirktrue.pro> Reviewers: Jun Rao <junrao@gmail.com>
In this PR: I implemented assign() and assignment(). Ported the original tests from the KafkaConsumerTest.java
Different from the original Implementation:
We will explicitly send an assignment change event to the background thread to invoke auto-commit if the group.id is configured. After updating the subscription state, a NewTopicsMetadataUpdateRequestEvent will also be sent to the background thread to update the metadata.