Skip to content

KAFKA-19369: Add group.share.assignors config and integration test #19900

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 6, 2025

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented Jun 4, 2025

  • Add group.share.assignors config to GroupCoordinatorConfig.
  • Send rackId in share group heartbeat request if it's not null.
  • Add integration test testShareConsumerWithRackAwareAssignor.

Reviewers: Lan Ding 53332773+DL1231@users.noreply.github.com, Andrew
Schofield aschofield@confluent.io

@FrankYang0529
Copy link
Member Author

I will remove wip and rebase trunk after #19856 is merged.

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 changed the title KAFKA-19369: Add group.share.assignors config and integration test (wip) KAFKA-19369: Add group.share.assignors config and integration test Jun 5, 2025
@FrankYang0529
Copy link
Member Author

The flaky test PlaintextConsumerTest#testCloseLeavesGroupOnInterrupt is not related to this PR.

https://issues.apache.org/jira/browse/KAFKA-18031

Copy link
Contributor

@DL1231 DL1231 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, LGTM, left a small comment.

Comment on lines 575 to 587
ShareGroupPartitionAssignor assignor;

if (Objects.equals(kclass, SHARE_GROUP_ASSIGNORS_DEFAULT)) {
assignor = SHARE_GROUP_BUILTIN_ASSIGNOR;
} else {
try {
assignor = Utils.newInstance(kclass, ShareGroupPartitionAssignor.class);
} catch (ClassNotFoundException e) {
throw new KafkaException("Class " + kclass + " cannot be found", e);
} catch (ClassCastException e) {
throw new KafkaException(kclass + " is not an instance of " + ShareGroupPartitionAssignor.class.getName());
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
ShareGroupPartitionAssignor assignor;
if (Objects.equals(kclass, SHARE_GROUP_ASSIGNORS_DEFAULT)) {
assignor = SHARE_GROUP_BUILTIN_ASSIGNOR;
} else {
try {
assignor = Utils.newInstance(kclass, ShareGroupPartitionAssignor.class);
} catch (ClassNotFoundException e) {
throw new KafkaException("Class " + kclass + " cannot be found", e);
} catch (ClassCastException e) {
throw new KafkaException(kclass + " is not an instance of " + ShareGroupPartitionAssignor.class.getName());
}
}
ShareGroupPartitionAssignor assignor = SHARE_GROUP_BUILTIN_ASSIGNOR;
if (!Objects.equals(kclass, SHARE_GROUP_ASSIGNORS_DEFAULT)) {
try {
assignor = Utils.newInstance(kclass, ShareGroupPartitionAssignor.class);
} catch (ClassNotFoundException e) {
throw new KafkaException("Class " + kclass + " cannot be found", e);
} catch (ClassCastException e) {
throw new KafkaException(kclass + " is not an instance of " + ShareGroupPartitionAssignor.class.getName());
}
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review. Updated it.

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Member

@AndrewJSchofield AndrewJSchofield left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. Just one comment but worth getting it right to start with I think. If you agree, I'll adjust the KIP before I merge.

@@ -235,6 +238,13 @@ public class GroupCoordinatorConfig {
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000;
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members.";

private static final ShareGroupPartitionAssignor SHARE_GROUP_BUILTIN_ASSIGNOR = new SimpleAssignor();
public static final String SHARE_GROUP_ASSIGNORS_CONFIG = "group.share.assignors";
public static final String SHARE_GROUP_ASSIGNORS_DOC = "The server-side assignors as a list of full class names. " +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you've done a slightly different thing here than it says in the KIP. I think it's an improvement so I want to adjust the KIP too. The doc should say "The server-side assignors as a list of either names for built-in assignors or full class names for custom assignors. The list must contain only a single entry which is used by all groups. The supported built-in assignors are: " + SHARE_GROUP_BUILTIN_ASSIGNOR.name() + ".". I don't think there's any need to include the sentence about future plans. That's more helpful as commentary in the KIP. In this way, I think we have a futureproof behaviour and accurate documentation.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree. I will update the PR later. Thanks.

Signed-off-by: PoAn Yang <payang@apache.org>
@AndrewJSchofield AndrewJSchofield merged commit 844b0e6 into apache:trunk Jun 6, 2025
21 checks passed
@FrankYang0529 FrankYang0529 deleted the KAFKA-19369 branch June 6, 2025 13:22
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions ci-approved clients connect consumer KIP-932 Queues for Kafka
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants