From 9748efcc3c39107eae3c1e197b7a2d14dcb3fcaa Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Tue, 22 Aug 2023 11:39:59 +0800 Subject: [PATCH] feat(s3): define stream and object protocol between controller and broker 1. define stream and object protocol between controller and broker Signed-off-by: TheR1sing3un --- .../message/CommitCompactObjectRequest.json | 125 ++++++++++++++++++ .../message/CommitCompactObjectResponse.json | 30 +++++ .../message/CommitStreamObjectRequest.json | 65 +++++++++ .../message/CommitStreamObjectResponse.json | 30 +++++ .../message/CommitWALObjectRequest.json | 80 +++++++++++ .../message/CommitWALObjectResponse.json | 36 +++++ .../common/message/OpenStreamRequest.json | 30 ++++- .../common/message/OpenStreamResponse.json | 14 +- .../message/PrepareS3ObjectRequest.json | 47 +++++++ .../message/PrepareS3ObjectResponse.json | 36 +++++ .../objects/CommitCompactObjectRequest.java | 20 +++ .../s3/objects/CommitStreamObjectRequest.java | 13 +- .../s3/objects/CommitWalObjectRequest.java | 13 +- .../kafka/log/s3/objects/StreamObject.java | 13 ++ .../org/apache/kafka/message/EntityType.java | 5 +- 15 files changed, 539 insertions(+), 18 deletions(-) create mode 100644 clients/src/main/resources/common/message/CommitCompactObjectRequest.json create mode 100644 clients/src/main/resources/common/message/CommitCompactObjectResponse.json create mode 100644 clients/src/main/resources/common/message/CommitStreamObjectRequest.json create mode 100644 clients/src/main/resources/common/message/CommitStreamObjectResponse.json create mode 100644 clients/src/main/resources/common/message/CommitWALObjectRequest.json create mode 100644 clients/src/main/resources/common/message/CommitWALObjectResponse.json create mode 100644 clients/src/main/resources/common/message/PrepareS3ObjectRequest.json create mode 100644 clients/src/main/resources/common/message/PrepareS3ObjectResponse.json diff --git a/clients/src/main/resources/common/message/CommitCompactObjectRequest.json b/clients/src/main/resources/common/message/CommitCompactObjectRequest.json new file mode 100644 index 0000000000..caf49ca9ae --- /dev/null +++ b/clients/src/main/resources/common/message/CommitCompactObjectRequest.json @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 74, + "type": "request", + "listeners": [ + "controller", + "broker" + ], + "name": "CommitCompactObjectRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "entityType": "brokerId", + "about": "The ID of the requesting broker" + }, + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The ID of the WAL S3 object to commit" + }, + { + "name": "ObjectSize", + "type": "int64", + "versions": "0+", + "about": "The size of the WAL S3 object to commit" + }, + { + "name": "ObjectStreamRanges", + "type": "[]ObjectStreamRange", + "versions": "0+", + "about": "The stream ranges of the WAL S3 object to commit", + "fields": [ + { + "name": "StreamId", + "type": "int32", + "versions": "0+", + "about": "The ID of the stream" + }, + { + "name": "StreamEpoch", + "type": "int64", + "versions": "0+", + "entityType": "streamEpoch", + "about": "The epoch of the requesting stream in the requesting broker" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the stream range" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the stream range" + } + ] + }, + { + "name": "StreamObjects", + "type": "[]StreamObject", + "versions": "0+", + "about": "The stream objects to commit", + "fields": [ + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The ID of the WAL S3 object to commit" + }, + { + "name": "ObjectSize", + "type": "int64", + "versions": "0+", + "about": "The size of the WAL S3 object to commit" + }, + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The ID of the stream", + "entityType": "streamId" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the stream range" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the stream range" + }, + { + "name": "sourceObjectIds", + "type": "[]int64", + "versions": "0+", + "about": "The IDs of the source S3 objects" + } + ] + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CommitCompactObjectResponse.json b/clients/src/main/resources/common/message/CommitCompactObjectResponse.json new file mode 100644 index 0000000000..bba9db249b --- /dev/null +++ b/clients/src/main/resources/common/message/CommitCompactObjectResponse.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 74, + "type": "response", + "name": "CommitCompactObjectResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ErrorCode", + "type": "int16", + "versions": "0+", + "about": "The top level response error code" + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CommitStreamObjectRequest.json b/clients/src/main/resources/common/message/CommitStreamObjectRequest.json new file mode 100644 index 0000000000..179fcf5210 --- /dev/null +++ b/clients/src/main/resources/common/message/CommitStreamObjectRequest.json @@ -0,0 +1,65 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 75, + "type": "request", + "listeners": [ + "controller", + "broker" + ], + "name": "CommitStreamObjectRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The ID of the Stream S3 object to commit" + }, + { + "name": "ObjectSize", + "type": "int64", + "versions": "0+", + "about": "The size of the Stream S3 object to commit" + }, + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The ID of the stream", + "entityType": "streamId" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the stream range" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the stream range" + }, + { + "name": "sourceObjectIds", + "type": "[]int64", + "versions": "0+", + "about": "The IDs of the source S3 objects" + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CommitStreamObjectResponse.json b/clients/src/main/resources/common/message/CommitStreamObjectResponse.json new file mode 100644 index 0000000000..591c99621d --- /dev/null +++ b/clients/src/main/resources/common/message/CommitStreamObjectResponse.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 75, + "type": "response", + "name": "CommitStreamObjectResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ErrorCode", + "type": "int16", + "versions": "0+", + "about": "The top level response error code" + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CommitWALObjectRequest.json b/clients/src/main/resources/common/message/CommitWALObjectRequest.json new file mode 100644 index 0000000000..77ab3b83de --- /dev/null +++ b/clients/src/main/resources/common/message/CommitWALObjectRequest.json @@ -0,0 +1,80 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 73, + "type": "request", + "listeners": [ + "controller", + "broker" + ], + "name": "CommitWALObjectRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "entityType": "brokerId", + "about": "The ID of the requesting broker" + }, + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The ID of the WAL S3 object to commit" + }, + { + "name": "ObjectSize", + "type": "int64", + "versions": "0+", + "about": "The size of the WAL S3 object to commit" + }, + { + "name": "ObjectStreamRanges", + "type": "[]ObjectStreamRange", + "versions": "0+", + "about": "The stream ranges of the WAL S3 object to commit", + "fields": [ + { + "name": "StreamId", + "type": "int32", + "versions": "0+", + "about": "The ID of the stream" + }, + { + "name": "StreamEpoch", + "type": "int64", + "versions": "0+", + "entityType": "streamEpoch", + "about": "The epoch of the requesting stream in the requesting broker" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the stream range" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the stream range" + } + ] + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CommitWALObjectResponse.json b/clients/src/main/resources/common/message/CommitWALObjectResponse.json new file mode 100644 index 0000000000..4ee72de0d5 --- /dev/null +++ b/clients/src/main/resources/common/message/CommitWALObjectResponse.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 73, + "type": "response", + "name": "CommitWALObjectResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ErrorCode", + "type": "int16", + "versions": "0+", + "about": "The top level response error code" + }, + { + "name": "FailedStreamIds", + "type": "[]int64", + "versions": "0+", + "about": "Failed to commit WAL objects' id" + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/OpenStreamRequest.json b/clients/src/main/resources/common/message/OpenStreamRequest.json index 28c8f8e3a3..40cdd10ff5 100644 --- a/clients/src/main/resources/common/message/OpenStreamRequest.json +++ b/clients/src/main/resources/common/message/OpenStreamRequest.json @@ -16,14 +16,34 @@ { "apiKey": 69, "type": "request", - "listeners": ["controller", "broker"], + "listeners": [ + "controller", + "broker" + ], "name": "OpenStreamRequest", "validVersions": "0", "flexibleVersions": "0+", "fields": [ - { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", - "about": "The ID of the requesting broker" }, - { "name": "StreamId", "type": "int64", "versions": "0+", "entityType": "streamId", - "about": "The id of the requesting stream" } + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "entityType": "streamId", + "about": "The id of the requesting stream" + }, + { + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "entityType": "brokerId", + "about": "The ID of the requesting broker" + }, + { + "name": "StreamEpoch", + "type": "int64", + "versions": "0+", + "entityType": "streamEpoch", + "about": "The epoch of the requesting stream in the requesting broker" + } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/OpenStreamResponse.json b/clients/src/main/resources/common/message/OpenStreamResponse.json index 75175ea985..356fce75cd 100644 --- a/clients/src/main/resources/common/message/OpenStreamResponse.json +++ b/clients/src/main/resources/common/message/OpenStreamResponse.json @@ -20,7 +20,17 @@ "validVersions": "0", "flexibleVersions": "0+", "fields": [ - { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The top level response error code" } + { + "name": "ErrorCode", + "type": "int16", + "versions": "0+", + "about": "The top level response error code" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the opened stream" + } ] } \ No newline at end of file diff --git a/clients/src/main/resources/common/message/PrepareS3ObjectRequest.json b/clients/src/main/resources/common/message/PrepareS3ObjectRequest.json new file mode 100644 index 0000000000..29970d8787 --- /dev/null +++ b/clients/src/main/resources/common/message/PrepareS3ObjectRequest.json @@ -0,0 +1,47 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 72, + "type": "request", + "listeners": [ + "controller", + "broker" + ], + "name": "PrepareS3ObjectRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "entityType": "brokerId", + "about": "The ID of the requesting broker" + }, + { + "name": "PreparedCount", + "type": "int32", + "versions": "0+", + "about": "The S3 object count to prepare" + }, + { + "name": "TimeToLiveInMs", + "type": "int64", + "versions": "0+", + "about": "The time to live in milliseconds for the prepared S3 object" + } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json b/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json new file mode 100644 index 0000000000..3d71bc7082 --- /dev/null +++ b/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 72, + "type": "response", + "name": "PrepareS3ObjectResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ErrorCode", + "type": "int16", + "versions": "0+", + "about": "The top level response error code" + }, + { + "name": "S3ObjectIds", + "type": "[]int64", + "versions": "0+", + "about": "The prepared S3 objects' id" + } + ] +} \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java index 848c80878e..aab72f9ad3 100644 --- a/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java +++ b/core/src/main/scala/kafka/log/s3/objects/CommitCompactObjectRequest.java @@ -22,7 +22,19 @@ public class CommitCompactObjectRequest { private long objectId; private long objectSize; + /** + * The stream ranges of the compacted object. + */ + private List streamRanges; + + /** + * The stream objects which split from the compacted object. + */ private List streamObjects; + + /** + * The object ids which are compacted by the compacted object. + */ private List compactedObjectIds; public long getObjectId() { @@ -56,4 +68,12 @@ public List getCompactedObjectIds() { public void setCompactedObjectIds(List compactedObjectIds) { this.compactedObjectIds = compactedObjectIds; } + + public List getStreamRanges() { + return streamRanges; + } + + public void setStreamRanges(List streamRanges) { + this.streamRanges = streamRanges; + } } diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitStreamObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitStreamObjectRequest.java index 8f8e6823a3..6df190b868 100644 --- a/core/src/main/scala/kafka/log/s3/objects/CommitStreamObjectRequest.java +++ b/core/src/main/scala/kafka/log/s3/objects/CommitStreamObjectRequest.java @@ -23,7 +23,10 @@ public class CommitStreamObjectRequest { private long streamId; private long startOffset; private long endOffset; - private long sourceObjectId; + /** + * The source objects' id of the stream object. + */ + private long[] sourceObjectIds; public long getObjectId() { return objectId; @@ -65,11 +68,11 @@ public void setEndOffset(long endOffset) { this.endOffset = endOffset; } - public long getSourceObjectId() { - return sourceObjectId; + public long[] getSourceObjectIds() { + return sourceObjectIds; } - public void setSourceObjectId(long sourceObjectId) { - this.sourceObjectId = sourceObjectId; + public void setSourceObjectIds(long[] sourceObjectIds) { + this.sourceObjectIds = sourceObjectIds; } } diff --git a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java b/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java index 1f33c04108..92f6fab04c 100644 --- a/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java +++ b/core/src/main/scala/kafka/log/s3/objects/CommitWalObjectRequest.java @@ -22,7 +22,10 @@ public class CommitWalObjectRequest { private long objectId; private long objectSize; - private List streams; + /** + * The stream ranges of the compacted object. + */ + private List streamRanges; public long getObjectId() { return objectId; @@ -40,11 +43,11 @@ public void setObjectSize(long objectSize) { this.objectSize = objectSize; } - public List getStreams() { - return streams; + public List getStreamRanges() { + return streamRanges; } - public void setStreams(List streams) { - this.streams = streams; + public void setStreamRanges(List streamRanges) { + this.streamRanges = streamRanges; } } diff --git a/core/src/main/scala/kafka/log/s3/objects/StreamObject.java b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java index 63d8f37474..175a5baa96 100644 --- a/core/src/main/scala/kafka/log/s3/objects/StreamObject.java +++ b/core/src/main/scala/kafka/log/s3/objects/StreamObject.java @@ -24,6 +24,11 @@ public class StreamObject { private long startOffset; private long endOffset; + /** + * The source objects' id of the stream object. + */ + private long[] sourceObjectIds; + public long getObjectId() { return objectId; } @@ -63,4 +68,12 @@ public long getEndOffset() { public void setEndOffset(long endOffset) { this.endOffset = endOffset; } + + public long[] getSourceObjectIds() { + return sourceObjectIds; + } + + public void setSourceObjectIds(long[] sourceObjectIds) { + this.sourceObjectIds = sourceObjectIds; + } } diff --git a/generator/src/main/java/org/apache/kafka/message/EntityType.java b/generator/src/main/java/org/apache/kafka/message/EntityType.java index be756481f1..9fd35aab44 100644 --- a/generator/src/main/java/org/apache/kafka/message/EntityType.java +++ b/generator/src/main/java/org/apache/kafka/message/EntityType.java @@ -41,7 +41,10 @@ public enum EntityType { // Kafka on S3 inject start @JsonProperty("streamId") - STREAM_ID(Int64FieldType.INSTANCE); + STREAM_ID(Int64FieldType.INSTANCE), + + @JsonProperty("streamEpoch") + STREAM_EPOCH(Int64FieldType.INSTANCE); // Kafka on S3 inject end private final FieldType baseType;