Skip to content

Commit

Permalink
KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new S…
Browse files Browse the repository at this point in the history
…taleMemberEpochException error (#14046)

This patch does a few things:
1) It introduces version 9 of the OffsetCommit API. This new version has no schema changes but it can return a StaleMemberEpochException if the new consumer group protocol is used. Note the use of `"latestVersionUnstable": true` in the request schema. This means that this new version is not available yet unless activated.
2) It renames the `generationId` field in the request to `GenerationIdOrMemberEpoch`. This is backward compatible change.
3) It introduces the new StaleMemberEpochException error.
4) It does a minor refactoring in OffsetCommitRequest class.

Reviewers: Jeff Kim <jeff.kim@confluent.io>, David Arthur <mumrah@gmail.com>, Justine Olshan <jolshan@confluent.io>
  • Loading branch information
dajac committed Jul 21, 2023
1 parent 4daeb27 commit 69659b7
Show file tree
Hide file tree
Showing 20 changed files with 146 additions and 75 deletions.
Expand Up @@ -215,7 +215,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.groupId)
.setGenerationId(generation.generationId)
.setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values())));
Expand Down
Expand Up @@ -1361,7 +1361,7 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.rebalanceConfig.groupId)
.setGenerationId(generation.generationId)
.setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values()))
Expand Down
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

import org.apache.kafka.common.annotation.InterfaceStability;

/**
* The StaleMemberEpochException is used in the context of the new
* consumer group protocol (KIP-848). This error is returned in the
* OffsetCommit/Fetch APIs when the member epoch received does not
* match the current member epoch.
*/
@InterfaceStability.Evolving
public class StaleMemberEpochException extends ApiException {
public StaleMemberEpochException(String message) {
super(message);
}
}
Expand Up @@ -108,6 +108,7 @@
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
Expand Down Expand Up @@ -378,7 +379,8 @@ public enum Errors {
OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new),
FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group coordinator. The member must abandon all its partitions and rejoin.", FencedMemberEpochException::new),
UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another member in the consumer group. That member must leave first.", UnreleasedInstanceIdException::new),
UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not supported by the consumer group.", UnsupportedAssignorException::new);
UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not supported by the consumer group.", UnsupportedAssignorException::new),
STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Expand Up @@ -37,8 +37,15 @@ public static abstract class Builder<T extends AbstractRequest> {
/**
* Construct a new builder which allows any supported version
*/
public Builder(ApiKeys apiKey, boolean enableUnstableLastVersion) {
this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion(enableUnstableLastVersion));
}

/**
* Construct a new builder which allows any supported and released version
*/
public Builder(ApiKeys apiKey) {
this(apiKey, apiKey.oldestVersion(), apiKey.latestVersion());
this(apiKey, false);
}

/**
Expand Down
Expand Up @@ -30,7 +30,11 @@ public static class Builder extends AbstractRequest.Builder<ConsumerGroupHeartbe
private final ConsumerGroupHeartbeatRequestData data;

public Builder(ConsumerGroupHeartbeatRequestData data) {
super(ApiKeys.CONSUMER_GROUP_HEARTBEAT);
this(data, false);
}

public Builder(ConsumerGroupHeartbeatRequestData data, boolean enableUnstableLastVersion) {
super(ApiKeys.CONSUMER_GROUP_HEARTBEAT, enableUnstableLastVersion);
this.data = data;
}

Expand Down
Expand Up @@ -28,10 +28,9 @@
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class OffsetCommitRequest extends AbstractRequest {
// default values for the current version
Expand Down Expand Up @@ -89,33 +88,29 @@ public Map<TopicPartition, Long> offsets() {
return offsets;
}

public static List<OffsetCommitResponseTopic> getErrorResponseTopics(
List<OffsetCommitRequestTopic> requestTopics,
Errors e) {
List<OffsetCommitResponseTopic> responseTopicData = new ArrayList<>();
for (OffsetCommitRequestTopic entry : requestTopics) {
List<OffsetCommitResponsePartition> responsePartitions =
new ArrayList<>();
for (OffsetCommitRequestData.OffsetCommitRequestPartition requestPartition : entry.partitions()) {
responsePartitions.add(new OffsetCommitResponsePartition()
.setPartitionIndex(requestPartition.partitionIndex())
.setErrorCode(e.code()));
}
responseTopicData.add(new OffsetCommitResponseTopic()
.setName(entry.name())
.setPartitions(responsePartitions)
);
}
return responseTopicData;
public static OffsetCommitResponseData getErrorResponse(
OffsetCommitRequestData request,
Errors error
) {
OffsetCommitResponseData response = new OffsetCommitResponseData();
request.topics().forEach(topic -> {
OffsetCommitResponseTopic responseTopic = new OffsetCommitResponseTopic()
.setName(topic.name());
response.topics().add(responseTopic);

topic.partitions().forEach(partition -> {
responseTopic.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code()));
});
});
return response;
}

@Override
public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<OffsetCommitResponseTopic>
responseTopicData = getErrorResponseTopics(data.topics(), Errors.forException(e));
return new OffsetCommitResponse(new OffsetCommitResponseData()
.setTopics(responseTopicData)
.setThrottleTimeMs(throttleTimeMs));
return new OffsetCommitResponse(getErrorResponse(data, Errors.forException(e))
.setThrottleTimeMs(throttleTimeMs));
}

@Override
Expand All @@ -126,4 +121,8 @@ public OffsetCommitResponse getErrorResponse(Throwable e) {
public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version);
}

public static Optional<String> groupInstanceId(OffsetCommitRequestData request) {
return Optional.ofNullable(request.groupInstanceId());
}
}
Expand Up @@ -31,13 +31,19 @@
// version 7 adds a new field called groupInstanceId to indicate member identity across restarts.
//
// Version 8 is the first flexible version.
"validVersions": "0-8",
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The
// request is the same as version 8.
// Version 9 is added as part of KIP-848 and is still under development. Hence, the last version of the
// API is not exposed by default by brokers unless explicitly enabled.
"latestVersionUnstable": true,
"validVersions": "0-9",
"flexibleVersions": "8+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The unique group identifier." },
{ "name": "GenerationId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
"about": "The generation of the group." },
{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
"about": "The generation of the group if using the generic group protocol or the member epoch if using the consumer protocol." },
{ "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true,
"about": "The member ID assigned by the group coordinator." },
{ "name": "GroupInstanceId", "type": "string", "versions": "7+",
Expand Down
Expand Up @@ -28,8 +28,21 @@
// Version 7 offsetCommitRequest supports a new field called groupInstanceId to indicate member identity across restarts.
//
// Version 8 is the first flexible version.
"validVersions": "0-8",
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used.
"validVersions": "0-9",
"flexibleVersions": "8+",
// Supported errors:
// - GROUP_AUTHORIZATION_FAILED (version 0+)
// - NOT_COORDINATOR (version 0+)
// - COORDINATOR_NOT_AVAILABLE (version 0+)
// - COORDINATOR_LOAD_IN_PROGRESS (version 0+)
// - ILLEGAL_GENERATION (version 1+)
// - UNKNOWN_MEMBER_ID (version 1+)
// - INVALID_COMMIT_OFFSET_SIZE (version 0+)
// - FENCED_MEMBER_EPOCH (version 7+)
// - STALE_MEMBER_EPOCH (version 9+)
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
Expand Down
Expand Up @@ -2575,7 +2575,7 @@ public void testCommitAfterLeaveGroup() {
client.prepareResponse(body -> {
OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
return commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
commitRequest.data().generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
commitRequest.data().generationIdOrMemberEpoch() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
}, offsetCommitResponse(singletonMap(t1p, Errors.NONE)));

AtomicBoolean success = new AtomicBoolean(false);
Expand Down
Expand Up @@ -276,7 +276,7 @@ public void testOffsetCommitDefaultGroupInstanceId() throws Exception {
.setGroupId("groupId")
.setMemberId(memberId)
.setTopics(new ArrayList<>())
.setGenerationId(15);
.setGenerationIdOrMemberEpoch(15);
testAllMessageRoundTripsFromVersion((short) 1, request.get());
testAllMessageRoundTripsFromVersion((short) 1, request.get().setGroupInstanceId(null));
testAllMessageRoundTripsFromVersion((short) 7, request.get().setGroupInstanceId(instanceId));
Expand Down Expand Up @@ -482,7 +482,7 @@ public void testOffsetCommitRequestVersions() throws Exception {
OffsetCommitRequestData requestData = request.get();
if (version < 1) {
requestData.setMemberId("");
requestData.setGenerationId(-1);
requestData.setGenerationIdOrMemberEpoch(-1);
}

if (version != 1) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -34,7 +35,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponseTopics;
import static org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -102,25 +103,6 @@ public void testConstructor() {
}
}

@Test
public void testGetErrorResponseTopics() {
List<OffsetCommitResponseTopic> expectedTopics = Arrays.asList(
new OffsetCommitResponseTopic()
.setName(topicOne)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionOne))),
new OffsetCommitResponseTopic()
.setName(topicTwo)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionTwo)))
);
assertEquals(expectedTopics, getErrorResponseTopics(topics, Errors.UNKNOWN_MEMBER_ID));
}

@Test
public void testVersionSupportForGroupInstanceId() {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
Expand All @@ -139,4 +121,24 @@ public void testVersionSupportForGroupInstanceId() {
}
}
}

@Test
public void testGetErrorResponse() {
OffsetCommitResponseData expectedResponse = new OffsetCommitResponseData()
.setTopics(Arrays.asList(
new OffsetCommitResponseTopic()
.setName(topicOne)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionOne))),
new OffsetCommitResponseTopic()
.setName(topicTwo)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionTwo)))));

assertEquals(expectedResponse, getErrorResponse(data, Errors.UNKNOWN_MEMBER_ID));
}
}
Expand Up @@ -2038,7 +2038,7 @@ private OffsetCommitRequest createOffsetCommitRequest(short version) {
.setGroupId("group1")
.setMemberId("consumer1")
.setGroupInstanceId(null)
.setGenerationId(100)
.setGenerationIdOrMemberEpoch(100)
.setTopics(singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("test")
Expand Down
Expand Up @@ -407,7 +407,7 @@ private[group] class GroupCoordinatorAdapter(
request.groupId,
request.memberId,
Option(request.groupInstanceId),
request.generationId,
request.generationIdOrMemberEpoch,
partitions.toMap,
callback,
RequestLocal(bufferSupplier)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Expand Up @@ -534,7 +534,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetCommitRequestData = new OffsetCommitRequestData()
.setGroupId(offsetCommitRequest.data.groupId)
.setMemberId(offsetCommitRequest.data.memberId)
.setGenerationId(offsetCommitRequest.data.generationId)
.setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch)
.setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
.setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
.setTopics(authorizedTopicsRequest.asJava)
Expand Down
Expand Up @@ -173,6 +173,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
properties.put(KafkaConfig.TransactionsTopicPartitionsProp, "1")
properties.put(KafkaConfig.TransactionsTopicReplicationFactorProp, "1")
properties.put(KafkaConfig.TransactionsTopicMinISRProp, "1")
properties.put(KafkaConfig.UnstableApiVersionsEnableProp, "true")
properties.put(BrokerSecurityConfigs.PRINCIPAL_BUILDER_CLASS_CONFIG, classOf[PrincipalBuilder].getName)
}

Expand Down Expand Up @@ -493,7 +494,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
new OffsetCommitRequestData()
.setGroupId(group)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setGenerationId(1)
.setGenerationIdOrMemberEpoch(1)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topic)
Expand Down
Expand Up @@ -644,7 +644,7 @@ class GroupCoordinatorAdapterTest {
val data = new OffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
.setGenerationId(10)
.setGenerationIdOrMemberEpoch(10)
.setRetentionTimeMs(1000)
.setTopics(List(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
Expand All @@ -669,7 +669,7 @@ class GroupCoordinatorAdapterTest {
ArgumentMatchers.eq(data.groupId),
ArgumentMatchers.eq(data.memberId),
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(data.generationId),
ArgumentMatchers.eq(data.generationIdOrMemberEpoch),
ArgumentMatchers.eq(Map(
new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new OffsetAndMetadata(
offset = 100,
Expand Down

0 comments on commit 69659b7

Please sign in to comment.