-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-10056; Ensure consumer metadata contains new topics on subscription change #8739
KAFKA-10056; Ensure consumer metadata contains new topics on subscription change #8739
Conversation
// When subscription changes `groupSubscription` may be outdated, ensure that | ||
// new subscription topics are returned. | ||
Set<String> topics = new HashSet<>(groupSubscription); | ||
topics.addAll(subscription); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems consistent with needsMetadata
method below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this change seems to make sense. I'm trying to understand the edge case a little bit better. It seems the basic scenario is the following:
- user calls
subscribe
. subscription is updated to (A), while group subscription might be (B) - we call
requestUpdateForNewTopics
which bumps the request version - metadata update gets triggered and requests (B) with the bumped request version
At this point, no further metadata update will be sent, but the consumer should rebalance. The part that confuses me a little bit is that we don't request a metadata update following the rebalance.
I guess it is due to SubscriptionState.groupSubscribe
? Assuming that we remain the leader, if (A) is the only topic subscribed, then we will first change groupSubscription
to (A). Then we will not request a new metadata update because groupSubscription
matches subscription
.
Alternatively, if we are not the leader, we will call resetGroupSubscription
, which will set groupSubscription
to (), but will not request an update.
Do I have that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hachikuji Thank you, yes, that is exactly right.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @rajinisivaram, looks good. I left a question just to make sure I understand the problem.
// When subscription changes `groupSubscription` may be outdated, ensure that | ||
// new subscription topics are returned. | ||
Set<String> topics = new HashSet<>(groupSubscription); | ||
topics.addAll(subscription); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree this change seems to make sense. I'm trying to understand the edge case a little bit better. It seems the basic scenario is the following:
- user calls
subscribe
. subscription is updated to (A), while group subscription might be (B) - we call
requestUpdateForNewTopics
which bumps the request version - metadata update gets triggered and requests (B) with the bumped request version
At this point, no further metadata update will be sent, but the consumer should rebalance. The part that confuses me a little bit is that we don't request a metadata update following the rebalance.
I guess it is due to SubscriptionState.groupSubscribe
? Assuming that we remain the leader, if (A) is the only topic subscribed, then we will first change groupSubscription
to (A). Then we will not request a new metadata update because groupSubscription
matches subscription
.
Alternatively, if we are not the leader, we will call resetGroupSubscription
, which will set groupSubscription
to (), but will not request an update.
Do I have that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, LGTM
@hachikuji Thanks for the review, merging to trunk and older branches where KAFKA-9181 was applied. |
…tion change (#8739) Reviewers: Jason Gustafson <jason@confluent.io>
…tion change (#8739) Reviewers: Jason Gustafson <jason@confluent.io>
…tion change (#8739) Reviewers: Jason Gustafson <jason@confluent.io>
…tion change (#8739) Reviewers: Jason Gustafson <jason@confluent.io>
* 'trunk' of github.com:apache/kafka: (36 commits) Remove redundant `containsKey` call in KafkaProducer (apache#8761) KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723) KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749) KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238) KAFKA-9501: convert between active and standby without closing stores (apache#8248) KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739) MINOR: Log the reason for coordinator discovery failure (apache#8747) KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705) MINOR: remove unnecessary timeout for admin request (apache#8738) MINOR: Relax Percentiles test (apache#8748) MINOR: regression test for task assignor config (apache#8743) MINOR: Update documentation.html to refer to 2.6 (apache#8745) MINOR: Update documentation.html to refer to 2.5 (apache#8744) KAFKA-9673: Filter and Conditional SMTs (apache#8699) KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (apache#8720) KAFKA-10052: Harden assertion of topic settings in Connect integration tests (apache#8735) MINOR: Slight MetadataCache tweaks to avoid unnecessary work (apache#8728) KAFKA-9802; Increase transaction timeout in system tests to reduce flakiness (apache#8736) KAFKA-10050: kafka_log4j_appender.py fixed for JDK11 (apache#8731) KAFKA-9146: Add option to force delete active members in StreamsResetter (apache#8589) ... # Conflicts: # core/src/main/scala/kafka/log/Log.scala
* apache-github/2.6: (32 commits) KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests (apache#8786) KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used (apache#8737) KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (apache#8695) KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (apache#8777) MINOR: Remove unused variable to fix spotBugs failure (apache#8779) MINOR: ChangelogReader should poll for duration 0 for standby restore (apache#8773) KAFKA-10030: Allow fetching a key from a single partition (apache#8706) Kafka-10064 Add documentation for KIP-571 (apache#8760) MINOR: Code cleanup and assertion message fixes in Connect integration tests (apache#8750) KAFKA-9987: optimize sticky assignment algorithm for same-subscription case (apache#8668) KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing (apache#7956) KAFKA-10074: Improve performance of `matchingAcls` (apache#8769) KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723) KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739) KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705) KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749) KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238) KAFKA-9501: convert between active and standby without closing stores (apache#8248) MINOR: Relax Percentiles test (apache#8748) MINOR: regression test for task assignor config (apache#8743) ...
Committer Checklist (excluded from commit message)