-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19468: Ignore unsubscribed topics when computing share assignment #20101
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch, left some comments.
if (currentAssignedPartitions.equals(entry.getValue().partitions())) { | ||
// The assigned and initialized partitions match, so assignment does not need to be recomputed. | ||
continue; | ||
} else { | ||
// The assigned and initialized partitions do not match, so recompute the assignment. | ||
return true; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What do you think of the implementation below?
if (currentAssignedPartitions.equals(entry.getValue().partitions())) { | |
// The assigned and initialized partitions match, so assignment does not need to be recomputed. | |
continue; | |
} else { | |
// The assigned and initialized partitions do not match, so recompute the assignment. | |
return true; | |
} | |
if (!currentAssignedPartitions.equals(entry.getValue().partitions())) { | |
// The assigned and initialized partitions do not match, so recompute the assignment. | |
return true; | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code restructured slightly. I originally wrote it in a very simplistic style to make all of the cases really clear. But that was a bit verbose so tightened up now.
@@ -2692,7 +2690,27 @@ private boolean initializedAssignmentPending(ShareGroup group) { | |||
} | |||
} | |||
|
|||
return !initializedTps.equals(currentAssigned); | |||
Set<String> subscribedTopicNames = group.subscribedTopicNames().keySet(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add integration tests for this scenario?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just added unit tests which verify that this method returns the expected value in all of the interesting cases. I suggest an integration test which verifies that the epoch does not increase continuously would be a good candidate for a future task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Thanks for the PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the patch, LGTM
When the group coordinator is processing a heartbeat from a share
consumer, it must decide whether the recompute the assignment. Part of
this decision hinges on whether the assigned partitions match the
partitions initialised by the share coordinator. However, when the set
of subscribed topics changes, there may be initialised partitions which
are not currently assigned. Topics which are not subscribed should be
omitted from the calculation about whether to recompute the assignment.
Co-authored-by: Sushant Mahajan smahajan@confluent.io
Reviewers: Lan Ding 53332773+DL1231@users.noreply.github.com, Sushant
Mahajan smahajan@confluent.io, Apoorv Mittal
apoorvmittal10@gmail.com