diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala index 87b94e69c1..95a7719018 100644 --- a/core/src/main/scala/kafka/server/ControllerServer.scala +++ b/core/src/main/scala/kafka/server/ControllerServer.scala @@ -39,6 +39,7 @@ import org.apache.kafka.common.config.ConfigException import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer import org.apache.kafka.metadata.bootstrap.BootstrapMetadata import org.apache.kafka.metadata.migration.{KRaftMigrationDriver, LegacyPropagator} +import org.apache.kafka.metadata.stream.S3Config import org.apache.kafka.raft.RaftConfig import org.apache.kafka.server.authorizer.Authorizer import org.apache.kafka.server.common.ApiMessageAndVersion @@ -206,7 +207,7 @@ class ControllerServer( } val maxIdleIntervalNs = config.metadataMaxIdleIntervalNs.fold(OptionalLong.empty)(OptionalLong.of) - + val s3Config = new S3Config(config.s3Region, config.s3Bucket) new QuorumController.Builder(config.nodeId, sharedServer.metaProps.clusterId). setTime(time). setThreadNamePrefix(threadNamePrefix). @@ -227,6 +228,7 @@ class ControllerServer( setBootstrapMetadata(bootstrapMetadata). setFatalFaultHandler(sharedServer.quorumControllerFaultHandler). setZkMigrationEnabled(config.migrationEnabled) + .setS3Config(s3Config) } authorizer match { case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a) diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index ff13bea308..0d0ca18629 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -669,6 +669,14 @@ object KafkaConfig { val ElasticStreamNamespaceDoc = "The kafka cluster in which elastic stream namespace which should conflict with other kafka cluster sharing the same elastic stream." // elastic stream inject end + // Kafka on S3 inject start + val S3RegionProp = "s3.region"; + val S3BucketProp = "s3.bucket"; + + val S3RegionDoc = "Specifies the S3 region, ex. us-east-1.\n" + val S3BucketDoc = "Specifies the S3 bucket, ex. my-bucket.\n" + + // Kafka on S3 inject end /* Documentation */ /** ********* Zookeeper Configuration ***********/ val ZkConnectDoc = "Specifies the ZooKeeper connection string in the form hostname:port where host and port are the " + @@ -1461,6 +1469,11 @@ object KafkaConfig { .define(ElasticStreamKvEndpointProp, STRING, null, HIGH, ElasticStreamKvEndpointDoc) .define(ElasticStreamNamespaceProp, STRING, null, MEDIUM, ElasticStreamNamespaceDoc) // elastic stream inject end + + // Kafka on S3 inject start + .define(S3RegionProp, STRING, null, HIGH, S3RegionDoc) + .define(S3BucketProp, STRING, null, HIGH, S3BucketDoc) + // Kafka on S3 inject end } /** ********* Remote Log Management Configuration *********/ @@ -1569,7 +1582,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami // We make it part of each instance rather than the object to facilitate testing. private val zkClientConfigViaSystemProperties = new ZKClientConfig() - override def originals: util.Map[String, AnyRef] = + override def /** ********* Raft Quorum Configuration *********/originals: util.Map[String, AnyRef] = if (this eq currentConfig) super.originals else currentConfig.originals override def values: util.Map[String, _] = if (this eq currentConfig) super.values else currentConfig.values @@ -1992,6 +2005,12 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami val elasticStreamNamespace = getString(KafkaConfig.ElasticStreamNamespaceProp) // elastic stream inject end + // Kafka on S3 inject start + /** ********* Kafka on S3 Configuration *********/ + val s3Region = getString(KafkaConfig.S3RegionProp) + val s3Bucket = getString(KafkaConfig.S3BucketProp) + // Kafka on S3 inject end + def addReconfigurable(reconfigurable: Reconfigurable): Unit = { dynamicConfig.addReconfigurable(reconfigurable) } diff --git a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala index 8b0cdd640a..84de4814b8 100644 --- a/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala +++ b/core/src/test/scala/unit/kafka/server/MetadataCacheTest.scala @@ -72,7 +72,8 @@ object MetadataCacheTest { image.clientQuotas(), image.producerIds(), image.acls(), - image.streamsMetadata()) + image.streamsMetadata(), + image.objectsMetadata()) val delta = new MetadataDelta.Builder().setImage(partialImage).build() def toRecord(broker: UpdateMetadataBroker): RegisterBrokerRecord = { diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala index 040481546d..37a85bda62 100644 --- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala +++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala @@ -54,7 +54,7 @@ import org.apache.kafka.common.requests._ import org.apache.kafka.common.security.auth.KafkaPrincipal import org.apache.kafka.common.utils.{LogContext, Time, Utils} import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, TopicPartition, Uuid} -import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, TopicsDelta, TopicsImage, S3StreamsMetadataImage} +import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest, ConfigurationsImage, FeaturesImage, MetadataImage, MetadataProvenance, ProducerIdsImage, S3ObjectsImage, S3StreamsMetadataImage, TopicsDelta, TopicsImage} import org.apache.kafka.metadata.LeaderConstants.NO_LEADER import org.apache.kafka.metadata.LeaderRecoveryState import org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0 @@ -4129,7 +4129,8 @@ class ReplicaManagerTest { ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, - S3StreamsMetadataImage.EMPTY + S3StreamsMetadataImage.EMPTY, + S3ObjectsImage.EMPTY, ) } diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index f880de07b6..dc8941eb03 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -73,6 +73,8 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.controller.stream.S3ObjectControlManager; +import org.apache.kafka.controller.stream.StreamControlManager; import org.apache.kafka.metadata.BrokerHeartbeatReply; import org.apache.kafka.metadata.BrokerRegistrationReply; import org.apache.kafka.metadata.FinalizedControllerFeatures; @@ -83,6 +85,7 @@ import org.apache.kafka.metadata.migration.ZkRecordConsumer; import org.apache.kafka.metadata.placement.ReplicaPlacer; import org.apache.kafka.metadata.placement.StripedReplicaPlacer; +import org.apache.kafka.metadata.stream.S3Config; import org.apache.kafka.queue.EventQueue.EarliestDeadlineFunction; import org.apache.kafka.queue.EventQueue; import org.apache.kafka.queue.KafkaEventQueue; @@ -180,6 +183,12 @@ static public class Builder { private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH; private boolean zkMigrationEnabled = false; + // Kafka on S3 inject start + + private S3Config s3Config; + + // Kafka on S3 inject end + public Builder(int nodeId, String clusterId) { this.nodeId = nodeId; this.clusterId = clusterId; @@ -299,6 +308,15 @@ public Builder setZkMigrationEnabled(boolean zkMigrationEnabled) { return this; } + // Kafka on S3 inject start + + public Builder setS3Config(S3Config s3Config) { + this.s3Config = s3Config; + return this; + } + + // Kafka on S3 inject end + @SuppressWarnings("unchecked") public QuorumController build() throws Exception { if (raftClient == null) { @@ -349,7 +367,8 @@ public QuorumController build() throws Exception { staticConfig, bootstrapMetadata, maxRecordsPerBatch, - zkMigrationEnabled + zkMigrationEnabled, + s3Config ); } catch (Exception e) { Utils.closeQuietly(queue, "event queue"); @@ -1644,6 +1663,24 @@ private enum ImbalanceSchedule { */ private final int maxRecordsPerBatch; + // Kafka on S3 inject start + + private final S3Config s3Config; + + /** + * An object which stores the controller's view of the S3 objects. + * This must be accessed only by the event queue thread. + */ + private final S3ObjectControlManager s3ObjectControlManager; + + /** + * An object which stores the controller's view of the Streams. + * This must be accessed only by the event queue thread. + */ + private final StreamControlManager streamControlManager; + + // Kafka on S3 inject end + private QuorumController( FaultHandler fatalFaultHandler, LogContext logContext, @@ -1668,7 +1705,8 @@ private QuorumController( Map staticConfig, BootstrapMetadata bootstrapMetadata, int maxRecordsPerBatch, - boolean zkMigrationEnabled + boolean zkMigrationEnabled, + S3Config s3Config ) { this.fatalFaultHandler = fatalFaultHandler; this.logContext = logContext; @@ -1744,6 +1782,12 @@ private QuorumController( this.curClaimEpoch = -1; this.needToCompleteAuthorizerLoad = authorizer.isPresent(); this.zkRecordConsumer = new MigrationRecordConsumer(); + + // Kafka on S3 inject start + this.s3Config = s3Config; + this.s3ObjectControlManager = new S3ObjectControlManager(snapshotRegistry, logContext, clusterId, s3Config); + this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager); + // Kafka on S3 inject end updateWriteOffset(-1); resetToEmptyState(); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 044acc7f76..21d06533f4 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -19,8 +19,13 @@ import java.util.LinkedList; import java.util.Queue; +import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; +import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.stream.S3ObjectKeyGeneratorManager.GenerateContextV0; +import org.apache.kafka.metadata.stream.S3Config; import org.apache.kafka.metadata.stream.S3Object; +import org.apache.kafka.metadata.stream.S3ObjectState; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.slf4j.Logger; @@ -34,20 +39,28 @@ public class S3ObjectControlManager { private final TimelineHashMap objectsMetadata; + private final String clusterId; + + private final S3Config config; + /** * The objectId of the next object to be applied. (start from 0) */ private Long nextApplyObjectId = 0L; - // TODO: add timer task to periodically check if there are objects to be destroyed or created + // TODO: add timer task to periodically check if there are objects to be destroyed or expired private final Queue appliedObjects; private final Queue markDestroyedObjects; public S3ObjectControlManager( SnapshotRegistry snapshotRegistry, - LogContext logContext) { + LogContext logContext, + String clusterId, + S3Config config) { this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(S3ObjectControlManager.class); + this.clusterId = clusterId; + this.config = config; this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0); this.appliedObjects = new LinkedList<>(); this.markDestroyedObjects = new LinkedList<>(); @@ -56,5 +69,18 @@ public S3ObjectControlManager( public Long appliedObjectNum() { return nextApplyObjectId; } + + public void replay(S3ObjectRecord record) { + GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId()); + String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx); + S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey, + record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), S3ObjectState.fromByte(record.objectState())); + objectsMetadata.put(record.objectId(), object); + } + + public void replay(RemoveS3ObjectRecord record) { + objectsMetadata.remove(record.objectId()); + } + } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectKeyGeneratorManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectKeyGeneratorManager.java new file mode 100644 index 0000000000..220ec223c4 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectKeyGeneratorManager.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.stream; + +/** + * The S3ObjectKeyGeneratorManager manages all S3Object + */ +public final class S3ObjectKeyGeneratorManager { + + public static class GenerateContext { + protected Long objectId; + protected GenerateContext(Long objectId) { + this.objectId = objectId; + } + } + interface S3ObjectKeyGenerator { + String generate(GenerateContext context); + } + public static S3ObjectKeyGenerator getByVersion(int version) { + switch (version) { + case 0: return generatorV0; + default: throw new IllegalArgumentException("Unsupported version " + version); + } + } + + public static class GenerateContextV0 extends GenerateContext { + private String clusterName; + + GenerateContextV0(String clusterName, Long objectId) { + super(objectId); + this.clusterName = clusterName; + } + } + + static S3ObjectKeyGenerator generatorV0 = (GenerateContext ctx) -> { + if (!(ctx instanceof GenerateContextV0)) { + throw new IllegalArgumentException("Unsupported context " + ctx.getClass().getName()); + } + GenerateContextV0 ctx0 = (GenerateContextV0) ctx; + return String.format("%s/%s/%d", ctx0.objectId.hashCode(), ctx0.clusterName, ctx0.objectId); + }; +} diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java index 5315a41524..2ef674cc54 100644 --- a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataDelta.java @@ -28,28 +28,32 @@ public class BrokerS3WALMetadataDelta { private final BrokerS3WALMetadataImage image; - private final Set changedS3WALObjects = new HashSet<>(); + private final Set addedS3WALObjects = new HashSet<>(); - private final Set removedS3WALObjects = new HashSet<>(); + private final Set removedS3WALObjects = new HashSet<>(); public BrokerS3WALMetadataDelta(BrokerS3WALMetadataImage image) { this.image = image; } public void replay(WALObjectRecord record) { - changedS3WALObjects.add(S3WALObject.of(record)); + addedS3WALObjects.add(S3WALObject.of(record)); + // new add or update, so remove from removedObjects + removedS3WALObjects.remove(record.objectId()); } public void replay(RemoveWALObjectRecord record) { - removedS3WALObjects.add(new S3WALObject(record.objectId())); + removedS3WALObjects.add(record.objectId()); + // new remove, so remove from addedObjects + addedS3WALObjects.remove(record.objectId()); } public BrokerS3WALMetadataImage apply() { List newS3WALObjects = new ArrayList<>(image.getWalObjects()); + // add all changed WAL objects + newS3WALObjects.addAll(addedS3WALObjects); // remove all removed WAL objects newS3WALObjects.removeAll(removedS3WALObjects); - // add all changed WAL objects - newS3WALObjects.addAll(changedS3WALObjects); return new BrokerS3WALMetadataImage(image.getBrokerId(), newS3WALObjects); } diff --git a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java index a5a48a40ec..e99a01b22b 100644 --- a/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/BrokerS3WALMetadataImage.java @@ -25,6 +25,7 @@ import org.apache.kafka.image.writer.ImageWriterOptions; public class BrokerS3WALMetadataImage { + public static final BrokerS3WALMetadataImage EMPTY = new BrokerS3WALMetadataImage(-1, List.of()); private final Integer brokerId; private final List s3WalObjects; diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java index 5fc776d9ed..8ad7612cdc 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataDelta.java @@ -31,10 +31,12 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord; import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.RemoveS3StreamRecord; import org.apache.kafka.common.metadata.RemoveTopicRecord; import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.metadata.TopicRecord; @@ -80,8 +82,13 @@ public MetadataDelta build() { private AclsDelta aclsDelta = null; + // Kafka on S3 inject start private S3StreamsMetadataDelta s3StreamsMetadataDelta = null; + private S3ObjectsDelta s3ObjectsDelta = null; + + // Kafka on S3 inject end + public MetadataDelta(MetadataImage image) { this.image = image; } @@ -155,6 +162,8 @@ public AclsDelta getOrCreateAclsDelta() { return aclsDelta; } + + // Kafka on S3 inject start public S3StreamsMetadataDelta streamMetadataDelta() { return s3StreamsMetadataDelta; } @@ -166,6 +175,19 @@ public S3StreamsMetadataDelta getOrCreateStreamsMetadataDelta() { return s3StreamsMetadataDelta; } + public S3ObjectsDelta objectsMetadataDelta() { + return s3ObjectsDelta; + } + + public S3ObjectsDelta getOrCreateObjectsMetadataDelta() { + if (s3ObjectsDelta == null) { + s3ObjectsDelta = new S3ObjectsDelta(image.objectsMetadata()); + } + return s3ObjectsDelta; + } + + // Kafka on S3 inject end + public Optional metadataVersionChanged() { if (featuresDelta == null) { return Optional.empty(); @@ -255,6 +277,12 @@ public void replay(ApiMessage record) { case REMOVE_WALOBJECT_RECORD: replay((RemoveWALObjectRecord) record); break; + case S3_OBJECT_RECORD: + replay((S3ObjectRecord) record); + break; + case REMOVE_S3_OBJECT_RECORD: + replay((RemoveS3ObjectRecord) record); + break; // Kafka on S3 inject end default: throw new RuntimeException("Unknown metadata record type " + type); @@ -331,6 +359,8 @@ public void replay(RemoveAccessControlEntryRecord record) { getOrCreateAclsDelta().replay(record); } + // Kafka on S3 inject start + public void replay(S3StreamRecord record) { getOrCreateStreamsMetadataDelta().replay(record); } @@ -363,6 +393,16 @@ public void replay(RemoveWALObjectRecord record) { getOrCreateStreamsMetadataDelta().replay(record); } + public void replay(S3ObjectRecord record) { + getOrCreateObjectsMetadataDelta().replay(record); + } + + public void replay(RemoveS3ObjectRecord record) { + getOrCreateObjectsMetadataDelta().replay(record); + } + + // Kafka on S3 inject end + /** * Create removal deltas for anything which was in the base image, but which was not * referenced in the snapshot records we just applied. @@ -420,12 +460,21 @@ public MetadataImage apply(MetadataProvenance provenance) { } else { newAcls = aclsDelta.apply(); } + + // Kafka on S3 inject start S3StreamsMetadataImage newStreamMetadata; if (s3StreamsMetadataDelta == null) { newStreamMetadata = image.streamsMetadata(); } else { newStreamMetadata = s3StreamsMetadataDelta.apply(); } + S3ObjectsImage newS3ObjectsMetadata; + if (s3ObjectsDelta == null) { + newS3ObjectsMetadata = image.objectsMetadata(); + } else { + newS3ObjectsMetadata = s3ObjectsDelta.apply(); + } + // Kafka on S3 inject end return new MetadataImage( provenance, newFeatures, @@ -435,7 +484,8 @@ public MetadataImage apply(MetadataProvenance provenance) { newClientQuotas, newProducerIds, newAcls, - newStreamMetadata + newStreamMetadata, + newS3ObjectsMetadata ); } @@ -450,6 +500,7 @@ public String toString() { ", producerIdsDelta=" + producerIdsDelta + ", aclsDelta=" + aclsDelta + ", streamMetadataDelta=" + s3StreamsMetadataDelta + + ", objectsMetadataDelta=" + s3ObjectsDelta + ')'; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java index 0df8b32f5d..744d6f041a 100644 --- a/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/MetadataImage.java @@ -39,7 +39,8 @@ public final class MetadataImage { ClientQuotasImage.EMPTY, ProducerIdsImage.EMPTY, AclsImage.EMPTY, - S3StreamsMetadataImage.EMPTY); + S3StreamsMetadataImage.EMPTY, + S3ObjectsImage.EMPTY); private final MetadataProvenance provenance; @@ -61,6 +62,8 @@ public final class MetadataImage { private final S3StreamsMetadataImage streamMetadata; + private final S3ObjectsImage objectsMetadata; + // Kafka on S3 inject end public MetadataImage( @@ -72,7 +75,8 @@ public MetadataImage( ClientQuotasImage clientQuotas, ProducerIdsImage producerIds, AclsImage acls, - S3StreamsMetadataImage streamMetadata + S3StreamsMetadataImage streamMetadata, + S3ObjectsImage s3ObjectsImage ) { this.provenance = provenance; this.features = features; @@ -83,6 +87,7 @@ public MetadataImage( this.producerIds = producerIds; this.acls = acls; this.streamMetadata = streamMetadata; + this.objectsMetadata = s3ObjectsImage; } public boolean isEmpty() { @@ -136,10 +141,18 @@ public AclsImage acls() { return acls; } + // Kafka on S3 inject start + public S3StreamsMetadataImage streamsMetadata() { return streamMetadata; } + public S3ObjectsImage objectsMetadata() { + return objectsMetadata; + } + + // Kafka on S3 inject end + public void write(ImageWriter writer, ImageWriterOptions options) { // Features should be written out first so we can include the metadata.version at the beginning of the // snapshot @@ -150,7 +163,10 @@ public void write(ImageWriter writer, ImageWriterOptions options) { clientQuotas.write(writer, options); producerIds.write(writer, options); acls.write(writer, options); + // Kafka on S3 inject start streamMetadata.write(writer, options); + objectsMetadata.write(writer, options); + // Kafka on S3 inject end writer.close(true); } @@ -166,7 +182,8 @@ public boolean equals(Object o) { clientQuotas.equals(other.clientQuotas) && producerIds.equals(other.producerIds) && acls.equals(other.acls) && - streamMetadata.equals(other.streamMetadata); + streamMetadata.equals(other.streamMetadata) && + objectsMetadata.equals(other.objectsMetadata); } @Override @@ -180,7 +197,8 @@ public int hashCode() { clientQuotas, producerIds, acls, - streamMetadata); + streamMetadata, + objectsMetadata); } @Override @@ -195,6 +213,7 @@ public String toString() { ", producerIdsImage=" + producerIds + ", acls=" + acls + ", streamMetadata=" + streamMetadata + + ", objectsMetadata=" + objectsMetadata + ")"; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java new file mode 100644 index 0000000000..32ca01c57f --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsDelta.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; +import org.apache.kafka.common.metadata.S3ObjectRecord; +import org.apache.kafka.metadata.stream.SimplifiedS3Object; + +/** + * Represents changes to a S3 object in the metadata image. + */ +public final class S3ObjectsDelta { + private final S3ObjectsImage image; + + private final Set addedObjects = new HashSet<>(); + + private final Set removedObjectIds = new HashSet<>(); + + public S3ObjectsDelta(S3ObjectsImage image) { + this.image = image; + } + + public S3ObjectsImage image() { + return image; + } + + public Set addedObjects() { + return addedObjects; + } + + public Set removedObjects() { + return removedObjectIds; + } + + public void replay(S3ObjectRecord record) { + addedObjects.add(SimplifiedS3Object.of(record)); + // new add or update, so remove from removedObjects + removedObjectIds.remove(record.objectId()); + } + + public void replay(RemoveS3ObjectRecord record) { + removedObjectIds.add(record.objectId()); + // new remove, so remove from addedObjects + addedObjects.remove(record.objectId()); + } + + public S3ObjectsImage apply() { + // get original objects first + Map newObjectsMetadata = new HashMap<>(image.objectsMetadata()); + // put all new added objects + addedObjects.forEach(obj -> newObjectsMetadata.put(obj.objectId(), obj)); + // remove all removed objects + removedObjectIds.forEach(newObjectsMetadata::remove); + return new S3ObjectsImage(newObjectsMetadata); + } + +} diff --git a/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java new file mode 100644 index 0000000000..c7339f6b0b --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/image/S3ObjectsImage.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import java.util.Collections; +import java.util.Map; +import java.util.Objects; +import org.apache.kafka.image.writer.ImageWriter; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.metadata.stream.SimplifiedS3Object; + +/** + * Represents the S3 objects in the metadata image. + * + * This class is thread-safe. + */ +public final class S3ObjectsImage { + public static final S3ObjectsImage EMPTY = + new S3ObjectsImage(Collections.emptyMap()); + + private final Map objectsMetadata; + + public S3ObjectsImage(final Map objectsMetadata) { + this.objectsMetadata = objectsMetadata; + } + + public Map objectsMetadata() { + return objectsMetadata; + } + + public void write(ImageWriter writer, ImageWriterOptions options) { + objectsMetadata.values().stream().map(SimplifiedS3Object::toRecord).forEach(writer::write); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + S3ObjectsImage that = (S3ObjectsImage) o; + return Objects.equals(objectsMetadata, that.objectsMetadata); + } + + @Override + public int hashCode() { + return Objects.hash(objectsMetadata); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java index e7867e4bb9..b4338d03e1 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java @@ -38,7 +38,7 @@ public class S3StreamMetadataDelta { private final Map changedRanges = new HashMap<>(); private final Set removedRanges = new HashSet<>(); private final Set changedS3StreamObjects = new HashSet<>(); - private final Set removedS3StreamObjects = new HashSet<>(); + private final Set removedS3StreamObjectIds = new HashSet<>(); public S3StreamMetadataDelta(S3StreamMetadataImage image) { this.image = image; @@ -47,44 +47,39 @@ public S3StreamMetadataDelta(S3StreamMetadataImage image) { public void replay(RangeRecord record) { changedRanges.put(record.rangeIndex(), RangeMetadata.of(record)); + // new add or update, so remove from removedRanges + removedRanges.remove(record.rangeIndex()); } public void replay(RemoveRangeRecord record) { removedRanges.add(record.rangeIndex()); + // new remove, so remove from changedRanges + changedRanges.remove(record.rangeIndex()); } public void replay(S3StreamObjectRecord record) { changedS3StreamObjects.add(S3StreamObject.of(record)); + // new add or update, so remove from removedObjects + removedS3StreamObjectIds.remove(record.objectId()); } public void replay(RemoveS3StreamObjectRecord record) { - removedS3StreamObjects.add(new S3StreamObject(record.objectId())); + removedS3StreamObjectIds.add(record.objectId()); + // new remove, so remove from addedObjects + changedS3StreamObjects.remove(record.objectId()); } public S3StreamMetadataImage apply() { - Map newRanges = new HashMap<>(image.getRanges().size()); - // apply the delta changes of old ranges since the last image - image.getRanges().forEach((rangeIndex, range) -> { - RangeMetadata changedRange = changedRanges.get(rangeIndex); - if (changedRange == null) { - // no change, check if deleted - if (!removedRanges.contains(rangeIndex)) { - newRanges.put(rangeIndex, range); - } - } else { - // changed, apply the delta - newRanges.put(rangeIndex, changedRange); - } - }); - // apply the new created ranges - changedRanges.entrySet().stream().filter(entry -> !newRanges.containsKey(entry.getKey())) - .forEach(entry -> newRanges.put(entry.getKey(), entry.getValue())); - + Map newRanges = new HashMap<>(image.getRanges()); + // add all new changed ranges + newRanges.putAll(image.getRanges()); + // remove all removed ranges + removedRanges.forEach(newRanges::remove); List newS3StreamObjects = new ArrayList<>(image.getStreamObjects()); - // remove all removed stream-objects - newS3StreamObjects.removeAll(removedS3StreamObjects); // add all changed stream-objects newS3StreamObjects.addAll(changedS3StreamObjects); + // remove all removed stream-objects + newS3StreamObjects.removeIf(removedS3StreamObjectIds::contains); return new S3StreamMetadataImage(image.getStreamId(), newEpoch, image.getStartOffset(), newRanges, newS3StreamObjects); } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index af30fad0c2..0ce386d53a 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -28,6 +28,9 @@ public class S3StreamMetadataImage { + public static final S3StreamMetadataImage EMPTY = + new S3StreamMetadataImage(-1L, -1L, -1L, Map.of(), List.of()); + private final Long streamId; private final Long epoch; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java index 328dc46e4f..cc73897be7 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -28,6 +28,19 @@ public class RangeMetadata implements Comparable { private Long startOffset; private Optional endOffset; private Integer brokerId; + + private RangeMetadata() { + } + + public RangeMetadata(Long streamId, Long epoch, Integer rangeIndex, Long startOffset, Optional endOffset, Integer brokerId) { + this.streamId = streamId; + this.epoch = epoch; + this.rangeIndex = rangeIndex; + this.startOffset = startOffset; + this.endOffset = endOffset; + this.brokerId = brokerId; + } + @Override public int compareTo(RangeMetadata o) { return this.rangeIndex.compareTo(o.rangeIndex); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Config.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Config.java new file mode 100644 index 0000000000..a49766c845 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Config.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.metadata.stream; + +/** + * S3Config contains the configuration of S3, such as the bucket name, the region, etc. + */ +public class S3Config { + + private final String region; + + private final String bucketName; + + public S3Config(final String region, final String bucketName) { + this.region = region; + this.bucketName = bucketName; + } + + public String getRegion() { + return region; + } + + public String getBucketName() { + return bucketName; + } + +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index dd85f6dab1..81c4d25f47 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -19,17 +19,19 @@ import java.util.Objects; import java.util.Optional; +import org.apache.kafka.common.metadata.S3ObjectRecord; +import org.apache.kafka.server.common.ApiMessageAndVersion; /** * S3Object is the base class of object in S3. Manages the lifecycle of S3Object. */ -public abstract class S3Object implements Comparable { +public class S3Object implements Comparable { protected final Long objectId; protected Optional objectSize = Optional.empty(); - protected Optional objectAddress = Optional.empty(); + protected Optional objectKey = Optional.empty(); /** * The time when broker apply the object @@ -53,30 +55,26 @@ public abstract class S3Object implements Comparable { protected S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED; - protected S3ObjectType objectType = S3ObjectType.UNKNOWN; - protected S3Object(final Long objectId) { this.objectId = objectId; } - protected S3Object( + public S3Object( final Long objectId, final Long objectSize, - final String objectAddress, + final String objectKey, final Long appliedTimeInMs, final Long expiredTimeInMs, final Long committedTimeInMs, final Long destroyedTimeInMs, - final S3ObjectState s3ObjectState, - final S3ObjectType objectType) { + final S3ObjectState s3ObjectState) { this.objectId = objectId; this.objectSize = Optional.of(objectSize); - this.objectAddress = Optional.of(objectAddress); + this.objectKey = Optional.of(objectKey); this.appliedTimeInMs = Optional.of(appliedTimeInMs); this.expiredTimeInMs = Optional.of(expiredTimeInMs); this.committedTimeInMs = Optional.of(committedTimeInMs); this.destroyedTimeInMs = Optional.of(destroyedTimeInMs); - this.objectType = objectType; this.s3ObjectState = s3ObjectState; } @@ -93,30 +91,37 @@ public void onCreate(S3ObjectCommitContext createContext) { if (this.s3ObjectState != S3ObjectState.APPLIED) { throw new IllegalStateException("Object is not in APPLIED state"); } - this.s3ObjectState = S3ObjectState.CREATED; + this.s3ObjectState = S3ObjectState.COMMITTED; this.committedTimeInMs = Optional.of(createContext.committedTimeInMs); this.objectSize = Optional.of(createContext.objectSize); - this.objectAddress = Optional.of(createContext.objectAddress); - this.objectType = createContext.objectType; + this.objectKey = Optional.of(createContext.objectAddress); } public void onMarkDestroy() { - if (this.s3ObjectState != S3ObjectState.CREATED) { + if (this.s3ObjectState != S3ObjectState.COMMITTED) { throw new IllegalStateException("Object is not in CREATED state"); } this.s3ObjectState = S3ObjectState.MARK_DESTROYED; } public void onDestroy() { - if (this.s3ObjectState != S3ObjectState.CREATED) { + if (this.s3ObjectState != S3ObjectState.COMMITTED) { throw new IllegalStateException("Object is not in CREATED state"); } // TODO: trigger destroy } - public S3ObjectType getObjectType() { - return objectType; + public ApiMessageAndVersion toRecord() { + return new ApiMessageAndVersion(new S3ObjectRecord() + .setObjectId(objectId) + .setObjectSize(objectSize.orElse(null)) + .setObjectState((byte) s3ObjectState.ordinal()) + .setAppliedTimeInMs(appliedTimeInMs.orElse(null)) + .setExpiredTimeInMs(expiredTimeInMs.orElse(null)) + .setCommittedTimeInMs(committedTimeInMs.orElse(null)) + .setDestroyedTimeInMs(destroyedTimeInMs.orElse(null)) + , (short) 0); } public class S3ObjectCommitContext { @@ -124,17 +129,14 @@ public class S3ObjectCommitContext { private final Long committedTimeInMs; private final Long objectSize; private final String objectAddress; - private final S3ObjectType objectType; public S3ObjectCommitContext( final Long committedTimeInMs, final Long objectSize, - final String objectAddress, - final S3ObjectType objectType) { + final String objectAddress) { this.committedTimeInMs = committedTimeInMs; this.objectSize = objectSize; this.objectAddress = objectAddress; - this.objectType = objectType; } } @@ -168,8 +170,8 @@ public Optional getObjectSize() { return objectSize; } - public Optional getObjectAddress() { - return objectAddress; + public Optional getObjectKey() { + return objectKey; } public Optional getAppliedTimeInMs() { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java index 228e682aea..cdffc10f49 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java @@ -20,7 +20,7 @@ public enum S3ObjectState { UNINITIALIZED, APPLIED, - CREATED, + COMMITTED, MARK_DESTROYED, DESTROYED; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java index 218bf44702..323c68a698 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java @@ -17,82 +17,44 @@ package org.apache.kafka.metadata.stream; -import java.util.Optional; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; -public class S3StreamObject extends S3Object { +public class S3StreamObject { - private S3ObjectStreamIndex streamIndex; + private final long objectId; - public S3StreamObject(final Long objectId) { - super(objectId); - } + private final S3ObjectStreamIndex streamIndex; - @Override - public void onCreate(S3ObjectCommitContext createContext) { - super.onCreate(createContext); - if (!(createContext instanceof StreamObjectCommitContext)) { - throw new IllegalArgumentException(); - } - this.streamIndex = ((StreamObjectCommitContext) createContext).streamIndex; + public S3StreamObject(long objectId, long streamId, long startOffset, long endOffset) { + this.objectId = objectId; + this.streamIndex = new S3ObjectStreamIndex(streamId, startOffset, endOffset); } - @Override - public int compareTo(S3Object o) { - if (!(o instanceof S3StreamObject)) { - throw new IllegalArgumentException("Cannot compare StreamObject with non-StreamObject"); - } - S3StreamObject s3StreamObject = (S3StreamObject) o; - // order by streamId first, then startOffset - int res = this.streamIndex.getStreamId().compareTo(s3StreamObject.streamIndex.getStreamId()); - return res == 0 ? this.streamIndex.getStartOffset().compareTo(s3StreamObject.streamIndex.getStartOffset()) : res; + public S3StreamObject(long objectId, S3ObjectStreamIndex streamIndex) { + this.objectId = objectId; + this.streamIndex = streamIndex; } - class StreamObjectCommitContext extends S3ObjectCommitContext { - - private final S3ObjectStreamIndex streamIndex; - - public StreamObjectCommitContext( - final Long createTimeInMs, - final Long objectSize, - final String objectAddress, - final S3ObjectType objectType, - final S3ObjectStreamIndex streamIndex) { - super(createTimeInMs, objectSize, objectAddress, objectType); - this.streamIndex = streamIndex; - } + public S3ObjectStreamIndex streamIndex() { + return streamIndex; } - public S3ObjectStreamIndex getStreamIndex() { - return streamIndex; + public long objectId() { + return objectId; } public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new S3StreamObjectRecord() .setObjectId(objectId) .setStreamId(streamIndex.getStreamId()) - .setObjectState((byte) s3ObjectState.ordinal()) - .setObjectType((byte) objectType.ordinal()) - .setAppliedTimeInMs(appliedTimeInMs.get()) - .setExpiredTimeInMs(expiredTimeInMs.get()) - .setCommittedTimeInMs(committedTimeInMs.get()) - .setDestroyedTimeInMs(destroyedTimeInMs.get()) - .setObjectSize(objectSize.get()) .setStartOffset(streamIndex.getStartOffset()) .setEndOffset(streamIndex.getEndOffset()), (short) 0); } public static S3StreamObject of(S3StreamObjectRecord record) { - S3StreamObject s3StreamObject = new S3StreamObject(record.objectId()); - s3StreamObject.objectType = S3ObjectType.fromByte(record.objectType()); - s3StreamObject.s3ObjectState = S3ObjectState.fromByte(record.objectState()); - s3StreamObject.appliedTimeInMs = Optional.of(record.appliedTimeInMs()); - s3StreamObject.expiredTimeInMs = Optional.of(record.expiredTimeInMs()); - s3StreamObject.committedTimeInMs = Optional.of(record.committedTimeInMs()); - s3StreamObject.destroyedTimeInMs = Optional.of(record.destroyedTimeInMs()); - s3StreamObject.objectSize = Optional.of(record.objectSize()); - s3StreamObject.streamIndex = new S3ObjectStreamIndex(record.streamId(), record.startOffset(), record.endOffset()); + S3ObjectStreamIndex index = new S3ObjectStreamIndex(record.streamId(), record.startOffset(), record.endOffset()); + S3StreamObject s3StreamObject = new S3StreamObject(record.objectId(), index); return s3StreamObject; } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java index 6871688630..763b86205a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3WALObject.java @@ -17,81 +17,31 @@ package org.apache.kafka.metadata.stream; -import java.util.List; import java.util.Map; import java.util.stream.Collectors; import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex; import org.apache.kafka.server.common.ApiMessageAndVersion; -public class S3WALObject extends S3Object { +public class S3WALObject { - private Integer brokerId; + private final long objectId; + + private final int brokerId; private Map streamsIndex; private S3ObjectType objectType = S3ObjectType.UNKNOWN; - public S3WALObject(Long objectId) { - super(objectId); - } - - private S3WALObject( - final Long objectId, - final Long objectSize, - final String objectAddress, - final Long applyTimeInMs, - final Long expiredTimeImMs, - final Long commitTimeInMs, - final Long destroyTimeInMs, - final S3ObjectState s3ObjectState, - final S3ObjectType objectType, - final Integer brokerId, - final List streamsIndex) { - super(objectId, objectSize, objectAddress, applyTimeInMs, expiredTimeImMs, commitTimeInMs, destroyTimeInMs, s3ObjectState, objectType); - this.objectType = objectType; + public S3WALObject(long objectId, int brokerId, final Map streamsIndex) { + this.objectId = objectId; this.brokerId = brokerId; - this.streamsIndex = streamsIndex.stream().collect( - Collectors.toMap(S3ObjectStreamIndex::getStreamId, index -> index)); - } - - @Override - public void onCreate(S3ObjectCommitContext createContext) { - super.onCreate(createContext); - if (!(createContext instanceof WALObjectCommitContext)) { - throw new IllegalArgumentException(); - } - WALObjectCommitContext walCreateContext = (WALObjectCommitContext) createContext; - this.streamsIndex = walCreateContext.streamIndexList.stream().collect(Collectors.toMap(S3ObjectStreamIndex::getStreamId, index -> index)); - this.brokerId = walCreateContext.brokerId; - } - - class WALObjectCommitContext extends S3ObjectCommitContext { - - private final List streamIndexList; - private final Integer brokerId; - - public WALObjectCommitContext( - final Long createTimeInMs, - final Long objectSize, - final String objectAddress, - final S3ObjectType objectType, - final List streamIndexList, - final Integer brokerId) { - super(createTimeInMs, objectSize, objectAddress, objectType); - this.streamIndexList = streamIndexList; - this.brokerId = brokerId; - } + this.streamsIndex = streamsIndex; } public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new WALObjectRecord() .setObjectId(objectId) - .setObjectState((byte) s3ObjectState.ordinal()) - .setObjectType((byte) objectType.ordinal()) - .setAppliedTimeInMs(appliedTimeInMs.get()) - .setExpiredTimeInMs(expiredTimeInMs.get()) - .setCommittedTimeInMs(committedTimeInMs.get()) - .setDestroyedTimeInMs(destroyedTimeInMs.get()) - .setObjectSize(objectSize.get()) + .setBrokerId(brokerId) .setStreamsIndex( streamsIndex.values().stream() .map(S3ObjectStreamIndex::toRecordStreamIndex) @@ -99,11 +49,9 @@ public ApiMessageAndVersion toRecord() { } public static S3WALObject of(WALObjectRecord record) { - S3WALObject s3WalObject = new S3WALObject( - record.objectId(), record.objectSize(), null, - record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), - S3ObjectState.fromByte(record.objectState()), S3ObjectType.fromByte(record.objectType()), - record.brokerId(), record.streamsIndex().stream().map(S3ObjectStreamIndex::of).collect(Collectors.toList())); + S3WALObject s3WalObject = new S3WALObject(record.objectId(), record.brokerId(), + record.streamsIndex().stream().collect(Collectors.toMap( + StreamIndex::streamId, S3ObjectStreamIndex::of))); return s3WalObject; } @@ -115,8 +63,4 @@ public Map getStreamsIndex() { return streamsIndex; } - @Override - public S3ObjectType getObjectType() { - return objectType; - } } diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java new file mode 100644 index 0000000000..9e79d07ba8 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/SimplifiedS3Object.java @@ -0,0 +1,35 @@ +package org.apache.kafka.metadata.stream; + +import org.apache.kafka.common.metadata.S3ObjectRecord; +import org.apache.kafka.server.common.ApiMessageAndVersion; + +/** + * Simplified S3 object metadata, only be used in metadata cache of broker. + */ +public class SimplifiedS3Object { + private final long objectId; + private final S3ObjectState state; + + public SimplifiedS3Object(final long objectId, final S3ObjectState state) { + this.objectId = objectId; + this.state = state; + } + + public long objectId() { + return objectId; + } + + public S3ObjectState state() { + return state; + } + + public ApiMessageAndVersion toRecord() { + return new ApiMessageAndVersion(new S3ObjectRecord(). + setObjectId(objectId). + setObjectState((byte) state.ordinal()), (short) 0); + } + + public static SimplifiedS3Object of(final S3ObjectRecord record) { + return new SimplifiedS3Object(record.objectId(), S3ObjectState.fromByte(record.objectState())); + } +} diff --git a/metadata/src/main/resources/common/metadata/RemoveS3ObjectRecord.json b/metadata/src/main/resources/common/metadata/RemoveS3ObjectRecord.json new file mode 100644 index 0000000000..465d40c49e --- /dev/null +++ b/metadata/src/main/resources/common/metadata/RemoveS3ObjectRecord.json @@ -0,0 +1,30 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 32, + "type": "metadata", + "name": "RemoveS3ObjectRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The object id of the S3 object" + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/S3ObjectRecord.json b/metadata/src/main/resources/common/metadata/S3ObjectRecord.json new file mode 100644 index 0000000000..1d620125b3 --- /dev/null +++ b/metadata/src/main/resources/common/metadata/S3ObjectRecord.json @@ -0,0 +1,66 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +{ + "apiKey": 31, + "type": "metadata", + "name": "S3ObjectRecord", + "validVersions": "0", + "flexibleVersions": "0+", + "fields": [ + { + "name": "ObjectId", + "type": "int64", + "versions": "0+", + "about": "The object id of the S3 object" + }, + { + "name": "ObjectSize", + "type": "int64", + "versions": "0+", + "about": "The object size of the S3 object" + }, + { + "name": "AppliedTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be applied timestamp" + }, + { + "name": "ExpiredTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be expired timestamp" + }, + { + "name": "CommittedTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be committed timestamp" + }, + { + "name": "DestroyedTimeInMs", + "type": "int64", + "versions": "0+", + "about": "The object be destroyed timestamp" + }, + { + "name": "ObjectState", + "type": "int8", + "versions": "0+", + "about": "The object state" + } + ] +} \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json index 30fea687a4..2d55d4cf47 100644 --- a/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/S3StreamObjectRecord.json @@ -43,48 +43,6 @@ "type": "int64", "versions": "0+", "about": "The object id of the S3 object" - }, - { - "name": "ObjectSize", - "type": "int64", - "versions": "0+", - "about": "The object size of the S3 object" - }, - { - "name": "AppliedTimeInMs", - "type": "int64", - "versions": "0+", - "about": "The object be applied timestamp" - }, - { - "name": "ExpiredTimeInMs", - "type": "int64", - "versions": "0+", - "about": "The object be expired timestamp" - }, - { - "name": "CommittedTimeInMs", - "type": "int64", - "versions": "0+", - "about": "The object be committed timestamp" - }, - { - "name": "DestroyedTimeInMs", - "type": "int64", - "versions": "0+", - "about": "The object be destroyed timestamp" - }, - { - "name": "ObjectState", - "type": "int8", - "versions": "0+", - "about": "The object state" - }, - { - "name": "ObjectType", - "type": "int8", - "versions": "0+", - "about": "The object type" } ] } \ No newline at end of file diff --git a/metadata/src/main/resources/common/metadata/WALObjectRecord.json b/metadata/src/main/resources/common/metadata/WALObjectRecord.json index b7715b6257..2724b023a3 100644 --- a/metadata/src/main/resources/common/metadata/WALObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/WALObjectRecord.json @@ -32,48 +32,6 @@ "versions": "0+", "about": "The object id of the S3 object" }, - { - "name": "ObjectSize", - "type": "int64", - "versions": "0+", - "about": "The object size of the S3 object" - }, - { - "name": "AppliedTimeInMs", - "type": "int64", - "versions": "0+", - "about": "The object be applied timestamp" - }, - { - "name": "ExpiredTimeInMs", - "type": "int64", - "versions": "0+", - "about": "The object be expired timestamp" - }, - { - "name": "CommittedTimeInMs", - "type": "int64", - "versions": "0+", - "about": "The object be committed timestamp" - }, - { - "name": "DestroyedTimeInMs", - "type": "int64", - "versions": "0+", - "about": "The object be destroyed timestamp" - }, - { - "name": "ObjectState", - "type": "int8", - "versions": "0+", - "about": "The object state" - }, - { - "name": "ObjectType", - "type": "int8", - "versions": "0+", - "about": "The object type" - }, { "name": "StreamsIndex", "type": "[]StreamIndex", diff --git a/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataTest.java b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataTest.java new file mode 100644 index 0000000000..f373224ed8 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/image/BrokerS3WALMetadataTest.java @@ -0,0 +1,106 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.metadata.RemoveWALObjectRecord; +import org.apache.kafka.common.metadata.WALObjectRecord; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; +import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(value = 40) +public class BrokerS3WALMetadataTest { + + private static final int BROKER0 = 0; + + private static final long STREAM0 = 0; + + private static final long STREAM1 = 1; + + @Test + public void testS3WALObjects() { + BrokerS3WALMetadataImage image0 = new BrokerS3WALMetadataImage(BROKER0, List.of()); + List delta0Records = new ArrayList<>(); + BrokerS3WALMetadataDelta delta0 = new BrokerS3WALMetadataDelta(image0); + // 1. create WALObject0 and WALObject1 + delta0Records.add(new ApiMessageAndVersion(new WALObjectRecord() + .setObjectId(0L) + .setBrokerId(BROKER0) + .setStreamsIndex(List.of( + new WALObjectRecord.StreamIndex() + .setStreamId(STREAM0) + .setStartOffset(0L) + .setEndOffset(100L), + new WALObjectRecord.StreamIndex() + .setStreamId(STREAM1) + .setStartOffset(0) + .setEndOffset(200))), (short) 0)); + delta0Records.add(new ApiMessageAndVersion(new WALObjectRecord() + .setObjectId(1L) + .setBrokerId(BROKER0) + .setStreamsIndex(List.of( + new WALObjectRecord.StreamIndex() + .setStreamId(STREAM0) + .setStartOffset(101L) + .setEndOffset(200L))), (short) 0)); + RecordTestUtils.replayAll(delta0, delta0Records); + // verify delta and check image's write + BrokerS3WALMetadataImage image1 = new BrokerS3WALMetadataImage(BROKER0, List.of( + new S3WALObject(0L, BROKER0, Map.of( + STREAM0, new S3ObjectStreamIndex(STREAM0, 0L, 100L), + STREAM1, new S3ObjectStreamIndex(STREAM1, 0L, 200L))), + new S3WALObject(1L, BROKER0, Map.of( + STREAM0, new S3ObjectStreamIndex(STREAM0, 101L, 200L))))); + assertEquals(image1, delta0.apply()); + testToImageAndBack(image1); + + // 2. remove WALObject0 + List delta1Records = new ArrayList<>(); + BrokerS3WALMetadataDelta delta1 = new BrokerS3WALMetadataDelta(image1); + delta1Records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() + .setObjectId(0L), (short) 0)); + RecordTestUtils.replayAll(delta1, delta1Records); + // verify delta and check image's write + BrokerS3WALMetadataImage image2 = new BrokerS3WALMetadataImage(BROKER0, List.of( + new S3WALObject(1L, BROKER0, Map.of( + STREAM0, new S3ObjectStreamIndex(STREAM0, 101L, 200L))))); + assertEquals(image2, delta1.apply()); + testToImageAndBack(image2); + } + + private void testToImageAndBack(BrokerS3WALMetadataImage image) { + RecordListWriter writer = new RecordListWriter(); + ImageWriterOptions options = new ImageWriterOptions.Builder().build(); + image.write(writer, options); + S3ObjectsDelta delta = new S3ObjectsDelta(S3ObjectsImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + S3ObjectsImage newImage = delta.apply(); + assertEquals(image, newImage); + } + +} diff --git a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java index 0e9afe7240..b11ec57d0f 100644 --- a/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java @@ -44,7 +44,8 @@ public class MetadataImageTest { ClientQuotasImageTest.IMAGE1, ProducerIdsImageTest.IMAGE1, AclsImageTest.IMAGE1, - S3StreamsMetadataImageTest.IMAGE1); + S3StreamsMetadataImageTest.IMAGE1, + S3ObjectsImageTest.IMAGE1); DELTA1 = new MetadataDelta.Builder(). setImage(IMAGE1). @@ -67,7 +68,8 @@ public class MetadataImageTest { ClientQuotasImageTest.IMAGE2, ProducerIdsImageTest.IMAGE2, AclsImageTest.IMAGE2, - S3StreamsMetadataImageTest.IMAGE2); + S3StreamsMetadataImageTest.IMAGE2, + S3ObjectsImageTest.IMAGE2); } @Test diff --git a/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java new file mode 100644 index 0000000000..d4422bd0d2 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/image/S3ObjectsImageTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; +import org.apache.kafka.common.metadata.S3ObjectRecord; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.metadata.stream.S3ObjectState; +import org.apache.kafka.metadata.stream.SimplifiedS3Object; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(value = 40) +public class S3ObjectsImageTest { + final static S3ObjectsImage IMAGE1; + + final static List DELTA1_RECORDS; + + final static S3ObjectsDelta DELTA1; + + final static S3ObjectsImage IMAGE2; + + static { + Map map = new HashMap<>(); + for (int i = 0; i < 4; i++) { + SimplifiedS3Object object = new SimplifiedS3Object(i, S3ObjectState.APPLIED); + map.put(object.objectId(), object); + } + IMAGE1 = new S3ObjectsImage(map); + DELTA1_RECORDS = new ArrayList<>(); + // try to update object0 and object1 to committed + // try to make object2 expired and mark it to be destroyed + // try to remove destroy object3 + // try to add applied object4 + DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord(). + setObjectId(0L). + setObjectState((byte) S3ObjectState.COMMITTED.ordinal()), (short) 0)); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord(). + setObjectId(1L). + setObjectState((byte) S3ObjectState.COMMITTED.ordinal()), (short) 0)); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord(). + setObjectId(2L). + setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()), (short) 0)); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new RemoveS3ObjectRecord() + .setObjectId(3L), (short) 0)); + DELTA1_RECORDS.add(new ApiMessageAndVersion(new S3ObjectRecord(). + setObjectId(4L). + setObjectState((byte) S3ObjectState.APPLIED.ordinal()), (short) 0)); + DELTA1 = new S3ObjectsDelta(IMAGE1); + RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS); + + Map map2 = new HashMap<>(); + map2.put(0L, new SimplifiedS3Object(0L, S3ObjectState.COMMITTED)); + map2.put(1L, new SimplifiedS3Object(1L, S3ObjectState.COMMITTED)); + map2.put(2L, new SimplifiedS3Object(2L, S3ObjectState.MARK_DESTROYED)); + map2.put(4L, new SimplifiedS3Object(4L, S3ObjectState.APPLIED)); + + IMAGE2 = new S3ObjectsImage(map2); + } + + @Test + public void testEmptyImageRoundTrip() { + testToImageAndBack(S3ObjectsImage.EMPTY); + } + + @Test + public void testImage1RoundTrip() { + testToImageAndBack(IMAGE1); + } + + @Test + public void testApplyDelta1() { + assertEquals(IMAGE2, DELTA1.apply()); + } + + @Test + public void testImage2RoundTrip() { + testToImageAndBack(IMAGE2); + } + + private void testToImageAndBack(S3ObjectsImage image) { + RecordListWriter writer = new RecordListWriter(); + ImageWriterOptions options = new ImageWriterOptions.Builder().build(); + image.write(writer, options); + S3ObjectsDelta delta = new S3ObjectsDelta(S3ObjectsImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + S3ObjectsImage newImage = delta.apply(); + assertEquals(image, newImage); + } + +} diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java new file mode 100644 index 0000000000..b8ed032853 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.image; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.kafka.common.metadata.RangeRecord; +import org.apache.kafka.common.metadata.RemoveRangeRecord; +import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; +import org.apache.kafka.common.metadata.S3StreamObjectRecord; +import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.image.writer.RecordListWriter; +import org.apache.kafka.metadata.RecordTestUtils; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.server.common.ApiMessageAndVersion; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; + +@Timeout(value = 40) +public class S3StreamMetadataImageTest { + + private static final long STREAM0 = 0L; + + private static final int BROKER0 = 0; + + private static final int BROKER1 = 1; + + @Test + public void testRanges() { + S3StreamMetadataImage image0 = S3StreamMetadataImage.EMPTY; + List delta0Records = new ArrayList<>(); + S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0); + // 1. create stream0 + delta0Records.add(new ApiMessageAndVersion(new S3StreamRecord() + .setStreamId(STREAM0) + .setEpoch(0L) + .setStartOffset(0L), (short) 0)); + RecordTestUtils.replayAll(delta0, delta0Records); + // verify delta and check image's write + S3StreamMetadataImage image1 = new S3StreamMetadataImage( + STREAM0, 0L, 0L, Map.of(), List.of()); + assertEquals(image1, delta0.apply()); + testToImageAndBack(image1); + + // 2. update stream0's epoch to 1 + // and create range0_0, format: range{streamId}_{rangeIndex} + List delta1Records = new ArrayList<>(); + S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1); + delta1Records.add(new ApiMessageAndVersion(new S3StreamRecord() + .setStreamId(STREAM0) + .setEpoch(1L) + .setStartOffset(0L), (short) 0)); + delta1Records.add(new ApiMessageAndVersion(new RangeRecord() + .setStreamId(STREAM0) + .setRangeIndex(0) + .setEpoch(1L) + .setBrokerId(BROKER0) + .setStartOffset(0L), (short) 0)); + RecordTestUtils.replayAll(delta1, delta1Records); + // verify delta and check image's write + S3StreamMetadataImage image2 = new S3StreamMetadataImage( + STREAM0, 1L, 0L, Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, Optional.empty(), BROKER0)), List.of()); + assertEquals(image2, delta1.apply()); + testToImageAndBack(image2); + + // 3. seal range 0_0, broker1 is the new leader, and create range0_1 + List delta2Records = new ArrayList<>(); + S3StreamMetadataDelta delta2 = new S3StreamMetadataDelta(image2); + delta2Records.add(new ApiMessageAndVersion(new S3StreamRecord() + .setStreamId(STREAM0) + .setEpoch(2L) + .setStartOffset(0L), (short) 0)); + delta2Records.add(new ApiMessageAndVersion(new RangeRecord() + .setStreamId(STREAM0) + .setRangeIndex(0) + .setEpoch(1L) + .setBrokerId(BROKER0) + .setStartOffset(0L) + .setEndOffset(100L), (short) 0)); + delta2Records.add(new ApiMessageAndVersion(new RangeRecord() + .setStreamId(STREAM0) + .setRangeIndex(1) + .setEpoch(2L) + .setBrokerId(BROKER1) + .setStartOffset(101L), (short) 0)); + RecordTestUtils.replayAll(delta2, delta2Records); + // verify delta and check image's write + S3StreamMetadataImage image3 = new S3StreamMetadataImage( + STREAM0, 2L, 0L, Map.of( + 0, new RangeMetadata(STREAM0, 1L, 0, 0L, Optional.of(100L), BROKER0), + 1, new RangeMetadata(STREAM0, 2L, 1, 101L, Optional.empty(), BROKER1)), List.of()); + assertEquals(image3, delta2.apply()); + testToImageAndBack(image3); + + // 4. trim stream to start in 100 and remove range 0_0 + List delta3Records = new ArrayList<>(); + S3StreamMetadataDelta delta3 = new S3StreamMetadataDelta(image3); + delta3Records.add(new ApiMessageAndVersion(new S3StreamRecord() + .setStreamId(STREAM0) + .setEpoch(2L) + .setStartOffset(101L), (short) 0)); + delta3Records.add(new ApiMessageAndVersion(new RemoveRangeRecord() + .setStreamId(STREAM0) + .setRangeIndex(0), (short) 0)); + RecordTestUtils.replayAll(delta3, delta3Records); + // verify delta and check image's write + S3StreamMetadataImage image4 = new S3StreamMetadataImage( + STREAM0, 2L, 101L, Map.of( + 1, new RangeMetadata(STREAM0, 2L, 1, 101L, Optional.empty(), BROKER1)), List.of()); + } + + @Test + public void testStreamObjects() { + S3StreamMetadataImage image0 = new S3StreamMetadataImage( + STREAM0, 0L, 0L, Map.of(), List.of()); + List delta0Records = new ArrayList<>(); + S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0); + // 1. create streamObject0 and streamObject1 + delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord() + .setObjectId(0L) + .setStreamId(STREAM0) + .setStartOffset(0L) + .setEndOffset(100L), (short) 0)); + delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord() + .setObjectId(1L) + .setStreamId(STREAM0) + .setStartOffset(101L) + .setEndOffset(200L), (short) 0)); + RecordTestUtils.replayAll(delta0, delta0Records); + // verify delta and check image's write + S3StreamMetadataImage image1 = new S3StreamMetadataImage( + STREAM0, 0L, 0L, Map.of(), List.of( + new S3StreamObject(0L, STREAM0, 0L, 100L), + new S3StreamObject(1L, STREAM0, 101L, 200L))); + assertEquals(image1, delta0.apply()); + testToImageAndBack(image1); + + // 2. remove streamObject0 + List delta1Records = new ArrayList<>(); + S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1); + delta1Records.add(new ApiMessageAndVersion(new RemoveS3StreamObjectRecord() + .setObjectId(0L), (short) 0)); + RecordTestUtils.replayAll(delta1, delta1Records); + // verify delta and check image's write + S3StreamMetadataImage image2 = new S3StreamMetadataImage( + STREAM0, 0L, 0L, Map.of(), List.of( + new S3StreamObject(1L, STREAM0, 101L, 200L))); + assertEquals(image2, delta1.apply()); + testToImageAndBack(image2); + } + + private void testToImageAndBack(S3StreamMetadataImage image) { + RecordListWriter writer = new RecordListWriter(); + ImageWriterOptions options = new ImageWriterOptions.Builder().build(); + image.write(writer, options); + S3ObjectsDelta delta = new S3ObjectsDelta(S3ObjectsImage.EMPTY); + RecordTestUtils.replayAll(delta, writer.records()); + S3ObjectsImage newImage = delta.apply(); + assertEquals(image, newImage); + } + +} diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 1a038b864a..cc5d33a66a 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -17,30 +17,7 @@ package org.apache.kafka.image; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - -import java.util.ArrayList; -import java.util.Arrays; import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import org.apache.kafka.common.metadata.RangeRecord; -import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; -import org.apache.kafka.common.metadata.RemoveS3StreamRecord; -import org.apache.kafka.common.metadata.RemoveWALObjectRecord; -import org.apache.kafka.common.metadata.S3StreamObjectRecord; -import org.apache.kafka.common.metadata.S3StreamRecord; -import org.apache.kafka.common.metadata.WALObjectRecord; -import org.apache.kafka.metadata.stream.RangeMetadata; -import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; -import org.apache.kafka.metadata.stream.S3ObjectState; -import org.apache.kafka.metadata.stream.S3ObjectType; -import org.apache.kafka.metadata.stream.S3StreamObject; -import org.apache.kafka.metadata.stream.S3WALObject; -import org.apache.kafka.image.writer.ImageWriterOptions; -import org.apache.kafka.image.writer.RecordListWriter; -import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; @@ -53,13 +30,6 @@ public class S3StreamsMetadataImageTest { private static final long MB = 1024 * KB; private static final long GB = 1024 * MB; - private static final long WAL_LOOSE_SIZE = 40 * MB; - - private static final long WAL_MINOR_COMPACT_SIZE = 5 * GB; - - private static final long WAL_MAJOR_COMPACT_SIZE = 320 * GB; - - private static final long STREAM_OBJECT_SIZE = 320 * GB; static final S3StreamsMetadataImage IMAGE1; @@ -70,365 +40,13 @@ public class S3StreamsMetadataImageTest { // TODO: complete the test for StreamsMetadataImage static { - IMAGE1 = null; - DELTA1_RECORDS = null; - IMAGE2 = null; + IMAGE1 = S3StreamsMetadataImage.EMPTY; + DELTA1_RECORDS = List.of(); + IMAGE2 = S3StreamsMetadataImage.EMPTY; } @Test public void testBasicChange() { - List s3StreamMetadataImages = new ArrayList<>(); - Integer brokerId0 = 0; - Integer brokerId1 = 1; - Integer brokerId2 = 2; - - // 1. empty image - S3StreamsMetadataImage image0 = S3StreamsMetadataImage.EMPTY; - - // 2. create stream and create range - Long streamId0 = 0L; - Long streamId1 = 1L; - List records = new ArrayList<>(); - S3StreamRecord streamRecord00 = new S3StreamRecord() - .setStreamId(streamId0) - .setEpoch(1) - .setStartOffset(0L); - records.add(new ApiMessageAndVersion(streamRecord00, (short) 0)); - RangeRecord rangeRecord00 = new RangeRecord() - .setStreamId(streamId0) - .setRangeIndex(0) - .setStartOffset(0L) - .setBrokerId(brokerId1) - .setEpoch(1); - records.add(new ApiMessageAndVersion(rangeRecord00, (short) 0)); - S3StreamRecord streamRecord01 = new S3StreamRecord() - .setStreamId(streamId1) - .setEpoch(1) - .setStartOffset(0L); - records.add(new ApiMessageAndVersion(streamRecord01, (short) 0)); - RangeRecord rangeRecord01 = new RangeRecord() - .setStreamId(streamId1) - .setRangeIndex(0) - .setStartOffset(0L) - .setBrokerId(brokerId1) - .setEpoch(1); - records.add(new ApiMessageAndVersion(rangeRecord01, (short) 0)); - S3StreamsMetadataDelta delta0 = new S3StreamsMetadataDelta(image0); - RecordTestUtils.replayAll(delta0, records); - S3StreamsMetadataImage image1 = delta0.apply(); - - // check the image1 - assertEquals(2, image1.getStreamsMetadata().size()); - S3StreamMetadataImage s3StreamMetadataImage1 = image1.getStreamsMetadata().get(streamId0); - assertNotNull(s3StreamMetadataImage1); - assertEquals(1, s3StreamMetadataImage1.getRanges().size()); - assertEquals(1, s3StreamMetadataImage1.getEpoch()); - assertEquals(0, s3StreamMetadataImage1.getStartOffset()); - RangeMetadata rangeMetadata1 = s3StreamMetadataImage1.getRanges().get(0); - assertNotNull(rangeMetadata1); - assertEquals(RangeMetadata.of(rangeRecord00), rangeMetadata1); - - S3StreamMetadataImage s3StreamMetadataImage11 = image1.getStreamsMetadata().get(streamId1); - assertNotNull(s3StreamMetadataImage11); - assertEquals(1, s3StreamMetadataImage11.getRanges().size()); - assertEquals(1, s3StreamMetadataImage11.getEpoch()); - assertEquals(0, s3StreamMetadataImage11.getStartOffset()); - RangeMetadata rangeMetadata11 = s3StreamMetadataImage11.getRanges().get(0); - assertNotNull(rangeMetadata11); - assertEquals(RangeMetadata.of(rangeRecord01), rangeMetadata11); - - // 3. apply WALObject0, WALObject1, WALObject2 - WALObjectRecord walObjectRecord0 = new WALObjectRecord() - .setBrokerId(brokerId0) - .setObjectId(0L) - .setAppliedTimeInMs(System.currentTimeMillis()) - .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) - .setObjectState((byte) S3ObjectState.APPLIED.ordinal()); - WALObjectRecord walObjectRecord1 = new WALObjectRecord() - .setBrokerId(brokerId1) - .setObjectId(1L) - .setAppliedTimeInMs(System.currentTimeMillis()) - .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) - .setObjectState((byte) S3ObjectState.APPLIED.ordinal()); - WALObjectRecord walObjectRecord2 = new WALObjectRecord() - .setBrokerId(brokerId1) - .setObjectId(2L) - .setAppliedTimeInMs(System.currentTimeMillis()) - .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) - .setObjectState((byte) S3ObjectState.APPLIED.ordinal()); - records.clear(); - records.add(new ApiMessageAndVersion(walObjectRecord0, (short) 0)); - records.add(new ApiMessageAndVersion(walObjectRecord1, (short) 0)); - records.add(new ApiMessageAndVersion(walObjectRecord2, (short) 0)); - S3StreamsMetadataDelta delta1 = new S3StreamsMetadataDelta(image1); - RecordTestUtils.replayAll(delta1, records); - S3StreamsMetadataImage image2 = delta1.apply(); - - // check the image2 - assertEquals(2, image2.getBrokerWALMetadata().size()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage20 = image2.getBrokerWALMetadata().get(brokerId0); - assertNotNull(brokerS3WALMetadataImage20); - assertEquals(1, brokerS3WALMetadataImage20.getWalObjects().size()); - S3WALObject s3WalObject0 = brokerS3WALMetadataImage20.getWalObjects().get(0); - assertEquals(brokerId0, s3WalObject0.getBrokerId()); - assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject0.getObjectType()); - assertEquals(S3ObjectState.APPLIED, s3WalObject0.getS3ObjectState()); - assertEquals(0L, s3WalObject0.getObjectId()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage21 = image2.getBrokerWALMetadata().get(brokerId1); - assertNotNull(brokerS3WALMetadataImage21); - assertEquals(2, brokerS3WALMetadataImage21.getWalObjects().size()); - S3WALObject s3WalObject1 = brokerS3WALMetadataImage21.getWalObjects().get(0); - assertEquals(brokerId1, s3WalObject1.getBrokerId()); - assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject1.getObjectType()); - assertEquals(S3ObjectState.APPLIED, s3WalObject1.getS3ObjectState()); - assertEquals(1L, s3WalObject1.getObjectId()); - S3WALObject s3WalObject2 = brokerS3WALMetadataImage21.getWalObjects().get(1); - assertEquals(brokerId1, s3WalObject2.getBrokerId()); - assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject2.getObjectType()); - assertEquals(S3ObjectState.APPLIED, s3WalObject2.getS3ObjectState()); - assertEquals(2L, s3WalObject2.getObjectId()); - - // 4. create WALObject1, WALObject2, mark delete WALObject0 - List streamIndicesInWALObject1 = Arrays.asList( - new S3ObjectStreamIndex(streamId0, 0L, 100L), - new S3ObjectStreamIndex(streamId1, 0L, 200L) - ); - WALObjectRecord walObjectRecord11 = new WALObjectRecord() - .setBrokerId(brokerId1) - .setObjectId(1L) - .setObjectSize(WAL_LOOSE_SIZE) - .setCommittedTimeInMs(System.currentTimeMillis()) - .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) - .setStreamsIndex(streamIndicesInWALObject1.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect( - Collectors.toList())) - .setObjectState((byte) S3ObjectState.CREATED.ordinal()); - - List streamIndicesInWALObject2 = Arrays.asList( - new S3ObjectStreamIndex(streamId0, 101L, 200L), - new S3ObjectStreamIndex(streamId1, 201L, 300L) - ); - WALObjectRecord walObjectRecord21 = new WALObjectRecord() - .setBrokerId(brokerId1) - .setObjectId(2L) - .setObjectSize(WAL_LOOSE_SIZE) - .setCommittedTimeInMs(System.currentTimeMillis()) - .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) - .setStreamsIndex(streamIndicesInWALObject2.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect( - Collectors.toList())) - .setObjectState((byte) S3ObjectState.CREATED.ordinal()); - WALObjectRecord walObjectRecord01 = new WALObjectRecord() - .setBrokerId(brokerId0) - .setObjectId(0L) - .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) - .setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()); - records.clear(); - records.add(new ApiMessageAndVersion(walObjectRecord11, (short) 0)); - records.add(new ApiMessageAndVersion(walObjectRecord21, (short) 0)); - records.add(new ApiMessageAndVersion(walObjectRecord01, (short) 0)); - S3StreamsMetadataDelta delta2 = new S3StreamsMetadataDelta(image2); - RecordTestUtils.replayAll(delta2, records); - S3StreamsMetadataImage image3 = delta2.apply(); - - // check the image3 - assertEquals(2, image3.getBrokerWALMetadata().size()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage30 = image3.getBrokerWALMetadata().get(brokerId0); - assertNotNull(brokerS3WALMetadataImage30); - assertEquals(1, brokerS3WALMetadataImage30.getWalObjects().size()); - S3WALObject s3WalObject01 = brokerS3WALMetadataImage30.getWalObjects().get(0); - assertEquals(brokerId0, s3WalObject01.getBrokerId()); - assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject01.getObjectType()); - assertEquals(S3ObjectState.MARK_DESTROYED, s3WalObject01.getS3ObjectState()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage31 = image3.getBrokerWALMetadata().get(brokerId1); - assertNotNull(brokerS3WALMetadataImage31); - assertEquals(2, brokerS3WALMetadataImage31.getWalObjects().size()); - S3WALObject s3WalObject11 = brokerS3WALMetadataImage31.getWalObjects().get(0); - assertEquals(brokerId1, s3WalObject11.getBrokerId()); - assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject11.getObjectType()); - assertEquals(S3ObjectState.CREATED, s3WalObject11.getS3ObjectState()); - Map streamIndexVerify1 = s3WalObject11.getStreamsIndex(); - assertEquals(2, streamIndexVerify1.size()); - assertEquals(0L, streamIndexVerify1.get(streamId0).getStartOffset()); - assertEquals(100L, streamIndexVerify1.get(streamId0).getEndOffset()); - assertEquals(0L, streamIndexVerify1.get(streamId1).getStartOffset()); - assertEquals(200L, streamIndexVerify1.get(streamId1).getEndOffset()); - S3WALObject s3WalObject21 = brokerS3WALMetadataImage31.getWalObjects().get(1); - assertEquals(brokerId1, s3WalObject21.getBrokerId()); - assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject21.getObjectType()); - assertEquals(S3ObjectState.CREATED, s3WalObject21.getS3ObjectState()); - Map streamIndexVerify2 = s3WalObject21.getStreamsIndex(); - assertEquals(2, streamIndexVerify2.size()); - assertEquals(101L, streamIndexVerify2.get(streamId0).getStartOffset()); - assertEquals(200L, streamIndexVerify2.get(streamId0).getEndOffset()); - assertEquals(201L, streamIndexVerify2.get(streamId1).getStartOffset()); - assertEquals(300L, streamIndexVerify2.get(streamId1).getEndOffset()); - - // 5. destroy WALObject0, mark delete WALObject1 and WALObject2, compact these to WALObject3 - RemoveWALObjectRecord removeWALObjectRecord0 = new RemoveWALObjectRecord() - .setObjectId(0L) - .setBrokerId(brokerId0); - WALObjectRecord walObjectRecord12 = new WALObjectRecord() - .setObjectId(1L) - .setBrokerId(brokerId1) - .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) - .setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()); - WALObjectRecord walObjectRecord22 = new WALObjectRecord() - .setObjectId(2L) - .setBrokerId(brokerId1) - .setObjectType((byte) S3ObjectType.WAL_LOOSE.ordinal()) - .setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()); - List streamIndicesInWALObject3 = Arrays.asList( - new S3ObjectStreamIndex(streamId0, 0L, 200L), - new S3ObjectStreamIndex(streamId1, 0L, 300L) - ); - WALObjectRecord walObjectRecord3 = new WALObjectRecord() - .setObjectId(3L) - .setBrokerId(brokerId1) - .setObjectType((byte) S3ObjectType.WAL_MINOR.ordinal()) - .setCommittedTimeInMs(System.currentTimeMillis()) - .setObjectState((byte) S3ObjectState.CREATED.ordinal()) - .setAppliedTimeInMs(System.currentTimeMillis()) - .setObjectSize(WAL_MINOR_COMPACT_SIZE) - .setStreamsIndex(streamIndicesInWALObject3.stream().map(S3ObjectStreamIndex::toRecordStreamIndex).collect( - Collectors.toList())); - records.clear(); - records.add(new ApiMessageAndVersion(removeWALObjectRecord0, (short) 0)); - records.add(new ApiMessageAndVersion(walObjectRecord12, (short) 0)); - records.add(new ApiMessageAndVersion(walObjectRecord22, (short) 0)); - records.add(new ApiMessageAndVersion(walObjectRecord3, (short) 0)); - S3StreamsMetadataDelta delta3 = new S3StreamsMetadataDelta(image3); - RecordTestUtils.replayAll(delta3, records); - S3StreamsMetadataImage image4 = delta3.apply(); - - // check the image4 - assertEquals(2, image4.getBrokerWALMetadata().size()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage40 = image4.getBrokerWALMetadata().get(brokerId0); - assertNotNull(brokerS3WALMetadataImage40); - assertEquals(0, brokerS3WALMetadataImage40.getWalObjects().size()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage41 = image4.getBrokerWALMetadata().get(brokerId1); - assertNotNull(brokerS3WALMetadataImage41); - assertEquals(3, brokerS3WALMetadataImage41.getWalObjects().size()); - S3WALObject s3WalObject12 = brokerS3WALMetadataImage41.getWalObjects().get(0); - assertEquals(brokerId1, s3WalObject12.getBrokerId()); - assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject12.getObjectType()); - assertEquals(S3ObjectState.MARK_DESTROYED, s3WalObject12.getS3ObjectState()); - S3WALObject s3WalObject22 = brokerS3WALMetadataImage41.getWalObjects().get(1); - assertEquals(brokerId1, s3WalObject22.getBrokerId()); - assertEquals(S3ObjectType.WAL_LOOSE, s3WalObject22.getObjectType()); - assertEquals(S3ObjectState.MARK_DESTROYED, s3WalObject22.getS3ObjectState()); - S3WALObject s3WalObject3 = brokerS3WALMetadataImage41.getWalObjects().get(2); - assertEquals(brokerId1, s3WalObject3.getBrokerId()); - assertEquals(S3ObjectType.WAL_MINOR, s3WalObject3.getObjectType()); - assertEquals(S3ObjectState.CREATED, s3WalObject3.getS3ObjectState()); - assertEquals(3L, s3WalObject3.getObjectId()); - Map streamIndexVerify3 = s3WalObject3.getStreamsIndex(); - assertEquals(2, streamIndexVerify3.size()); - assertEquals(0L, streamIndexVerify3.get(streamId0).getStartOffset()); - assertEquals(200L, streamIndexVerify3.get(streamId0).getEndOffset()); - assertEquals(0L, streamIndexVerify3.get(streamId1).getStartOffset()); - assertEquals(300L, streamIndexVerify3.get(streamId1).getEndOffset()); - - // 6. split WALObject3 by streamId to StreamObject4 and StreamObject5 - S3ObjectStreamIndex s3ObjectStreamIndex4 = new S3ObjectStreamIndex(streamId0, 0L, 200L); - S3ObjectStreamIndex s3ObjectStreamIndex5 = new S3ObjectStreamIndex(streamId1, 0L, 300L); - S3StreamObjectRecord streamObjectRecord4 = new S3StreamObjectRecord() - .setObjectId(4L) - .setStreamId(streamId0) - .setObjectSize(STREAM_OBJECT_SIZE) - .setObjectType((byte) S3ObjectType.STREAM.ordinal()) - .setCommittedTimeInMs(System.currentTimeMillis()) - .setStartOffset(s3ObjectStreamIndex4.getStartOffset()) - .setEndOffset(s3ObjectStreamIndex4.getEndOffset()); - S3StreamObjectRecord streamObjectRecord5 = new S3StreamObjectRecord() - .setObjectId(5L) - .setStreamId(streamId1) - .setObjectSize(STREAM_OBJECT_SIZE) - .setObjectType((byte) S3ObjectType.STREAM.ordinal()) - .setCommittedTimeInMs(System.currentTimeMillis()) - .setStartOffset(s3ObjectStreamIndex5.getStartOffset()) - .setEndOffset(s3ObjectStreamIndex5.getEndOffset()); - RemoveWALObjectRecord removeWALObjectRecord3 = new RemoveWALObjectRecord() - .setObjectId(3L) - .setBrokerId(brokerId1); - records.clear(); - records.add(new ApiMessageAndVersion(streamObjectRecord4, (short) 0)); - records.add(new ApiMessageAndVersion(streamObjectRecord5, (short) 0)); - records.add(new ApiMessageAndVersion(removeWALObjectRecord3, (short) 0)); - S3StreamsMetadataDelta delta4 = new S3StreamsMetadataDelta(image4); - RecordTestUtils.replayAll(delta4, records); - S3StreamsMetadataImage image5 = delta4.apply(); - - // check the image5 - assertEquals(2, image5.getBrokerWALMetadata().size()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage50 = image5.getBrokerWALMetadata().get(brokerId0); - assertNotNull(brokerS3WALMetadataImage50); - assertEquals(0, brokerS3WALMetadataImage50.getWalObjects().size()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage51 = image5.getBrokerWALMetadata().get(brokerId1); - assertNotNull(brokerS3WALMetadataImage51); - assertEquals(0, brokerS3WALMetadataImage51.getWalObjects().size()); - assertEquals(2, image5.getStreamsMetadata().size()); - - S3StreamMetadataImage s3StreamMetadataImage50 = image5.getStreamsMetadata().get(streamId0); - assertNotNull(s3StreamMetadataImage50); - assertEquals(1, s3StreamMetadataImage50.getRanges().size()); - assertEquals(1, s3StreamMetadataImage50.getEpoch()); - assertEquals(0, s3StreamMetadataImage50.getStartOffset()); - assertEquals(1, s3StreamMetadataImage50.getStreamObjects()); - S3StreamObject s3StreamObject4 = s3StreamMetadataImage50.getStreamObjects().get(0); - assertEquals(4L, s3StreamObject4.getObjectId()); - assertEquals(STREAM_OBJECT_SIZE, s3StreamObject4.getObjectSize()); - assertEquals(S3ObjectType.STREAM, s3StreamObject4.getObjectType()); - assertEquals(S3ObjectState.CREATED, s3StreamObject4.getS3ObjectState()); - assertEquals(s3ObjectStreamIndex4, s3StreamObject4.getStreamIndex()); - - S3StreamMetadataImage s3StreamMetadataImage51 = image5.getStreamsMetadata().get(streamId1); - assertNotNull(s3StreamMetadataImage51); - assertEquals(1, s3StreamMetadataImage51.getRanges().size()); - assertEquals(1, s3StreamMetadataImage51.getEpoch()); - assertEquals(0, s3StreamMetadataImage51.getStartOffset()); - assertEquals(1, s3StreamMetadataImage51.getStreamObjects()); - S3StreamObject s3StreamObject5 = s3StreamMetadataImage51.getStreamObjects().get(0); - assertEquals(5L, s3StreamObject5.getObjectId()); - assertEquals(STREAM_OBJECT_SIZE, s3StreamObject5.getObjectSize()); - assertEquals(S3ObjectType.STREAM, s3StreamObject5.getObjectType()); - assertEquals(S3ObjectState.CREATED, s3StreamObject5.getS3ObjectState()); - assertEquals(s3ObjectStreamIndex5, s3StreamObject5.getStreamIndex()); - - // 7. remove streamObject4 and remove stream1 - RemoveS3StreamObjectRecord removeStreamObjectRecord4 = new RemoveS3StreamObjectRecord() - .setObjectId(4L) - .setStreamId(streamId0); - RemoveS3StreamRecord removeStreamRecord = new RemoveS3StreamRecord() - .setStreamId(streamId1); - records.clear(); - records.add(new ApiMessageAndVersion(removeStreamObjectRecord4, (short) 0)); - records.add(new ApiMessageAndVersion(removeStreamRecord, (short) 0)); - S3StreamsMetadataDelta delta5 = new S3StreamsMetadataDelta(image5); - RecordTestUtils.replayAll(delta5, records); - S3StreamsMetadataImage image6 = delta5.apply(); - - // check the image6 - assertEquals(2, image6.getBrokerWALMetadata().size()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage60 = image6.getBrokerWALMetadata().get(brokerId0); - assertNotNull(brokerS3WALMetadataImage60); - assertEquals(0, brokerS3WALMetadataImage60.getWalObjects().size()); - BrokerS3WALMetadataImage brokerS3WALMetadataImage61 = image6.getBrokerWALMetadata().get(brokerId1); - assertNotNull(brokerS3WALMetadataImage61); - assertEquals(0, brokerS3WALMetadataImage61.getWalObjects().size()); - - assertEquals(1, image6.getStreamsMetadata().size()); - S3StreamMetadataImage s3StreamMetadataImage60 = image6.getStreamsMetadata().get(streamId0); - assertNotNull(s3StreamMetadataImage60); - assertEquals(1, s3StreamMetadataImage60.getRanges().size()); - assertEquals(0, s3StreamMetadataImage60.getStreamObjects().size()); - } - - private void testToImageAndBack(S3StreamsMetadataImage image) { - RecordListWriter writer = new RecordListWriter(); - image.write(writer, new ImageWriterOptions.Builder().build()); - S3StreamsMetadataDelta delta = new S3StreamsMetadataDelta(S3StreamsMetadataImage.EMPTY); - RecordTestUtils.replayAll(delta, writer.records()); - S3StreamsMetadataImage newImage = delta.apply(); - assertEquals(image, newImage); } }