-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-17747: [7/N] Add consumer group integration test for rack aware assignment #19856
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: PoAn Yang <payang@apache.org>
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Thanks for the PR. I left a few comments.
// RackId - set if present | ||
membershipManager.rackId().ifPresent(data::setRackId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that it should only be set if it did not change since the last request or if we send a full request.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated it. Thanks.
@@ -142,7 +142,7 @@ private ConsumerMembershipManager createMembershipManagerJoiningGroup(String gro | |||
|
|||
private ConsumerMembershipManager createMembershipManager(String groupInstanceId) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's extend unit tests to cover the rack field.
// The WorkerGroupMember in connect module also uses this class, but there is no client.rack in DistributedConfig. | ||
// Ignore the rackId in that case to avoid ConfigException. | ||
// The GroupCoordinatorService throws error if the rackId is empty. The default value of client.rack is empty string. | ||
// Skip empty rackId to avoid InvalidRequestException. | ||
if (config.values().containsKey(CommonClientConfigs.CLIENT_RACK_CONFIG) && !config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG).isEmpty()) { | ||
this.rackId = Optional.ofNullable(config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG)); | ||
} else { | ||
this.rackId = Optional.empty(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Should we only do this if the protocolType is CONSUMER?
- It may be simpler to get the config in a local variable and to set this.rack if non-null and non-empty;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I change to do this when protocol type is CONSUMER or SHARE, because they all support the field.
@@ -216,6 +222,92 @@ public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception { | |||
} | |||
} | |||
|
|||
@ClusterTest( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether we could add another test which verifies that a rebalance is triggered when the racks of a partition has changed. Have you considered it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I added it to the same case. It uses alterPartitionReassignments
to change partitions to different brokers and make sure consumers get new assignments.
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Thanks for the update. A few more comments.
// Connect doesn't support rack id. | ||
// The GroupCoordinatorService throws error if the rackId is empty. The default value of client.rack is empty string. | ||
// Skip empty rackId to avoid InvalidRequestException. | ||
this.rackId = config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG).isEmpty() ? | ||
Optional.empty() : | ||
Optional.of(config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In ConsumerCoordinator
, we have the following code:
this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId);
Hence, we can use the same logic for the classic protocol. I wonder if we could just do the following here for all protocols.
String rackId = config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG);
this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId);
We may also want to update ConsumerCoordinator
to get it from GroupRebalanceConfig
too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Update ConsumerCoordinator
to get rack id from GroupRebalanceConfig
. Thanks for the suggestion.
// RackId - sent when joining or if it has changed since the last heartbeat | ||
String rackId = membershipManager.rackId().orElse(null); | ||
if (sendAllFields || !Objects.equals(rackId, sentFields.rackId)) { | ||
data.setRackId(rackId); | ||
sentFields.rackId = rackId; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we also test this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added related test testRackIdInHeartbeatLifecycle
. BTW, I remove rackId
from sentFields
. Users cannot reconfigure it, so we don't need to keep last sent data in sentFields
.
Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 Thanks for the update. I left a few nits.
// Connect doesn't support rack id. | ||
// The GroupCoordinatorService throws error if the rackId is empty. The default value of client.rack is empty string. | ||
// Skip empty rackId to avoid InvalidRequestException. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We can remove this comment now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it. Thanks.
when(membershipManager.rackId()).thenReturn(Optional.empty()); | ||
data = heartbeatState.buildRequestData(); | ||
assertNull(data.rackId()); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we verify that it is sent if all fields are sent again?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I add a case: if member state changes to JOINING again, it sends rackId
again. Thanks.
…state is joining Signed-off-by: PoAn Yang <payang@apache.org>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm, thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@FrankYang0529 thanks for this patch. it is wonderful, and only two small comments are left. PTAL
new ConsumerConfig(Map.of( | ||
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", | ||
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", | ||
ConsumerConfig.CLIENT_RACK_CONFIG, "rack1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since an empty rack is illegal in the heartbeat RPC, the empty rack should be handled as no rack. That is a critical behavior, so could you please add unit test for it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Followup PR: #19906
Thanks for the suggestion.
/** | ||
* The RackAwareAssignor is a consumer group partition assignor that takes into account the rack | ||
* information of the members when assigning partitions to them. | ||
* It needs all brokers and members to have rack information available. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It also requires all member to subscribe same topics, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this only checks subscription of first member and assume all members use same subscription. This is a sample assignor under test module. Do we want to make it support heterogeneous group?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to make it support heterogeneous group?
no, it is used by testing only so it should be fine.
… assignment (apache#19856) * Add `RackAwareAssignor`. It uses `racksForPartition` to check the rack id of a partition and assign it to a member which has the same rack id. * Add `ConsumerIntegrationTest#testRackAwareAssignment` to check `racksForPartition` works correctly. Reviewers: David Jacot <djacot@confluent.io> --------- Signed-off-by: PoAn Yang <payang@apache.org>
RackAwareAssignor
. It usesracksForPartition
to check the rackid of a partition and assign it to a member which has the same rack id.
ConsumerIntegrationTest#testRackAwareAssignment
to checkracksForPartition
works correctly.Reviewers: David Jacot djacot@confluent.io