diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index d0eaddcad2..b270523334 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -118,8 +118,8 @@ public enum ApiKeys { CLOSE_STREAM(ApiMessageType.CLOSE_STREAM, false, true), PREPARE_S3_OBJECT(ApiMessageType.PREPARE_S3_OBJECT, false, true), COMMIT_WALOBJECT(ApiMessageType.COMMIT_WALOBJECT, false, true), - COMMIT_COMPACT_OBJECT(ApiMessageType.COMMIT_COMPACT_OBJECT, false, true), - COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true); + COMMIT_STREAM_OBJECT(ApiMessageType.COMMIT_STREAM_OBJECT, false, true), + GET_STREAMS_OFFSET(ApiMessageType.GET_STREAMS_OFFSET, false, true); // Kafka on S3 inject end private static final Map> APIS_BY_LISTENER = diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CloseStreamRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamRequest.java similarity index 93% rename from clients/src/main/java/org/apache/kafka/common/requests/CloseStreamRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamRequest.java index 3523f3cc3d..b1d4691089 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CloseStreamRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamRequest.java @@ -15,11 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; public class CloseStreamRequest extends AbstractRequest { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CloseStreamResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamResponse.java similarity index 94% rename from clients/src/main/java/org/apache/kafka/common/requests/CloseStreamResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamResponse.java index 0a11f16820..9a6ca9ad92 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CloseStreamResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CloseStreamResponse.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import java.util.Map; import org.apache.kafka.common.message.CloseStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; public class CloseStreamResponse extends AbstractResponse { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CommitStreamObjectRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectRequest.java similarity index 93% rename from clients/src/main/java/org/apache/kafka/common/requests/CommitStreamObjectRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectRequest.java index e0b400a19f..a29720c355 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CommitStreamObjectRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectRequest.java @@ -15,11 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; public class CommitStreamObjectRequest extends AbstractRequest { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CommitStreamObjectResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectResponse.java similarity index 94% rename from clients/src/main/java/org/apache/kafka/common/requests/CommitStreamObjectResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectResponse.java index 97d8134fe4..b26c1ec2dd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CommitStreamObjectResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitStreamObjectResponse.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import java.util.Map; import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; public class CommitStreamObjectResponse extends AbstractResponse { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CommitWALObjectRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java similarity index 93% rename from clients/src/main/java/org/apache/kafka/common/requests/CommitWALObjectRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java index 9ccf56c464..d789ae9fd8 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CommitWALObjectRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectRequest.java @@ -15,11 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import org.apache.kafka.common.message.CommitWALObjectRequestData; import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; public class CommitWALObjectRequest extends AbstractRequest { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CommitWALObjectResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java similarity index 94% rename from clients/src/main/java/org/apache/kafka/common/requests/CommitWALObjectResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java index 340423b512..c1ef215e3a 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CommitWALObjectResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/CommitWALObjectResponse.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import java.util.Map; import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; public class CommitWALObjectResponse extends AbstractResponse { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteStreamRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamRequest.java similarity index 93% rename from clients/src/main/java/org/apache/kafka/common/requests/DeleteStreamRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamRequest.java index 0119a94f8a..1ff86e7790 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteStreamRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamRequest.java @@ -15,11 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import org.apache.kafka.common.message.DeleteStreamRequestData; import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; public class DeleteStreamRequest extends AbstractRequest { public static class Builder extends AbstractRequest.Builder { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DeleteStreamResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamResponse.java similarity index 94% rename from clients/src/main/java/org/apache/kafka/common/requests/DeleteStreamResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamResponse.java index 543b3fe25c..804ba4cbcd 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/DeleteStreamResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/DeleteStreamResponse.java @@ -14,12 +14,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import java.util.Map; import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; public class DeleteStreamResponse extends AbstractResponse { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CommitCompactObjectRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java similarity index 54% rename from clients/src/main/java/org/apache/kafka/common/requests/CommitCompactObjectRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java index 1e42cc8d7e..1d6ca499b2 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CommitCompactObjectRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetRequest.java @@ -15,24 +15,27 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; -import org.apache.kafka.common.message.CommitCompactObjectRequestData; -import org.apache.kafka.common.message.CommitCompactObjectResponseData; +import org.apache.kafka.common.message.GetStreamsOffsetRequestData; +import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; -public class CommitCompactObjectRequest extends AbstractRequest { - public static class Builder extends AbstractRequest.Builder { +public class GetStreamsOffsetRequest extends AbstractRequest { - private final CommitCompactObjectRequestData data; - public Builder(CommitCompactObjectRequestData data) { - super(ApiKeys.COMMIT_COMPACT_OBJECT); + public static class Builder extends AbstractRequest.Builder { + + private final GetStreamsOffsetRequestData data; + public Builder(GetStreamsOffsetRequestData data) { + super(ApiKeys.GET_STREAMS_OFFSET); this.data = data; } @Override - public CommitCompactObjectRequest build(short version) { - return new CommitCompactObjectRequest(data, version); + public GetStreamsOffsetRequest build(short version) { + return new GetStreamsOffsetRequest(data, version); } @Override @@ -40,24 +43,26 @@ public String toString() { return data.toString(); } } - private final CommitCompactObjectRequestData data; - public CommitCompactObjectRequest(CommitCompactObjectRequestData data, short version) { - super(ApiKeys.DELETE_STREAM, version); + private final GetStreamsOffsetRequestData data; + + public GetStreamsOffsetRequest(GetStreamsOffsetRequestData data, short version) { + super(ApiKeys.GET_STREAMS_OFFSET, version); this.data = data; } @Override - public CommitCompactObjectResponse getErrorResponse(int throttleTimeMs, Throwable e) { + public CreateStreamResponse getErrorResponse(int throttleTimeMs, Throwable e) { ApiError apiError = ApiError.fromThrowable(e); - CommitCompactObjectResponseData response = new CommitCompactObjectResponseData() + CreateStreamResponseData response = new CreateStreamResponseData() .setErrorCode(apiError.error().code()) .setThrottleTimeMs(throttleTimeMs); - return new CommitCompactObjectResponse(response); + return new CreateStreamResponse(response); } @Override - public CommitCompactObjectRequestData data() { + public GetStreamsOffsetRequestData data() { return data; } + } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CommitCompactObjectResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java similarity index 74% rename from clients/src/main/java/org/apache/kafka/common/requests/CommitCompactObjectResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java index b7c4722d65..6342e34b9c 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CommitCompactObjectResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/GetStreamsOffsetResponse.java @@ -15,24 +15,24 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import java.util.Map; -import org.apache.kafka.common.message.CommitCompactObjectResponseData; +import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; -public class CommitCompactObjectResponse extends AbstractResponse { +public class GetStreamsOffsetResponse extends AbstractResponse { + private final GetStreamsOffsetResponseData data; - private final CommitCompactObjectResponseData data; - - public CommitCompactObjectResponse(CommitCompactObjectResponseData data) { - super(ApiKeys.COMMIT_COMPACT_OBJECT); + public GetStreamsOffsetResponse(GetStreamsOffsetResponseData data) { + super(ApiKeys.GET_STREAMS_OFFSET); this.data = data; } @Override - public CommitCompactObjectResponseData data() { + public GetStreamsOffsetResponseData data() { return data; } @@ -50,5 +50,4 @@ public int throttleTimeMs() { public void maybeSetThrottleTimeMs(int throttleTimeMs) { data.setThrottleTimeMs(throttleTimeMs); } - } diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamRequest.java similarity index 93% rename from clients/src/main/java/org/apache/kafka/common/requests/OpenStreamRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamRequest.java index 63f48d74f8..9dfc825346 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamRequest.java @@ -15,11 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; public class OpenStreamRequest extends AbstractRequest { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamResponse.java similarity index 94% rename from clients/src/main/java/org/apache/kafka/common/requests/OpenStreamResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamResponse.java index 9d1b070f98..8cdc047af9 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/OpenStreamResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/OpenStreamResponse.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import java.util.Map; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; public class OpenStreamResponse extends AbstractResponse { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PrepareS3ObjectRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectRequest.java similarity index 93% rename from clients/src/main/java/org/apache/kafka/common/requests/PrepareS3ObjectRequest.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectRequest.java index 6304af4ca9..340d4dcc21 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/PrepareS3ObjectRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectRequest.java @@ -15,11 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.requests.AbstractRequest; +import org.apache.kafka.common.requests.ApiError; public class PrepareS3ObjectRequest extends AbstractRequest { diff --git a/clients/src/main/java/org/apache/kafka/common/requests/PrepareS3ObjectResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectResponse.java similarity index 94% rename from clients/src/main/java/org/apache/kafka/common/requests/PrepareS3ObjectResponse.java rename to clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectResponse.java index 3c87a14123..b983cfe1a7 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/PrepareS3ObjectResponse.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/s3/PrepareS3ObjectResponse.java @@ -15,12 +15,13 @@ * limitations under the License. */ -package org.apache.kafka.common.requests; +package org.apache.kafka.common.requests.s3; import java.util.Map; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.requests.AbstractResponse; public class PrepareS3ObjectResponse extends AbstractResponse { private final PrepareS3ObjectResponseData data; diff --git a/clients/src/main/resources/common/message/CloseStreamRequest.json b/clients/src/main/resources/common/message/CloseStreamRequest.json index 3342936820..b9fd0690db 100644 --- a/clients/src/main/resources/common/message/CloseStreamRequest.json +++ b/clients/src/main/resources/common/message/CloseStreamRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 70, + "apiKey": 503, "type": "request", "listeners": [ "controller", diff --git a/clients/src/main/resources/common/message/CloseStreamResponse.json b/clients/src/main/resources/common/message/CloseStreamResponse.json index 1cf6aec35f..fa4ea77519 100644 --- a/clients/src/main/resources/common/message/CloseStreamResponse.json +++ b/clients/src/main/resources/common/message/CloseStreamResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 70, + "apiKey": 503, "type": "response", "name": "CloseStreamResponse", "validVersions": "0", diff --git a/clients/src/main/resources/common/message/CommitCompactObjectRequest.json b/clients/src/main/resources/common/message/CommitCompactObjectRequest.json deleted file mode 100644 index 8528694449..0000000000 --- a/clients/src/main/resources/common/message/CommitCompactObjectRequest.json +++ /dev/null @@ -1,125 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -{ - "apiKey": 74, - "type": "request", - "listeners": [ - "controller", - "broker" - ], - "name": "CommitCompactObjectRequest", - "validVersions": "0", - "flexibleVersions": "0+", - "fields": [ - { - "name": "BrokerId", - "type": "int32", - "versions": "0+", - "entityType": "brokerId", - "about": "The ID of the requesting broker" - }, - { - "name": "ObjectId", - "type": "int64", - "versions": "0+", - "about": "The ID of the WAL S3 object to commit" - }, - { - "name": "ObjectSize", - "type": "int64", - "versions": "0+", - "about": "The size of the WAL S3 object to commit" - }, - { - "name": "ObjectStreamRanges", - "type": "[]ObjectStreamRange", - "versions": "0+", - "about": "The stream ranges of the WAL S3 object to commit", - "fields": [ - { - "name": "StreamId", - "type": "int64", - "versions": "0+", - "about": "The ID of the stream" - }, - { - "name": "StreamEpoch", - "type": "int64", - "versions": "0+", - "entityType": "streamEpoch", - "about": "The epoch of the requesting stream in the requesting broker" - }, - { - "name": "StartOffset", - "type": "int64", - "versions": "0+", - "about": "The start offset of the stream range" - }, - { - "name": "EndOffset", - "type": "int64", - "versions": "0+", - "about": "The end offset of the stream range" - } - ] - }, - { - "name": "StreamObjects", - "type": "[]StreamObject", - "versions": "0+", - "about": "The stream objects to commit", - "fields": [ - { - "name": "ObjectId", - "type": "int64", - "versions": "0+", - "about": "The ID of the WAL S3 object to commit" - }, - { - "name": "ObjectSize", - "type": "int64", - "versions": "0+", - "about": "The size of the WAL S3 object to commit" - }, - { - "name": "StreamId", - "type": "int64", - "versions": "0+", - "about": "The ID of the stream", - "entityType": "streamId" - }, - { - "name": "StartOffset", - "type": "int64", - "versions": "0+", - "about": "The start offset of the stream range" - }, - { - "name": "EndOffset", - "type": "int64", - "versions": "0+", - "about": "The end offset of the stream range" - }, - { - "name": "sourceObjectIds", - "type": "[]int64", - "versions": "0+", - "about": "The IDs of the source S3 objects" - } - ] - } - ] -} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CommitStreamObjectRequest.json b/clients/src/main/resources/common/message/CommitStreamObjectRequest.json index 179fcf5210..34f2881d99 100644 --- a/clients/src/main/resources/common/message/CommitStreamObjectRequest.json +++ b/clients/src/main/resources/common/message/CommitStreamObjectRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 75, + "apiKey": 507, "type": "request", "listeners": [ "controller", diff --git a/clients/src/main/resources/common/message/CommitStreamObjectResponse.json b/clients/src/main/resources/common/message/CommitStreamObjectResponse.json index a0e4801aea..7a1bd49e7a 100644 --- a/clients/src/main/resources/common/message/CommitStreamObjectResponse.json +++ b/clients/src/main/resources/common/message/CommitStreamObjectResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 75, + "apiKey": 507, "type": "response", "name": "CommitStreamObjectResponse", "validVersions": "0", diff --git a/clients/src/main/resources/common/message/CommitWALObjectRequest.json b/clients/src/main/resources/common/message/CommitWALObjectRequest.json index 972e84ace3..0a53e0586e 100644 --- a/clients/src/main/resources/common/message/CommitWALObjectRequest.json +++ b/clients/src/main/resources/common/message/CommitWALObjectRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 73, + "apiKey": 506, "type": "request", "listeners": [ "controller", @@ -75,6 +75,57 @@ "about": "The end offset of the stream range" } ] + }, + { + "name": "StreamObjects", + "type": "[]StreamObject", + "versions": "0+", + "about": "The stream objects to commit", + "fields": [ + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The ID of the WAL S3 object to commit" + }, + { + "name": "ObjectSize", + "type": "int64", + "versions": "0+", + "about": "The size of the WAL S3 object to commit" + }, + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The ID of the stream", + "entityType": "streamId" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the stream range" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the stream range" + }, + { + "name": "sourceObjectIds", + "type": "[]int64", + "versions": "0+", + "about": "The IDs of the source S3 objects" + } + ] + }, + { + "name": "CompactedObjectIds", + "type": "[]int64", + "versions": "0+", + "about": "The IDs of the compacted S3 objects" } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CommitWALObjectResponse.json b/clients/src/main/resources/common/message/CommitWALObjectResponse.json index 7f1b0b4efc..83a436542a 100644 --- a/clients/src/main/resources/common/message/CommitWALObjectResponse.json +++ b/clients/src/main/resources/common/message/CommitWALObjectResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 73, + "apiKey": 506, "type": "response", "name": "CommitWALObjectResponse", "validVersions": "0", @@ -31,12 +31,6 @@ "type": "int32", "versions": "0+", "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." - }, - { - "name": "FailedStreamIds", - "type": "[]int64", - "versions": "0+", - "about": "Failed to commit WAL objects' id" } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CreateStreamRequest.json b/clients/src/main/resources/common/message/CreateStreamRequest.json index 09deb54b57..ce908598fa 100644 --- a/clients/src/main/resources/common/message/CreateStreamRequest.json +++ b/clients/src/main/resources/common/message/CreateStreamRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 68, + "apiKey": 501, "type": "request", "listeners": [ "controller", diff --git a/clients/src/main/resources/common/message/CreateStreamResponse.json b/clients/src/main/resources/common/message/CreateStreamResponse.json index 9c1743c01e..7fa87d69b3 100644 --- a/clients/src/main/resources/common/message/CreateStreamResponse.json +++ b/clients/src/main/resources/common/message/CreateStreamResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 68, + "apiKey": 501, "type": "response", "name": "CreateStreamResponse", "validVersions": "0", diff --git a/clients/src/main/resources/common/message/DeleteStreamRequest.json b/clients/src/main/resources/common/message/DeleteStreamRequest.json index 400901e99c..18a6a1562f 100644 --- a/clients/src/main/resources/common/message/DeleteStreamRequest.json +++ b/clients/src/main/resources/common/message/DeleteStreamRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 71, + "apiKey": 504, "type": "request", "listeners": [ "controller", diff --git a/clients/src/main/resources/common/message/DeleteStreamResponse.json b/clients/src/main/resources/common/message/DeleteStreamResponse.json index 4d0fc66d21..2ee5f7420e 100644 --- a/clients/src/main/resources/common/message/DeleteStreamResponse.json +++ b/clients/src/main/resources/common/message/DeleteStreamResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 71, + "apiKey": 504, "type": "response", "name": "DeleteStreamResponse", "validVersions": "0", diff --git a/clients/src/main/resources/common/message/CommitCompactObjectResponse.json b/clients/src/main/resources/common/message/GetStreamsOffsetRequest.json similarity index 67% rename from clients/src/main/resources/common/message/CommitCompactObjectResponse.json rename to clients/src/main/resources/common/message/GetStreamsOffsetRequest.json index 5222fd0812..d976cf9e90 100644 --- a/clients/src/main/resources/common/message/CommitCompactObjectResponse.json +++ b/clients/src/main/resources/common/message/GetStreamsOffsetRequest.json @@ -14,23 +14,22 @@ // limitations under the License. { - "apiKey": 74, - "type": "response", - "name": "CommitCompactObjectResponse", + "apiKey": 508, + "type": "request", + "listeners": [ + "controller", + "broker" + ], + "name": "GetStreamsOffsetRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ { - "name": "ErrorCode", - "type": "int16", + "name": "StreamIds", + "type": "[]int64", "versions": "0+", - "about": "The top level response error code" - }, - { - "name": "ThrottleTimeMs", - "type": "int32", - "versions": "0+", - "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + "entityType": "streamId", + "about": "The ids of the requesting stream" } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/GetStreamsOffsetResponse.json b/clients/src/main/resources/common/message/GetStreamsOffsetResponse.json new file mode 100644 index 0000000000..bd1ea25517 --- /dev/null +++ b/clients/src/main/resources/common/message/GetStreamsOffsetResponse.json @@ -0,0 +1,62 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 508, + "type": "response", + "name": "GetStreamsOffsetResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ErrorCode", + "type": "int16", + "versions": "0+", + "about": "The top level response error code" + }, + { + "name": "ThrottleTimeMs", + "type": "int32", + "versions": "0+", + "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." + }, + { + "name": "StreamsOffset", + "type": "[]StreamOffset", + "versions": "0+", + "about": "The responses for each topic.", + "fields": [ + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The stream id" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the stream" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the stream" + } + ] + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/OpenStreamRequest.json b/clients/src/main/resources/common/message/OpenStreamRequest.json index 40cdd10ff5..c739b07916 100644 --- a/clients/src/main/resources/common/message/OpenStreamRequest.json +++ b/clients/src/main/resources/common/message/OpenStreamRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 69, + "apiKey": 502, "type": "request", "listeners": [ "controller", diff --git a/clients/src/main/resources/common/message/OpenStreamResponse.json b/clients/src/main/resources/common/message/OpenStreamResponse.json index 03cf51cc85..e7a720241d 100644 --- a/clients/src/main/resources/common/message/OpenStreamResponse.json +++ b/clients/src/main/resources/common/message/OpenStreamResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 69, + "apiKey": 502, "type": "response", "name": "OpenStreamResponse", "validVersions": "0", diff --git a/clients/src/main/resources/common/message/PrepareS3ObjectRequest.json b/clients/src/main/resources/common/message/PrepareS3ObjectRequest.json index 29970d8787..46b81f3aca 100644 --- a/clients/src/main/resources/common/message/PrepareS3ObjectRequest.json +++ b/clients/src/main/resources/common/message/PrepareS3ObjectRequest.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 72, + "apiKey": 505, "type": "request", "listeners": [ "controller", diff --git a/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json b/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json index 7e392aef0d..4da2272990 100644 --- a/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json +++ b/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json @@ -14,7 +14,7 @@ // limitations under the License. { - "apiKey": 72, + "apiKey": 505, "type": "response", "name": "PrepareS3ObjectResponse", "validVersions": "0", diff --git a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java index ba774088f7..c2787a273e 100644 --- a/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java +++ b/core/src/main/scala/kafka/log/s3/memory/MemoryMetadataManager.java @@ -18,7 +18,6 @@ package kafka.log.s3.memory; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.LinkedList; @@ -32,13 +31,11 @@ import java.util.concurrent.TimeUnit; import java.util.function.Supplier; import java.util.stream.Collectors; +import kafka.log.s3.model.StreamOffset; import kafka.log.s3.objects.CommitCompactObjectRequest; import kafka.log.s3.objects.CommitStreamObjectRequest; import kafka.log.s3.objects.CommitWalObjectRequest; import kafka.log.s3.objects.CommitWalObjectResponse; -import kafka.log.s3.objects.GetStreamsOffsetRequest; -import kafka.log.s3.objects.GetStreamsOffsetResponse; -import kafka.log.s3.objects.GetStreamsOffsetResponse.StreamRange; import kafka.log.s3.objects.ObjectManager; import kafka.log.s3.objects.ObjectStreamRange; import kafka.log.s3.objects.OpenStreamMetadata; @@ -364,14 +361,11 @@ public CompletableFuture deleteStream(long streamId, long epoch) { } @Override - public CompletableFuture getStreamsOffset(GetStreamsOffsetRequest request) { + public CompletableFuture> getStreamsOffset(List streamIds) { return this.submitEvent(() -> { - GetStreamsOffsetResponse response = new GetStreamsOffsetResponse(); - StreamRange[] ranges = Arrays.stream(request.streamIds()).filter(this.streamsMetadata::containsKey).mapToObj(id -> { - return new StreamRange(id, this.streamsMetadata.get(id).startOffset, this.streamsMetadata.get(id).endOffset); - }).toArray(StreamRange[]::new); - response.setStreamRanges(ranges); - return response; + return streamIds.stream().filter(this.streamsMetadata::containsKey).map(id -> { + return new StreamOffset(id, this.streamsMetadata.get(id).startOffset, this.streamsMetadata.get(id).endOffset); + }).collect(Collectors.toList()); }); } diff --git a/core/src/main/scala/kafka/log/s3/objects/GetStreamsOffsetRequest.java b/core/src/main/scala/kafka/log/s3/model/StreamOffset.java similarity index 60% rename from core/src/main/scala/kafka/log/s3/objects/GetStreamsOffsetRequest.java rename to core/src/main/scala/kafka/log/s3/model/StreamOffset.java index 3b7b0b6efe..0cfdf340df 100644 --- a/core/src/main/scala/kafka/log/s3/objects/GetStreamsOffsetRequest.java +++ b/core/src/main/scala/kafka/log/s3/model/StreamOffset.java @@ -15,17 +15,28 @@ * limitations under the License. */ -package kafka.log.s3.objects; +package kafka.log.s3.model; -public class GetStreamsOffsetRequest { +public class StreamOffset { + private final long streamId; + private final long startOffset; + private final long endOffset; - private long[] streamIds; + public StreamOffset(long streamId, long startOffset, long endOffset) { + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffset = endOffset; + } + + public long streamId() { + return streamId; + } - public long[] streamIds() { - return streamIds; + public long startOffset() { + return startOffset; } - public void setStreamIds(long[] streamIds) { - this.streamIds = streamIds; + public long endOffset() { + return endOffset; } -} +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index 005dd492fb..f6b106dd7f 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -27,8 +27,8 @@ import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.PrepareS3ObjectRequest; -import org.apache.kafka.common.requests.PrepareS3ObjectRequest.Builder; +import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest; +import org.apache.kafka.common.requests.s3.PrepareS3ObjectRequest.Builder; import org.apache.kafka.metadata.stream.S3ObjectMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/core/src/main/scala/kafka/log/s3/objects/GetStreamsOffsetResponse.java b/core/src/main/scala/kafka/log/s3/objects/GetStreamsOffsetResponse.java deleted file mode 100644 index 15ec26c4d9..0000000000 --- a/core/src/main/scala/kafka/log/s3/objects/GetStreamsOffsetResponse.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.log.s3.objects; - -public class GetStreamsOffsetResponse { - private StreamRange[] streamRanges; - - public StreamRange[] streamRanges() { - return streamRanges; - } - - public void setStreamRanges(StreamRange[] streamRanges) { - this.streamRanges = streamRanges; - } - - public static class StreamRange { - private final long streamId; - private final long startOffset; - private final long endOffset; - - public StreamRange(long streamId, long startOffset, long endOffset) { - this.streamId = streamId; - this.startOffset = startOffset; - this.endOffset = endOffset; - } - - public long streamId() { - return streamId; - } - - public long startOffset() { - return startOffset; - } - - public long endOffset() { - return endOffset; - } - } -} diff --git a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java index bbdf48f918..a4e6ca99a8 100644 --- a/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/ControllerStreamManager.java @@ -17,18 +17,22 @@ package kafka.log.s3.streams; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import kafka.log.s3.model.StreamOffset; import kafka.log.s3.network.ControllerRequestSender; -import kafka.log.s3.objects.GetStreamsOffsetRequest; -import kafka.log.s3.objects.GetStreamsOffsetResponse; import kafka.log.s3.objects.OpenStreamMetadata; import kafka.server.KafkaConfig; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; +import org.apache.kafka.common.message.GetStreamsOffsetRequestData; +import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.requests.OpenStreamRequest; +import org.apache.kafka.common.requests.s3.GetStreamsOffsetRequest; +import org.apache.kafka.common.requests.s3.OpenStreamRequest; import org.apache.kafka.common.requests.s3.CreateStreamRequest; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,9 +76,15 @@ public CompletableFuture openStream(long streamId, long epoc switch (Errors.forCode(resp.errorCode())) { case NONE: return new OpenStreamMetadata(streamId, epoch, resp.startOffset(), resp.nextOffset()); + case STREAM_NOT_EXIST: + LOGGER.error("Stream not exist while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); + throw Errors.forCode(resp.errorCode()).exception(); case STREAM_FENCED: LOGGER.error("Stream fenced while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); throw Errors.forCode(resp.errorCode()).exception(); + case STREAM_NOT_CLOSED: + LOGGER.error("Stream not closed while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); + throw Errors.forCode(resp.errorCode()).exception(); default: LOGGER.error("Error while opening stream: {}, code: {}", request, Errors.forCode(resp.errorCode())); throw Errors.forCode(resp.errorCode()).exception(); @@ -98,7 +108,20 @@ public CompletableFuture deleteStream(long streamId, long epoch) { } @Override - public CompletableFuture getStreamsOffset(GetStreamsOffsetRequest request) { - return null; + public CompletableFuture> getStreamsOffset(List streamIds) { + GetStreamsOffsetRequest.Builder request = new GetStreamsOffsetRequest.Builder( + new GetStreamsOffsetRequestData() + .setStreamIds(streamIds)); + return this.requestSender.send(request, GetStreamsOffsetResponseData.class).thenApply(resp -> { + switch (Errors.forCode(resp.errorCode())) { + case NONE: + return resp.streamsOffset().stream() + .map(streamOffset -> new StreamOffset(streamOffset.streamId(), streamOffset.startOffset(), streamOffset.endOffset())) + .collect(Collectors.toList()); + default: + LOGGER.error("Error while getting streams offset: {}, code: {}", request, Errors.forCode(resp.errorCode())); + throw Errors.forCode(resp.errorCode()).exception(); + } + }); } } diff --git a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java index 12d362a82e..175aee16cf 100644 --- a/core/src/main/scala/kafka/log/s3/streams/StreamManager.java +++ b/core/src/main/scala/kafka/log/s3/streams/StreamManager.java @@ -17,9 +17,9 @@ package kafka.log.s3.streams; +import java.util.List; import java.util.concurrent.CompletableFuture; -import kafka.log.s3.objects.GetStreamsOffsetRequest; -import kafka.log.s3.objects.GetStreamsOffsetResponse; +import kafka.log.s3.model.StreamOffset; import kafka.log.s3.objects.OpenStreamMetadata; public interface StreamManager { @@ -72,9 +72,10 @@ public interface StreamManager { * Get streams offset. * * When server is starting or recovering, wal in EBS need streams offset to determine the recover point. - * @param request {@link GetStreamsOffsetRequest} - * @return {@link GetStreamsOffsetResponse} + * + * @param streamIds stream ids. + * @return {@link StreamOffset} */ - CompletableFuture getStreamsOffset(GetStreamsOffsetRequest request); + CompletableFuture> getStreamsOffset(List streamIds); } diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala index d717a56361..31e04152c9 100644 --- a/core/src/main/scala/kafka/server/ControllerApis.scala +++ b/core/src/main/scala/kafka/server/ControllerApis.scala @@ -42,7 +42,7 @@ import org.apache.kafka.common.message.{CreateTopicsRequestData, _} import org.apache.kafka.common.protocol.Errors._ import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors} import org.apache.kafka.common.requests._ -import org.apache.kafka.common.requests.s3.{CreateStreamRequest, CreateStreamResponse} +import org.apache.kafka.common.requests.s3.{CloseStreamRequest, CloseStreamResponse, CommitStreamObjectRequest, CommitStreamObjectResponse, CommitWALObjectRequest, CommitWALObjectResponse, CreateStreamRequest, CreateStreamResponse, DeleteStreamRequest, DeleteStreamResponse, GetStreamsOffsetRequest, GetStreamsOffsetResponse, OpenStreamRequest, OpenStreamResponse, PrepareS3ObjectRequest, PrepareS3ObjectResponse} import org.apache.kafka.common.resource.Resource.CLUSTER_NAME import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC} import org.apache.kafka.common.utils.Time @@ -117,8 +117,8 @@ class ControllerApis(val requestChannel: RequestChannel, case ApiKeys.DELETE_STREAM => handleDeleteStream(request) case ApiKeys.PREPARE_S3_OBJECT => handlePrepareS3Object(request) case ApiKeys.COMMIT_WALOBJECT => handleCommitWALObject(request) - case ApiKeys.COMMIT_COMPACT_OBJECT => handleCommitCompactObject(request) case ApiKeys.COMMIT_STREAM_OBJECT => handleCommitStreamObject(request) + case ApiKeys.GET_STREAMS_OFFSET => handleGetStreamsOffset(request) // Kafka on S3 inject end case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey}") } @@ -973,33 +973,33 @@ class ControllerApis(val requestChannel: RequestChannel, } } - def handleCommitCompactObject(request: RequestChannel.Request): CompletableFuture[Unit] = { - val commitCompactObjectRequest = request.body[CommitCompactObjectRequest] + def handleCommitStreamObject(request: RequestChannel.Request): CompletableFuture[Unit] = { + val commitStreamObjectRequest = request.body[CommitStreamObjectRequest] val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty()) - controller.commitCompactObject(context, commitCompactObjectRequest.data) + controller.commitStreamObject(context, commitStreamObjectRequest.data) .handle[Unit] { (result, exception) => if (exception != null) { requestHelper.handleError(request, exception) } else { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - new CommitCompactObjectResponse(result.setThrottleTimeMs(requestThrottleMs)) + new CommitStreamObjectResponse(result.setThrottleTimeMs(requestThrottleMs)) }) } } } - def handleCommitStreamObject(request: RequestChannel.Request): CompletableFuture[Unit] = { - val commitStreamObjectRequest = request.body[CommitStreamObjectRequest] + def handleGetStreamsOffset(request: RequestChannel.Request): CompletableFuture[Unit] = { + val getStreamsOffsetRequest = request.body[GetStreamsOffsetRequest] val context = new ControllerRequestContext(request.context.header.data, request.context.principal, OptionalLong.empty()) - controller.commitStreamObject(context, commitStreamObjectRequest.data) + controller.getStreamsOffset(context, getStreamsOffsetRequest.data) .handle[Unit] { (result, exception) => if (exception != null) { requestHelper.handleError(request, exception) } else { requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => { - new CommitStreamObjectResponse(result.setThrottleTimeMs(requestThrottleMs)) + new GetStreamsOffsetResponse(result.setThrottleTimeMs(requestThrottleMs)) }) } } diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index 6c1024c299..f1beba9bd1 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -33,8 +33,6 @@ import org.apache.kafka.common.message.BrokerRegistrationRequestData; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; -import org.apache.kafka.common.message.CommitCompactObjectRequestData; -import org.apache.kafka.common.message.CommitCompactObjectResponseData; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.message.CommitWALObjectRequestData; @@ -51,6 +49,8 @@ import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.message.GetStreamsOffsetRequestData; +import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; @@ -513,14 +513,13 @@ public CompletableFuture commitWALObject(Controller } @Override - public CompletableFuture commitCompactObject(ControllerRequestContext context, - CommitCompactObjectRequestData request) { + public CompletableFuture commitStreamObject(ControllerRequestContext context, + CommitStreamObjectRequestData request) { throw new UnsupportedOperationException(); } @Override - public CompletableFuture commitStreamObject(ControllerRequestContext context, - CommitStreamObjectRequestData request) { + public CompletableFuture getStreamsOffset(ControllerRequestContext context, GetStreamsOffsetRequestData request) { throw new UnsupportedOperationException(); } // Kafka on S3 inject end diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index 9ba7a6a91a..9cf248128b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -30,8 +30,6 @@ import org.apache.kafka.common.message.BrokerRegistrationRequestData; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; -import org.apache.kafka.common.message.CommitCompactObjectRequestData; -import org.apache.kafka.common.message.CommitCompactObjectResponseData; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.message.CommitWALObjectRequestData; @@ -46,6 +44,8 @@ import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.message.GetStreamsOffsetRequestData; +import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; @@ -436,21 +436,20 @@ CompletableFuture commitWALObject( ); /** - * Broker trys to commit a compact object. + * Broker trys to commit a stream object */ - CompletableFuture commitCompactObject( + CompletableFuture commitStreamObject( ControllerRequestContext context, - CommitCompactObjectRequestData request + CommitStreamObjectRequestData request ); /** - * Broker trys to commit a stream object + * Broker trys to get the offset: [startOffset, endOffset) of the stream. */ - CompletableFuture commitStreamObject( + CompletableFuture getStreamsOffset( ControllerRequestContext context, - CommitStreamObjectRequestData request + GetStreamsOffsetRequestData request ); - // Kafka on S3 inject end } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index 4f3b5e7239..85c1e5a26a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -40,8 +40,6 @@ import org.apache.kafka.common.message.BrokerRegistrationRequestData; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; -import org.apache.kafka.common.message.CommitCompactObjectRequestData; -import org.apache.kafka.common.message.CommitCompactObjectResponseData; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.message.CommitWALObjectRequestData; @@ -56,6 +54,8 @@ import org.apache.kafka.common.message.DeleteStreamResponseData; import org.apache.kafka.common.message.ElectLeadersRequestData; import org.apache.kafka.common.message.ElectLeadersResponseData; +import org.apache.kafka.common.message.GetStreamsOffsetRequestData; +import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.message.ListPartitionReassignmentsRequestData; import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; @@ -2222,19 +2222,19 @@ public CompletableFuture commitWALObject(Controller () -> streamControlManager.commitWALObject(request)); } - @Override - public CompletableFuture commitCompactObject(ControllerRequestContext context, - CommitCompactObjectRequestData request) { - return appendWriteEvent("commitCompactObject", context.deadlineNs(), - () -> streamControlManager.commitCompactObject(request)); - } - @Override public CompletableFuture commitStreamObject(ControllerRequestContext context, CommitStreamObjectRequestData request) { return appendWriteEvent("commitStreamObject", context.deadlineNs(), () -> streamControlManager.commitStreamObject(request)); } + + @Override + public CompletableFuture getStreamsOffset(ControllerRequestContext context, GetStreamsOffsetRequestData request) { + return appendReadEvent("getStreamsOffset", context.deadlineNs(), + () -> streamControlManager.getStreamsOffset(request)); + } + // Kafka on S3 inject end // VisibleForTesting diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 6cb230cfcc..41ce96a463 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -26,8 +26,6 @@ import java.util.stream.Collectors; import org.apache.kafka.common.message.CloseStreamRequestData; import org.apache.kafka.common.message.CloseStreamResponseData; -import org.apache.kafka.common.message.CommitCompactObjectRequestData; -import org.apache.kafka.common.message.CommitCompactObjectResponseData; import org.apache.kafka.common.message.CommitStreamObjectRequestData; import org.apache.kafka.common.message.CommitStreamObjectResponseData; import org.apache.kafka.common.message.CommitWALObjectRequestData; @@ -37,6 +35,9 @@ import org.apache.kafka.common.message.CreateStreamResponseData; import org.apache.kafka.common.message.DeleteStreamRequestData; import org.apache.kafka.common.message.DeleteStreamResponseData; +import org.apache.kafka.common.message.GetStreamsOffsetRequestData; +import org.apache.kafka.common.message.GetStreamsOffsetResponseData; +import org.apache.kafka.common.message.GetStreamsOffsetResponseData.StreamOffset; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; @@ -343,21 +344,15 @@ public ControllerResult deleteStream(DeleteStreamReque } public ControllerResult commitWALObject(CommitWALObjectRequestData data) { + // TODO: deal with compacted objects, mark delete compacted object + // TODO: deal with stream objects, replay streamObjectRecord to advance stream's end offset + // TODO: generate order id to ensure the order of all wal object CommitWALObjectResponseData resp = new CommitWALObjectResponseData(); List records = new ArrayList<>(); - List failedStreamIds = new ArrayList<>(); - resp.setFailedStreamIds(failedStreamIds); long objectId = data.objectId(); int brokerId = data.brokerId(); long objectSize = data.objectSize(); List streamRanges = data.objectStreamRanges(); - // verify stream epoch - streamRanges.stream().filter(range -> !verifyWalStreamRanges(range, brokerId)) - .mapToLong(ObjectStreamRange::streamId).forEach(failedStreamIds::add); - if (!failedStreamIds.isEmpty()) { - log.error("stream is invalid when commit wal object, failed stream ids [{}]", - String.join(",", failedStreamIds.stream().map(String::valueOf).collect(Collectors.toList()))); - } // commit object ControllerResult commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize); if (!commitResult.response()) { @@ -367,7 +362,6 @@ public ControllerResult commitWALObject(CommitWALOb } records.addAll(commitResult.records()); List indexes = streamRanges.stream() - .filter(range -> !failedStreamIds.contains(range.streamId())) .map(range -> new S3ObjectStreamIndex(range.streamId(), range.startOffset(), range.endOffset())) .collect(Collectors.toList()); // update broker's wal object @@ -388,12 +382,27 @@ public ControllerResult commitWALObject(CommitWALOb return ControllerResult.atomicOf(records, resp); } - public ControllerResult commitCompactObject(CommitCompactObjectRequestData data) { + public ControllerResult commitStreamObject(CommitStreamObjectRequestData data) { throw new UnsupportedOperationException(); } - public ControllerResult commitStreamObject(CommitStreamObjectRequestData data) { - throw new UnsupportedOperationException(); + public GetStreamsOffsetResponseData getStreamsOffset(GetStreamsOffsetRequestData data) { + List streamIds = data.streamIds(); + GetStreamsOffsetResponseData resp = new GetStreamsOffsetResponseData(); + List streamOffsets = streamIds.stream() + .filter(this.streamsMetadata::containsKey) + .map(id -> { + S3StreamMetadata streamMetadata = this.streamsMetadata.get(id); + RangeMetadata range = streamMetadata.ranges().get(streamMetadata.currentRangeIndex()); + long startOffset = streamMetadata.startOffset(); + long endOffset = range == null ? startOffset : range.endOffset(); + return new StreamOffset() + .setStreamId(id) + .setStartOffset(startOffset) + .setEndOffset(endOffset); + }).collect(Collectors.toList()); + resp.setStreamsOffset(streamOffsets); + return resp; } public void replay(AssignedStreamIdRecord record) { diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 0f88b96f7f..0f12ad17c4 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -33,6 +33,8 @@ import org.apache.kafka.common.message.CommitWALObjectResponseData; import org.apache.kafka.common.message.CreateStreamRequestData; import org.apache.kafka.common.message.CreateStreamResponseData; +import org.apache.kafka.common.message.GetStreamsOffsetRequestData; +import org.apache.kafka.common.message.GetStreamsOffsetResponseData; import org.apache.kafka.common.message.OpenStreamRequestData; import org.apache.kafka.common.message.OpenStreamResponseData; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; @@ -310,49 +312,7 @@ public void testCommitWal() { .setObjectStreamRanges(streamRanges1); ControllerResult result4 = manager.commitWALObject(commitRequest1); assertEquals(Errors.OBJECT_NOT_EXIST.code(), result4.response().errorCode()); - // 4. commit a wal object that doesn't match the next offset - List streamRanges2 = List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(99) - .setEndOffset(200)); - CommitWALObjectRequestData commitRequest2 = new CommitWALObjectRequestData() - .setObjectId(2L) - .setBrokerId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges2); - ControllerResult result5 = manager.commitWALObject(commitRequest2); - assertEquals(Errors.NONE.code(), result5.response().errorCode()); - assertEquals(1, result5.response().failedStreamIds().size()); - assertEquals(STREAM0, result5.response().failedStreamIds().get(0).longValue()); - // 5. commit a wal object that contains a stream which doesn't exist - List streamRanges3 = List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(100) - .setEndOffset(200), - new ObjectStreamRange() - .setStreamId(STREAM1) - .setStreamEpoch(EPOCH0) - .setStartOffset(0) - .setEndOffset(100)); - CommitWALObjectRequestData commitRequest3 = new CommitWALObjectRequestData() - .setObjectId(3L) - .setBrokerId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges3); - ControllerResult result6 = manager.commitWALObject(commitRequest3); - assertEquals(Errors.NONE.code(), result6.response().errorCode()); - assertEquals(1, result6.response().failedStreamIds().size()); - assertEquals(STREAM1, result6.response().failedStreamIds().get(0).longValue()); - replay(manager, result6.records()); - // verify range's end offset advanced and wal object is added - streamMetadata0 = manager.streamsMetadata().get(STREAM0); - assertEquals(1, streamMetadata0.ranges().size()); - assertEquals(0L, streamMetadata0.ranges().get(0).startOffset()); - assertEquals(200L, streamMetadata0.ranges().get(0).endOffset()); - assertEquals(2, manager.brokersMetadata().get(BROKER0).walObjects().size()); - // 6. broker_0 close stream_0 with epoch_0 and broker_1 open stream_0 with epoch_1 + // 4. broker_0 close stream_0 with epoch_0 and broker_1 open stream_0 with epoch_1 ControllerResult result7 = manager.closeStream( new CloseStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH0).setBrokerId(BROKER0)); assertEquals(Errors.NONE.code(), result7.response().errorCode()); @@ -361,28 +321,13 @@ public void testCommitWal() { new OpenStreamRequestData().setStreamId(STREAM0).setStreamEpoch(EPOCH1).setBrokerId(BROKER1)); assertEquals(Errors.NONE.code(), result8.response().errorCode()); assertEquals(0L, result8.response().startOffset()); - assertEquals(200L, result8.response().nextOffset()); + assertEquals(100L, result8.response().nextOffset()); replay(manager, result8.records()); - // 7. broker_0 try to keep committing wal object which contains stream_0's data - List streamRanges5 = List.of(new ObjectStreamRange() - .setStreamId(STREAM0) - .setStreamEpoch(EPOCH0) - .setStartOffset(200) - .setEndOffset(300)); - CommitWALObjectRequestData commitRequest5 = new CommitWALObjectRequestData() - .setObjectId(5L) - .setBrokerId(BROKER0) - .setObjectSize(999) - .setObjectStreamRanges(streamRanges5); - ControllerResult result9 = manager.commitWALObject(commitRequest5); - assertEquals(Errors.NONE.code(), result9.response().errorCode()); - assertEquals(1, result9.response().failedStreamIds().size()); - assertEquals(STREAM0, result9.response().failedStreamIds().get(0).longValue()); - // 8. broker_1 successfully commit wal object which contains stream_0's data + // 5. broker_1 successfully commit wal object which contains stream_0's data List streamRanges6 = List.of(new ObjectStreamRange() .setStreamId(STREAM0) .setStreamEpoch(EPOCH1) - .setStartOffset(200) + .setStartOffset(100) .setEndOffset(300)); CommitWALObjectRequestData commitRequest6 = new CommitWALObjectRequestData() .setBrokerId(BROKER1) @@ -396,11 +341,21 @@ public void testCommitWal() { streamMetadata0 = manager.streamsMetadata().get(STREAM0); assertEquals(2, streamMetadata0.ranges().size()); assertEquals(0L, streamMetadata0.ranges().get(0).startOffset()); - assertEquals(200L, streamMetadata0.ranges().get(0).endOffset()); + assertEquals(100L, streamMetadata0.ranges().get(0).endOffset()); RangeMetadata rangeMetadata1 = streamMetadata0.ranges().get(1); - assertEquals(200L, rangeMetadata1.startOffset()); + assertEquals(100L, rangeMetadata1.startOffset()); assertEquals(300L, rangeMetadata1.endOffset()); assertEquals(1, manager.brokersMetadata().get(BROKER1).walObjects().size()); + + // 6. get stream's offset + GetStreamsOffsetRequestData request = new GetStreamsOffsetRequestData() + .setStreamIds(List.of(STREAM0)); + GetStreamsOffsetResponseData streamsOffset = manager.getStreamsOffset(request); + assertEquals(1, streamsOffset.streamsOffset().size()); + assertEquals(STREAM0, streamsOffset.streamsOffset().get(0).streamId()); + assertEquals(0L, streamsOffset.streamsOffset().get(0).startOffset()); + assertEquals(300L, streamsOffset.streamsOffset().get(0).endOffset()); + } private void replay(StreamControlManager manager, List records) {