From 814f0d710be85cffabccfd966bddb57a211ad6ed Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 23 Aug 2023 17:06:35 +0800 Subject: [PATCH 1/4] ci(s3): add Kafka on S3 related ut ci workflow 1. add Kafka on S3 related ut ci workflow Signed-off-by: TheR1sing3un --- .github/workflows/es_unit_tests.yml | 2 +- build.gradle | 30 +++++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/.github/workflows/es_unit_tests.yml b/.github/workflows/es_unit_tests.yml index 5fd32fb44e..c94fbe2cb6 100644 --- a/.github/workflows/es_unit_tests.yml +++ b/.github/workflows/es_unit_tests.yml @@ -32,4 +32,4 @@ jobs: - name: Setup Gradle uses: gradle/gradle-build-action@v2 - name: Execute Gradle build - run: ./gradlew metadata:esUnitTest core:esUnitTest + run: ./gradlew metadata:esUnitTest core:esUnitTest metadata:S3UnitTest core:S3UnitTest diff --git a/build.gradle b/build.gradle index e07c7812a1..c4311a0ff1 100644 --- a/build.gradle +++ b/build.gradle @@ -498,6 +498,36 @@ subprojects { } } + tasks.register('S3UnitTest', Test) { + description = 'Runs unit tests for Kafka on S3.' + dependsOn compileJava + + maxParallelForks = maxTestForks + ignoreFailures = userIgnoreFailures + + maxHeapSize = defaultMaxHeapSize + jvmArgs = defaultJvmArgs + + testLogging { + events = userTestLoggingEvents ?: testLoggingEvents + showStandardStreams = userShowStandardStreams ?: testShowStandardStreams + exceptionFormat = testExceptionFormat + displayGranularity = 0 + } + logTestStdout.rehydrate(delegate, owner, this)() + + exclude testsToExclude + + useJUnitPlatform { + includeTags 'S3Unit' + } + + retry { + maxRetries = userMaxTestRetries + maxFailures = userMaxTestRetryFailures + } + } + tasks.register('esIntegrationTest', Test) { description = 'Runs integration tests for elastic stream storage.' dependsOn compileJava From c54b0e97da9febf6594169cb28ae2201164e643c Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 23 Aug 2023 17:47:09 +0800 Subject: [PATCH 2/4] feat(s3): support prepare object logic in controller 1. support prepare object logic in controller Signed-off-by: TheR1sing3un --- .../stream/S3ObjectControlManager.java | 37 ++++++-- .../kafka/metadata/stream/S3Object.java | 90 +++++++++---------- .../common/metadata/S3ObjectRecord.json | 4 +- 3 files changed, 75 insertions(+), 56 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 6f228d55b8..8360139af9 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -121,15 +121,32 @@ public Long nextAssignedObjectId() { return nextAssignedObjectId; } - public ControllerResult prepareObject(PrepareS3ObjectRequestData data) { - throw new UnsupportedOperationException(); + public ControllerResult prepareObject(PrepareS3ObjectRequestData request) { + List records = new ArrayList<>(); + PrepareS3ObjectResponseData response = new PrepareS3ObjectResponseData(); + int count = request.preparedCount(); + List prepareObjectIds = new ArrayList<>(count); + for (int i = 0; i < count; i++) { + Long objectId = nextAssignedObjectId + i; + prepareObjectIds.add(objectId); + long preparedTs = System.currentTimeMillis(); + long expiredTs = preparedTs + request.timeToLiveInMs(); + S3ObjectRecord record = new S3ObjectRecord() + .setObjectId(objectId) + .setObjectState((byte) S3ObjectState.PREPARED.ordinal()) + .setPreparedTimeInMs(preparedTs) + .setExpiredTimeInMs(expiredTs); + records.add(new ApiMessageAndVersion(record, (short) 0)); + } + response.setS3ObjectIds(prepareObjectIds); + return ControllerResult.of(records, response); } public void replay(S3ObjectRecord record) { GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId()); String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx); S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey, - record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), + record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), S3ObjectState.fromByte(record.objectState())); objectsMetadata.put(record.objectId(), object); // TODO: recover the prepared objects and mark destroyed objects when restart the controller @@ -161,11 +178,11 @@ public ControllerResult checkS3ObjectsLifecycle() { S3ObjectRecord record = new S3ObjectRecord() .setObjectId(obj.getObjectId()) .setObjectState((byte) S3ObjectState.DESTROYED.ordinal()) - .setObjectSize(obj.getObjectSize().get()) - .setAppliedTimeInMs(obj.getAppliedTimeInMs().get()) - .setExpiredTimeInMs(obj.getExpiredTimeInMs().get()) - .setCommittedTimeInMs(obj.getCommittedTimeInMs().get()) - .setDestroyedTimeInMs(obj.getDestroyedTimeInMs().get()); + .setObjectSize(obj.getObjectSize()) + .setPreparedTimeInMs(obj.getPreparedTimeInMs()) + .setExpiredTimeInMs(obj.getExpiredTimeInMs()) + .setCommittedTimeInMs(obj.getCommittedTimeInMs()) + .setDestroyedTimeInMs(obj.getDestroyedTimeInMs()); // generate the records which mark the expired objects as destroyed records.add(new ApiMessageAndVersion(record, (short) 0)); // generate the records which listener reply for the object-destroy events @@ -177,7 +194,7 @@ public ControllerResult checkS3ObjectsLifecycle() { // check the mark destroyed objects ObjectPair[] destroyedObjects = this.markDestroyedObjects.stream() // must guarantee that the objects in markDestroyedObjects also exist in objectsMetadata - .map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey().get())) + .map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey())) .toArray(ObjectPair[]::new); String[] destroyedObjectKeys = Arrays.stream(destroyedObjects).map(ObjectPair::objectKey).toArray(String[]::new); Set destroyedObjectIds = Arrays.stream(destroyedObjects).map(ObjectPair::objectId).collect(Collectors.toSet()); @@ -203,6 +220,7 @@ public ControllerResult checkS3ObjectsLifecycle() { /** * Generate RemoveS3ObjectRecord for the deleted S3Objects. + * * @param deletedObjectIds the deleted S3Objects' ids * @return the result of the generation, contains the records which should be applied to the raft. */ @@ -230,6 +248,7 @@ public interface S3ObjectLifeCycleListener { } static class ObjectPair { + private final Long objectId; private final String objectKey; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index 1829458473..8f163f9f8a 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -27,54 +27,54 @@ */ public class S3Object implements Comparable { - protected final Long objectId; + private final long objectId; - protected Optional objectSize = Optional.empty(); + private long objectSize = -1; - protected Optional objectKey = Optional.empty(); + private String objectKey; /** - * The time when broker apply the object + * The time when broker ask for preparing the object */ - protected Optional appliedTimeInMs = Optional.empty(); + private long preparedTimeInMs; /** * The time when the object will be expired if it is not be committed */ - protected Optional expiredTimeInMs = Optional.empty(); + private long expiredTimeInMs; /** * The time when the object is committed */ - protected Optional committedTimeInMs = Optional.empty(); + private long committedTimeInMs; /** * The time when the object is destroyed */ - protected Optional destroyedTimeInMs = Optional.empty(); + private long destroyedTimeInMs; - protected S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED; + private S3ObjectState s3ObjectState = S3ObjectState.UNINITIALIZED; - protected S3Object(final Long objectId) { + private S3Object(final long objectId) { this.objectId = objectId; } public S3Object( - final Long objectId, - final Long objectSize, + final long objectId, + final long objectSize, final String objectKey, - final Long appliedTimeInMs, - final Long expiredTimeInMs, - final Long committedTimeInMs, - final Long destroyedTimeInMs, + final long preparedTimeInMs, + final long expiredTimeInMs, + final long committedTimeInMs, + final long destroyedTimeInMs, final S3ObjectState s3ObjectState) { this.objectId = objectId; - this.objectSize = Optional.of(objectSize); - this.objectKey = Optional.of(objectKey); - this.appliedTimeInMs = Optional.of(appliedTimeInMs); - this.expiredTimeInMs = Optional.of(expiredTimeInMs); - this.committedTimeInMs = Optional.of(committedTimeInMs); - this.destroyedTimeInMs = Optional.of(destroyedTimeInMs); + this.objectSize = objectSize; + this.objectKey = objectKey; + this.preparedTimeInMs = preparedTimeInMs; + this.expiredTimeInMs = expiredTimeInMs; + this.committedTimeInMs = committedTimeInMs; + this.destroyedTimeInMs = destroyedTimeInMs; this.s3ObjectState = s3ObjectState; } @@ -83,7 +83,7 @@ public void onApply() { throw new IllegalStateException("Object is not in UNINITIALIZED state"); } this.s3ObjectState = S3ObjectState.PREPARED; - this.appliedTimeInMs = Optional.of(System.currentTimeMillis()); + this.preparedTimeInMs = System.currentTimeMillis(); } public void onCreate(S3ObjectCommitContext createContext) { @@ -92,9 +92,9 @@ public void onCreate(S3ObjectCommitContext createContext) { throw new IllegalStateException("Object is not in APPLIED state"); } this.s3ObjectState = S3ObjectState.COMMITTED; - this.committedTimeInMs = Optional.of(createContext.committedTimeInMs); - this.objectSize = Optional.of(createContext.objectSize); - this.objectKey = Optional.of(createContext.objectAddress); + this.committedTimeInMs = createContext.committedTimeInMs; + this.objectSize = createContext.objectSize; + this.objectKey = createContext.objectAddress; } public void onMarkDestroy() { @@ -115,23 +115,23 @@ public void onDestroy() { public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new S3ObjectRecord() .setObjectId(objectId) - .setObjectSize(objectSize.orElse(null)) + .setObjectSize(objectSize) .setObjectState((byte) s3ObjectState.ordinal()) - .setAppliedTimeInMs(appliedTimeInMs.orElse(null)) - .setExpiredTimeInMs(expiredTimeInMs.orElse(null)) - .setCommittedTimeInMs(committedTimeInMs.orElse(null)) - .setDestroyedTimeInMs(destroyedTimeInMs.orElse(null)), (short) 0); + .setPreparedTimeInMs(preparedTimeInMs) + .setExpiredTimeInMs(expiredTimeInMs) + .setCommittedTimeInMs(committedTimeInMs) + .setDestroyedTimeInMs(destroyedTimeInMs), (short) 0); } static public class S3ObjectCommitContext { - private final Long committedTimeInMs; - private final Long objectSize; + private final long committedTimeInMs; + private final long objectSize; private final String objectAddress; public S3ObjectCommitContext( - final Long committedTimeInMs, - final Long objectSize, + final long committedTimeInMs, + final long objectSize, final String objectAddress) { this.committedTimeInMs = committedTimeInMs; this.objectSize = objectSize; @@ -141,7 +141,7 @@ public S3ObjectCommitContext( @Override public int compareTo(S3Object o) { - return this.objectId.compareTo(o.objectId); + return Long.compare(this.objectId, o.objectId); } @Override @@ -161,31 +161,31 @@ public int hashCode() { return Objects.hash(objectId); } - public Long getObjectId() { + public long getObjectId() { return objectId; } - public Optional getObjectSize() { + public long getObjectSize() { return objectSize; } - public Optional getObjectKey() { + public String getObjectKey() { return objectKey; } - public Optional getAppliedTimeInMs() { - return appliedTimeInMs; + public long getPreparedTimeInMs() { + return preparedTimeInMs; } - public Optional getCommittedTimeInMs() { + public long getCommittedTimeInMs() { return committedTimeInMs; } - public Optional getDestroyedTimeInMs() { + public long getDestroyedTimeInMs() { return destroyedTimeInMs; } - public Optional getExpiredTimeInMs() { + public long getExpiredTimeInMs() { return expiredTimeInMs; } @@ -194,6 +194,6 @@ public S3ObjectState getS3ObjectState() { } public boolean isExpired() { - return System.currentTimeMillis() > expiredTimeInMs.get(); + return System.currentTimeMillis() > expiredTimeInMs; } } diff --git a/metadata/src/main/resources/common/metadata/S3ObjectRecord.json b/metadata/src/main/resources/common/metadata/S3ObjectRecord.json index 1d620125b3..006af11595 100644 --- a/metadata/src/main/resources/common/metadata/S3ObjectRecord.json +++ b/metadata/src/main/resources/common/metadata/S3ObjectRecord.json @@ -33,10 +33,10 @@ "about": "The object size of the S3 object" }, { - "name": "AppliedTimeInMs", + "name": "PreparedTimeInMs", "type": "int64", "versions": "0+", - "about": "The object be applied timestamp" + "about": "The object be prepared timestamp" }, { "name": "ExpiredTimeInMs", From f6ecc23572d30413065afa4bb19d3e755b5be4b7 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 23 Aug 2023 17:58:22 +0800 Subject: [PATCH 3/4] test(s3): add S3ObjectControlManagerTest 1. add S3ObjectControlManagerTest Signed-off-by: TheR1sing3un --- .../S3ObjectControlManagerTest.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) create mode 100644 metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java new file mode 100644 index 0000000000..c97a66c513 --- /dev/null +++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller; + +import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.stream.S3ObjectControlManager; +import org.apache.kafka.metadata.stream.S3Config; +import org.apache.kafka.timeline.SnapshotRegistry; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Timeout; +import org.mockito.Mockito; + +@Timeout(40) +public class S3ObjectControlManagerTest { + + private static final String CLUSTER = "kafka-on-S3_cluster"; + private static final String S3_REGION = "us-east-1"; + private static final String S3_BUCKET = "kafka-on-S3-bucket"; + + private static final S3Config S3_CONFIG = new S3Config(S3_REGION, S3_BUCKET); + private S3ObjectControlManager manager; + + private QuorumController controller; + + @BeforeEach + private void setUp() { + controller = Mockito.mock(QuorumController.class); + LogContext logContext = new LogContext(); + SnapshotRegistry registry = new SnapshotRegistry(logContext); + manager = new S3ObjectControlManager(controller, registry, logContext, CLUSTER, S3_CONFIG); + + } + +} From 680693c48e91aa48925968cfb2dc249bbeead2e7 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 23 Aug 2023 20:01:30 +0800 Subject: [PATCH 4/4] test(s3): complete S3ObjectControlManger::prepare test 1. complete S3ObjectControlManger::prepare test Signed-off-by: TheR1sing3un --- .../stream/S3ObjectControlManager.java | 10 ++++ .../kafka/metadata/stream/S3Object.java | 1 - .../S3ObjectControlManagerTest.java | 48 ++++++++++++++++++- 3 files changed, 56 insertions(+), 3 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 8360139af9..d1710bc414 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Map; import java.util.OptionalLong; import java.util.Queue; import java.util.Set; @@ -122,6 +123,7 @@ public Long nextAssignedObjectId() { } public ControllerResult prepareObject(PrepareS3ObjectRequestData request) { + // TODO: support batch prepare objects List records = new ArrayList<>(); PrepareS3ObjectResponseData response = new PrepareS3ObjectResponseData(); int count = request.preparedCount(); @@ -233,6 +235,14 @@ public ControllerResult notifyS3ObjectDeleted(Set deletedObjectIds) return ControllerResult.of(records, null); } + public Queue preparedObjects() { + return preparedObjects; + } + + public Map objectsMetadata() { + return objectsMetadata; + } + /** * S3Object's lifecycle listener. */ diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index 8f163f9f8a..dafcfc4f90 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -18,7 +18,6 @@ package org.apache.kafka.metadata.stream; import java.util.Objects; -import java.util.Optional; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java index c97a66c513..4a32f62492 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java @@ -17,17 +17,29 @@ package org.apache.kafka.controller; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertInstanceOf; + +import org.apache.kafka.common.message.PrepareS3ObjectRequestData; +import org.apache.kafka.common.message.PrepareS3ObjectResponseData; +import org.apache.kafka.common.metadata.S3ObjectRecord; +import org.apache.kafka.common.protocol.ApiMessage; +import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.controller.stream.S3ObjectControlManager; import org.apache.kafka.metadata.stream.S3Config; +import org.apache.kafka.metadata.stream.S3ObjectState; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.mockito.Mockito; @Timeout(40) public class S3ObjectControlManagerTest { - + private static final int BROKER0 = 0; + private static final int BROKER1 = 1; private static final String CLUSTER = "kafka-on-S3_cluster"; private static final String S3_REGION = "us-east-1"; private static final String S3_BUCKET = "kafka-on-S3-bucket"; @@ -38,12 +50,44 @@ public class S3ObjectControlManagerTest { private QuorumController controller; @BeforeEach - private void setUp() { + public void setUp() { controller = Mockito.mock(QuorumController.class); LogContext logContext = new LogContext(); SnapshotRegistry registry = new SnapshotRegistry(logContext); manager = new S3ObjectControlManager(controller, registry, logContext, CLUSTER, S3_CONFIG); + } + + @Test + public void testBasicPrepareObject() { + // 1. prepare 3 objects + ControllerResult result0 = manager.prepareObject(new PrepareS3ObjectRequestData() + .setBrokerId(BROKER0) + .setPreparedCount(3) + .setTimeToLiveInMs(1000)); + assertEquals(Errors.NONE.code(), result0.response().errorCode()); + assertEquals(3, result0.records().size()); + for (int i = 0; i < 3; i++) { + verifyPrepareObjectRecord(result0.records().get(i), i, 1000); + } + result0.records().stream().map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record)); + + // verify the 3 objects are prepared + assertEquals(3, manager.objectsMetadata().size()); + manager.objectsMetadata().forEach((id, s3Object) -> { + assertEquals(S3ObjectState.PREPARED, s3Object.getS3ObjectState()); + assertEquals(id, s3Object.getObjectId()); + assertEquals(1000, s3Object.getExpiredTimeInMs() - s3Object.getPreparedTimeInMs()); + }); + assertEquals(3, manager.nextAssignedObjectId()); + } + private void verifyPrepareObjectRecord(ApiMessageAndVersion result, long expectedObjectId, long expectedTimeToLiveInMs) { + ApiMessage message = result.message(); + assertInstanceOf(S3ObjectRecord.class, message); + S3ObjectRecord record = (S3ObjectRecord) message; + assertEquals(expectedObjectId, record.objectId()); + assertEquals(expectedTimeToLiveInMs, record.expiredTimeInMs() - record.preparedTimeInMs()); + assertEquals((byte) S3ObjectState.PREPARED.ordinal(), record.objectState()); } }