-
Notifications
You must be signed in to change notification settings - Fork 14k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-9130: KIP-518 Allow listing consumer groups per state #8238
Conversation
@mimaison Is it already ready to be reviewed? I will help reviewing it. |
@dajac Thanks for offering your help! Yes it's pretty much complete and can start do be reviewed. I'm just planning to add a few more tests, early this week (hopefully today!) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Overall, it looks pretty good. I have left a bunch of comments/questions.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/ListGroupsRequest.json
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/ListGroupsRequest.json
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Outdated
Show resolved
Hide resolved
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/integration/kafka/api/PlaintextAdminIntegrationTest.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimaison I have made another pass over it and I have left few small comments and one question.
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/admin/DescribeConsumerGroupTest.scala
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/ListGroupsRequest.json
Outdated
Show resolved
Hide resolved
core/src/test/scala/unit/kafka/server/ListGroupsRequestTest.scala
Outdated
Show resolved
Hide resolved
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
Show resolved
Hide resolved
@mimaison Let me know when I can do another pass on this PR. |
@dajac Thanks a lot for the feedback! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimaison Thanks for the update. I just made a second pass over it and left few comments.
Co-authored-by: Mickael Maison <mickael.maison@gmail.com> Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
Thanks @dajac for the great feedback! I believe I've addressed all your comments now. @cmccabe @rajinisivaram As you voted on the KIP, could you take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimaison Thanks for the update. LGTM!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left some comments.
private Optional<Set<ConsumerGroupState>> states = Optional.empty(); | ||
|
||
/** | ||
* Only groups in these states will be returned by listConsumerGroups() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth adding a comment about broker compatibility with this API.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you address this comment?
"flexibleVersions": "3+", | ||
"fields": [ | ||
{ "name": "States", "type": "[]string", "versions": "4+", "tag": 0, "taggedVersions": "4+", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I missed this from the discussion, but why are we bumping the version if we are only adding tagged fields? Is it so that we can detect whether the capability is supported? If so, then I wonder why we don't make this a regular field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes the version bump is necessary to detect if this field was supported.
As we're bumping the version and as you said in #8238 (comment) the overhead of the extra field on this API is not a concern, it's probably simpler to use a regular field.
* All groups with their states will be returned by listConsumerGroups() | ||
*/ | ||
public ListConsumerGroupsOptions inAnyState() { | ||
this.states = Optional.of(EnumSet.allOf(ConsumerGroupState.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm.. We have an UNKNOWN
state in ConsumerGroupState
in case the group coordinator adds a new state that the client isn't aware of. Currently we're going to pass this through the request, which is a bit odd. Furthermore, if the coordinator does add new states, we will be unable to see them using this API. I think it might be better to use a null
list of states in the request to indicate that any state is needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's a good point so I agree, it makes sense to return all states when null
(or an empty list) is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can also be argued for the state value in the response. Currently ConsumerGroupDescription
stores the state as ConsumerGroupState
so states the client isn't aware of are mapped to UNKNOWN
so I'm doing the same in ConsumerGroupListing
.
val listedGroup = new ListGroupsResponseData.ListedGroup() | ||
.setGroupId(group.groupId) | ||
.setProtocolType(group.protocolType) | ||
if (!states.isEmpty) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we always return the state? I don't think overhead is a huge concern for an api like this.
- Switch to use regular fields instead of tagged fields - Empty/null state filter now means all states
Thanks @hachikuji for the feedback. You brought some interesting points that overall simplify the KIP/logic a bit. I've pushed an update following your suggestions to use a regular field and default to see all groups. I think it's easier to reason about and keeps the API simple. Let me know what you think. I'll update the KIP accordingly if we think that's the best option. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mimaison The change makes sense to me if you are ok with it. I will review the patch again shortly.
@hachikuji Yes I think it's better this way. I'll update the KIP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, left a few small comments.
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/ConsumerGroupListing.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/admin/ListConsumerGroupsOptions.java
Outdated
Show resolved
Hide resolved
@@ -34,7 +36,9 @@ | |||
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", | |||
"about": "The group ID." }, | |||
{ "name": "ProtocolType", "type": "string", "versions": "0+", | |||
"about": "The group protocol type." } | |||
"about": "The group protocol type." }, | |||
{ "name": "GroupState", "type": "string", "versions": "4+", "nullableVersions": "0+", "ignorable": true, "default": "null", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it intentional to use nullable versions 0+? I'm surprised the generator doesn't fail.
core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/ListGroupsRequest.json
Outdated
Show resolved
Hide resolved
clients/src/main/resources/common/message/ListGroupsRequest.json
Outdated
Show resolved
Hide resolved
@hachikuji Thanks for the feedback. I've pushed updates addressing your comments. I've also updated the KIP accordingly and I'll send a message to the VOTE thread on the mailing list. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few nitpicks. This is ready to merge once addressed.
this.groupId = groupId; | ||
this.isSimpleConsumerGroup = isSimpleConsumerGroup; | ||
this.state = state; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: usually we would write this is this.state = requireNonNull(state);
private Optional<Set<ConsumerGroupState>> states = Optional.empty(); | ||
|
||
/** | ||
* Only groups in these states will be returned by listConsumerGroups() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you address this comment?
|
||
object ConsumerGroupCommand extends Logging { | ||
|
||
val allStates = ConsumerGroupState.values.toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this used?
}, s"Expected to be able to list $testGroupId") | ||
|
||
TestUtils.waitUntilTrue(() => { | ||
val options = new ListConsumerGroupsOptions() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems the same as the previous case. It might be more interesting if we request only groups that are stable? Then we have covered both successful and unsuccessful matching.
val service = getConsumerGroupService(cgcArgs) | ||
|
||
val expectedListing = Set( | ||
new ConsumerGroupListing(simpleGroup, true, Optional.of(ConsumerGroupState.EMPTY)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: looks misaligned
assertEquals(Set(ConsumerGroupState.DEAD, ConsumerGroupState.COMPLETING_REBALANCE), result) | ||
|
||
try { | ||
ConsumerGroupCommand.consumerGroupStatesFromString("bad, wrong") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we could use assertThrows
or intercept
for all of these
Thanks @hachikuji. I've pushed another update addressing your comments. I also sent an update to the thread on the mailing list. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Note I am also picking this into 2.6. |
Implementation of KIP-518: https://cwiki.apache.org/confluence/display/KAFKA/KIP-518%3A+Allow+listing+consumer+groups+per+state. Reviewers: David Jacot <djacot@confluent.io>, Jason Gustafson <jason@confluent.io> Co-authored-by: Mickael Maison <mickael.maison@gmail.com> Co-authored-by: Edoardo Comar <ecomar@uk.ibm.com>
* 'trunk' of github.com:apache/kafka: (36 commits) Remove redundant `containsKey` call in KafkaProducer (apache#8761) KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723) KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749) KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238) KAFKA-9501: convert between active and standby without closing stores (apache#8248) KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739) MINOR: Log the reason for coordinator discovery failure (apache#8747) KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705) MINOR: remove unnecessary timeout for admin request (apache#8738) MINOR: Relax Percentiles test (apache#8748) MINOR: regression test for task assignor config (apache#8743) MINOR: Update documentation.html to refer to 2.6 (apache#8745) MINOR: Update documentation.html to refer to 2.5 (apache#8744) KAFKA-9673: Filter and Conditional SMTs (apache#8699) KAFKA-9971: Error Reporting in Sink Connectors (KIP-610) (apache#8720) KAFKA-10052: Harden assertion of topic settings in Connect integration tests (apache#8735) MINOR: Slight MetadataCache tweaks to avoid unnecessary work (apache#8728) KAFKA-9802; Increase transaction timeout in system tests to reduce flakiness (apache#8736) KAFKA-10050: kafka_log4j_appender.py fixed for JDK11 (apache#8731) KAFKA-9146: Add option to force delete active members in StreamsResetter (apache#8589) ... # Conflicts: # core/src/main/scala/kafka/log/Log.scala
* apache-github/2.6: (32 commits) KAFKA-10083: fix failed testReassignmentWithRandomSubscriptionsAndChanges tests (apache#8786) KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used (apache#8737) KAFKA-9320: Enable TLSv1.3 by default (KIP-573) (apache#8695) KAFKA-10082: Fix the failed testMultiConsumerStickyAssignment (apache#8777) MINOR: Remove unused variable to fix spotBugs failure (apache#8779) MINOR: ChangelogReader should poll for duration 0 for standby restore (apache#8773) KAFKA-10030: Allow fetching a key from a single partition (apache#8706) Kafka-10064 Add documentation for KIP-571 (apache#8760) MINOR: Code cleanup and assertion message fixes in Connect integration tests (apache#8750) KAFKA-9987: optimize sticky assignment algorithm for same-subscription case (apache#8668) KAFKA-9392; Clarify deleteAcls javadoc and add test for create/delete timing (apache#7956) KAFKA-10074: Improve performance of `matchingAcls` (apache#8769) KAFKA-9494; Include additional metadata information in DescribeConfig response (KIP-569) (apache#8723) KAFKA-10056; Ensure consumer metadata contains new topics on subscription change (apache#8739) KAFKA-10029; Don't update completedReceives when channels are closed to avoid ConcurrentModificationException (apache#8705) KAFKA-10061; Fix flaky `ReassignPartitionsIntegrationTest.testCancellation` (apache#8749) KAFKA-9130; KIP-518 Allow listing consumer groups per state (apache#8238) KAFKA-9501: convert between active and standby without closing stores (apache#8248) MINOR: Relax Percentiles test (apache#8748) MINOR: regression test for task assignor config (apache#8743) ...
Committer Checklist (excluded from commit message)