Skip to content

KAFKA-17747: [7/N] Add consumer group integration test for rack aware assignment #19856

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Jun 4, 2025

Conversation

FrankYang0529
Copy link
Member

@FrankYang0529 FrankYang0529 commented May 30, 2025

  • Add RackAwareAssignor. It uses racksForPartition to check the rack
    id of a partition and assign it to a member which has the same rack id.
  • Add ConsumerIntegrationTest#testRackAwareAssignment to check
    racksForPartition works correctly.

Reviewers: David Jacot djacot@confluent.io

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 changed the title KAFKA-17747: Add integration test for rack aware assignment (wip) KAFKA-17747: [7/N] Add integration test for rack aware assignment (wip) May 30, 2025
@github-actions github-actions bot added consumer build Gradle build or GitHub Actions clients labels May 30, 2025
@FrankYang0529 FrankYang0529 changed the title KAFKA-17747: [7/N] Add integration test for rack aware assignment (wip) KAFKA-17747: [7/N] Add integration test for rack aware assignment May 30, 2025
@FrankYang0529 FrankYang0529 requested review from dajac and lianetm May 30, 2025 23:46
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Thanks for the PR. I left a few comments.

Comment on lines 265 to 266
// RackId - set if present
membershipManager.rackId().ifPresent(data::setRackId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that it should only be set if it did not change since the last request or if we send a full request.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated it. Thanks.

@@ -142,7 +142,7 @@ private ConsumerMembershipManager createMembershipManagerJoiningGroup(String gro

private ConsumerMembershipManager createMembershipManager(String groupInstanceId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's extend unit tests to cover the rack field.

Comment on lines 77 to 85
// The WorkerGroupMember in connect module also uses this class, but there is no client.rack in DistributedConfig.
// Ignore the rackId in that case to avoid ConfigException.
// The GroupCoordinatorService throws error if the rackId is empty. The default value of client.rack is empty string.
// Skip empty rackId to avoid InvalidRequestException.
if (config.values().containsKey(CommonClientConfigs.CLIENT_RACK_CONFIG) && !config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG).isEmpty()) {
this.rackId = Optional.ofNullable(config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG));
} else {
this.rackId = Optional.empty();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Should we only do this if the protocolType is CONSUMER?
  • It may be simpler to get the config in a local variable and to set this.rack if non-null and non-empty;

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I change to do this when protocol type is CONSUMER or SHARE, because they all support the field.

@@ -216,6 +222,92 @@ public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception {
}
}

@ClusterTest(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder whether we could add another test which verifies that a rebalance is triggered when the racks of a partition has changed. Have you considered it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I added it to the same case. It uses alterPartitionReassignments to change partitions to different brokers and make sure consumers get new assignments.

Signed-off-by: PoAn Yang <payang@apache.org>
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Thanks for the update. A few more comments.

Comment on lines 57 to 62
// Connect doesn't support rack id.
// The GroupCoordinatorService throws error if the rackId is empty. The default value of client.rack is empty string.
// Skip empty rackId to avoid InvalidRequestException.
this.rackId = config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG).isEmpty() ?
Optional.empty() :
Optional.of(config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In ConsumerCoordinator, we have the following code:

this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId);

Hence, we can use the same logic for the classic protocol. I wonder if we could just do the following here for all protocols.

String rackId = config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG);
this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId);

We may also want to update ConsumerCoordinator to get it from GroupRebalanceConfig too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update ConsumerCoordinator to get rack id from GroupRebalanceConfig. Thanks for the suggestion.

Comment on lines 310 to 315
// RackId - sent when joining or if it has changed since the last heartbeat
String rackId = membershipManager.rackId().orElse(null);
if (sendAllFields || !Objects.equals(rackId, sentFields.rackId)) {
data.setRackId(rackId);
sentFields.rackId = rackId;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also test this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added related test testRackIdInHeartbeatLifecycle. BTW, I remove rackId from sentFields. Users cannot reconfigure it, so we don't need to keep last sent data in sentFields.

Signed-off-by: PoAn Yang <payang@apache.org>
@FrankYang0529 FrankYang0529 requested a review from dajac June 3, 2025 14:09
@FrankYang0529 FrankYang0529 changed the title KAFKA-17747: [7/N] Add integration test for rack aware assignment KAFKA-17747: [7/N] Add consumer group integration test for rack aware assignment Jun 4, 2025
Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 Thanks for the update. I left a few nits.

Comment on lines 57 to 59
// Connect doesn't support rack id.
// The GroupCoordinatorService throws error if the rackId is empty. The default value of client.rack is empty string.
// Skip empty rackId to avoid InvalidRequestException.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can remove this comment now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it. Thanks.

when(membershipManager.rackId()).thenReturn(Optional.empty());
data = heartbeatState.buildRequestData();
assertNull(data.rackId());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we verify that it is sent if all fields are sent again?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I add a case: if member state changes to JOINING again, it sends rackId again. Thanks.

Copy link
Member

@dajac dajac left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm, thanks

@dajac dajac merged commit 949617b into apache:trunk Jun 4, 2025
25 checks passed
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FrankYang0529 thanks for this patch. it is wonderful, and only two small comments are left. PTAL

new ConsumerConfig(Map.of(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer",
ConsumerConfig.CLIENT_RACK_CONFIG, "rack1"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since an empty rack is illegal in the heartbeat RPC, the empty rack should be handled as no rack. That is a critical behavior, so could you please add unit test for it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Followup PR: #19906

Thanks for the suggestion.

/**
* The RackAwareAssignor is a consumer group partition assignor that takes into account the rack
* information of the members when assigning partitions to them.
* It needs all brokers and members to have rack information available.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also requires all member to subscribe same topics, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this only checks subscription of first member and assume all members use same subscription. This is a sample assignor under test module. Do we want to make it support heterogeneous group?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to make it support heterogeneous group?

no, it is used by testing only so it should be fine.

@FrankYang0529 FrankYang0529 deleted the KAFKA-17747-7 branch June 5, 2025 02:41
Mirai1129 pushed a commit to Mirai1129/kafka that referenced this pull request Jun 5, 2025
… assignment (apache#19856)

* Add `RackAwareAssignor`. It uses `racksForPartition` to check the rack
id of a partition and assign it to a member which has the same rack id.
* Add `ConsumerIntegrationTest#testRackAwareAssignment` to check
`racksForPartition` works correctly.

Reviewers: David Jacot <djacot@confluent.io>

---------

Signed-off-by: PoAn Yang <payang@apache.org>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
build Gradle build or GitHub Actions clients connect consumer
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants