Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-14499: [1/N] Introduce OffsetCommit API version 9 and add new StaleMemberEpochException error #14046

Merged
merged 9 commits into from Jul 21, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -215,7 +215,7 @@ public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.groupId)
.setGenerationId(generation.generationId)
.setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values())));
Expand Down
Expand Up @@ -1361,7 +1361,7 @@ RequestFuture<Void> sendOffsetCommitRequest(final Map<TopicPartition, OffsetAndM
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId(this.rebalanceConfig.groupId)
.setGenerationId(generation.generationId)
.setGenerationIdOrMemberEpoch(generation.generationId)
.setMemberId(generation.memberId)
.setGroupInstanceId(groupInstanceId)
.setTopics(new ArrayList<>(requestTopicDataMap.values()))
Expand Down
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.common.errors;

import org.apache.kafka.common.annotation.InterfaceStability;

@InterfaceStability.Evolving
public class StaleMemberEpochException extends ApiException {
dajac marked this conversation as resolved.
Show resolved Hide resolved
public StaleMemberEpochException(String message) {
super(message);
}
}
Expand Up @@ -108,6 +108,7 @@
import org.apache.kafka.common.errors.SecurityDisabledException;
import org.apache.kafka.common.errors.SnapshotNotFoundException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.StaleMemberEpochException;
import org.apache.kafka.common.errors.ThrottlingQuotaExceededException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
Expand Down Expand Up @@ -378,7 +379,8 @@ public enum Errors {
OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new),
FENCED_MEMBER_EPOCH(110, "The member epoch is fenced by the group coordinator. The member must abandon all its partitions and rejoin.", FencedMemberEpochException::new),
UNRELEASED_INSTANCE_ID(111, "The instance ID is still used by another member in the consumer group. That member must leave first.", UnreleasedInstanceIdException::new),
UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not supported by the consumer group.", UnsupportedAssignorException::new);
UNSUPPORTED_ASSIGNOR(112, "The assignor or its version range is not supported by the consumer group.", UnsupportedAssignorException::new),
STALE_MEMBER_EPOCH(113, "The member epoch is stale. The member must retry after receiving its updated member epoch via the ConsumerGroupHeartbeat API.", StaleMemberEpochException::new);

private static final Logger log = LoggerFactory.getLogger(Errors.class);

Expand Down
Expand Up @@ -28,10 +28,9 @@
import org.apache.kafka.common.protocol.Errors;

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class OffsetCommitRequest extends AbstractRequest {
// default values for the current version
Expand Down Expand Up @@ -89,33 +88,29 @@ public Map<TopicPartition, Long> offsets() {
return offsets;
}

public static List<OffsetCommitResponseTopic> getErrorResponseTopics(
List<OffsetCommitRequestTopic> requestTopics,
dajac marked this conversation as resolved.
Show resolved Hide resolved
Errors e) {
List<OffsetCommitResponseTopic> responseTopicData = new ArrayList<>();
for (OffsetCommitRequestTopic entry : requestTopics) {
List<OffsetCommitResponsePartition> responsePartitions =
new ArrayList<>();
for (OffsetCommitRequestData.OffsetCommitRequestPartition requestPartition : entry.partitions()) {
responsePartitions.add(new OffsetCommitResponsePartition()
.setPartitionIndex(requestPartition.partitionIndex())
.setErrorCode(e.code()));
}
responseTopicData.add(new OffsetCommitResponseTopic()
.setName(entry.name())
.setPartitions(responsePartitions)
);
}
return responseTopicData;
public static OffsetCommitResponseData getErrorResponse(
OffsetCommitRequestData request,
Errors error
) {
OffsetCommitResponseData response = new OffsetCommitResponseData();
request.topics().forEach(topic -> {
OffsetCommitResponseTopic responseTopic = new OffsetCommitResponseTopic()
.setName(topic.name());
response.topics().add(responseTopic);

topic.partitions().forEach(partition -> {
responseTopic.partitions().add(new OffsetCommitResponsePartition()
.setPartitionIndex(partition.partitionIndex())
.setErrorCode(error.code()));
});
});
return response;
}

@Override
public OffsetCommitResponse getErrorResponse(int throttleTimeMs, Throwable e) {
List<OffsetCommitResponseTopic>
responseTopicData = getErrorResponseTopics(data.topics(), Errors.forException(e));
return new OffsetCommitResponse(new OffsetCommitResponseData()
.setTopics(responseTopicData)
.setThrottleTimeMs(throttleTimeMs));
return new OffsetCommitResponse(getErrorResponse(data, Errors.forException(e))
.setThrottleTimeMs(throttleTimeMs));
}

@Override
Expand All @@ -126,4 +121,8 @@ public OffsetCommitResponse getErrorResponse(Throwable e) {
public static OffsetCommitRequest parse(ByteBuffer buffer, short version) {
return new OffsetCommitRequest(new OffsetCommitRequestData(new ByteBufferAccessor(buffer), version), version);
}

public static Optional<String> groupInstanceId(OffsetCommitRequestData request) {
return Optional.ofNullable(request.groupInstanceId());
}
}
Expand Up @@ -31,13 +31,19 @@
// version 7 adds a new field called groupInstanceId to indicate member identity across restarts.
//
// Version 8 is the first flexible version.
"validVersions": "0-8",
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The
// request is the same as version 8.
// Version 9 is added as part of KIP-848 and is still under development. Hence, the last version of the
// API is not exposed by default by brokers unless explicitly enabled.
"latestVersionUnstable": true,
"validVersions": "0-9",
"flexibleVersions": "8+",
"fields": [
{ "name": "GroupId", "type": "string", "versions": "0+", "entityType": "groupId",
"about": "The unique group identifier." },
{ "name": "GenerationId", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
"about": "The generation of the group." },
{ "name": "GenerationIdOrMemberEpoch", "type": "int32", "versions": "1+", "default": "-1", "ignorable": true,
"about": "The generation of the group if the generic group protocol or the member epoch if the consumer protocol." },
dajac marked this conversation as resolved.
Show resolved Hide resolved
{ "name": "MemberId", "type": "string", "versions": "1+", "ignorable": true,
"about": "The member ID assigned by the group coordinator." },
{ "name": "GroupInstanceId", "type": "string", "versions": "7+",
Expand Down
Expand Up @@ -28,7 +28,10 @@
// Version 7 offsetCommitRequest supports a new field called groupInstanceId to indicate member identity across restarts.
//
// Version 8 is the first flexible version.
"validVersions": "0-8",
//
// Version 9 is the first version that can be used with the new consumer group protocol (KIP-848). The response is
// the same as version 8 but can return STALE_MEMBER_EPOCH when the new consumer group protocol is used.
"validVersions": "0-9",
"flexibleVersions": "8+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true,
Expand Down
Expand Up @@ -2575,7 +2575,7 @@ public void testCommitAfterLeaveGroup() {
client.prepareResponse(body -> {
OffsetCommitRequest commitRequest = (OffsetCommitRequest) body;
return commitRequest.data().memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
commitRequest.data().generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
commitRequest.data().generationIdOrMemberEpoch() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
}, offsetCommitResponse(singletonMap(t1p, Errors.NONE)));

AtomicBoolean success = new AtomicBoolean(false);
Expand Down
Expand Up @@ -276,7 +276,7 @@ public void testOffsetCommitDefaultGroupInstanceId() throws Exception {
.setGroupId("groupId")
.setMemberId(memberId)
.setTopics(new ArrayList<>())
.setGenerationId(15);
.setGenerationIdOrMemberEpoch(15);
testAllMessageRoundTripsFromVersion((short) 1, request.get());
testAllMessageRoundTripsFromVersion((short) 1, request.get().setGroupInstanceId(null));
testAllMessageRoundTripsFromVersion((short) 7, request.get().setGroupInstanceId(instanceId));
Expand Down Expand Up @@ -482,7 +482,7 @@ public void testOffsetCommitRequestVersions() throws Exception {
OffsetCommitRequestData requestData = request.get();
if (version < 1) {
requestData.setMemberId("");
requestData.setGenerationId(-1);
requestData.setGenerationIdOrMemberEpoch(-1);
}

if (version != 1) {
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.kafka.common.message.OffsetCommitRequestData;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestPartition;
import org.apache.kafka.common.message.OffsetCommitRequestData.OffsetCommitRequestTopic;
import org.apache.kafka.common.message.OffsetCommitResponseData;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponsePartition;
import org.apache.kafka.common.message.OffsetCommitResponseData.OffsetCommitResponseTopic;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -34,7 +35,7 @@
import java.util.List;
import java.util.Map;

import static org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponseTopics;
import static org.apache.kafka.common.requests.OffsetCommitRequest.getErrorResponse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

Expand Down Expand Up @@ -102,25 +103,6 @@ public void testConstructor() {
}
}

@Test
public void testGetErrorResponseTopics() {
List<OffsetCommitResponseTopic> expectedTopics = Arrays.asList(
new OffsetCommitResponseTopic()
.setName(topicOne)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionOne))),
new OffsetCommitResponseTopic()
.setName(topicTwo)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionTwo)))
);
assertEquals(expectedTopics, getErrorResponseTopics(topics, Errors.UNKNOWN_MEMBER_ID));
}

@Test
public void testVersionSupportForGroupInstanceId() {
OffsetCommitRequest.Builder builder = new OffsetCommitRequest.Builder(
Expand All @@ -139,4 +121,24 @@ public void testVersionSupportForGroupInstanceId() {
}
}
}

@Test
public void testGetErrorResponse() {
OffsetCommitResponseData expectedResponse = new OffsetCommitResponseData()
.setTopics(Arrays.asList(
new OffsetCommitResponseTopic()
.setName(topicOne)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionOne))),
new OffsetCommitResponseTopic()
.setName(topicTwo)
.setPartitions(Collections.singletonList(
new OffsetCommitResponsePartition()
.setErrorCode(Errors.UNKNOWN_MEMBER_ID.code())
.setPartitionIndex(partitionTwo)))));

assertEquals(expectedResponse, getErrorResponse(data, Errors.UNKNOWN_MEMBER_ID));
}
}
Expand Up @@ -2038,7 +2038,7 @@ private OffsetCommitRequest createOffsetCommitRequest(short version) {
.setGroupId("group1")
.setMemberId("consumer1")
.setGroupInstanceId(null)
.setGenerationId(100)
.setGenerationIdOrMemberEpoch(100)
.setTopics(singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("test")
Expand Down
Expand Up @@ -407,7 +407,7 @@ private[group] class GroupCoordinatorAdapter(
request.groupId,
request.memberId,
Option(request.groupInstanceId),
request.generationId,
request.generationIdOrMemberEpoch,
partitions.toMap,
callback,
RequestLocal(bufferSupplier)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/server/KafkaApis.scala
Expand Up @@ -534,7 +534,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val offsetCommitRequestData = new OffsetCommitRequestData()
.setGroupId(offsetCommitRequest.data.groupId)
.setMemberId(offsetCommitRequest.data.memberId)
.setGenerationId(offsetCommitRequest.data.generationId)
.setGenerationIdOrMemberEpoch(offsetCommitRequest.data.generationIdOrMemberEpoch)
.setRetentionTimeMs(offsetCommitRequest.data.retentionTimeMs)
.setGroupInstanceId(offsetCommitRequest.data.groupInstanceId)
.setTopics(authorizedTopicsRequest.asJava)
Expand Down
Expand Up @@ -493,7 +493,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
new OffsetCommitRequestData()
.setGroupId(group)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setGenerationId(1)
.setGenerationIdOrMemberEpoch(1)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName(topic)
Expand Down
Expand Up @@ -644,7 +644,7 @@ class GroupCoordinatorAdapterTest {
val data = new OffsetCommitRequestData()
.setGroupId("group")
.setMemberId("member")
.setGenerationId(10)
.setGenerationIdOrMemberEpoch(10)
.setRetentionTimeMs(1000)
.setTopics(List(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
Expand All @@ -669,7 +669,7 @@ class GroupCoordinatorAdapterTest {
ArgumentMatchers.eq(data.groupId),
ArgumentMatchers.eq(data.memberId),
ArgumentMatchers.eq(None),
ArgumentMatchers.eq(data.generationId),
ArgumentMatchers.eq(data.generationIdOrMemberEpoch),
ArgumentMatchers.eq(Map(
new TopicIdPartition(Uuid.ZERO_UUID, 0 , "foo") -> new OffsetAndMetadata(
offset = 100,
Expand Down
2 changes: 1 addition & 1 deletion core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
Expand Up @@ -4080,7 +4080,7 @@ class KafkaApisTest {
.setGroupId("test")
.setMemberId("test")
.setGroupInstanceId("instanceId")
.setGenerationId(100)
.setGenerationIdOrMemberEpoch(100)
.setTopics(Collections.singletonList(
new OffsetCommitRequestData.OffsetCommitRequestTopic()
.setName("test")
Expand Down
Expand Up @@ -302,7 +302,7 @@ class RequestQuotaTest extends BaseRequestTest {
new OffsetCommitRequest.Builder(
new OffsetCommitRequestData()
.setGroupId("test-group")
.setGenerationId(1)
.setGenerationIdOrMemberEpoch(1)
.setMemberId(JoinGroupRequest.UNKNOWN_MEMBER_ID)
.setTopics(
Collections.singletonList(
Expand Down