-
Notifications
You must be signed in to change notification settings - Fork 14.5k
KAFKA-19369: Add group.share.assignors config and integration test #19900
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
I will remove wip and rebase trunk after #19856 is merged. |
c0bcb0e
to
4e03d22
Compare
Signed-off-by: PoAn Yang <payang@apache.org>
4e03d22
to
088a5ca
Compare
The flaky test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR, LGTM, left a small comment.
ShareGroupPartitionAssignor assignor; | ||
|
||
if (Objects.equals(kclass, SHARE_GROUP_ASSIGNORS_DEFAULT)) { | ||
assignor = SHARE_GROUP_BUILTIN_ASSIGNOR; | ||
} else { | ||
try { | ||
assignor = Utils.newInstance(kclass, ShareGroupPartitionAssignor.class); | ||
} catch (ClassNotFoundException e) { | ||
throw new KafkaException("Class " + kclass + " cannot be found", e); | ||
} catch (ClassCastException e) { | ||
throw new KafkaException(kclass + " is not an instance of " + ShareGroupPartitionAssignor.class.getName()); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ShareGroupPartitionAssignor assignor; | |
if (Objects.equals(kclass, SHARE_GROUP_ASSIGNORS_DEFAULT)) { | |
assignor = SHARE_GROUP_BUILTIN_ASSIGNOR; | |
} else { | |
try { | |
assignor = Utils.newInstance(kclass, ShareGroupPartitionAssignor.class); | |
} catch (ClassNotFoundException e) { | |
throw new KafkaException("Class " + kclass + " cannot be found", e); | |
} catch (ClassCastException e) { | |
throw new KafkaException(kclass + " is not an instance of " + ShareGroupPartitionAssignor.class.getName()); | |
} | |
} | |
ShareGroupPartitionAssignor assignor = SHARE_GROUP_BUILTIN_ASSIGNOR; | |
if (!Objects.equals(kclass, SHARE_GROUP_ASSIGNORS_DEFAULT)) { | |
try { | |
assignor = Utils.newInstance(kclass, ShareGroupPartitionAssignor.class); | |
} catch (ClassNotFoundException e) { | |
throw new KafkaException("Class " + kclass + " cannot be found", e); | |
} catch (ClassCastException e) { | |
throw new KafkaException(kclass + " is not an instance of " + ShareGroupPartitionAssignor.class.getName()); | |
} | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the review. Updated it.
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR. Just one comment but worth getting it right to start with I think. If you agree, I'll adjust the KIP before I merge.
@@ -235,6 +238,13 @@ public class GroupCoordinatorConfig { | |||
public static final int SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT = 15000; | |||
public static final String SHARE_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC = "The maximum heartbeat interval for share group members."; | |||
|
|||
private static final ShareGroupPartitionAssignor SHARE_GROUP_BUILTIN_ASSIGNOR = new SimpleAssignor(); | |||
public static final String SHARE_GROUP_ASSIGNORS_CONFIG = "group.share.assignors"; | |||
public static final String SHARE_GROUP_ASSIGNORS_DOC = "The server-side assignors as a list of full class names. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you've done a slightly different thing here than it says in the KIP. I think it's an improvement so I want to adjust the KIP too. The doc should say "The server-side assignors as a list of either names for built-in assignors or full class names for custom assignors. The list must contain only a single entry which is used by all groups. The supported built-in assignors are: " + SHARE_GROUP_BUILTIN_ASSIGNOR.name() + "."
. I don't think there's any need to include the sentence about future plans. That's more helpful as commentary in the KIP. In this way, I think we have a futureproof behaviour and accurate documentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I will update the PR later. Thanks.
Signed-off-by: PoAn Yang <payang@apache.org>
group.share.assignors
config toGroupCoordinatorConfig
.rackId
in share group heartbeat request if it's not null.testShareConsumerWithRackAwareAssignor
.Reviewers: Lan Ding 53332773+DL1231@users.noreply.github.com, Andrew
Schofield aschofield@confluent.io