-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets #19820
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
Hi @AndrewJSchofield @smjn @apoorvmittal10 , |
@JimmyWang6 There's a build failure due to spotless check. Please resolve it and merge latest changes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updated PR.
The overall approach of obtaining the offsets seems about right. It needs a lot of work to get the behaviour neat and tidy in all of the various cases it can be used in (such as non-existent topic, no topics specified, list of topics specified, incorrect arguments, and so on).
Even with a couple of local changes, I was not able to get the offsets to reset, but I think it's close.
Please continue in this direction and I'll continue to review as the changes appear. Thanks.
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
Outdated
Show resolved
Hide resolved
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
Outdated
Show resolved
Hide resolved
53e850e
to
24ecd7d
Compare
@JimmyWang6 could you please fix the build error? |
96cc251
to
1eab475
Compare
Hi @chia7712, |
Hi @AndrewJSchofield @chia7712
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JimmyWang6 thanks for this patch
.setTopicName(topic.topicName()); | ||
topic.partitions().forEach(partition -> { | ||
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partitionData; | ||
PartitionErrorData error = result.getErrors().get(topic.topicId()).get(partition.partitionIndex()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
result.getErrors()
will create a new collection by grouping data. Perhaps it should be moved out of loop.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Furthermore, how do we ensure that the topic from AlterShareGroupOffsetsResponseData
must exist in InitializeShareGroupStateResult
?
If users attempt to alter the nonexistent partitions within an existing topic, the AlterShareGroupOffsetsResponseData
will contain the topic but InitializeShareGroupStateResult
will not. Is this correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch! My idea is if users want to alter one nonexistent partition, the error code and message will be added to AlterShareGroupOffsetsResponseData
when executing groupMetadataManager.completeAlterShareGroupOffsets
. So I plan to skip the processed topicPartition which already had an error code, for which the rest of the partitions should exist in InitializeShareGroupStateResult.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall LGTM, left some small comments.
tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
Outdated
Show resolved
Hide resolved
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorService.java
Outdated
Show resolved
Hide resolved
…pache#20049) While testing the code in apache#19820, it became clear that the error handling problems were due to the underlying Admin API. This PR fixes the error handling for top-level errors in the AlterShareGroupOffsets RPC. Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Lan Ding <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>
@AndrewJSchofield So much thanks for your help to resolving the issues. I sincerely apologize for the previous bugs I introduced. I’ll resolve the conflicts and thoroughly test the script as soon as possible to ensure everything works smoothly. |
Hi @chia7712 @AndrewJSchofield , |
Thanks for the updates. I'm starting to review and test it. Here's a small problem I've found so far.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the updates. This is much closer now. It looks to me like it's working properly when it resets the offsets.
There's quite a lot of tidying up to do for the error cases. Here are some I tried and it was a bit messy:
- Attempt to reset offsets for a consumer group
- Attempt to reset offsets for a non-existent group
- Attempt to reset offsets when there are members in the share group
- Attempt to reset offsets for non-existent topic
- Attempt to reset offsets for existing topic which is not subscribed in the share group
I think the code structure will change a little when you sort out the error handling. Too often, we can see a stack trace when, for expected errors, a simple message would be neater. Also, the usage message sometimes comes out when the command options were valid.
Map<String, ShareGroupDescription> shareGroups = describeShareGroups(Collections.singleton(groupId)); | ||
TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>(); | ||
shareGroups.forEach((id, description) -> groupOffsets.put(id, new SimpleImmutableEntry<>(description, partitionOffsets))); | ||
printOffsets(groupOffsets, opts.options.has(opts.verboseOpt)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The --verbose
option is not valid with --reset-offsets
so this is a bit weird. But actually, I think it would be better if there was something like the consumer group tool printOffsetsToReset
. This is just because I think having a column heading NEW-START-OFFSET
would make it clear that it's the new start offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good idea! I will use the same format of printOffsetsToReset
of consumer group tool.
.collect(Collectors.toSet()); | ||
if (!existsTopics.containsAll(topics)) { | ||
CommandLineUtils | ||
.printUsageAndExit(opts.parser, String.format("Share group '%s' is not subscribed to topic '%s'", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't distinguish between non-existent topics and topics which are just not subscribed. This is probably a debatable point, so I wonder what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with your point. I've already added validation for non-existent topics, which require an extra adminClient.listTopics()
request.
Please fix the checkstyle build error. |
Hi @AndrewJSchofield ,
![]()
![]()
![]()
![]()
![]()
![]() |
45d2d7d
to
b95b40e
Compare
KAFKA-16717 aims to
finish the AlterShareGroupOffsets for ShareGroupCommand part.