diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java index f727bd18e5..477a15e1bc 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/ApiKeys.java @@ -108,7 +108,15 @@ public enum ApiKeys { UNREGISTER_BROKER(ApiMessageType.UNREGISTER_BROKER, false, RecordBatch.MAGIC_VALUE_V0, true), DESCRIBE_TRANSACTIONS(ApiMessageType.DESCRIBE_TRANSACTIONS), LIST_TRANSACTIONS(ApiMessageType.LIST_TRANSACTIONS), - ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true); + ALLOCATE_PRODUCER_IDS(ApiMessageType.ALLOCATE_PRODUCER_IDS, true, true), + + // Kafka on S3 inject start + + CREATE_STREAM(ApiMessageType.CREATE_STREAM, false, true), + DELETE_STREAM(ApiMessageType.DELETE_STREAM, false, true), + OPEN_STREAM(ApiMessageType.OPEN_STREAM, false, true), + CLOSE_STREAM(ApiMessageType.CLOSE_STREAM, false, true); + // Kafka on S3 inject end private static final Map> APIS_BY_LISTENER = new EnumMap<>(ApiMessageType.ListenerType.class); diff --git a/clients/src/main/resources/common/message/CloseStreamRequest.json b/clients/src/main/resources/common/message/CloseStreamRequest.json new file mode 100644 index 0000000000..1cfee577b6 --- /dev/null +++ b/clients/src/main/resources/common/message/CloseStreamRequest.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 70, + "type": "request", + "listeners": ["controller", "broker"], + "name": "CloseStreamRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the requesting broker" }, + { "name": "StreamId", "type": "int64", "versions": "0+", "entityType": "streamId", + "about": "The id of the requesting stream" } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CloseStreamResponse.json b/clients/src/main/resources/common/message/CloseStreamResponse.json new file mode 100644 index 0000000000..6d954b0e9e --- /dev/null +++ b/clients/src/main/resources/common/message/CloseStreamResponse.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 70, + "type": "response", + "name": "CloseStreamResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top level response error code" } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CreateStreamRequest.json b/clients/src/main/resources/common/message/CreateStreamRequest.json new file mode 100644 index 0000000000..4a086c26dc --- /dev/null +++ b/clients/src/main/resources/common/message/CreateStreamRequest.json @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "request", + "listeners": ["controller", "broker"], + "name": "CreateStreamRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "StreamId", "type": "int64", "versions": "0+", "entityType": "streamId", + "about": "The id of the requesting stream" } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/CreateStreamResponse.json b/clients/src/main/resources/common/message/CreateStreamResponse.json new file mode 100644 index 0000000000..65d50ec3b2 --- /dev/null +++ b/clients/src/main/resources/common/message/CreateStreamResponse.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 68, + "type": "response", + "name": "CreateStreamResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top level response error code" } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/DeleteStreamRequest.json b/clients/src/main/resources/common/message/DeleteStreamRequest.json new file mode 100644 index 0000000000..5d70bc6428 --- /dev/null +++ b/clients/src/main/resources/common/message/DeleteStreamRequest.json @@ -0,0 +1,27 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 71, + "type": "request", + "listeners": ["controller", "broker"], + "name": "DeleteStreamRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "StreamId", "type": "int64", "versions": "0+", "entityType": "streamId", + "about": "The id of the requesting stream" } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/DeleteStreamResponse.json b/clients/src/main/resources/common/message/DeleteStreamResponse.json new file mode 100644 index 0000000000..1cd1e5f570 --- /dev/null +++ b/clients/src/main/resources/common/message/DeleteStreamResponse.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 71, + "type": "response", + "name": "DeleteStreamResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top level response error code" } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/OpenStreamRequest.json b/clients/src/main/resources/common/message/OpenStreamRequest.json new file mode 100644 index 0000000000..28c8f8e3a3 --- /dev/null +++ b/clients/src/main/resources/common/message/OpenStreamRequest.json @@ -0,0 +1,29 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 69, + "type": "request", + "listeners": ["controller", "broker"], + "name": "OpenStreamRequest", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "BrokerId", "type": "int32", "versions": "0+", "entityType": "brokerId", + "about": "The ID of the requesting broker" }, + { "name": "StreamId", "type": "int64", "versions": "0+", "entityType": "streamId", + "about": "The id of the requesting stream" } + ] +} \ No newline at end of file diff --git a/clients/src/main/resources/common/message/OpenStreamResponse.json b/clients/src/main/resources/common/message/OpenStreamResponse.json new file mode 100644 index 0000000000..75175ea985 --- /dev/null +++ b/clients/src/main/resources/common/message/OpenStreamResponse.json @@ -0,0 +1,26 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 69, + "type": "response", + "name": "OpenStreamResponse", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The top level response error code" } + ] +} \ No newline at end of file diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index d2df68f6da..8b0cdd640a 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -71,7 +71,8 @@ object MetadataCacheTest { image.configs(), image.clientQuotas(), image.producerIds(), - image.acls()) + image.acls(), + image.streamsMetadata()) val delta = new MetadataDelta.Builder().setImage(partialImage).build() def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index e623816c39..040481546d 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -54,7 +54,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage} +import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage, S3StreamsMetadataImage} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 @@ -4128,7 +4128,8 @@ class ReplicaManagerTest { ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, - AclsImage.EMPTY + AclsImage.EMPTY, + S3StreamsMetadataImage.EMPTY ) } diff --git a/generator/src/main/java/org/apache/kafka/message/EntityType.java b/generator/src/main/java/org/apache/kafka/message/EntityType.java index 225c987873..be756481f1 100644 --- a/generator/src/main/java/org/apache/kafka/message/EntityType.java +++ b/generator/src/main/java/org/apache/kafka/message/EntityType.java @@ -18,6 +18,7 @@ package org.apache.kafka.message; import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.message.FieldType.Int64FieldType; public enum EntityType { @JsonProperty("unknown") @@ -36,8 +37,12 @@ public enum EntityType { TOPIC_NAME(FieldType.StringFieldType.INSTANCE), @JsonProperty("brokerId") - BROKER_ID(FieldType.Int32FieldType.INSTANCE); + BROKER_ID(FieldType.Int32FieldType.INSTANCE), + // Kafka on S3 inject start + @JsonProperty("streamId") + STREAM_ID(Int64FieldType.INSTANCE); + // Kafka on S3 inject end private final FieldType baseType; EntityType(FieldType baseType) { diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java new file mode 100644 index 0000000000..044acc7f76 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.stream; + +import java.util.LinkedList; +import java.util.Queue; +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.slf4j.Logger; + +/** + * The S3ObjectControlManager manages all S3Object's lifecycle, such as apply, create, destroy, etc. + */ +public class S3ObjectControlManager { + private final SnapshotRegistry snapshotRegistry; + private final Logger log; + + private final TimelineHashMap objectsMetadata; + + /** + * The objectId of the next object to be applied. (start from 0) + */ + private Long nextApplyObjectId = 0L; + + // TODO: add timer task to periodically check if there are objects to be destroyed or created + private final Queue appliedObjects; + private final Queue markDestroyedObjects; + + public S3ObjectControlManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(S3ObjectControlManager.class); + this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0); + this.appliedObjects = new LinkedList<>(); + this.markDestroyedObjects = new LinkedList<>(); + } + + public Long appliedObjectNum() { + return nextApplyObjectId; + } + +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java new file mode 100644 index 0000000000..9d586e8bcb --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.stream; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.apache.kafka.timeline.TimelineHashMap; +import org.apache.kafka.timeline.TimelineHashSet; +import org.slf4j.Logger; + +/** + * The StreamControlManager manages all Stream's lifecycle, such as create, open, delete, etc. + */ +public class StreamControlManager { + + static class S3StreamMetadata { + private Long streamId; + private Long epoch; + private Long startOffset; + private TimelineHashSet ranges; + private TimelineHashSet streamObjects; + } + + static class BrokerS3WALMetadata { + private Integer brokerId; + private TimelineHashSet walObjects; + } + + private final SnapshotRegistry snapshotRegistry; + + private final Logger log; + + private final S3ObjectControlManager s3ObjectControlManager; + + private final TimelineHashMap streamsMetadata; + + private final TimelineHashMap brokersMetadata; + + public StreamControlManager( + SnapshotRegistry snapshotRegistry, + LogContext logContext, + S3ObjectControlManager s3ObjectControlManager) { + this.snapshotRegistry = snapshotRegistry; + this.log = logContext.logger(StreamControlManager.class); + this.s3ObjectControlManager = s3ObjectControlManager; + this.streamsMetadata = new TimelineHashMap<>(snapshotRegistry, 0); + this.brokersMetadata = new TimelineHashMap<>(snapshotRegistry, 0); + } + + + +} diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java new file mode 100644 index 0000000000..5315a41524 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.metadata.stream.S3WALObject; + +public class BrokerS3WALMetadataDelta { + + private final BrokerS3WALMetadataImage image; + private final Set changedS3WALObjects = new HashSet<>(); + + private final Set removedS3WALObjects = new HashSet<>(); + + public BrokerS3WALMetadataDelta(BrokerS3WALMetadataImage image) { + this.image = image; + } + + public void replay(WALObjectRecord record) { + changedS3WALObjects.add(S3WALObject.of(record)); + } + + public void replay(RemoveWALObjectRecord record) { + removedS3WALObjects.add(new S3WALObject(record.objectId())); + } + + public BrokerS3WALMetadataImage apply() { + List newS3WALObjects = new ArrayList<>(image.getWalObjects()); + // remove all removed WAL objects + newS3WALObjects.removeAll(removedS3WALObjects); + // add all changed WAL objects + newS3WALObjects.addAll(changedS3WALObjects); + return new BrokerS3WALMetadataImage(image.getBrokerId(), newS3WALObjects); + } + +} diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java new file mode 100644 index 0000000000..a5a48a40ec --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.kafka.image; + +import java.util.List; +import java.util.Objects; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; + +public class BrokerS3WALMetadataImage { + private final Integer brokerId; + private final List s3WalObjects; + + public BrokerS3WALMetadataImage(Integer brokerId, List s3WalObjects) { + this.brokerId = brokerId; + this.s3WalObjects = s3WalObjects; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + BrokerS3WALMetadataImage that = (BrokerS3WALMetadataImage) o; + return Objects.equals(brokerId, that.brokerId) && Objects.equals(s3WalObjects, that.s3WalObjects); + } + + @Override + public int hashCode() { + return Objects.hash(brokerId, s3WalObjects); + } + + public void write(ImageWriter writer, ImageWriterOptions options) { + s3WalObjects.forEach(walObject -> writer.write(walObject.toRecord())); + } + + public List getWalObjects() { + return s3WalObjects; + } + + public Integer getBrokerId() { + return brokerId; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index ab4fd68f41..5fc776d9ed 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -27,12 +27,20 @@ import org.apache.kafka.common.metadata.PartitionChangeRecord; import org.apache.kafka.common.metadata.PartitionRecord; import org.apache.kafka.common.metadata.ProducerIdsRecord; +import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord; +import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; +import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.S3StreamObjectRecord; +import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.metadata.TopicRecord; import org.apache.kafka.common.metadata.UnfenceBrokerRecord; import org.apache.kafka.common.metadata.UnregisterBrokerRecord; +import org.apache.kafka.common.metadata.WALObjectRecord; import org.apache.kafka.common.protocol.ApiMessage; import org.apache.kafka.server.common.MetadataVersion; @@ -72,6 +80,8 @@ public MetadataDelta build() { private AclsDelta aclsDelta = null; + private S3StreamsMetadataDelta s3StreamsMetadataDelta = null; + public MetadataDelta(MetadataImage image) { this.image = image; } @@ -145,6 +155,17 @@ public AclsDelta getOrCreateAclsDelta() { return aclsDelta; } + public S3StreamsMetadataDelta streamMetadataDelta() { + return s3StreamsMetadataDelta; + } + + public S3StreamsMetadataDelta getOrCreateStreamsMetadataDelta() { + if (s3StreamsMetadataDelta == null) { + s3StreamsMetadataDelta = new S3StreamsMetadataDelta(image.streamsMetadata()); + } + return s3StreamsMetadataDelta; + } + public Optional metadataVersionChanged() { if (featuresDelta == null) { return Optional.empty(); @@ -209,6 +230,32 @@ public void replay(ApiMessage record) { case ZK_MIGRATION_STATE_RECORD: // TODO handle this break; + // Kafka on S3 inject start + case S3_STREAM_RECORD: + replay((S3StreamRecord) record); + break; + case REMOVE_S3_STREAM_RECORD: + replay((RemoveS3StreamRecord) record); + break; + case RANGE_RECORD: + replay((RangeRecord) record); + break; + case REMOVE_RANGE_RECORD: + replay((RemoveRangeRecord) record); + break; + case S3_STREAM_OBJECT_RECORD: + replay((S3StreamObjectRecord) record); + break; + case REMOVE_S3_STREAM_OBJECT_RECORD: + replay((RemoveS3StreamObjectRecord) record); + break; + case WALOBJECT_RECORD: + replay((WALObjectRecord) record); + break; + case REMOVE_WALOBJECT_RECORD: + replay((RemoveWALObjectRecord) record); + break; + // Kafka on S3 inject end default: throw new RuntimeException("Unknown metadata record type " + type); } @@ -284,6 +331,38 @@ public void replay(RemoveAccessControlEntryRecord record) { getOrCreateAclsDelta().replay(record); } + public void replay(S3StreamRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + + public void replay(RemoveS3StreamRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + + public void replay(RangeRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + + public void replay(RemoveRangeRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + + public void replay(S3StreamObjectRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + + public void replay(RemoveS3StreamObjectRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + + public void replay(WALObjectRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + + public void replay(RemoveWALObjectRecord record) { + getOrCreateStreamsMetadataDelta().replay(record); + } + /** * Create removal deltas for anything which was in the base image, but which was not * referenced in the snapshot records we just applied. @@ -341,6 +420,12 @@ public MetadataImage apply(MetadataProvenance provenance) { } else { newAcls = aclsDelta.apply(); } + S3StreamsMetadataImage newStreamMetadata; + if (s3StreamsMetadataDelta == null) { + newStreamMetadata = image.streamsMetadata(); + } else { + newStreamMetadata = s3StreamsMetadataDelta.apply(); + } return new MetadataImage( provenance, newFeatures, @@ -349,7 +434,8 @@ public MetadataImage apply(MetadataProvenance provenance) { newConfigs, newClientQuotas, newProducerIds, - newAcls + newAcls, + newStreamMetadata ); } @@ -363,6 +449,7 @@ public String toString() { ", clientQuotasDelta=" + clientQuotasDelta + ", producerIdsDelta=" + producerIdsDelta + ", aclsDelta=" + aclsDelta + + ", streamMetadataDelta=" + s3StreamsMetadataDelta + ')'; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java index 2202b4fe2f..0df8b32f5d 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java @@ -38,7 +38,8 @@ public final class MetadataImage { ConfigurationsImage.EMPTY, ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, - AclsImage.EMPTY); + AclsImage.EMPTY, + S3StreamsMetadataImage.EMPTY); private final MetadataProvenance provenance; @@ -56,6 +57,12 @@ public final class MetadataImage { private final AclsImage acls; + // Kafka on S3 inject start + + private final S3StreamsMetadataImage streamMetadata; + + // Kafka on S3 inject end + public MetadataImage( MetadataProvenance provenance, FeaturesImage features, @@ -64,7 +71,8 @@ public MetadataImage( ConfigurationsImage configs, ClientQuotasImage clientQuotas, ProducerIdsImage producerIds, - AclsImage acls + AclsImage acls, + S3StreamsMetadataImage streamMetadata ) { this.provenance = provenance; this.features = features; @@ -74,6 +82,7 @@ public MetadataImage( this.clientQuotas = clientQuotas; this.producerIds = producerIds; this.acls = acls; + this.streamMetadata = streamMetadata; } public boolean isEmpty() { @@ -83,7 +92,8 @@ public boolean isEmpty() { configs.isEmpty() && clientQuotas.isEmpty() && producerIds.isEmpty() && - acls.isEmpty(); + acls.isEmpty() && + streamMetadata.isEmpty(); } public MetadataProvenance provenance() { @@ -126,6 +136,10 @@ public AclsImage acls() { return acls; } + public S3StreamsMetadataImage streamsMetadata() { + return streamMetadata; + } + public void write(ImageWriter writer, ImageWriterOptions options) { // Features should be written out first so we can include the metadata.version at the beginning of the // snapshot @@ -136,6 +150,7 @@ public void write(ImageWriter writer, ImageWriterOptions options) { clientQuotas.write(writer, options); producerIds.write(writer, options); acls.write(writer, options); + streamMetadata.write(writer, options); writer.close(true); } @@ -150,7 +165,8 @@ public boolean equals(Object o) { configs.equals(other.configs) && clientQuotas.equals(other.clientQuotas) && producerIds.equals(other.producerIds) && - acls.equals(other.acls); + acls.equals(other.acls) && + streamMetadata.equals(other.streamMetadata); } @Override @@ -163,7 +179,8 @@ public int hashCode() { configs, clientQuotas, producerIds, - acls); + acls, + streamMetadata); } @Override @@ -177,6 +194,7 @@ public String toString() { ", clientQuotas=" + clientQuotas + ", producerIdsImage=" + producerIds + ", acls=" + acls + + ", streamMetadata=" + streamMetadata + ")"; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java new file mode 100644 index 0000000000..e7867e4bb9 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; +import org.apache.kafka.common.metadata.S3StreamObjectRecord; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3StreamObject; + +public class S3StreamMetadataDelta { + private final S3StreamMetadataImage image; + + private Long newEpoch; + + private final Map changedRanges = new HashMap<>(); + private final Set removedRanges = new HashSet<>(); + private final Set changedS3StreamObjects = new HashSet<>(); + private final Set removedS3StreamObjects = new HashSet<>(); + + public S3StreamMetadataDelta(S3StreamMetadataImage image) { + this.image = image; + this.newEpoch = image.getEpoch(); + } + + public void replay(RangeRecord record) { + changedRanges.put(record.rangeIndex(), RangeMetadata.of(record)); + } + + public void replay(RemoveRangeRecord record) { + removedRanges.add(record.rangeIndex()); + } + + public void replay(S3StreamObjectRecord record) { + changedS3StreamObjects.add(S3StreamObject.of(record)); + } + + public void replay(RemoveS3StreamObjectRecord record) { + removedS3StreamObjects.add(new S3StreamObject(record.objectId())); + } + + public S3StreamMetadataImage apply() { + Map newRanges = new HashMap<>(image.getRanges().size()); + // apply the delta changes of old ranges since the last image + image.getRanges().forEach((rangeIndex, range) -> { + RangeMetadata changedRange = changedRanges.get(rangeIndex); + if (changedRange == null) { + // no change, check if deleted + if (!removedRanges.contains(rangeIndex)) { + newRanges.put(rangeIndex, range); + } + } else { + // changed, apply the delta + newRanges.put(rangeIndex, changedRange); + } + }); + // apply the new created ranges + changedRanges.entrySet().stream().filter(entry -> !newRanges.containsKey(entry.getKey())) + .forEach(entry -> newRanges.put(entry.getKey(), entry.getValue())); + + List newS3StreamObjects = new ArrayList<>(image.getStreamObjects()); + // remove all removed stream-objects + newS3StreamObjects.removeAll(removedS3StreamObjects); + // add all changed stream-objects + newS3StreamObjects.addAll(changedS3StreamObjects); + return new S3StreamMetadataImage(image.getStreamId(), newEpoch, image.getStartOffset(), newRanges, newS3StreamObjects); + } + +} diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java new file mode 100644 index 0000000000..af30fad0c2 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.util.List; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; + +public class S3StreamMetadataImage { + + private final Long streamId; + + private final Long epoch; + + private final Long startOffset; + + private final Map ranges; + + private final List streamObjects; + + public S3StreamMetadataImage( + Long streamId, + Long epoch, + Long startOffset, + Map ranges, + List streamObjects) { + this.streamId = streamId; + this.epoch = epoch; + this.startOffset = startOffset; + this.ranges = ranges; + this.streamObjects = streamObjects; + } + + public void write(ImageWriter writer, ImageWriterOptions options) { + writer.write(0, new S3StreamRecord() + .setStreamId(streamId) + .setEpoch(epoch) + .setStartOffset(startOffset)); + ranges.values().forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord())); + streamObjects.forEach(streamObject -> writer.write(streamObject.toRecord())); + } + + public Map getRanges() { + return ranges; + } + + public List getStreamObjects() { + return streamObjects; + } + + public Long getEpoch() { + return epoch; + } + + public Long getStartOffset() { + return startOffset; + } + + public Long getStreamId() { + return streamId; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + S3StreamMetadataImage that = (S3StreamMetadataImage) o; + return Objects.equals(streamId, that.streamId) && Objects.equals(epoch, that.epoch) && Objects.equals(startOffset, + that.startOffset) && Objects.equals(ranges, that.ranges) && Objects.equals(streamObjects, that.streamObjects); + } + + @Override + public int hashCode() { + return Objects.hash(streamId, epoch, startOffset, ranges, streamObjects); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java new file mode 100644 index 0000000000..f4ac0f58d6 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamRecord; +import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.common.metadata.S3StreamObjectRecord; +import org.apache.kafka.common.metadata.WALObjectRecord; + +public final class S3StreamsMetadataDelta { + + private final S3StreamsMetadataImage image; + + private final Map changedStreams = new HashMap<>(); + + private final Map changedBrokers = new HashMap<>(); + + private final Set deletedStreams = new HashSet<>(); + // TODO: when we recycle the broker's memory data structure + // We don't use pair of specify BrokerCreateRecord and BrokerRemoveRecord to create or remove brokers, and + // we create BrokerStreamMetadataImage when we create the first WALObjectRecord for a broker, + // so we should decide when to recycle the broker's memory data structure + private final Set deletedBrokers = new HashSet<>(); + + public S3StreamsMetadataDelta(S3StreamsMetadataImage image) { + this.image = image; + } + + public void replay(S3StreamRecord record) { + S3StreamMetadataDelta delta; + if (!image.getStreamsMetadata().containsKey(record.streamId())) { + // create a new StreamMetadata with empty ranges and streams if not exist + delta = new S3StreamMetadataDelta( + new S3StreamMetadataImage(record.streamId(), record.epoch(), record.startOffset(), Collections.emptyMap(), Collections.emptyList())); + } else { + // update the epoch if exist + S3StreamMetadataImage s3StreamMetadataImage = image.getStreamsMetadata().get(record.streamId()); + delta = new S3StreamMetadataDelta( + new S3StreamMetadataImage(record.streamId(), record.epoch(), record.startOffset(), s3StreamMetadataImage.getRanges(), + s3StreamMetadataImage.getStreamObjects())); + } + // add the delta to the changedStreams + changedStreams.put(record.streamId(), delta); + } + + public void replay(RemoveS3StreamRecord record) { + // add the streamId to the deletedStreams + deletedStreams.add(record.streamId()); + } + + public void replay(RangeRecord record) { + getOrCreateStreamMetadataDelta(record.streamId()).replay(record); + } + + public void replay(RemoveRangeRecord record) { + getOrCreateStreamMetadataDelta(record.streamId()).replay(record); + } + + public void replay(S3StreamObjectRecord record) { + getOrCreateStreamMetadataDelta(record.streamId()).replay(record); + } + + public void replay(RemoveS3StreamObjectRecord record) { + getOrCreateStreamMetadataDelta(record.streamId()).replay(record); + } + + public void replay(WALObjectRecord record) { + getOrCreateBrokerStreamMetadataDelta(record.brokerId()).replay(record); + } + + public void replay(RemoveWALObjectRecord record) { + getOrCreateBrokerStreamMetadataDelta(record.brokerId()).replay(record); + } + + private S3StreamMetadataDelta getOrCreateStreamMetadataDelta(Long streamId) { + S3StreamMetadataDelta delta = changedStreams.get(streamId); + if (delta == null) { + delta = new S3StreamMetadataDelta(image.getStreamsMetadata().get(streamId)); + changedStreams.put(streamId, delta); + } + return delta; + } + + private BrokerS3WALMetadataDelta getOrCreateBrokerStreamMetadataDelta(Integer brokerId) { + BrokerS3WALMetadataDelta delta = changedBrokers.get(brokerId); + if (delta == null) { + delta = new BrokerS3WALMetadataDelta( + image.getBrokerWALMetadata(). + getOrDefault(brokerId, new BrokerS3WALMetadataImage(brokerId, Collections.emptyList()))); + changedBrokers.put(brokerId, delta); + } + return delta; + } + + S3StreamsMetadataImage apply() { + Map newStreams = new HashMap<>(image.getStreamsMetadata().size()); + Map newBrokerStreams = new HashMap<>(image.getBrokerWALMetadata().size()); + // apply the delta changes of old streams since the last image + image.getStreamsMetadata().forEach((streamId, streamMetadataImage) -> { + S3StreamMetadataDelta delta = changedStreams.get(streamId); + if (delta == null) { + // no change, check if deleted + if (!deletedStreams.contains(streamId)) { + newStreams.put(streamId, streamMetadataImage); + } + } else { + // changed, apply the delta + S3StreamMetadataImage newS3StreamMetadataImage = delta.apply(); + newStreams.put(streamId, newS3StreamMetadataImage); + } + }); + // apply the new created streams + changedStreams.entrySet().stream().filter(entry -> !newStreams.containsKey(entry.getKey())) + .forEach(entry -> { + S3StreamMetadataImage newS3StreamMetadataImage = entry.getValue().apply(); + newStreams.put(entry.getKey(), newS3StreamMetadataImage); + }); + + // apply the delta changes of old brokers since the last image + image.getBrokerWALMetadata().forEach((brokerId, brokerStreamMetadataImage) -> { + BrokerS3WALMetadataDelta delta = changedBrokers.get(brokerId); + if (delta == null) { + // no change, check if deleted + if (!deletedBrokers.contains(brokerId)) { + newBrokerStreams.put(brokerId, brokerStreamMetadataImage); + } + } else { + // changed, apply the delta + BrokerS3WALMetadataImage newBrokerS3WALMetadataImage = delta.apply(); + newBrokerStreams.put(brokerId, newBrokerS3WALMetadataImage); + } + }); + // apply the new created streams + changedBrokers.entrySet().stream().filter(entry -> !newBrokerStreams.containsKey(entry.getKey())) + .forEach(entry -> { + BrokerS3WALMetadataImage newBrokerS3WALMetadataImage = entry.getValue().apply(); + newBrokerStreams.put(entry.getKey(), newBrokerS3WALMetadataImage); + }); + + return new S3StreamsMetadataImage(newStreams, newBrokerStreams); + } + +} diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java new file mode 100644 index 0000000000..727f4ee3e5 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; + +public final class S3StreamsMetadataImage { + + public static final S3StreamsMetadataImage EMPTY = + new S3StreamsMetadataImage(Collections.emptyMap(), Collections.emptyMap()); + + private final Map streamsMetadata; + + private final Map brokerWALMetadata; + + public S3StreamsMetadataImage( + Map streamsMetadata, + Map brokerWALMetadata) { + this.streamsMetadata = streamsMetadata; + this.brokerWALMetadata = brokerWALMetadata; + } + + + boolean isEmpty() { + return this.brokerWALMetadata.isEmpty() && this.streamsMetadata.isEmpty(); + } + + public void write(ImageWriter writer, ImageWriterOptions options) { + streamsMetadata.values().forEach(image -> image.write(writer, options)); + brokerWALMetadata.values().forEach(image -> image.write(writer, options)); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof S3StreamsMetadataImage)) return false; + S3StreamsMetadataImage other = (S3StreamsMetadataImage) obj; + return this.streamsMetadata.equals(other.streamsMetadata) + && this.brokerWALMetadata.equals(other.brokerWALMetadata); + } + + @Override + public int hashCode() { + return Objects.hash(streamsMetadata, brokerWALMetadata); + } + + public Map getBrokerWALMetadata() { + return brokerWALMetadata; + } + + public Map getStreamsMetadata() { + return streamsMetadata; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java new file mode 100644 index 0000000000..328dc46e4f --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +import java.util.Optional; +import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +public class RangeMetadata implements Comparable { + private Long streamId; + private Long epoch; + private Integer rangeIndex; + private Long startOffset; + private Optional endOffset; + private Integer brokerId; + @Override + public int compareTo(RangeMetadata o) { + return this.rangeIndex.compareTo(o.rangeIndex); + } + + public Long getEpoch() { + return epoch; + } + + public Integer getRangeIndex() { + return rangeIndex; + } + + public Long getStartOffset() { + return startOffset; + } + + public Optional getEndOffset() { + return endOffset; + } + + public Integer getBrokerId() { + return brokerId; + } + + public ApiMessageAndVersion toRecord() { + return new ApiMessageAndVersion(new RangeRecord() + .setStreamId(streamId) + .setEpoch(epoch) + .setBrokerId(brokerId) + .setRangeIndex(rangeIndex) + .setStartOffset(startOffset) + .setEndOffset(endOffset.get()), (short) 0); + } + + public static RangeMetadata of(RangeRecord record) { + RangeMetadata rangeMetadata = new RangeMetadata(); + rangeMetadata.streamId = record.streamId(); + rangeMetadata.epoch = record.epoch(); + rangeMetadata.rangeIndex = record.rangeIndex(); + rangeMetadata.startOffset = record.startOffset(); + rangeMetadata.endOffset = Optional.ofNullable(record.endOffset()); + rangeMetadata.brokerId = record.brokerId(); + return rangeMetadata; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java new file mode 100644 index 0000000000..c50d0fa33d --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +import java.util.Objects; +import java.util.Optional; + +/** + * S3Object is the base class of object in S3. Manages the lifecycle of S3Object. + */ +public abstract class S3Object implements Comparable { + + protected final Long objectId; + + protected Optional objectSize = Optional.empty(); + + protected Optional objectAddress = Optional.empty(); + + protected Optional applyTimeInMs = Optional.empty(); + + protected Optional createTimeInMs = Optional.empty(); + + protected Optional destroyTimeInMs = Optional.empty(); + + protected S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED; + + protected S3ObjectType objectType = S3ObjectType.UNKNOWN; + + protected S3Object(final Long objectId) { + this.objectId = objectId; + } + + protected S3Object( + final Long objectId, + final Long objectSize, + final String objectAddress, + final Long applyTimeInMs, + final Long createTimeInMs, + final Long destroyTimeInMs, + final S3ObjectState s3ObjectState, + final S3ObjectType objectType) { + this.objectId = objectId; + this.objectSize = Optional.of(objectSize); + this.objectAddress = Optional.of(objectAddress); + this.applyTimeInMs = Optional.of(applyTimeInMs); + this.createTimeInMs = Optional.of(createTimeInMs); + this.destroyTimeInMs = Optional.of(destroyTimeInMs); + this.objectType = objectType; + this.s3ObjectState = s3ObjectState; + } + + public void onApply() { + if (this.s3ObjectState != S3ObjectState.UNINITIALIZED) { + throw new IllegalStateException("Object is not in UNINITIALIZED state"); + } + this.s3ObjectState = S3ObjectState.APPLIED; + this.applyTimeInMs = Optional.of(System.currentTimeMillis()); + } + + public void onCreate(S3ObjectCreateContext createContext) { + // TODO: decide fetch object metadata from S3 or let broker send it to controller + if (this.s3ObjectState != S3ObjectState.APPLIED) { + throw new IllegalStateException("Object is not in APPLIED state"); + } + this.s3ObjectState = S3ObjectState.CREATED; + this.createTimeInMs = Optional.of(createContext.createTimeInMs); + this.objectSize = Optional.of(createContext.objectSize); + this.objectAddress = Optional.of(createContext.objectAddress); + this.objectType = createContext.objectType; + } + + public void onMarkDestroy() { + if (this.s3ObjectState != S3ObjectState.CREATED) { + throw new IllegalStateException("Object is not in CREATED state"); + } + this.s3ObjectState = S3ObjectState.MARK_DESTROYED; + } + + public void onDestroy() { + if (this.s3ObjectState != S3ObjectState.CREATED) { + throw new IllegalStateException("Object is not in CREATED state"); + } + // TODO: trigger destroy + + } + + public S3ObjectType getObjectType() { + return objectType; + } + + public class S3ObjectCreateContext { + + private final Long createTimeInMs; + private final Long objectSize; + private final String objectAddress; + private final S3ObjectType objectType; + + public S3ObjectCreateContext( + final Long createTimeInMs, + final Long objectSize, + final String objectAddress, + final S3ObjectType objectType) { + this.createTimeInMs = createTimeInMs; + this.objectSize = objectSize; + this.objectAddress = objectAddress; + this.objectType = objectType; + } + } + + @Override + public int compareTo(S3Object o) { + return this.objectId.compareTo(o.objectId); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + S3Object s3Object = (S3Object) o; + return Objects.equals(objectId, s3Object.objectId); + } + + @Override + public int hashCode() { + return Objects.hash(objectId); + } + + public Long getObjectId() { + return objectId; + } + + public Optional getObjectSize() { + return objectSize; + } + + public Optional getObjectAddress() { + return objectAddress; + } + + public Optional getApplyTimeInMs() { + return applyTimeInMs; + } + + public Optional getCreateTimeInMs() { + return createTimeInMs; + } + + public Optional getDestroyTimeInMs() { + return destroyTimeInMs; + } + + public S3ObjectState getS3ObjectState() { + return s3ObjectState; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java new file mode 100644 index 0000000000..228e682aea --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +public enum S3ObjectState { + UNINITIALIZED, + APPLIED, + CREATED, + MARK_DESTROYED, + DESTROYED; + + public static S3ObjectState fromByte(Byte b) { + int ordinal = b.intValue(); + if (ordinal < 0 || ordinal >= values().length) { + throw new IllegalArgumentException("Invalid ObjectState ordinal " + ordinal); + } + return values()[ordinal]; + } +} \ No newline at end of file diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectStreamIndex.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectStreamIndex.java new file mode 100644 index 0000000000..5c8fd88071 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectStreamIndex.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; + +/** + * ObjectStreamIndex is the index of a stream range in a WAL object or STREAM object. + */ +public class S3ObjectStreamIndex implements Comparable { + + private final Long streamId; + + private final Long startOffset; + + private final Long endOffset; + + public S3ObjectStreamIndex(Long streamId, Long startOffset, Long endOffset) { + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffset = endOffset; + } + + public Long getStreamId() { + return streamId; + } + + public Long getStartOffset() { + return startOffset; + } + + public Long getEndOffset() { + return endOffset; + } + + @Override + public int compareTo(S3ObjectStreamIndex o) { + int res = this.streamId.compareTo(o.streamId); + return res == 0 ? this.startOffset.compareTo(o.startOffset) : res; + } + + public StreamIndex toRecordStreamIndex() { + return new StreamIndex() + .setStreamId(streamId) + .setStartOffset(startOffset) + .setEndOffset(endOffset); + } + + public static S3ObjectStreamIndex of(StreamIndex index) { + return new S3ObjectStreamIndex(index.streamId(), index.startOffset(), index.endOffset()); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectType.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectType.java new file mode 100644 index 0000000000..30cdaf31a5 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectType.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +public enum S3ObjectType { + /** + * WAL object with loose records + */ + WAL_LOOSE, + + /** + * WAL object with minor compaction records + */ + WAL_MINOR, + + /** + * WAL object with major compaction records + */ + WAL_MAJOR, + + /** + * STREAM object with stream records of one stream + */ + STREAM, + + /** + * UNKNOWN object type + */ + UNKNOWN; + + public static S3ObjectType fromByte(Byte b) { + int ordinal = b.intValue(); + if (ordinal < 0 || ordinal >= values().length) { + return UNKNOWN; + } + return values()[ordinal]; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java new file mode 100644 index 0000000000..6f93c6536b --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +import java.util.Optional; +import org.apache.kafka.common.metadata.S3StreamObjectRecord; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +public class S3StreamObject extends S3Object { + + private S3ObjectStreamIndex streamIndex; + + public S3StreamObject(final Long objectId) { + super(objectId); + } + + @Override + public void onCreate(S3ObjectCreateContext createContext) { + super.onCreate(createContext); + if (!(createContext instanceof StreamObjectCreateContext)) { + throw new IllegalArgumentException(); + } + this.streamIndex = ((StreamObjectCreateContext) createContext).streamIndex; + } + + @Override + public int compareTo(S3Object o) { + if (!(o instanceof S3StreamObject)) { + throw new IllegalArgumentException("Cannot compare StreamObject with non-StreamObject"); + } + S3StreamObject s3StreamObject = (S3StreamObject) o; + // order by streamId first, then startOffset + int res = this.streamIndex.getStreamId().compareTo(s3StreamObject.streamIndex.getStreamId()); + return res == 0 ? this.streamIndex.getStartOffset().compareTo(s3StreamObject.streamIndex.getStartOffset()) : res; + } + + class StreamObjectCreateContext extends S3ObjectCreateContext { + + private final S3ObjectStreamIndex streamIndex; + + public StreamObjectCreateContext( + final Long createTimeInMs, + final Long objectSize, + final String objectAddress, + final S3ObjectType objectType, + final S3ObjectStreamIndex streamIndex) { + super(createTimeInMs, objectSize, objectAddress, objectType); + this.streamIndex = streamIndex; + } + } + + public S3ObjectStreamIndex getStreamIndex() { + return streamIndex; + } + + public ApiMessageAndVersion toRecord() { + return new ApiMessageAndVersion(new S3StreamObjectRecord() + .setObjectId(objectId) + .setStreamId(streamIndex.getStreamId()) + .setObjectState((byte) s3ObjectState.ordinal()) + .setObjectType((byte) objectType.ordinal()) + .setApplyTimeInMs(applyTimeInMs.get()) + .setCreateTimeInMs(createTimeInMs.get()) + .setDestroyTimeInMs(destroyTimeInMs.get()) + .setObjectSize(objectSize.get()) + .setStartOffset(streamIndex.getStartOffset()) + .setEndOffset(streamIndex.getEndOffset()), (short) 0); + } + + public static S3StreamObject of(S3StreamObjectRecord record) { + S3StreamObject s3StreamObject = new S3StreamObject(record.objectId()); + s3StreamObject.objectType = S3ObjectType.fromByte(record.objectType()); + s3StreamObject.s3ObjectState = S3ObjectState.fromByte(record.objectState()); + s3StreamObject.applyTimeInMs = Optional.of(record.applyTimeInMs()); + s3StreamObject.createTimeInMs = Optional.of(record.createTimeInMs()); + s3StreamObject.destroyTimeInMs = Optional.of(record.destroyTimeInMs()); + s3StreamObject.objectSize = Optional.of(record.objectSize()); + s3StreamObject.streamIndex = new S3ObjectStreamIndex(record.streamId(), record.startOffset(), record.endOffset()); + return s3StreamObject; + } +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java new file mode 100644 index 0000000000..13e48d2312 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +public class S3WALObject extends S3Object { + + private Integer brokerId; + private Map streamsIndex; + + private S3ObjectType objectType = S3ObjectType.UNKNOWN; + + public S3WALObject(Long objectId) { + super(objectId); + } + + private S3WALObject( + final Long objectId, + final Long objectSize, + final String objectAddress, + final Long applyTimeInMs, + final Long createTimeInMs, + final Long destroyTimeInMs, + final S3ObjectState s3ObjectState, + final S3ObjectType objectType, + final Integer brokerId, + final List streamsIndex) { + super(objectId, objectSize, objectAddress, applyTimeInMs, createTimeInMs, destroyTimeInMs, s3ObjectState, objectType); + this.objectType = objectType; + this.brokerId = brokerId; + this.streamsIndex = streamsIndex.stream().collect( + Collectors.toMap(S3ObjectStreamIndex::getStreamId, index -> index)); + } + + @Override + public void onCreate(S3ObjectCreateContext createContext) { + super.onCreate(createContext); + if (!(createContext instanceof WALObjectCreateContext)) { + throw new IllegalArgumentException(); + } + WALObjectCreateContext walCreateContext = (WALObjectCreateContext) createContext; + this.streamsIndex = walCreateContext.streamIndexList.stream().collect(Collectors.toMap(S3ObjectStreamIndex::getStreamId, index -> index)); + this.brokerId = walCreateContext.brokerId; + } + + class WALObjectCreateContext extends S3ObjectCreateContext { + + private final List streamIndexList; + private final Integer brokerId; + + public WALObjectCreateContext( + final Long createTimeInMs, + final Long objectSize, + final String objectAddress, + final S3ObjectType objectType, + final List streamIndexList, + final Integer brokerId) { + super(createTimeInMs, objectSize, objectAddress, objectType); + this.streamIndexList = streamIndexList; + this.brokerId = brokerId; + } + } + + public ApiMessageAndVersion toRecord() { + return new ApiMessageAndVersion(new WALObjectRecord() + .setObjectId(objectId) + .setObjectState((byte) s3ObjectState.ordinal()) + .setObjectType((byte) objectType.ordinal()) + .setApplyTimeInMs(applyTimeInMs.get()) + .setCreateTimeInMs(createTimeInMs.get()) + .setDestroyTimeInMs(destroyTimeInMs.get()) + .setObjectSize(objectSize.get()) + .setStreamsIndex( + streamsIndex.values().stream() + .map(S3ObjectStreamIndex::toRecordStreamIndex) + .collect(Collectors.toList())), (short) 0); + } + + public static S3WALObject of(WALObjectRecord record) { + S3WALObject s3WalObject = new S3WALObject( + record.objectId(), record.objectSize(), null, + record.applyTimeInMs(), record.createTimeInMs(), record.destroyTimeInMs(), + S3ObjectState.fromByte(record.objectState()), S3ObjectType.fromByte(record.objectType()), + record.brokerId(), record.streamsIndex().stream().map(S3ObjectStreamIndex::of).collect(Collectors.toList())); + return s3WalObject; + } + + public Integer getBrokerId() { + return brokerId; + } + + public Map getStreamsIndex() { + return streamsIndex; + } + + @Override + public S3ObjectType getObjectType() { + return objectType; + } +} diff --git a/metadata/src/main/resources/common/metadata/RangeRecord.json b/metadata/src/main/resources/common/metadata/RangeRecord.json new file mode 100644 index 0000000000..8721c65752 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/RangeRecord.json @@ -0,0 +1,60 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 25, + "type": "metadata", + "name": "RangeRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The Stream ID of the range" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "0+", + "about": "The epoch of the range" + }, + { + "name": "RangeIndex", + "type": "int32", + "versions": "0+", + "about": "The index of the range" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the range" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the range" + }, + { + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "about": "The Broker which created this range" + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/RemoveRangeRecord.json b/metadata/src/main/resources/common/metadata/RemoveRangeRecord.json new file mode 100644 index 0000000000..dce6c5243c --- /dev/null +++ b/metadata/src/main/resources/common/metadata/RemoveRangeRecord.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 26, + "type": "metadata", + "name": "RemoveRangeRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The Stream ID of the range" + }, + { + "name": "RangeIndex", + "type": "int32", + "versions": "0+", + "about": "The index of the range" + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/RemoveS3StreamObjectRecord.json b/metadata/src/main/resources/common/metadata/RemoveS3StreamObjectRecord.json new file mode 100644 index 0000000000..729a0fc472 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/RemoveS3StreamObjectRecord.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 28, + "type": "metadata", + "name": "RemoveS3StreamObjectRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The Stream ID of the stream in this object" + }, + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The object id of this object" + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/RemoveS3StreamRecord.json b/metadata/src/main/resources/common/metadata/RemoveS3StreamRecord.json new file mode 100644 index 0000000000..30156373a7 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/RemoveS3StreamRecord.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 23, + "type": "metadata", + "name": "RemoveS3StreamRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The ID of the stream to be removed" + } + ] +} diff --git a/metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json b/metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json new file mode 100644 index 0000000000..ae126bd42e --- /dev/null +++ b/metadata/src/main/resources/common/metadata/RemoveWALObjectRecord.json @@ -0,0 +1,36 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 30, + "type": "metadata", + "name": "RemoveWALObjectRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "about": "The broker which owns the object" + }, + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The object id of this object" + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json new file mode 100644 index 0000000000..4ae99fb78d --- /dev/null +++ b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 27, + "type": "metadata", + "name": "S3StreamObjectRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The Stream ID of the stream in this object" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the stream in this object" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the stream in this object" + }, + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The object id of the S3 object" + }, + { + "name": "ObjectSize", + "type": "int64", + "versions": "0+", + "about": "The object size of the S3 object" + }, + { + "name": "ApplyTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be applied timestamp" + }, + { + "name": "CreateTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be created timestamp" + }, + { + "name": "DestroyTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be destroyed timestamp" + }, + { + "name": "ObjectState", + "type": "int8", + "versions": "0+", + "about": "The object state" + }, + { + "name": "ObjectType", + "type": "int8", + "versions": "0+", + "about": "The object type" + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/S3StreamRecord.json b/metadata/src/main/resources/common/metadata/S3StreamRecord.json new file mode 100644 index 0000000000..b4d4a08f31 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/S3StreamRecord.json @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 22, + "type": "metadata", + "name": "S3StreamRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The Stream ID" + }, + { + "name": "Epoch", + "type": "int64", + "versions": "0+", + "about": "The epoch" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the stream" + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/WALObjectRecord.json b/metadata/src/main/resources/common/metadata/WALObjectRecord.json new file mode 100644 index 0000000000..51b0a2316a --- /dev/null +++ b/metadata/src/main/resources/common/metadata/WALObjectRecord.json @@ -0,0 +1,98 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 29, + "type": "metadata", + "name": "WALObjectRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "BrokerId", + "type": "int32", + "versions": "0+", + "about": "The broker which owns the object" + }, + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The object id of the S3 object" + }, + { + "name": "ObjectSize", + "type": "int64", + "versions": "0+", + "about": "The object size of the S3 object" + }, + { + "name": "ApplyTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be applied timestamp" + }, + { + "name": "CreateTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be created timestamp" + }, + { + "name": "DestroyTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be destroyed timestamp" + }, + { + "name": "ObjectState", + "type": "int8", + "versions": "0+", + "about": "The object state" + }, + { + "name": "ObjectType", + "type": "int8", + "versions": "0+", + "about": "The object type" + }, + { + "name": "StreamsIndex", + "type": "[]StreamIndex", + "versions": "0+", + "about": "The streams index in this object", + "fields": [ + { + "name": "StreamId", + "type": "int64", + "versions": "0+", + "about": "The Stream ID of the stream in this object" + }, + { + "name": "StartOffset", + "type": "int64", + "versions": "0+", + "about": "The start offset of the stream in this object" + }, + { + "name": "EndOffset", + "type": "int64", + "versions": "0+", + "about": "The end offset of the stream in this object" + } + ] + } + ] +} \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java index be21a87bd6..0e9afe7240 100644 --- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java @@ -43,7 +43,8 @@ public class MetadataImageTest { ConfigurationsImageTest.IMAGE1, ClientQuotasImageTest.IMAGE1, ProducerIdsImageTest.IMAGE1, - AclsImageTest.IMAGE1); + AclsImageTest.IMAGE1, + S3StreamsMetadataImageTest.IMAGE1); DELTA1 = new MetadataDelta.Builder(). setImage(IMAGE1). @@ -55,6 +56,7 @@ public class MetadataImageTest { RecordTestUtils.replayAll(DELTA1, ClientQuotasImageTest.DELTA1_RECORDS); RecordTestUtils.replayAll(DELTA1, ProducerIdsImageTest.DELTA1_RECORDS); RecordTestUtils.replayAll(DELTA1, AclsImageTest.DELTA1_RECORDS); + RecordTestUtils.replayAll(DELTA1, S3StreamsMetadataImageTest.DELTA1_RECORDS); IMAGE2 = new MetadataImage( new MetadataProvenance(200, 5, 4000), @@ -64,7 +66,8 @@ public class MetadataImageTest { ConfigurationsImageTest.IMAGE2, ClientQuotasImageTest.IMAGE2, ProducerIdsImageTest.IMAGE2, - AclsImageTest.IMAGE2); + AclsImageTest.IMAGE2, + S3StreamsMetadataImageTest.IMAGE2); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java new file mode 100644 index 0000000000..c79b390483 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -0,0 +1,434 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamRecord; +import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.S3StreamObjectRecord; +import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.S3ObjectState; +import org.apache.kafka.metadata.stream.S3ObjectType; +import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(value = 40) +public class S3StreamsMetadataImageTest { + + private static final long KB = 1024; + + private static final long MB = 1024 * KB; + + private static final long GB = 1024 * MB; + private static final long WAL_LOOSE_SIZE = 40 * MB; + + private static final long WAL_MINOR_COMPACT_SIZE = 5 * GB; + + private static final long WAL_MAJOR_COMPACT_SIZE = 320 * GB; + + private static final long STREAM_OBJECT_SIZE = 320 * GB; + + static final S3StreamsMetadataImage IMAGE1; + + static final List DELTA1_RECORDS; + + static final S3StreamsMetadataImage IMAGE2; + + // TODO: complete the test for StreamsMetadataImage + + static { + IMAGE1 = null; + DELTA1_RECORDS = null; + IMAGE2 = null; + } + + @Test + public void testBasicChange() { + List s3StreamMetadataImages = new ArrayList<>(); + Integer brokerId0 = 0; + Integer brokerId1 = 1; + Integer brokerId2 = 2; + + // 1. empty image + S3StreamsMetadataImage image0 = S3StreamsMetadataImage.EMPTY; + + // 2. create stream and create range + Long streamId0 = 0L; + Long streamId1 = 1L; + List records = new ArrayList<>(); + S3StreamRecord streamRecord00 = new S3StreamRecord() + .setStreamId(streamId0) + .setEpoch(1) + .setStartOffset(0L); + records.add(new ApiMessageAndVersion(streamRecord00, (short) 0)); + RangeRecord rangeRecord00 = new RangeRecord() + .setStreamId(streamId0) + .setRangeIndex(0) + .setStartOffset(0L) + .setBrokerId(brokerId1) + .setEpoch(1); + records.add(new ApiMessageAndVersion(rangeRecord00, (short) 0)); + S3StreamRecord streamRecord01 = new S3StreamRecord() + .setStreamId(streamId1) + .setEpoch(1) + .setStartOffset(0L); + records.add(new ApiMessageAndVersion(streamRecord01, (short) 0)); + RangeRecord rangeRecord01 = new RangeRecord() + .setStreamId(streamId1) + .setRangeIndex(0) + .setStartOffset(0L) + .setBrokerId(brokerId1) + .setEpoch(1); + records.add(new ApiMessageAndVersion(rangeRecord01, (short) 0)); + S3StreamsMetadataDelta delta0 = new S3StreamsMetadataDelta(image0); + RecordTestUtils.replayAll(delta0, records); + S3StreamsMetadataImage image1 = delta0.apply(); + + // check the image1 + assertEquals(2, image1.getStreamsMetadata().size()); + S3StreamMetadataImage s3StreamMetadataImage1 = image1.getStreamsMetadata().get(streamId0); + assertNotNull(s3StreamMetadataImage1); + assertEquals(1, s3StreamMetadataImage1.getRanges().size()); + assertEquals(1, s3StreamMetadataImage1.getEpoch()); + assertEquals(0, s3StreamMetadataImage1.getStartOffset()); + RangeMetadata rangeMetadata1 = s3StreamMetadataImage1.getRanges().get(0); + assertNotNull(rangeMetadata1); + assertEquals(RangeMetadata.of(rangeRecord00), rangeMetadata1); + + S3StreamMetadataImage s3StreamMetadataImage11 = image1.getStreamsMetadata().get(streamId1); + assertNotNull(s3StreamMetadataImage11); + assertEquals(1, s3StreamMetadataImage11.getRanges().size()); + assertEquals(1, s3StreamMetadataImage11.getEpoch()); + assertEquals(0, s3StreamMetadataImage11.getStartOffset()); + RangeMetadata rangeMetadata11 = s3StreamMetadataImage11.getRanges().get(0); + assertNotNull(rangeMetadata11); + assertEquals(RangeMetadata.of(rangeRecord01), rangeMetadata11); + + // 3. apply WALObject0, WALObject1, WALObject2 + WALObjectRecord walObjectRecord0 = new WALObjectRecord() + .setBrokerId(brokerId0) + .setObjectId(0L) + .setApplyTimeInMs(System.currentTimeMillis()) + .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) + .setObjectState((byte) S3ObjectState.APPLIED.ordinal()); + WALObjectRecord walObjectRecord1 = new WALObjectRecord() + .setBrokerId(brokerId1) + .setObjectId(1L) + .setApplyTimeInMs(System.currentTimeMillis()) + .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) + .setObjectState((byte) S3ObjectState.APPLIED.ordinal()); + WALObjectRecord walObjectRecord2 = new WALObjectRecord() + .setBrokerId(brokerId1) + .setObjectId(2L) + .setApplyTimeInMs(System.currentTimeMillis()) + .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) + .setObjectState((byte) S3ObjectState.APPLIED.ordinal()); + records.clear(); + records.add(new ApiMessageAndVersion(walObjectRecord0, (short) 0)); + records.add(new ApiMessageAndVersion(walObjectRecord1, (short) 0)); + records.add(new ApiMessageAndVersion(walObjectRecord2, (short) 0)); + S3StreamsMetadataDelta delta1 = new S3StreamsMetadataDelta(image1); + RecordTestUtils.replayAll(delta1, records); + S3StreamsMetadataImage image2 = delta1.apply(); + + // check the image2 + assertEquals(2, image2.getBrokerWALMetadata().size()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage20 = image2.getBrokerWALMetadata().get(brokerId0); + assertNotNull(brokerS3WALMetadataImage20); + assertEquals(1, brokerS3WALMetadataImage20.getWalObjects().size()); + S3WALObject s3WalObject0 = brokerS3WALMetadataImage20.getWalObjects().get(0); + assertEquals(brokerId0, s3WalObject0.getBrokerId()); + assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject0.getObjectType()); + assertEquals(S3ObjectState.APPLIED, s3WalObject0.getS3ObjectState()); + assertEquals(0L, s3WalObject0.getObjectId()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage21 = image2.getBrokerWALMetadata().get(brokerId1); + assertNotNull(brokerS3WALMetadataImage21); + assertEquals(2, brokerS3WALMetadataImage21.getWalObjects().size()); + S3WALObject s3WalObject1 = brokerS3WALMetadataImage21.getWalObjects().get(0); + assertEquals(brokerId1, s3WalObject1.getBrokerId()); + assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject1.getObjectType()); + assertEquals(S3ObjectState.APPLIED, s3WalObject1.getS3ObjectState()); + assertEquals(1L, s3WalObject1.getObjectId()); + S3WALObject s3WalObject2 = brokerS3WALMetadataImage21.getWalObjects().get(1); + assertEquals(brokerId1, s3WalObject2.getBrokerId()); + assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject2.getObjectType()); + assertEquals(S3ObjectState.APPLIED, s3WalObject2.getS3ObjectState()); + assertEquals(2L, s3WalObject2.getObjectId()); + + // 4. create WALObject1, WALObject2, mark delete WALObject0 + List streamIndicesInWALObject1 = Arrays.asList( + new S3ObjectStreamIndex(streamId0, 0L, 100L), + new S3ObjectStreamIndex(streamId1, 0L, 200L) + ); + WALObjectRecord walObjectRecord11 = new WALObjectRecord() + .setBrokerId(brokerId1) + .setObjectId(1L) + .setObjectSize(WAL_LOOSE_SIZE) + .setCreateTimeInMs(System.currentTimeMillis()) + .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) + .setStreamsIndex(streamIndicesInWALObject1.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect( + Collectors.toList())) + .setObjectState((byte) S3ObjectState.CREATED.ordinal()); + + List streamIndicesInWALObject2 = Arrays.asList( + new S3ObjectStreamIndex(streamId0, 101L, 200L), + new S3ObjectStreamIndex(streamId1, 201L, 300L) + ); + WALObjectRecord walObjectRecord21 = new WALObjectRecord() + .setBrokerId(brokerId1) + .setObjectId(2L) + .setObjectSize(WAL_LOOSE_SIZE) + .setCreateTimeInMs(System.currentTimeMillis()) + .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) + .setStreamsIndex(streamIndicesInWALObject2.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect( + Collectors.toList())) + .setObjectState((byte) S3ObjectState.CREATED.ordinal()); + WALObjectRecord walObjectRecord01 = new WALObjectRecord() + .setBrokerId(brokerId0) + .setObjectId(0L) + .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) + .setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()); + records.clear(); + records.add(new ApiMessageAndVersion(walObjectRecord11, (short) 0)); + records.add(new ApiMessageAndVersion(walObjectRecord21, (short) 0)); + records.add(new ApiMessageAndVersion(walObjectRecord01, (short) 0)); + S3StreamsMetadataDelta delta2 = new S3StreamsMetadataDelta(image2); + RecordTestUtils.replayAll(delta2, records); + S3StreamsMetadataImage image3 = delta2.apply(); + + // check the image3 + assertEquals(2, image3.getBrokerWALMetadata().size()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage30 = image3.getBrokerWALMetadata().get(brokerId0); + assertNotNull(brokerS3WALMetadataImage30); + assertEquals(1, brokerS3WALMetadataImage30.getWalObjects().size()); + S3WALObject s3WalObject01 = brokerS3WALMetadataImage30.getWalObjects().get(0); + assertEquals(brokerId0, s3WalObject01.getBrokerId()); + assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject01.getObjectType()); + assertEquals(S3ObjectState.MARK_DESTROYED, s3WalObject01.getS3ObjectState()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage31 = image3.getBrokerWALMetadata().get(brokerId1); + assertNotNull(brokerS3WALMetadataImage31); + assertEquals(2, brokerS3WALMetadataImage31.getWalObjects().size()); + S3WALObject s3WalObject11 = brokerS3WALMetadataImage31.getWalObjects().get(0); + assertEquals(brokerId1, s3WalObject11.getBrokerId()); + assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject11.getObjectType()); + assertEquals(S3ObjectState.CREATED, s3WalObject11.getS3ObjectState()); + Map streamIndexVerify1 = s3WalObject11.getStreamsIndex(); + assertEquals(2, streamIndexVerify1.size()); + assertEquals(0L, streamIndexVerify1.get(streamId0).getStartOffset()); + assertEquals(100L, streamIndexVerify1.get(streamId0).getEndOffset()); + assertEquals(0L, streamIndexVerify1.get(streamId1).getStartOffset()); + assertEquals(200L, streamIndexVerify1.get(streamId1).getEndOffset()); + S3WALObject s3WalObject21 = brokerS3WALMetadataImage31.getWalObjects().get(1); + assertEquals(brokerId1, s3WalObject21.getBrokerId()); + assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject21.getObjectType()); + assertEquals(S3ObjectState.CREATED, s3WalObject21.getS3ObjectState()); + Map streamIndexVerify2 = s3WalObject21.getStreamsIndex(); + assertEquals(2, streamIndexVerify2.size()); + assertEquals(101L, streamIndexVerify2.get(streamId0).getStartOffset()); + assertEquals(200L, streamIndexVerify2.get(streamId0).getEndOffset()); + assertEquals(201L, streamIndexVerify2.get(streamId1).getStartOffset()); + assertEquals(300L, streamIndexVerify2.get(streamId1).getEndOffset()); + + // 5. destroy WALObject0, mark delete WALObject1 and WALObject2, compact these to WALObject3 + RemoveWALObjectRecord removeWALObjectRecord0 = new RemoveWALObjectRecord() + .setObjectId(0L) + .setBrokerId(brokerId0); + WALObjectRecord walObjectRecord12 = new WALObjectRecord() + .setObjectId(1L) + .setBrokerId(brokerId1) + .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) + .setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()); + WALObjectRecord walObjectRecord22 = new WALObjectRecord() + .setObjectId(2L) + .setBrokerId(brokerId1) + .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) + .setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()); + List streamIndicesInWALObject3 = Arrays.asList( + new S3ObjectStreamIndex(streamId0, 0L, 200L), + new S3ObjectStreamIndex(streamId1, 0L, 300L) + ); + WALObjectRecord walObjectRecord3 = new WALObjectRecord() + .setObjectId(3L) + .setBrokerId(brokerId1) + .setObjectType((byte) S3ObjectType.WAL_MINOR.ordinal()) + .setCreateTimeInMs(System.currentTimeMillis()) + .setObjectState((byte) S3ObjectState.CREATED.ordinal()) + .setApplyTimeInMs(System.currentTimeMillis()) + .setObjectSize(WAL_MINOR_COMPACT_SIZE) + .setStreamsIndex(streamIndicesInWALObject3.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect( + Collectors.toList())); + records.clear(); + records.add(new ApiMessageAndVersion(removeWALObjectRecord0, (short) 0)); + records.add(new ApiMessageAndVersion(walObjectRecord12, (short) 0)); + records.add(new ApiMessageAndVersion(walObjectRecord22, (short) 0)); + records.add(new ApiMessageAndVersion(walObjectRecord3, (short) 0)); + S3StreamsMetadataDelta delta3 = new S3StreamsMetadataDelta(image3); + RecordTestUtils.replayAll(delta3, records); + S3StreamsMetadataImage image4 = delta3.apply(); + + // check the image4 + assertEquals(2, image4.getBrokerWALMetadata().size()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage40 = image4.getBrokerWALMetadata().get(brokerId0); + assertNotNull(brokerS3WALMetadataImage40); + assertEquals(0, brokerS3WALMetadataImage40.getWalObjects().size()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage41 = image4.getBrokerWALMetadata().get(brokerId1); + assertNotNull(brokerS3WALMetadataImage41); + assertEquals(3, brokerS3WALMetadataImage41.getWalObjects().size()); + S3WALObject s3WalObject12 = brokerS3WALMetadataImage41.getWalObjects().get(0); + assertEquals(brokerId1, s3WalObject12.getBrokerId()); + assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject12.getObjectType()); + assertEquals(S3ObjectState.MARK_DESTROYED, s3WalObject12.getS3ObjectState()); + S3WALObject s3WalObject22 = brokerS3WALMetadataImage41.getWalObjects().get(1); + assertEquals(brokerId1, s3WalObject22.getBrokerId()); + assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject22.getObjectType()); + assertEquals(S3ObjectState.MARK_DESTROYED, s3WalObject22.getS3ObjectState()); + S3WALObject s3WalObject3 = brokerS3WALMetadataImage41.getWalObjects().get(2); + assertEquals(brokerId1, s3WalObject3.getBrokerId()); + assertEquals(S3ObjectType.WAL_MINOR, s3WalObject3.getObjectType()); + assertEquals(S3ObjectState.CREATED, s3WalObject3.getS3ObjectState()); + assertEquals(3L, s3WalObject3.getObjectId()); + Map streamIndexVerify3 = s3WalObject3.getStreamsIndex(); + assertEquals(2, streamIndexVerify3.size()); + assertEquals(0L, streamIndexVerify3.get(streamId0).getStartOffset()); + assertEquals(200L, streamIndexVerify3.get(streamId0).getEndOffset()); + assertEquals(0L, streamIndexVerify3.get(streamId1).getStartOffset()); + assertEquals(300L, streamIndexVerify3.get(streamId1).getEndOffset()); + + // 6. split WALObject3 by streamId to StreamObject4 and StreamObject5 + S3ObjectStreamIndex s3ObjectStreamIndex4 = new S3ObjectStreamIndex(streamId0, 0L, 200L); + S3ObjectStreamIndex s3ObjectStreamIndex5 = new S3ObjectStreamIndex(streamId1, 0L, 300L); + S3StreamObjectRecord streamObjectRecord4 = new S3StreamObjectRecord() + .setObjectId(4L) + .setStreamId(streamId0) + .setObjectSize(STREAM_OBJECT_SIZE) + .setObjectType((byte) S3ObjectType.STREAM.ordinal()) + .setCreateTimeInMs(System.currentTimeMillis()) + .setStartOffset(s3ObjectStreamIndex4.getStartOffset()) + .setEndOffset(s3ObjectStreamIndex4.getEndOffset()); + S3StreamObjectRecord streamObjectRecord5 = new S3StreamObjectRecord() + .setObjectId(5L) + .setStreamId(streamId1) + .setObjectSize(STREAM_OBJECT_SIZE) + .setObjectType((byte) S3ObjectType.STREAM.ordinal()) + .setCreateTimeInMs(System.currentTimeMillis()) + .setStartOffset(s3ObjectStreamIndex5.getStartOffset()) + .setEndOffset(s3ObjectStreamIndex5.getEndOffset()); + RemoveWALObjectRecord removeWALObjectRecord3 = new RemoveWALObjectRecord() + .setObjectId(3L) + .setBrokerId(brokerId1); + records.clear(); + records.add(new ApiMessageAndVersion(streamObjectRecord4, (short) 0)); + records.add(new ApiMessageAndVersion(streamObjectRecord5, (short) 0)); + records.add(new ApiMessageAndVersion(removeWALObjectRecord3, (short) 0)); + S3StreamsMetadataDelta delta4 = new S3StreamsMetadataDelta(image4); + RecordTestUtils.replayAll(delta4, records); + S3StreamsMetadataImage image5 = delta4.apply(); + + // check the image5 + assertEquals(2, image5.getBrokerWALMetadata().size()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage50 = image5.getBrokerWALMetadata().get(brokerId0); + assertNotNull(brokerS3WALMetadataImage50); + assertEquals(0, brokerS3WALMetadataImage50.getWalObjects().size()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage51 = image5.getBrokerWALMetadata().get(brokerId1); + assertNotNull(brokerS3WALMetadataImage51); + assertEquals(0, brokerS3WALMetadataImage51.getWalObjects().size()); + assertEquals(2, image5.getStreamsMetadata().size()); + + S3StreamMetadataImage s3StreamMetadataImage50 = image5.getStreamsMetadata().get(streamId0); + assertNotNull(s3StreamMetadataImage50); + assertEquals(1, s3StreamMetadataImage50.getRanges().size()); + assertEquals(1, s3StreamMetadataImage50.getEpoch()); + assertEquals(0, s3StreamMetadataImage50.getStartOffset()); + assertEquals(1, s3StreamMetadataImage50.getStreamObjects()); + S3StreamObject s3StreamObject4 = s3StreamMetadataImage50.getStreamObjects().get(0); + assertEquals(4L, s3StreamObject4.getObjectId()); + assertEquals(STREAM_OBJECT_SIZE, s3StreamObject4.getObjectSize()); + assertEquals(S3ObjectType.STREAM, s3StreamObject4.getObjectType()); + assertEquals(S3ObjectState.CREATED, s3StreamObject4.getS3ObjectState()); + assertEquals(s3ObjectStreamIndex4, s3StreamObject4.getStreamIndex()); + + S3StreamMetadataImage s3StreamMetadataImage51 = image5.getStreamsMetadata().get(streamId1); + assertNotNull(s3StreamMetadataImage51); + assertEquals(1, s3StreamMetadataImage51.getRanges().size()); + assertEquals(1, s3StreamMetadataImage51.getEpoch()); + assertEquals(0, s3StreamMetadataImage51.getStartOffset()); + assertEquals(1, s3StreamMetadataImage51.getStreamObjects()); + S3StreamObject s3StreamObject5 = s3StreamMetadataImage51.getStreamObjects().get(0); + assertEquals(5L, s3StreamObject5.getObjectId()); + assertEquals(STREAM_OBJECT_SIZE, s3StreamObject5.getObjectSize()); + assertEquals(S3ObjectType.STREAM, s3StreamObject5.getObjectType()); + assertEquals(S3ObjectState.CREATED, s3StreamObject5.getS3ObjectState()); + assertEquals(s3ObjectStreamIndex5, s3StreamObject5.getStreamIndex()); + + // 7. remove streamObject4 and remove stream1 + RemoveS3StreamObjectRecord removeStreamObjectRecord4 = new RemoveS3StreamObjectRecord() + .setObjectId(4L) + .setStreamId(streamId0); + RemoveS3StreamRecord removeStreamRecord = new RemoveS3StreamRecord() + .setStreamId(streamId1); + records.clear(); + records.add(new ApiMessageAndVersion(removeStreamObjectRecord4, (short) 0)); + records.add(new ApiMessageAndVersion(removeStreamRecord, (short) 0)); + S3StreamsMetadataDelta delta5 = new S3StreamsMetadataDelta(image5); + RecordTestUtils.replayAll(delta5, records); + S3StreamsMetadataImage image6 = delta5.apply(); + + // check the image6 + assertEquals(2, image6.getBrokerWALMetadata().size()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage60 = image6.getBrokerWALMetadata().get(brokerId0); + assertNotNull(brokerS3WALMetadataImage60); + assertEquals(0, brokerS3WALMetadataImage60.getWalObjects().size()); + BrokerS3WALMetadataImage brokerS3WALMetadataImage61 = image6.getBrokerWALMetadata().get(brokerId1); + assertNotNull(brokerS3WALMetadataImage61); + assertEquals(0, brokerS3WALMetadataImage61.getWalObjects().size()); + + assertEquals(1, image6.getStreamsMetadata().size()); + S3StreamMetadataImage s3StreamMetadataImage60 = image6.getStreamsMetadata().get(streamId0); + assertNotNull(s3StreamMetadataImage60); + assertEquals(1, s3StreamMetadataImage60.getRanges().size()); + assertEquals(0, s3StreamMetadataImage60.getStreamObjects().size()); + } + + + private void testToImageAndBack(S3StreamsMetadataImage image) { + RecordListWriter writer = new RecordListWriter(); + image.write(writer, new ImageWriterOptions.Builder().build()); + S3StreamsMetadataDelta delta = new S3StreamsMetadataDelta(S3StreamsMetadataImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + S3StreamsMetadataImage newImage = delta.apply(); + assertEquals(image, newImage); + } +}