Skip to content

KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets #19820

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 19 commits into
base: trunk
Choose a base branch
from

Conversation

JimmyWang6
Copy link
Contributor

@JimmyWang6 JimmyWang6 commented May 27, 2025

KAFKA-16717 aims to
finish the AlterShareGroupOffsets for ShareGroupCommand part.

@JimmyWang6 JimmyWang6 marked this pull request as draft May 27, 2025 01:39
@github-actions github-actions bot added triage PRs from the community tools small Small PRs labels May 27, 2025
@JimmyWang6 JimmyWang6 changed the title KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets WIP KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets May 27, 2025
@AndrewJSchofield AndrewJSchofield added KIP-932 Queues for Kafka ci-approved and removed triage PRs from the community labels May 27, 2025
@AndrewJSchofield AndrewJSchofield self-requested a review May 27, 2025 07:58
@github-actions github-actions bot removed the small Small PRs label Jun 2, 2025
@JimmyWang6 JimmyWang6 marked this pull request as ready for review June 4, 2025 02:18
@JimmyWang6 JimmyWang6 changed the title WIP KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets KAFKA-16717 [3/N]: Add AdminClient.alterShareGroupOffsets Jun 4, 2025
@JimmyWang6 JimmyWang6 marked this pull request as draft June 4, 2025 07:27
@JimmyWang6 JimmyWang6 marked this pull request as ready for review June 15, 2025 18:39
@JimmyWang6
Copy link
Contributor Author

Hi @AndrewJSchofield @smjn @apoorvmittal10 ,
Sorry for the late response, could you please take a look if there is anything wrong with the current approach? Thanks in advance.

@AndrewJSchofield
Copy link
Member

@JimmyWang6 There's a build failure due to spotless check. Please resolve it and merge latest changes.

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updated PR.

The overall approach of obtaining the offsets seems about right. It needs a lot of work to get the behaviour neat and tidy in all of the various cases it can be used in (such as non-existent topic, no topics specified, list of topics specified, incorrect arguments, and so on).

Even with a couple of local changes, I was not able to get the offsets to reset, but I think it's close.

Please continue in this direction and I'll continue to review as the changes appear. Thanks.

@chia7712
Copy link
Member

@JimmyWang6 could you please fix the build error?

@JimmyWang6
Copy link
Contributor Author

Hi @chia7712
Thanks for your comment and I have updated the PR, PTAL if you have time.

@github-actions github-actions bot added the core Kafka Broker label Jun 24, 2025
@JimmyWang6
Copy link
Contributor Author

JimmyWang6 commented Jun 24, 2025

Hi @AndrewJSchofield @chia7712
I have updated the PR and resolved several bugs.
Here are some of my test scenarios:

  1. When a single shareConsumer subscribes to a single topic, consumes some messages, and then exits and , it can subsequently consume all previously consumed messages again after resetting the offset to ealiest.
  2. When a single shareConsumer subscribes to mutiple topics, then excute bin/kafka-share-groups.sh --reset-offsets --execute --to-earliest --all-topics --group test-group --bootstrap-server localhost:9092 , the offset of all the topics were reset.
  3. When a single shareConsumer subscribes to mutiple topics, consumes some messages, and then exits, it can subsequently consume all previously consumed messages again after resetting the consumption offset.
  4. If no topic argument is provided when running bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --group test-group --reset-offsets --dry-run, the following exception occurs: Option [reset-offsets] requires one of these options: [all-topics] or [topic].
  5. When none of to-datetime, to-earliest or to-latest are specified, the execution failed with an error Option '[reset-offsets]' requires one of the following scenarios: [[to-datetime], [to-earliest], [to-latest]]
  6. For none-exists topic, the execution failed with Some topics none do not exist in share group 'test-group'.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@JimmyWang6 thanks for this patch

.setTopicName(topic.topicName());
topic.partitions().forEach(partition -> {
AlterShareGroupOffsetsResponseData.AlterShareGroupOffsetsResponsePartition partitionData;
PartitionErrorData error = result.getErrors().get(topic.topicId()).get(partition.partitionIndex());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

result.getErrors() will create a new collection by grouping data. Perhaps it should be moved out of loop.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Furthermore, how do we ensure that the topic from AlterShareGroupOffsetsResponseData must exist in InitializeShareGroupStateResult?

If users attempt to alter the nonexistent partitions within an existing topic, the AlterShareGroupOffsetsResponseData will contain the topic but InitializeShareGroupStateResult will not. Is this correct?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! My idea is if users want to alter one nonexistent partition, the error code and message will be added to AlterShareGroupOffsetsResponseData when executing groupMetadataManager.completeAlterShareGroupOffsets. So I plan to skip the processed topicPartition which already had an error code, for which the rest of the partitions should exist in InitializeShareGroupStateResult.

Copy link
Contributor

@DL1231 DL1231 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, left some small comments.

jiafu1115 pushed a commit to jiafu1115/kafka that referenced this pull request Jul 3, 2025
…pache#20049)

While testing the code in apache#19820, it
became clear that the error handling problems were due to the underlying
Admin API. This PR fixes the error handling for top-level errors in the
AlterShareGroupOffsets RPC.

Reviewers: Apoorv Mittal <apoorvmittal10@gmail.com>, Lan Ding
 <isDing_L@163.com>, TaiJuWu <tjwu1217@gmail.com>
@JimmyWang6
Copy link
Contributor Author

@AndrewJSchofield So much thanks for your help to resolving the issues. I sincerely apologize for the previous bugs I introduced. I’ll resolve the conflicts and thoroughly test the script as soon as possible to ensure everything works smoothly.

@JimmyWang6
Copy link
Contributor Author

Hi @chia7712 @AndrewJSchofield ,
I have updated the PR and addressed the comments, PTAL when get a chance, thanks.

@AndrewJSchofield
Copy link
Member

Thanks for the updates. I'm starting to review and test it. Here's a small problem I've found so far. SG1 is an existing share group with 1 member. T2 doesn't exist, so the message about "Not subscribed" is fine. However, the usage message for the tool is printed even though it's a regular expected error.

bin/kafka-share-groups.sh --bootstrap-server localhost:9092 --reset-offsets --group SG1 --execute --topic T2 --to-earliest 
Share group 'SG1' is not subscribed to topic 'T2'
Option                                 Description                           
------                                 -----------                           
--all-groups                           Apply to all share groups.
(( And so on ))

Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates. This is much closer now. It looks to me like it's working properly when it resets the offsets.

There's quite a lot of tidying up to do for the error cases. Here are some I tried and it was a bit messy:

  • Attempt to reset offsets for a consumer group
  • Attempt to reset offsets for a non-existent group
  • Attempt to reset offsets when there are members in the share group
  • Attempt to reset offsets for non-existent topic
  • Attempt to reset offsets for existing topic which is not subscribed in the share group

I think the code structure will change a little when you sort out the error handling. Too often, we can see a stack trace when, for expected errors, a simple message would be neater. Also, the usage message sometimes comes out when the command options were valid.

Map<String, ShareGroupDescription> shareGroups = describeShareGroups(Collections.singleton(groupId));
TreeMap<String, Entry<ShareGroupDescription, Collection<SharePartitionOffsetInformation>>> groupOffsets = new TreeMap<>();
shareGroups.forEach((id, description) -> groupOffsets.put(id, new SimpleImmutableEntry<>(description, partitionOffsets)));
printOffsets(groupOffsets, opts.options.has(opts.verboseOpt));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The --verbose option is not valid with --reset-offsets so this is a bit weird. But actually, I think it would be better if there was something like the consumer group tool printOffsetsToReset. This is just because I think having a column heading NEW-START-OFFSET would make it clear that it's the new start offset.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea! I will use the same format of printOffsetsToReset of consumer group tool.

.collect(Collectors.toSet());
if (!existsTopics.containsAll(topics)) {
CommandLineUtils
.printUsageAndExit(opts.parser, String.format("Share group '%s' is not subscribed to topic '%s'",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't distinguish between non-existent topics and topics which are just not subscribed. This is probably a debatable point, so I wonder what you think.

Copy link
Contributor Author

@JimmyWang6 JimmyWang6 Jul 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with your point. I've already added validation for non-existent topics, which require an extra adminClient.listTopics() request.

@AndrewJSchofield
Copy link
Member

Please fix the checkstyle build error.

@JimmyWang6
Copy link
Contributor Author

JimmyWang6 commented Jul 15, 2025

Hi @AndrewJSchofield ,
I have improved the error handling structure, please take another look when get a chance, thank! Here are my test results:

  • Attempt to reset offsets for a consumer group
image
  • Attempt to reset offsets for a non-existent group
image
  • Attempt to reset offsets when there are members in the share group
image
  • Attempt to reset offsets for non-existent topic
image
  • Attempt to reset offsets for existing topic which is not subscribed in the share group
image
  • Successfully to reset offsets to earliest
image

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants