diff --git a/build.gradle b/build.gradle index 968eda48e8852..a3f1965f81a8b 100644 --- a/build.gradle +++ b/build.gradle @@ -1995,6 +1995,7 @@ project(':clients:clients-integration-tests') { implementation project(':server-common') testImplementation project(':metadata') implementation project(':group-coordinator') + implementation project(':group-coordinator:group-coordinator-api') implementation project(':transaction-coordinator') testImplementation libs.junitJupiter diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 9fc1bcd7eff37..b1ef62ca3a26b 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -235,6 +235,7 @@ + diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java index a56de229318dd..8a41c1a8beb02 100644 --- a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/ConsumerIntegrationTest.java @@ -16,7 +16,12 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.NewPartitionReassignment; +import org.apache.kafka.clients.admin.NewPartitions; +import org.apache.kafka.clients.admin.NewTopic; import org.apache.kafka.clients.consumer.internals.AbstractHeartbeatRequestManager; +import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.KafkaException; @@ -31,12 +36,16 @@ import org.apache.kafka.common.test.api.ClusterTest; import org.apache.kafka.common.test.api.ClusterTests; import org.apache.kafka.common.test.api.Type; +import org.apache.kafka.coordinator.group.GroupCoordinatorConfig; import java.time.Duration; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ExecutionException; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -216,6 +225,116 @@ public void testLeaderEpoch(ClusterInstance clusterInstance) throws Exception { } } + @ClusterTest( + types = {Type.KRAFT}, + brokers = 3, + serverProperties = { + @ClusterConfigProperty(id = 0, key = "broker.rack", value = "rack0"), + @ClusterConfigProperty(id = 1, key = "broker.rack", value = "rack1"), + @ClusterConfigProperty(id = 2, key = "broker.rack", value = "rack2"), + @ClusterConfigProperty(key = GroupCoordinatorConfig.CONSUMER_GROUP_ASSIGNORS_CONFIG, value = "org.apache.kafka.clients.consumer.RackAwareAssignor") + } + ) + public void testRackAwareAssignment(ClusterInstance clusterInstance) throws ExecutionException, InterruptedException { + String topic = "test-topic"; + try (Admin admin = clusterInstance.admin(); + Producer producer = clusterInstance.producer(); + Consumer consumer0 = clusterInstance.consumer(Map.of( + ConsumerConfig.GROUP_ID_CONFIG, "group0", + ConsumerConfig.CLIENT_RACK_CONFIG, "rack0", + ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name() + )); + Consumer consumer1 = clusterInstance.consumer(Map.of( + ConsumerConfig.GROUP_ID_CONFIG, "group0", + ConsumerConfig.CLIENT_RACK_CONFIG, "rack1", + ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name() + )); + Consumer consumer2 = clusterInstance.consumer(Map.of( + ConsumerConfig.GROUP_ID_CONFIG, "group0", + ConsumerConfig.CLIENT_RACK_CONFIG, "rack2", + ConsumerConfig.GROUP_PROTOCOL_CONFIG, GroupProtocol.CONSUMER.name() + )) + ) { + // Create a new topic with 1 partition on broker 0. + admin.createTopics(List.of(new NewTopic(topic, Map.of(0, List.of(0))))); + clusterInstance.waitForTopic(topic, 1); + + producer.send(new ProducerRecord<>(topic, "key".getBytes(), "value".getBytes())); + producer.flush(); + + consumer0.subscribe(List.of(topic)); + consumer1.subscribe(List.of(topic)); + consumer2.subscribe(List.of(topic)); + + TestUtils.waitForCondition(() -> { + consumer0.poll(Duration.ofMillis(1000)); + consumer1.poll(Duration.ofMillis(1000)); + consumer2.poll(Duration.ofMillis(1000)); + return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) && + consumer1.assignment().isEmpty() && + consumer2.assignment().isEmpty(); + }, "Consumer 0 should be assigned to topic partition 0"); + + // Add a new partition 1 and 2 to broker 1. + admin.createPartitions( + Map.of( + topic, + NewPartitions.increaseTo(3, List.of(List.of(1), List.of(1))) + ) + ); + clusterInstance.waitForTopic(topic, 3); + TestUtils.waitForCondition(() -> { + consumer0.poll(Duration.ofMillis(1000)); + consumer1.poll(Duration.ofMillis(1000)); + consumer2.poll(Duration.ofMillis(1000)); + return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) && + consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) && + consumer2.assignment().isEmpty(); + }, "Consumer 1 should be assigned to topic partition 1 and 2"); + + // Add a new partition 3, 4, and 5 to broker 2. + admin.createPartitions( + Map.of( + topic, + NewPartitions.increaseTo(6, List.of(List.of(2), List.of(2), List.of(2))) + ) + ); + clusterInstance.waitForTopic(topic, 6); + TestUtils.waitForCondition(() -> { + consumer0.poll(Duration.ofMillis(1000)); + consumer1.poll(Duration.ofMillis(1000)); + consumer2.poll(Duration.ofMillis(1000)); + return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 0))) && + consumer1.assignment().equals(Set.of(new TopicPartition(topic, 1), new TopicPartition(topic, 2))) && + consumer2.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4), new TopicPartition(topic, 5))); + }, "Consumer 2 should be assigned to topic partition 3, 4, and 5"); + + // Change partitions to different brokers. + // partition 0 -> broker 2 + // partition 1 -> broker 2 + // partition 2 -> broker 2 + // partition 3 -> broker 1 + // partition 4 -> broker 1 + // partition 5 -> broker 0 + admin.alterPartitionReassignments(Map.of( + new TopicPartition(topic, 0), Optional.of(new NewPartitionReassignment(List.of(2))), + new TopicPartition(topic, 1), Optional.of(new NewPartitionReassignment(List.of(2))), + new TopicPartition(topic, 2), Optional.of(new NewPartitionReassignment(List.of(2))), + new TopicPartition(topic, 3), Optional.of(new NewPartitionReassignment(List.of(1))), + new TopicPartition(topic, 4), Optional.of(new NewPartitionReassignment(List.of(1))), + new TopicPartition(topic, 5), Optional.of(new NewPartitionReassignment(List.of(0))) + )).all().get(); + TestUtils.waitForCondition(() -> { + consumer0.poll(Duration.ofMillis(1000)); + consumer1.poll(Duration.ofMillis(1000)); + consumer2.poll(Duration.ofMillis(1000)); + return consumer0.assignment().equals(Set.of(new TopicPartition(topic, 5))) && + consumer1.assignment().equals(Set.of(new TopicPartition(topic, 3), new TopicPartition(topic, 4))) && + consumer2.assignment().equals(Set.of(new TopicPartition(topic, 0), new TopicPartition(topic, 1), new TopicPartition(topic, 2))); + }, "Consumer with topic partition mapping should be 0 -> 5 | 1 -> 3, 4 | 2 -> 0, 1, 2"); + } + } + private void sendMsg(ClusterInstance clusterInstance, String topic, int sendMsgNum) { try (var producer = clusterInstance.producer(Map.of( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class, diff --git a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java new file mode 100644 index 0000000000000..e71e1f8a1a3f3 --- /dev/null +++ b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/consumer/RackAwareAssignor.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.consumer; + +import org.apache.kafka.common.Uuid; +import org.apache.kafka.coordinator.group.api.assignor.ConsumerGroupPartitionAssignor; +import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment; +import org.apache.kafka.coordinator.group.api.assignor.GroupSpec; +import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment; +import org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException; +import org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber; +import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * The RackAwareAssignor is a consumer group partition assignor that takes into account the rack + * information of the members when assigning partitions to them. + * It needs all brokers and members to have rack information available. + */ +public class RackAwareAssignor implements ConsumerGroupPartitionAssignor { + @Override + public String name() { + return "rack-aware-assignor"; + } + + @Override + public GroupAssignment assign(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) throws PartitionAssignorException { + Map rackIdToMemberId = new HashMap<>(); + List memberIds = new ArrayList<>(groupSpec.memberIds()); + for (String memberId : memberIds) { + if (groupSpec.memberSubscription(memberId).rackId().isEmpty()) { + throw new PartitionAssignorException("Member " + memberId + " does not have rack information available."); + } + rackIdToMemberId.put( + groupSpec.memberSubscription(memberId).rackId().get(), + memberId + ); + } + + Map>> assignments = new HashMap<>(); + for (Uuid topicId : groupSpec.memberSubscription(memberIds.get(0)).subscribedTopicIds()) { + int numPartitions = subscribedTopicDescriber.numPartitions(topicId); + if (numPartitions == -1) { + throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); + } + + for (int partitionId = 0; partitionId < numPartitions; partitionId++) { + Set racks = subscribedTopicDescriber.racksForPartition(topicId, partitionId); + if (racks.isEmpty()) { + throw new PartitionAssignorException("No racks available for partition " + partitionId + " of topic " + topicId); + } + + String assignedRack = null; + for (String rack : racks) { + String memberId = rackIdToMemberId.get(rack); + if (memberId == null) { + continue; + } + assignedRack = rack; + break; + } + + if (assignedRack == null) { + throw new PartitionAssignorException("No member found for racks " + racks + " for partition " + partitionId + " of topic " + topicId); + } + + Map> assignment = assignments.computeIfAbsent( + rackIdToMemberId.get(assignedRack), + k -> new HashMap<>() + ); + Set partitions = assignment.computeIfAbsent( + topicId, + k -> new java.util.HashSet<>() + ); + partitions.add(partitionId); + } + } + + Map memberAssignments = new HashMap<>(); + for (Map.Entry>> entry : assignments.entrySet()) { + memberAssignments.put(entry.getKey(), new MemberAssignmentImpl(entry.getValue())); + } + return new GroupAssignment(memberAssignments); + } +} diff --git a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java index 4aff7c8c0a88f..fce243ebc6461 100644 --- a/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/GroupRebalanceConfig.java @@ -43,6 +43,7 @@ public String toString() { public final int heartbeatIntervalMs; public final String groupId; public final Optional groupInstanceId; + public final Optional rackId; public final long retryBackoffMs; public final long retryBackoffMaxMs; public final boolean leaveGroupOnClose; @@ -53,8 +54,12 @@ public GroupRebalanceConfig(AbstractConfig config, ProtocolType protocolType) { // Consumer and Connect use different config names for defining rebalance timeout if ((protocolType == ProtocolType.CONSUMER) || (protocolType == ProtocolType.SHARE)) { this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.MAX_POLL_INTERVAL_MS_CONFIG); + + String rackId = config.getString(CommonClientConfigs.CLIENT_RACK_CONFIG); + this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId); } else { this.rebalanceTimeoutMs = config.getInt(CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG); + this.rackId = Optional.empty(); } this.heartbeatIntervalMs = config.getInt(CommonClientConfigs.HEARTBEAT_INTERVAL_MS_CONFIG); @@ -90,6 +95,7 @@ public GroupRebalanceConfig(final int sessionTimeoutMs, final int heartbeatIntervalMs, String groupId, Optional groupInstanceId, + String rackId, long retryBackoffMs, long retryBackoffMaxMs, boolean leaveGroupOnClose) { @@ -98,6 +104,7 @@ public GroupRebalanceConfig(final int sessionTimeoutMs, this.heartbeatIntervalMs = heartbeatIntervalMs; this.groupId = groupId; this.groupInstanceId = groupInstanceId; + this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId); this.retryBackoffMs = retryBackoffMs; this.retryBackoffMaxMs = retryBackoffMaxMs; this.leaveGroupOnClose = leaveGroupOnClose; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java index e17ae8ae97238..3a50ff037abaa 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ClassicKafkaConsumer.java @@ -230,7 +230,6 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { config.getInt(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG), this.interceptors, config.getBoolean(THROW_ON_FETCH_STABLE_OFFSET_UNSUPPORTED), - config.getString(ConsumerConfig.CLIENT_RACK_CONFIG), clientTelemetryReporter); } this.fetcher = new Fetcher<>( @@ -330,6 +329,7 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { heartbeatIntervalMs, groupId.get(), groupInstanceId, + rackId, retryBackoffMs, retryBackoffMaxMs, true @@ -348,7 +348,6 @@ public class ClassicKafkaConsumer implements ConsumerDelegate { autoCommitIntervalMs, interceptors, throwOnStableOffsetNotSupported, - rackId, clientTelemetryReporter ); } else { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 499d7363f6b73..4956d64228dbb 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -178,7 +178,6 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, int autoCommitIntervalMs, ConsumerInterceptors interceptors, boolean throwOnFetchStableOffsetsUnsupported, - String rackId, Optional clientTelemetryReporter) { this(rebalanceConfig, logContext, @@ -193,7 +192,6 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, autoCommitIntervalMs, interceptors, throwOnFetchStableOffsetsUnsupported, - rackId, clientTelemetryReporter, Optional.empty()); } @@ -214,7 +212,6 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, int autoCommitIntervalMs, ConsumerInterceptors interceptors, boolean throwOnFetchStableOffsetsUnsupported, - String rackId, Optional clientTelemetryReporter, Optional> heartbeatThreadSupplier) { super(rebalanceConfig, @@ -228,7 +225,7 @@ public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig, this.rebalanceConfig = rebalanceConfig; this.log = logContext.logger(ConsumerCoordinator.class); this.metadata = metadata; - this.rackId = rackId == null || rackId.isEmpty() ? Optional.empty() : Optional.of(rackId); + this.rackId = rebalanceConfig.rackId; this.metadataSnapshot = new MetadataSnapshot(this.rackId, subscriptions, metadata.fetch(), metadata.updateVersion()); this.subscriptions = subscriptions; this.defaultOffsetCommitCallback = new DefaultOffsetCommitCallback(); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java index 2845f4bc9ee51..c5f95305a4747 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManager.java @@ -247,6 +247,7 @@ public void reset() { sentFields.reset(); } + @SuppressWarnings("NPathComplexity") public ConsumerGroupHeartbeatRequestData buildRequestData() { ConsumerGroupHeartbeatRequestData data = new ConsumerGroupHeartbeatRequestData(); @@ -306,6 +307,12 @@ public ConsumerGroupHeartbeatRequestData buildRequestData() { sentFields.localAssignment = local; } + // RackId - sent when joining + String rackId = membershipManager.rackId().orElse(null); + if (sendAllFields) { + data.setRackId(rackId); + } + return data; } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java index bf0f593d8edd6..b615977075441 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java @@ -112,6 +112,8 @@ public class ConsumerMembershipManager extends AbstractMembershipManager groupInstanceId; + private final Optional rackId; + /** * Rebalance timeout. To be used as time limit for the commit request issued * when a new assignment is received, that is retried until it succeeds, fails with a @@ -140,6 +142,7 @@ public class ConsumerMembershipManager extends AbstractMembershipManager groupInstanceId, + Optional rackId, int rebalanceTimeoutMs, Optional serverAssignor, SubscriptionState subscriptions, @@ -152,6 +155,7 @@ public ConsumerMembershipManager(String groupId, boolean autoCommitEnabled) { this(groupId, groupInstanceId, + rackId, rebalanceTimeoutMs, serverAssignor, subscriptions, @@ -167,6 +171,7 @@ public ConsumerMembershipManager(String groupId, // Visible for testing ConsumerMembershipManager(String groupId, Optional groupInstanceId, + Optional rackId, int rebalanceTimeoutMs, Optional serverAssignor, SubscriptionState subscriptions, @@ -185,6 +190,7 @@ public ConsumerMembershipManager(String groupId, metricsManager, autoCommitEnabled); this.groupInstanceId = groupInstanceId; + this.rackId = rackId; this.rebalanceTimeoutMs = rebalanceTimeoutMs; this.serverAssignor = serverAssignor; this.commitRequestManager = commitRequestManager; @@ -199,6 +205,10 @@ public Optional groupInstanceId() { return groupInstanceId; } + public Optional rackId() { + return rackId; + } + /** * {@inheritDoc} */ diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java index f341dc35a4a53..32c76f8c73244 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/RequestManagers.java @@ -248,6 +248,7 @@ protected RequestManagers create() { membershipManager = new ConsumerMembershipManager( groupRebalanceConfig.groupId, groupRebalanceConfig.groupInstanceId, + groupRebalanceConfig.rackId, groupRebalanceConfig.rebalanceTimeoutMs, serverAssignor, subscriptions, diff --git a/clients/src/test/java/org/apache/kafka/clients/GroupRebalanceConfigTest.java b/clients/src/test/java/org/apache/kafka/clients/GroupRebalanceConfigTest.java new file mode 100644 index 0000000000000..bd9bc2252056a --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/GroupRebalanceConfigTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients; + +import org.apache.kafka.clients.consumer.ConsumerConfig; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class GroupRebalanceConfigTest { + + @ParameterizedTest + @EnumSource(value = GroupRebalanceConfig.ProtocolType.class, names = {"CONSUMER", "SHARE"}) + void testRackIdIsEmptyIfNoDefined(GroupRebalanceConfig.ProtocolType protocolType) { + GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( + new ConsumerConfig(Map.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" + )), + protocolType + ); + assertTrue(groupRebalanceConfig.rackId.isEmpty()); + } + + @ParameterizedTest + @EnumSource(value = GroupRebalanceConfig.ProtocolType.class, names = {"CONSUMER", "SHARE"}) + void testRackIdIsNotEmptyIfDefined(GroupRebalanceConfig.ProtocolType protocolType) { + GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig( + new ConsumerConfig(Map.of( + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer", + ConsumerConfig.CLIENT_RACK_CONFIG, "rack1" + )), + protocolType + ); + assertTrue(groupRebalanceConfig.rackId.isPresent()); + assertEquals("rack1", groupRebalanceConfig.rackId.get()); + } +} diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java index df784da79379c..6aa3095fadaae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java @@ -166,6 +166,7 @@ false, false, new SubscriptionState(logContext, AutoOffsetResetStrategy.EARLIEST HEARTBEAT_INTERVAL_MS, GROUP_ID, groupInstanceId, + null, retryBackoffMs, retryBackoffMaxMs, leaveOnClose); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 8d364b2ae30ce..683a25a3e1c1f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -208,7 +208,7 @@ public void setup() { this.rebalanceListener = new MockRebalanceListener(); this.mockOffsetCommitCallback = new MockCommitCallback(); this.partitionAssignor.clear(); - this.rebalanceConfig = buildRebalanceConfig(Optional.empty()); + this.rebalanceConfig = buildRebalanceConfig(Optional.empty(), null); this.coordinator = buildCoordinator(rebalanceConfig, metrics, assignors, @@ -216,12 +216,13 @@ public void setup() { subscriptions); } - private GroupRebalanceConfig buildRebalanceConfig(Optional groupInstanceId) { + private GroupRebalanceConfig buildRebalanceConfig(Optional groupInstanceId, String rackId) { return new GroupRebalanceConfig(sessionTimeoutMs, rebalanceTimeoutMs, heartbeatIntervalMs, groupId, groupInstanceId, + rackId, retryBackoffMs, retryBackoffMaxMs, groupInstanceId.isEmpty()); @@ -2974,7 +2975,7 @@ public void testCommitOffsetFencedInstanceWithNewGeneration() { @Test public void testCommitOffsetShouldNotSetInstanceIdIfMemberIdIsUnknown() { - rebalanceConfig = buildRebalanceConfig(groupInstanceId); + rebalanceConfig = buildRebalanceConfig(groupInstanceId, null); ConsumerCoordinator coordinator = buildCoordinator( rebalanceConfig, new Metrics(), @@ -3699,7 +3700,6 @@ private void supportStableFlag(final short upperVersion, final boolean expectThr autoCommitIntervalMs, null, true, - null, Optional.empty()); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE)); @@ -3750,7 +3750,7 @@ private ConsumerCoordinator prepareCoordinatorForCloseTest(final boolean useGrou final boolean autoCommit, final Optional groupInstanceId, final boolean shouldPoll) { - rebalanceConfig = buildRebalanceConfig(groupInstanceId); + rebalanceConfig = buildRebalanceConfig(groupInstanceId, null); ConsumerCoordinator coordinator = buildCoordinator(rebalanceConfig, new Metrics(), assignors, @@ -3868,7 +3868,6 @@ private ConsumerCoordinator buildCoordinator(final GroupRebalanceConfig rebalanc autoCommitIntervalMs, null, false, - null, Optional.empty()); } @@ -4112,9 +4111,10 @@ private void createRackAwareCoordinator(String rackId, MockPartitionAssignor ass metrics = new Metrics(time); + rebalanceConfig = buildRebalanceConfig(rebalanceConfig.groupInstanceId, rackId); coordinator = new ConsumerCoordinator(rebalanceConfig, new LogContext(), consumerClient, Collections.singletonList(assignor), metadata, subscriptions, - metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, rackId, Optional.empty()); + metrics, consumerId + groupId, time, false, autoCommitIntervalMs, null, false, Optional.empty()); } private static MetadataResponse rackAwareMetadata(int numNodes, @@ -4193,7 +4193,6 @@ private void createMockHeartbeatThreadCoordinator() { autoCommitIntervalMs, null, false, - null, Optional.empty(), Optional.of(() -> Mockito.mock(BaseHeartbeatThread.class))); } diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java index 2bdc8819aec64..9063ae5ab5bf4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerHeartbeatRequestManagerTest.java @@ -1009,6 +1009,34 @@ public void testRegexInJoiningHeartbeat() { assertNull(data.subscribedTopicRegex()); } + @Test + public void testRackIdInHeartbeatLifecycle() { + heartbeatState = new HeartbeatState(subscriptions, membershipManager, DEFAULT_MAX_POLL_INTERVAL_MS); + createHeartbeatRequestStateWithZeroHeartbeatInterval(); + + // Initial heartbeat with rackId + mockJoiningMemberData(null); + when(membershipManager.rackId()).thenReturn(Optional.of("rack1")); + ConsumerGroupHeartbeatRequestData data = heartbeatState.buildRequestData(); + assertEquals("rack1", data.rackId()); + + // RackId not included in HB if member state is not JOINING + when(membershipManager.state()).thenReturn(MemberState.STABLE); + data = heartbeatState.buildRequestData(); + assertNull(data.rackId()); + + // RackId included in HB if member state changes to JOINING again + when(membershipManager.state()).thenReturn(MemberState.JOINING); + data = heartbeatState.buildRequestData(); + assertEquals("rack1", data.rackId()); + + // Empty rackId not included in HB + when(membershipManager.rackId()).thenReturn(Optional.empty()); + heartbeatState = new HeartbeatState(subscriptions, membershipManager, DEFAULT_MAX_POLL_INTERVAL_MS); + data = heartbeatState.buildRequestData(); + assertNull(data.rackId()); + } + private void assertHeartbeat(ConsumerHeartbeatRequestManager hrm, int nextPollMs) { NetworkClientDelegate.PollResult pollResult = hrm.poll(time.milliseconds()); assertEquals(1, pollResult.unsentRequests.size()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java index b8557e3701565..aa8c7bdb1dcae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManagerTest.java @@ -142,17 +142,20 @@ private ConsumerMembershipManager createMembershipManagerJoiningGroup(String gro private ConsumerMembershipManager createMembershipManager(String groupInstanceId) { ConsumerMembershipManager manager = spy(new ConsumerMembershipManager( - GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, Optional.empty(), + GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, backgroundEventHandler, time, rebalanceMetricsManager, true)); assertMemberIdIsGenerated(manager.memberId()); return manager; } - private ConsumerMembershipManager createMembershipManagerJoiningGroup(String groupInstanceId, - String serverAssignor) { + private ConsumerMembershipManager createMembershipManagerJoiningGroup( + String groupInstanceId, + String serverAssignor, + String rackId + ) { ConsumerMembershipManager manager = spy(new ConsumerMembershipManager( - GROUP_ID, Optional.ofNullable(groupInstanceId), REBALANCE_TIMEOUT, + GROUP_ID, Optional.ofNullable(groupInstanceId), Optional.ofNullable(rackId), REBALANCE_TIMEOUT, Optional.ofNullable(serverAssignor), subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, backgroundEventHandler, time, rebalanceMetricsManager, true)); assertMemberIdIsGenerated(manager.memberId()); @@ -165,10 +168,19 @@ public void testMembershipManagerServerAssignor() { ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup(); assertEquals(Optional.empty(), membershipManager.serverAssignor()); - membershipManager = createMembershipManagerJoiningGroup("instance1", "Uniform"); + membershipManager = createMembershipManagerJoiningGroup("instance1", "Uniform", null); assertEquals(Optional.of("Uniform"), membershipManager.serverAssignor()); } + @Test + public void testMembershipManagerRackId() { + ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup(); + assertEquals(Optional.empty(), membershipManager.rackId()); + + membershipManager = createMembershipManagerJoiningGroup(null, null, "rack1"); + assertEquals(Optional.of("rack1"), membershipManager.rackId()); + } + @Test public void testMembershipManagerInitSupportsEmptyGroupInstanceId() { createMembershipManagerJoiningGroup(); @@ -231,7 +243,7 @@ public void testTransitionToFatal() { @Test public void testTransitionToFailedWhenTryingToJoin() { ConsumerMembershipManager membershipManager = new ConsumerMembershipManager( - GROUP_ID, Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), + GROUP_ID, Optional.empty(), Optional.empty(), REBALANCE_TIMEOUT, Optional.empty(), subscriptionState, commitRequestManager, metadata, LOG_CONTEXT, backgroundEventHandler, time, rebalanceMetricsManager, true); assertEquals(MemberState.UNSUBSCRIBED, membershipManager.state()); @@ -2737,7 +2749,7 @@ private ConsumerMembershipManager createMemberInStableState() { } private ConsumerMembershipManager createMemberInStableState(String groupInstanceId) { - ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup(groupInstanceId, null); + ConsumerMembershipManager membershipManager = createMembershipManagerJoiningGroup(groupInstanceId, null, null); ConsumerGroupHeartbeatResponse heartbeatResponse = createConsumerGroupHeartbeatResponse(new Assignment(), membershipManager.memberId()); when(subscriptionState.hasAutoAssignedPartitions()).thenReturn(true); when(subscriptionState.rebalanceListener()).thenReturn(Optional.empty()); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java index 49102da976603..de7937673c888 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/HeartbeatTest.java @@ -45,6 +45,7 @@ public void setUp() { heartbeatIntervalMs, "group_id", Optional.empty(), + null, retryBackoffMs, retryBackoffMaxMs, true); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java index a8e7cd465529f..b0408b1273584 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java @@ -149,6 +149,7 @@ public void init(ConnectProtocolCompatibility compatibility) { heartbeatIntervalMs, groupId, Optional.empty(), + null, retryBackoffMs, retryBackoffMaxMs, true); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java index 4122578266aaf..9b6d591522522 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java @@ -139,6 +139,7 @@ public void setup(ConnectProtocolCompatibility compatibility) { heartbeatIntervalMs, groupId, Optional.empty(), + null, retryBackoffMs, retryBackoffMaxMs, true);