Skip to content

KAFKA-19468: Ignore unsubscribed topics when computing share assignment #20101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 4, 2025

Conversation

AndrewJSchofield
Copy link
Member

@AndrewJSchofield AndrewJSchofield commented Jul 3, 2025

When the group coordinator is processing a heartbeat from a share
consumer, it must decide whether the recompute the assignment. Part of
this decision hinges on whether the assigned partitions match the
partitions initialised by the share coordinator. However, when the set
of subscribed topics changes, there may be initialised partitions which
are not currently assigned. Topics which are not subscribed should be
omitted from the calculation about whether to recompute the assignment.

Co-authored-by: Sushant Mahajan smahajan@confluent.io

Reviewers: Lan Ding 53332773+DL1231@users.noreply.github.com, Sushant
Mahajan smahajan@confluent.io, Apoorv Mittal
apoorvmittal10@gmail.com

Copy link
Contributor

@DL1231 DL1231 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, left some comments.

Comment on lines 2699 to 2705
if (currentAssignedPartitions.equals(entry.getValue().partitions())) {
// The assigned and initialized partitions match, so assignment does not need to be recomputed.
continue;
} else {
// The assigned and initialized partitions do not match, so recompute the assignment.
return true;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of the implementation below?

Suggested change
if (currentAssignedPartitions.equals(entry.getValue().partitions())) {
// The assigned and initialized partitions match, so assignment does not need to be recomputed.
continue;
} else {
// The assigned and initialized partitions do not match, so recompute the assignment.
return true;
}
if (!currentAssignedPartitions.equals(entry.getValue().partitions())) {
// The assigned and initialized partitions do not match, so recompute the assignment.
return true;
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code restructured slightly. I originally wrote it in a very simplistic style to make all of the cases really clear. But that was a bit verbose so tightened up now.

@@ -2692,7 +2690,27 @@ private boolean initializedAssignmentPending(ShareGroup group) {
}
}

return !initializedTps.equals(currentAssigned);
Set<String> subscribedTopicNames = group.subscribedTopicNames().keySet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add integration tests for this scenario?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added unit tests which verify that this method returns the expected value in all of the interesting cases. I suggest an integration test which verifies that the epoch does not increase continuously would be a good candidate for a future task.

@github-actions github-actions bot removed the small Small PRs label Jul 4, 2025
Copy link
Collaborator

@smjn smjn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, Thanks for the PR

Copy link
Contributor

@DL1231 DL1231 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the patch, LGTM

@AndrewJSchofield AndrewJSchofield merged commit da4fbba into apache:trunk Jul 4, 2025
38 of 40 checks passed
@AndrewJSchofield AndrewJSchofield deleted the KAFKA-19468 branch July 4, 2025 13:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants