diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java index 962b1e6cba95..82959d26ce35 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java @@ -215,7 +215,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() { OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( new OffsetCommitRequestData() .setGroupId(this.groupId) - .setGenerationId(generation.generationId) + .setGenerationIdOrMemberEpoch(generation.generationId) .setMemberId(generation.memberId) .setGroupInstanceId(groupInstanceId) .setTopics(new ArrayList<>(requestTopicDataMap.values()))); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index bcd4b377881e..82125be17052 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -1361,7 +1361,7 @@ RequestFuture sendOffsetCommitRequest(final Map(requestTopicDataMap.values())) diff --git a/clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java b/clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java new file mode 100644 index 000000000000..7b373c5b5c11 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/errors/StaleMemberEpochException.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.errors; + +import org.apache.kafka.common.annotation.InterfaceStability; + +/** + * The StaleMemberEpochException is used in the context of the new + * consumer group protocol (KIP-848). This error is returned in the + * OffsetCommit/Fetch APIs when the member epoch received does not + * match the current member epoch. + */ +@InterfaceStability.Evolving +public class StaleMemberEpochException extends ApiException { + public StaleMemberEpochException(String message) { + super(message); + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java index b5ea650b1663..1ccdcd0627cb 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java @@ -108,6 +108,7 @@ import org.apache.kafka.common.errors.SecurityDisabledException; import org.apache.kafka.common.errors.SnapshotNotFoundException; import org.apache.kafka.common.errors.StaleBrokerEpochException; +import org.apache.kafka.common.errors.StaleMemberEpochException; import org.apache.kafka.common.errors.ThrottlingQuotaExceededException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; @@ -378,7 +379,8 @@ public enum Errors { OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new), FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group coordinator. The member must abandon all its partitions and rejoin.", FencedMemberEpochException::new), UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another member in the consumer group. That member must leave first.", UnreleasedInstanceIdException::new), - UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not supported by the consumer group.", UnsupportedAssignorException::new); + UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not supported by the consumer group.", UnsupportedAssignorException::new), + STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new); private static final Logger log = LoggerFactory.getLogger(Errors.class); diff --git a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java index 1988347a9741..64f1c2e4a2f8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java @@ -37,8 +37,15 @@ public static abstract class Builder { /** * Construct a new builder which allows any supported version */ + public Builder(ApiKeys apiKey, boolean enableUnstableLastVersion) { + this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion(enableUnstableLastVersion)); + } + + /** + * Construct a new builder which allows any supported and released version + */ public Builder(ApiKeys apiKey) { - this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion()); + this(apiKey, false); } /** diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java index 215e18ea2de8..6e42111670ab 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/ConsumerGroupHeartbeatRequest.java @@ -30,7 +30,11 @@ public static class Builder extends AbstractRequest.Builder offsets() { return offsets; } - public static List getErrorResponseTopics( - List requestTopics, - Errors e) { - List responseTopicData = new ArrayList<>(); - for (OffsetCommitRequestTopic entry : requestTopics) { - List responsePartitions = - new ArrayList<>(); - for (OffsetCommitRequestData.OffsetCommitRequestPartition requestPartition : entry.partitions()) { - responsePartitions.add(new OffsetCommitResponsePartition() - .setPartitionIndex(requestPartition.partitionIndex()) - .setErrorCode(e.code())); - } - responseTopicData.add(new OffsetCommitResponseTopic() - .setName(entry.name()) - .setPartitions(responsePartitions) - ); - } - return responseTopicData; + public static OffsetCommitResponseData getErrorResponse( + OffsetCommitRequestData request, + Errors error + ) { + OffsetCommitResponseData response = new OffsetCommitResponseData(); + request.topics().forEach(topic -> { + OffsetCommitResponseTopic responseTopic = new OffsetCommitResponseTopic() + .setName(topic.name()); + response.topics().add(responseTopic); + + topic.partitions().forEach(partition -> { + responseTopic.partitions().add(new OffsetCommitResponsePartition() + .setPartitionIndex(partition.partitionIndex()) + .setErrorCode(error.code())); + }); + }); + return response; } @Override public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) { - List - responseTopicData = getErrorResponseTopics(data.topics(), Errors.forException(e)); - return new OffsetCommitResponse(new OffsetCommitResponseData() - .setTopics(responseTopicData) - .setThrottleTimeMs(throttleTimeMs)); + return new OffsetCommitResponse(getErrorResponse(data, Errors.forException(e)) + .setThrottleTimeMs(throttleTimeMs)); } @Override @@ -126,4 +121,8 @@ public OffsetCommitResponse getErrorResponse(Throwable e) { public static OffsetCommitRequest parse(ByteBuffer buffer, short version) { return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version); } + + public static Optional groupInstanceId(OffsetCommitRequestData request) { + return Optional.ofNullable(request.groupInstanceId()); + } } diff --git a/clients/src/main/resources/common/message/OffsetCommitRequest.json b/clients/src/main/resources/common/message/OffsetCommitRequest.json index cf112e1ed72c..c11b56c95448 100644 --- a/clients/src/main/resources/common/message/OffsetCommitRequest.json +++ b/clients/src/main/resources/common/message/OffsetCommitRequest.json @@ -31,13 +31,19 @@ // version 7 adds a new field called groupInstanceId to indicate member identity across restarts. // // Version 8 is the first flexible version. - "validVersions": "0-8", + // + // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The + // request is the same as version 8. + // Version 9 is added as part of KIP-848 and is still under development. Hence, the last version of the + // API is not exposed by default by brokers unless explicitly enabled. + "latestVersionUnstable": true, + "validVersions": "0-9", "flexibleVersions": "8+", "fields": [ { "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId", "about": "The unique group identifier." }, - { "name": "GenerationId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, - "about": "The generation of the group." }, + { "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true, + "about": "The generation of the group if using the generic group protocol or the member epoch if using the consumer protocol." }, { "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true, "about": "The member ID assigned by the group coordinator." }, { "name": "GroupInstanceId", "type": "string", "versions": "7+", diff --git a/clients/src/main/resources/common/message/OffsetCommitResponse.json b/clients/src/main/resources/common/message/OffsetCommitResponse.json index 3d547794cfb4..797c7bfe515c 100644 --- a/clients/src/main/resources/common/message/OffsetCommitResponse.json +++ b/clients/src/main/resources/common/message/OffsetCommitResponse.json @@ -28,8 +28,21 @@ // Version 7 offsetCommitRequest supports a new field called groupInstanceId to indicate member identity across restarts. // // Version 8 is the first flexible version. - "validVersions": "0-8", + // + // Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is + // the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used. + "validVersions": "0-9", "flexibleVersions": "8+", + // Supported errors: + // - GROUP_AUTHORIZATION_FAILED (version 0+) + // - NOT_COORDINATOR (version 0+) + // - COORDINATOR_NOT_AVAILABLE (version 0+) + // - COORDINATOR_LOAD_IN_PROGRESS (version 0+) + // - ILLEGAL_GENERATION (version 1+) + // - UNKNOWN_MEMBER_ID (version 1+) + // - INVALID_COMMIT_OFFSET_SIZE (version 0+) + // - FENCED_MEMBER_EPOCH (version 7+) + // - STALE_MEMBER_EPOCH (version 9+) "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 03493a122902..fdf6cf1d73b5 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -2575,7 +2575,7 @@ public void testCommitAfterLeaveGroup() { client.prepareResponse(body -> { OffsetCommitRequest commitRequest = (OffsetCommitRequest) body; return commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) && - commitRequest.data().generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID; + commitRequest.data().generationIdOrMemberEpoch() == OffsetCommitRequest.DEFAULT_GENERATION_ID; }, offsetCommitResponse(singletonMap(t1p, Errors.NONE))); AtomicBoolean success = new AtomicBoolean(false); diff --git a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java index 5762e84f60bb..478bfa0668d3 100644 --- a/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java +++ b/clients/src/test/java/org/apache/kafka/common/message/MessageTest.java @@ -276,7 +276,7 @@ public void testOffsetCommitDefaultGroupInstanceId() throws Exception { .setGroupId("groupId") .setMemberId(memberId) .setTopics(new ArrayList<>()) - .setGenerationId(15); + .setGenerationIdOrMemberEpoch(15); testAllMessageRoundTripsFromVersion((short) 1, request.get()); testAllMessageRoundTripsFromVersion((short) 1, request.get().setGroupInstanceId(null)); testAllMessageRoundTripsFromVersion((short) 7, request.get().setGroupInstanceId(instanceId)); @@ -482,7 +482,7 @@ public void testOffsetCommitRequestVersions() throws Exception { OffsetCommitRequestData requestData = request.get(); if (version < 1) { requestData.setMemberId(""); - requestData.setGenerationId(-1); + requestData.setGenerationIdOrMemberEpoch(-1); } if (version != 1) { diff --git a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java index 08ae7a3fbd57..7da2271d97c6 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/OffsetCommitRequestTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.message.OffsetCommitRequestData; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition; import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic; +import org.apache.kafka.common.message.OffsetCommitResponseData; import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition; import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic; import org.apache.kafka.common.protocol.ApiKeys; @@ -34,7 +35,7 @@ import java.util.List; import java.util.Map; -import static org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponseTopics; +import static org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponse; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -102,25 +103,6 @@ public void testConstructor() { } } - @Test - public void testGetErrorResponseTopics() { - List expectedTopics = Arrays.asList( - new OffsetCommitResponseTopic() - .setName(topicOne) - .setPartitions(Collections.singletonList( - new OffsetCommitResponsePartition() - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) - .setPartitionIndex(partitionOne))), - new OffsetCommitResponseTopic() - .setName(topicTwo) - .setPartitions(Collections.singletonList( - new OffsetCommitResponsePartition() - .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) - .setPartitionIndex(partitionTwo))) - ); - assertEquals(expectedTopics, getErrorResponseTopics(topics, Errors.UNKNOWN_MEMBER_ID)); - } - @Test public void testVersionSupportForGroupInstanceId() { OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder( @@ -139,4 +121,24 @@ public void testVersionSupportForGroupInstanceId() { } } } + + @Test + public void testGetErrorResponse() { + OffsetCommitResponseData expectedResponse = new OffsetCommitResponseData() + .setTopics(Arrays.asList( + new OffsetCommitResponseTopic() + .setName(topicOne) + .setPartitions(Collections.singletonList( + new OffsetCommitResponsePartition() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + .setPartitionIndex(partitionOne))), + new OffsetCommitResponseTopic() + .setName(topicTwo) + .setPartitions(Collections.singletonList( + new OffsetCommitResponsePartition() + .setErrorCode(Errors.UNKNOWN_MEMBER_ID.code()) + .setPartitionIndex(partitionTwo))))); + + assertEquals(expectedResponse, getErrorResponse(data, Errors.UNKNOWN_MEMBER_ID)); + } } diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java index 64a7300935b4..be4cd2244b6f 100644 --- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java +++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java @@ -2038,7 +2038,7 @@ private OffsetCommitRequest createOffsetCommitRequest(short version) { .setGroupId("group1") .setMemberId("consumer1") .setGroupInstanceId(null) - .setGenerationId(100) + .setGenerationIdOrMemberEpoch(100) .setTopics(singletonList( new OffsetCommitRequestData.OffsetCommitRequestTopic() .setName("test") diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala index 87dba8b10199..86ef919c93e2 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinatorAdapter.scala @@ -407,7 +407,7 @@ private[group] class GroupCoordinatorAdapter( request.groupId, request.memberId, Option(request.groupInstanceId), - request.generationId, + request.generationIdOrMemberEpoch, partitions.toMap, callback, RequestLocal(bufferSupplier) diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 6a4971c44d35..5d6233d11000 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -534,7 +534,7 @@ class KafkaApis(val requestChannel: RequestChannel, val offsetCommitRequestData = new OffsetCommitRequestData() .setGroupId(offsetCommitRequest.data.groupId) .setMemberId(offsetCommitRequest.data.memberId) - .setGenerationId(offsetCommitRequest.data.generationId) + .setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch) .setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs) .setGroupInstanceId(offsetCommitRequest.data.groupInstanceId) .setTopics(authorizedTopicsRequest.asJava) diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala index 084d3f7be93a..c7e7fb0ac0c5 100644 --- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala +++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala @@ -173,6 +173,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1") properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1") properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1") + properties.put(KafkaConfig.UnstableApiVersionsEnableProp, "true") properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[PrincipalBuilder].getName) } @@ -493,7 +494,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest { new OffsetCommitRequestData() .setGroupId(group) .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) - .setGenerationId(1) + .setGenerationIdOrMemberEpoch(1) .setTopics(Collections.singletonList( new OffsetCommitRequestData.OffsetCommitRequestTopic() .setName(topic) diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala index 0a7b6f2ba248..7b7e10b1c94f 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorAdapterTest.scala @@ -644,7 +644,7 @@ class GroupCoordinatorAdapterTest { val data = new OffsetCommitRequestData() .setGroupId("group") .setMemberId("member") - .setGenerationId(10) + .setGenerationIdOrMemberEpoch(10) .setRetentionTimeMs(1000) .setTopics(List( new OffsetCommitRequestData.OffsetCommitRequestTopic() @@ -669,7 +669,7 @@ class GroupCoordinatorAdapterTest { ArgumentMatchers.eq(data.groupId), ArgumentMatchers.eq(data.memberId), ArgumentMatchers.eq(None), - ArgumentMatchers.eq(data.generationId), + ArgumentMatchers.eq(data.generationIdOrMemberEpoch), ArgumentMatchers.eq(Map( new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new OffsetAndMetadata( offset = 100, diff --git a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala index 7982fa00885b..89cddfe97e72 100644 --- a/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ConsumerGroupHeartbeatRequestTest.scala @@ -42,7 +42,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @ClusterTest def testConsumerGroupHeartbeatIsDisabledByDefault(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() + new ConsumerGroupHeartbeatRequestData(), + true ).build() assertThrows(classOf[EOFException], () => connectAndReceive(consumerGroupHeartbeatRequest)) } @@ -50,7 +51,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { @ClusterTest(serverProperties = Array(new ClusterConfigProperty(key = "unstable.api.versions.enable", value = "true"))) def testConsumerGroupHeartbeatIsAccessibleWhenEnabled(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequest.Builder( - new ConsumerGroupHeartbeatRequestData() + new ConsumerGroupHeartbeatRequestData(), + true ).build() val consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) @@ -83,7 +85,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { .setMemberEpoch(0) .setRebalanceTimeoutMs(5 * 60 * 1000) .setSubscribedTopicNames(List("foo").asJava) - .setTopicPartitions(List.empty.asJava) + .setTopicPartitions(List.empty.asJava), + true ).build() // Send the request until receiving a successful response. There is a delay @@ -111,7 +114,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { new ConsumerGroupHeartbeatRequestData() .setGroupId("grp") .setMemberId(consumerGroupHeartbeatResponse.data.memberId) - .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch) + .setMemberEpoch(consumerGroupHeartbeatResponse.data.memberEpoch), + true ).build() // This is the expected assignment. @@ -137,7 +141,8 @@ class ConsumerGroupHeartbeatRequestTest(cluster: ClusterInstance) { new ConsumerGroupHeartbeatRequestData() .setGroupId("grp") .setMemberId(consumerGroupHeartbeatResponse.data.memberId) - .setMemberEpoch(-1) + .setMemberEpoch(-1), + true ).build() consumerGroupHeartbeatResponse = connectAndReceive(consumerGroupHeartbeatRequest) diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala index 6f135af363af..9c6c2c73bffb 100644 --- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala +++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala @@ -4080,7 +4080,7 @@ class KafkaApisTest { .setGroupId("test") .setMemberId("test") .setGroupInstanceId("instanceId") - .setGenerationId(100) + .setGenerationIdOrMemberEpoch(100) .setTopics(Collections.singletonList( new OffsetCommitRequestData.OffsetCommitRequestTopic() .setName("test") @@ -6118,7 +6118,7 @@ class KafkaApisTest { def testConsumerGroupHeartbeatReturnsUnsupportedVersion(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") - val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) + val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build()) createKafkaApis().handle(requestChannelRequest, RequestLocal.NoCaching) @@ -6132,7 +6132,7 @@ class KafkaApisTest { def testConsumerGroupHeartbeatRequest(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") - val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) + val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build()) val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]() when(groupCoordinator.consumerGroupHeartbeat( @@ -6156,7 +6156,7 @@ class KafkaApisTest { def testConsumerGroupHeartbeatRequestFutureFailed(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") - val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) + val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build()) val future = new CompletableFuture[ConsumerGroupHeartbeatResponseData]() when(groupCoordinator.consumerGroupHeartbeat( @@ -6177,7 +6177,7 @@ class KafkaApisTest { def testConsumerGroupHeartbeatRequestAuthorizationFailed(): Unit = { val consumerGroupHeartbeatRequest = new ConsumerGroupHeartbeatRequestData().setGroupId("group") - val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest).build()) + val requestChannelRequest = buildRequest(new ConsumerGroupHeartbeatRequest.Builder(consumerGroupHeartbeatRequest, true).build()) val authorizer: Authorizer = mock(classOf[Authorizer]) when(authorizer.authorize(any[RequestContext], any[util.List[Action]])) diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala index 9bb6e3cee9a7..c06bde91f91a 100644 --- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala +++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala @@ -302,7 +302,7 @@ class RequestQuotaTest extends BaseRequestTest { new OffsetCommitRequest.Builder( new OffsetCommitRequestData() .setGroupId("test-group") - .setGenerationId(1) + .setGenerationIdOrMemberEpoch(1) .setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID) .setTopics( Collections.singletonList( @@ -646,7 +646,7 @@ class RequestQuotaTest extends BaseRequestTest { new AllocateProducerIdsRequest.Builder(new AllocateProducerIdsRequestData()) case ApiKeys.CONSUMER_GROUP_HEARTBEAT => - new ConsumerGroupHeartbeatRequest.Builder(new ConsumerGroupHeartbeatRequestData()) + new ConsumerGroupHeartbeatRequest.Builder(new ConsumerGroupHeartbeatRequestData(), true) case _ => throw new IllegalArgumentException("Unsupported API key " + apiKey)