diff --git a/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json b/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json index 4da2272990..1cdab6226a 100644 --- a/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json +++ b/clients/src/main/resources/common/message/PrepareS3ObjectResponse.json @@ -33,10 +33,10 @@ "about": "Duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, { - "name": "S3ObjectIds", - "type": "[]int64", + "name": "FirstS3ObjectId", + "type": "int64", "versions": "0+", - "about": "The prepared S3 objects' id" + "about": "The first of the prepared S3 objects' id" } ] } \ No newline at end of file diff --git a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java index 006ff50c89..4d0eb081ec 100644 --- a/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java +++ b/core/src/main/scala/kafka/log/s3/objects/ControllerObjectManager.java @@ -65,8 +65,7 @@ public CompletableFuture prepareObject(int count, long ttl) { Errors code = Errors.forCode(resp.errorCode()); switch (code) { case NONE: - // TODO: simply response's data structure, only return first object id is enough - return resp.s3ObjectIds().stream().findFirst().get(); + return resp.firstS3ObjectId(); default: LOGGER.error("Error while preparing {} object, code: {}", count, code); throw code.exception(); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index 1ce2e6823c..0ba64bb1cf 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -132,16 +132,15 @@ public ControllerResult prepareObject(PrepareS3Obje List records = new ArrayList<>(); PrepareS3ObjectResponseData response = new PrepareS3ObjectResponseData(); int count = request.preparedCount(); - List prepareObjectIds = new ArrayList<>(count); // update assigned stream id long newAssignedObjectId = nextAssignedObjectId.get() + count - 1; records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord() .setAssignedS3ObjectId(newAssignedObjectId), (short) 0)); + long firstAssignedObjectId = nextAssignedObjectId.get(); for (int i = 0; i < count; i++) { Long objectId = nextAssignedObjectId.get() + i; - prepareObjectIds.add(objectId); long preparedTs = System.currentTimeMillis(); long expiredTs = preparedTs + request.timeToLiveInMs(); S3ObjectRecord record = new S3ObjectRecord() @@ -151,7 +150,7 @@ public ControllerResult prepareObject(PrepareS3Obje .setExpiredTimeInMs(expiredTs); records.add(new ApiMessageAndVersion(record, (short) 0)); } - response.setS3ObjectIds(prepareObjectIds); + response.setFirstS3ObjectId(firstAssignedObjectId); return ControllerResult.atomicOf(records, response); } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 3ee287fdab..cadf060aaa 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -355,8 +355,6 @@ public ControllerResult deleteStream(DeleteStreamReque } public ControllerResult commitWALObject(CommitWALObjectRequestData data) { - // TODO: deal with compacted objects, mark delete compacted object - // TODO: deal with stream objects, replay streamObjectRecord to advance stream's end offset CommitWALObjectResponseData resp = new CommitWALObjectResponseData(); List records = new ArrayList<>(); long objectId = data.objectId(); @@ -402,11 +400,7 @@ public ControllerResult commitWALObject(CommitWALOb indexes.stream() .map(S3ObjectStreamIndex::toRecordStreamIndex) .collect(Collectors.toList())), (short) 0)); - // generate compacted objects' remove record - data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() - .setObjectId(id), (short) 0))); // create stream object records - // TODO: deal with the lifecycle of stream object's source objects, when and how to delete them ? List streamObjects = data.streamObjects(); streamObjects.stream().forEach(obj -> { long streamId = obj.streamId(); @@ -414,6 +408,9 @@ public ControllerResult commitWALObject(CommitWALOb long endOffset = obj.endOffset(); records.add(new S3StreamObject(obj.objectId(), obj.objectSize(), streamId, startOffset, endOffset).toRecord()); }); + // generate compacted objects' remove record + data.compactedObjectIds().forEach(id -> records.add(new ApiMessageAndVersion(new RemoveWALObjectRecord() + .setObjectId(id), (short) 0))); log.info("[CommitWALObject]: broker: {} commit wal object {} success", brokerId, objectId); return ControllerResult.atomicOf(records, resp); } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index a509d73cf0..055a04da38 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -283,7 +283,6 @@ public long endOffset() { } public S3ObjectMetadata toS3ObjectMetadata() { - // TODO: fill object size later return new S3ObjectMetadata(objectId, -1, objectType); } } diff --git a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java index 369769781a..062153e0a8 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/S3ObjectControlManagerTest.java @@ -27,9 +27,11 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord; +import org.apache.kafka.common.metadata.MetadataRecordType; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.protocol.ApiMessage; @@ -80,6 +82,7 @@ public void testPrepareObject() { .setPreparedCount(3) .setTimeToLiveInMs(60 * 1000)); assertEquals(Errors.NONE.code(), result0.response().errorCode()); + assertEquals(0L, result0.response().firstS3ObjectId()); assertEquals(4, result0.records().size()); ApiMessage message = result0.records().get(0).message(); assertInstanceOf(AssignedS3ObjectIdRecord.class, message); @@ -88,8 +91,7 @@ public void testPrepareObject() { for (int i = 0; i < 3; i++) { verifyPrepareObjectRecord(result0.records().get(i + 1), i, 60 * 1000); } - manager.replay(assignedRecord); - result0.records().stream().skip(1).map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record)); + replay(manager, result0.records()); // verify the 3 objects are prepared assertEquals(3, manager.objectsMetadata().size()); @@ -99,6 +101,46 @@ public void testPrepareObject() { assertEquals(60 * 1000, s3Object.getExpiredTimeInMs() - s3Object.getPreparedTimeInMs()); }); assertEquals(3, manager.nextAssignedObjectId()); + + // 2. prepare 5 objects + ControllerResult result1 = manager.prepareObject(new PrepareS3ObjectRequestData() + .setBrokerId(BROKER1) + .setPreparedCount(5) + .setTimeToLiveInMs(60 * 1000)); + assertEquals(Errors.NONE.code(), result1.response().errorCode()); + assertEquals(3L, result1.response().firstS3ObjectId()); + assertEquals(6, result1.records().size()); + replay(manager, result1.records()); + + // verify the 5 objects are prepared + assertEquals(8, manager.objectsMetadata().size()); + manager.objectsMetadata().forEach((id, s3Object) -> { + assertEquals(S3ObjectState.PREPARED, s3Object.getS3ObjectState()); + assertEquals(id, s3Object.getObjectId()); + assertEquals(60 * 1000, s3Object.getExpiredTimeInMs() - s3Object.getPreparedTimeInMs()); + }); + assertEquals(8, manager.nextAssignedObjectId()); + } + + private void replay(S3ObjectControlManager manager, List records) { + List messages = records.stream().map(x -> x.message()) + .collect(Collectors.toList()); + for (ApiMessage message : messages) { + MetadataRecordType type = MetadataRecordType.fromId(message.apiKey()); + switch (type) { + case ASSIGNED_S3_OBJECT_ID_RECORD: + manager.replay((AssignedS3ObjectIdRecord) message); + break; + case S3_OBJECT_RECORD: + manager.replay((S3ObjectRecord) message); + break; + case REMOVE_S3_OBJECT_RECORD: + manager.replay((RemoveS3ObjectRecord) message); + break; + default: + throw new IllegalStateException("Unknown metadata record type " + type); + } + } } private void verifyPrepareObjectRecord(ApiMessageAndVersion result, long expectedObjectId, long expectedTimeToLiveInMs) {