diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index cf243f49a2..7aea30a045 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -18,7 +18,9 @@ package org.apache.kafka.controller.stream; import com.automq.stream.s3.Config; +import com.automq.stream.s3.compact.CompactOperations; import com.automq.stream.s3.metadata.ObjectUtils; +import com.automq.stream.s3.objects.ObjectAttributes; import com.automq.stream.s3.operator.S3Operator; import java.util.ArrayList; import java.util.Arrays; @@ -219,27 +221,60 @@ public ControllerResult commitObject(long objectId, long objectSize, lon } public ControllerResult markDestroyObjects(List objects) { + return markDestroyObjects(objects, Collections.nCopies(objects.size(), CompactOperations.DELETE)); + } + + public ControllerResult markDestroyObjects(List objects, List operations) { AutoMQVersion version = this.version.get(); short objectRecordVersion = version.objectRecordVersion(); List records = new ArrayList<>(); - for (Long objectId : objects) { + for (int i = 0; i < objects.size(); i++) { + Long objectId = objects.get(i); + CompactOperations operation = operations.get(i); S3Object object = this.objectsMetadata.get(objectId); if (object == null || object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) { log.error("object {} not exist when mark destroy object", objectId); return ControllerResult.of(Collections.emptyList(), false); } - S3ObjectRecord record = new S3ObjectRecord() - .setObjectId(objectId) - .setObjectSize(object.getObjectSize()) - .setObjectState(S3ObjectState.MARK_DESTROYED.toByte()) - .setPreparedTimeInMs(object.getPreparedTimeInMs()) - .setExpiredTimeInMs(object.getExpiredTimeInMs()) - .setCommittedTimeInMs(object.getCommittedTimeInMs()) - .setMarkDestroyedTimeInMs(System.currentTimeMillis()); - if (version.isCompositeObjectSupported()) { - record.setAttributes(object.getAttributes()); + switch (operation) { + case DELETE: { + S3ObjectRecord record = new S3ObjectRecord() + .setObjectId(objectId) + .setObjectSize(object.getObjectSize()) + .setObjectState(S3ObjectState.MARK_DESTROYED.toByte()) + .setPreparedTimeInMs(object.getPreparedTimeInMs()) + .setExpiredTimeInMs(object.getExpiredTimeInMs()) + .setCommittedTimeInMs(object.getCommittedTimeInMs()) + .setMarkDestroyedTimeInMs(System.currentTimeMillis()); + if (version.isCompositeObjectSupported()) { + record.setAttributes(object.getAttributes()); + } + records.add(new ApiMessageAndVersion(record, objectRecordVersion)); + break; + } + case KEEP_DATA: { + records.add(new ApiMessageAndVersion(new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0)); + break; + } + case DEEP_DELETE: { + S3ObjectRecord record = new S3ObjectRecord() + .setObjectId(objectId) + .setObjectSize(object.getObjectSize()) + .setObjectState(S3ObjectState.MARK_DESTROYED.toByte()) + .setPreparedTimeInMs(object.getPreparedTimeInMs()) + .setExpiredTimeInMs(object.getExpiredTimeInMs()) + .setCommittedTimeInMs(object.getCommittedTimeInMs()) + .setMarkDestroyedTimeInMs(System.currentTimeMillis()); + if (version.isCompositeObjectSupported()) { + int attributes = ObjectAttributes.builder(object.getAttributes()).deepDelete().build().attributes(); + record.setAttributes(attributes); + } + records.add(new ApiMessageAndVersion(record, objectRecordVersion)); + break; + } + default: + throw new IllegalArgumentException("Unknown operation: " + operation); } - records.add(new ApiMessageAndVersion(record, objectRecordVersion)); } return ControllerResult.atomicOf(records, true); } @@ -313,7 +348,7 @@ public ControllerResult checkS3ObjectsLifecycle() { log.info("objects TTL is reached, objects={}", ttlReachedObjects); } // check the mark destroyed objects - List requiredDeleteKeys = new LinkedList<>(); + List requiredDeleteKeys = new LinkedList<>(); while (true) { Long objectId = this.markDestroyedObjects.peek(); if (objectId == null) { @@ -328,7 +363,7 @@ public ControllerResult checkS3ObjectsLifecycle() { if (object.getMarkDestroyedTimeInMs() + (this.config.objectRetentionTimeInSecond() * 1000L) < System.currentTimeMillis()) { this.markDestroyedObjects.poll(); // exceed delete retention time, trigger the truly deletion - requiredDeleteKeys.add(object.getObjectKey()); + requiredDeleteKeys.add(object); } else { // the following objects' mark destroyed time is not expired, so break the loop break; @@ -382,7 +417,9 @@ class ObjectCleaner { public static final int MAX_BATCH_DELETE_SIZE = 800; - CompletableFuture clean(List objectKeys) { + CompletableFuture clean(List objects) { + // TODO: replace with object storage + List objectKeys = objects.stream().map(S3Object::getObjectKey).collect(Collectors.toList()); List> cfList = new LinkedList<>(); for (int i = 0; i < objectKeys.size() / MAX_BATCH_DELETE_SIZE; i++) { List batch = objectKeys.subList(i * MAX_BATCH_DELETE_SIZE, (i + 1) * MAX_BATCH_DELETE_SIZE); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 83f518c09a..919b4a365f 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -17,6 +17,7 @@ package org.apache.kafka.controller.stream; +import com.automq.stream.s3.compact.CompactOperations; import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamOffsetRange; import com.automq.stream.s3.metadata.StreamState; @@ -538,7 +539,8 @@ public ControllerResult deleteStream(DeleteStreamRequest r .setStreamId(streamId), (short) 0)); // generate stream objects destroy records List streamObjectIds = new ArrayList<>(streamMetadata.streamObjects().keySet()); - ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects(streamObjectIds); + // deep delete the composite object: delete the composite object and it's linked objects + ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects(streamObjectIds, Collections.nCopies(streamObjectIds.size(), CompactOperations.DEEP_DELETE)); if (!markDestroyResult.response()) { log.error("[DELETE_STREAM],[FAIL]: failed to mark destroy stream objects. streamId={}, objects={}", streamId, streamObjectIds); resp.setErrorCode(Errors.STREAM_INNER_ERROR.code()); @@ -761,7 +763,13 @@ public ControllerResult commitStreamObject(Commi long dataTs = committedTs; // mark destroy compacted object if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) { - ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(sourceObjectIds); + List operations; + if (data.operations().isEmpty()) { + operations = Collections.nCopies(sourceObjectIds.size(), CompactOperations.DELETE); + } else { + operations = data.operations().stream().map(v -> CompactOperations.fromValue(v)).collect(Collectors.toList()); + } + ControllerResult destroyResult = this.s3ObjectControlManager.markDestroyObjects(sourceObjectIds, operations); if (!destroyResult.response()) { log.error("[CommitStreamObject]: failed to mark destroy compacted objects. compactedObjects={}, req={}", sourceObjectIds, data); diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java index 3ca79b8079..afef92e069 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3ObjectState.java @@ -18,21 +18,36 @@ package org.apache.kafka.metadata.stream; public enum S3ObjectState { - UNINITIALIZED, - PREPARED, - COMMITTED, - MARK_DESTROYED, - DESTROYED; + UNINITIALIZED((byte) 0), + PREPARED((byte) 1), + COMMITTED((byte) 2), + MARK_DESTROYED((byte) 3), + DESTROYED((byte) 4); + + private final byte value; + + S3ObjectState(byte value) { + this.value = value; + } public byte toByte() { - return (byte) ordinal(); + return value; } public static S3ObjectState fromByte(Byte b) { - int ordinal = b.intValue(); - if (ordinal < 0 || ordinal >= values().length) { - throw new IllegalArgumentException("Invalid ObjectState ordinal " + ordinal); + switch (b) { + case 0: + return UNINITIALIZED; + case 1: + return PREPARED; + case 2: + return COMMITTED; + case 3: + return MARK_DESTROYED; + case 4: + return DESTROYED; + default: + throw new IllegalArgumentException("Unknown value: " + b); } - return values()[ordinal]; } } \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java index 48ee0656c6..621438c01a 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java @@ -904,7 +904,7 @@ public void testCommitStreamObjectForFencedStream() { public void testCommitStreamObject() { Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong(), anyInt())) .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); - Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + Mockito.when(objectControlManager.markDestroyObjects(anyList(), anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); registerAlwaysSuccessEpoch(BROKER0); // 1. create and open stream_0 and stream_1 @@ -987,7 +987,7 @@ public void testCommitStreamObject() { assertEquals(400L, response.streamMetadataList().get(1).endOffset()); // 6. compact a stream object from invalid source object - Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false)); + Mockito.when(objectControlManager.markDestroyObjects(anyList(), anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false)); streamObjectRequest = new CommitStreamObjectRequestData() .setObjectId(5L) .setStreamId(STREAM1) @@ -1012,6 +1012,7 @@ private void mockData0() { Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong(), anyInt())) .thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE)); Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); + Mockito.when(objectControlManager.markDestroyObjects(anyList(), anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true)); registerAlwaysSuccessEpoch(BROKER0); registerAlwaysSuccessEpoch(BROKER1); diff --git a/s3stream/src/main/java/com/automq/stream/s3/CompositeObject.java b/s3stream/src/main/java/com/automq/stream/s3/CompositeObject.java index 11f773ad72..8f32d1a9b2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/CompositeObject.java +++ b/s3stream/src/main/java/com/automq/stream/s3/CompositeObject.java @@ -12,40 +12,44 @@ package com.automq.stream.s3; import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.objects.ObjectAttributes; import com.automq.stream.s3.operator.ObjectStorage; import com.automq.stream.s3.operator.Writer; +import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * CompositeObject is a logic object which soft links multiple objects together. *

* v0 format: * objects - * object_count u32 - * objects ( - * object_id u64 - * block_start_index u32 - * bucket_index u16 - * )* + * object_count u32 + * objects ( + * object_id u64 + * block_start_index u32 + * bucket_index u16 + * )* * indexes - * index_count u32 - * ( - * stream_id u64 - * start_offset u64 - * end_offset_delta u32 - * record_count u32 - * block_start_position u64 - * block_size u32 - * )* + * index_count u32 + * ( + * stream_id u64 + * start_offset u64 + * end_offset_delta u32 + * record_count u32 + * block_start_position u64 + * block_size u32 + * )* * index_handle - * position u64 - * length u32 + * position u64 + * length u32 * padding 40byte - 8 - 8 - 4 * magic u64 - * - * */ public class CompositeObject { + private static final Logger LOGGER = LoggerFactory.getLogger(CompositeObject.class); public static final byte OBJECTS_BLOCK_MAGIC = 0x52; public static final int OBJECT_BLOCK_HEADER_SIZE = 1 /* magic */ + 4 /* objects count */; public static final int OBJECT_UNIT_SIZE = 8 /* objectId */ + 4 /* blockStartIndex */ + 2 /* bucketId */; @@ -57,14 +61,39 @@ public static CompositeObjectReader reader(S3ObjectMetadata objectMetadata, Obje return new CompositeObjectReader(objectMetadata, rangeReader); } + public static CompositeObjectReader reader(S3ObjectMetadata objectMetadata, ObjectStorage objectStorage) { + return new CompositeObjectReader(objectMetadata, (metadata, startOffset, endOffset) -> objectStorage.rangeRead(ObjectStorage.ReadOptions.DEFAULT, metadata, startOffset, endOffset)); + } + public static CompositeObjectWriter writer(Writer writer) { return new CompositeObjectWriter(writer); } public static CompletableFuture delete(S3ObjectMetadata objectMetadata, ObjectStorage objectStorage) { + @SuppressWarnings("resource") + CompositeObjectReader reader = reader(objectMetadata, objectStorage); // 1. use reader to get all linked object - // 2. delete linked object - // 3. delete composite object - throw new UnsupportedOperationException(); + return reader.basicObjectInfo().thenCompose(info -> { + // 2. delete linked object + List objectIndexes = ((CompositeObjectReader.BasicObjectInfoExt) info).objectsBlock().indexes(); + List metadataList = objectIndexes + .stream() + .map(o -> new S3ObjectMetadata(o.objectId(), ObjectAttributes.builder().bucket(o.bucketId()).build().attributes())) + .collect(Collectors.toList()); + return objectStorage.delete(metadataList) + .thenApply(rst -> objectIndexes.stream().map(o -> o.bucketId() + "/" + o.objectId()).collect(Collectors.toList())); + }).thenCompose(linkedObjects -> { + // 3. delete composite object + return objectStorage.delete(List.of(objectMetadata)).thenAccept(rst -> + LOGGER.info("Delete composite object {}/{} success, linked objects: {}", + ObjectAttributes.from(objectMetadata.attributes()).bucket(), objectMetadata.objectId(), linkedObjects) + ); + }).thenAccept(rst -> { + }).whenComplete((rst, ex) -> { + reader.release(); + if (ex != null) { + LOGGER.error("Delete composite object {} fail", objectMetadata, ex); + } + }); } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactOperations.java b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactOperations.java index 1711798cc1..5bea07688c 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/CompactOperations.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/CompactOperations.java @@ -12,8 +12,14 @@ package com.automq.stream.s3.compact; public enum CompactOperations { + // - normal object: delete the object + // - composite object: delete the composite object DELETE((byte) 0), - KEEP_DATA((byte) 1); + // only delete the metadata in KRaft + KEEP_DATA((byte) 1), + // - normal object: delete the object + // - composite object: delete the composite object and all its linked objects + DEEP_DELETE((byte) 2); private final byte value; @@ -31,6 +37,8 @@ public static CompactOperations fromValue(byte value) { return DELETE; case 1: return KEEP_DATA; + case 2: + return DEEP_DELETE; default: throw new IllegalArgumentException("Unknown value: " + value); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/compact/StreamObjectCompactor.java b/s3stream/src/main/java/com/automq/stream/s3/compact/StreamObjectCompactor.java index e95e229314..f2078e0ea2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/compact/StreamObjectCompactor.java +++ b/s3stream/src/main/java/com/automq/stream/s3/compact/StreamObjectCompactor.java @@ -192,7 +192,7 @@ private void cleanupExpiredObject( int start = i; int end = Math.min(i + EXPIRED_OBJECTS_CLEAN_UP_STEP, expiredObjectCount); List subCompactedObjectIds = new ArrayList<>(compactedObjectIds.subList(start, end)); - List operations = subCompactedObjectIds.stream().map(id -> CompactOperations.DELETE).collect(Collectors.toList()); + List operations = subCompactedObjectIds.stream().map(id -> CompactOperations.DEEP_DELETE).collect(Collectors.toList()); request = new CompactStreamObjectRequest(ObjectUtils.NOOP_OBJECT_ID, 0, stream.streamId(), stream.streamEpoch(), NOOP_OFFSET, NOOP_OFFSET, subCompactedObjectIds, operations, ObjectAttributes.DEFAULT.attributes()); objectManager.compactStreamObject(request).get(); @@ -373,6 +373,7 @@ private void compactCompositeObject(S3ObjectMetadata objectMetadata, ObjectReade List dataBlockIndexes = info.indexBlock().indexes(); for (ObjectIndex linkedObjectIndex : linkedObjectIndexes) { boolean hasLiveBlocks = false; + S3ObjectMetadata linkedObjectMetadata = new S3ObjectMetadata(linkedObjectIndex.objectId(), ObjectAttributes.builder().bucket(linkedObjectIndex.bucketId()).build().attributes()); for (int j = linkedObjectIndex.blockStartIndex(); j < linkedObjectIndex.blockEndIndex(); j++) { DataBlockIndex dataBlockIndex = dataBlockIndexes.get(j); if (dataBlockIndex.endOffset() <= startOffset) { @@ -382,14 +383,14 @@ private void compactCompositeObject(S3ObjectMetadata objectMetadata, ObjectReade break; } if (!hasLiveBlocks) { - // The linked object is fully expired, so we could delete the object from object storage. - compactedObjectIds.add(linkedObjectIndex.objectId()); - operations.add(CompactOperations.DELETE); + // The linked object is fully expired, and there won't be any access to it. + // So we could directly delete the object from object storage. + objectStorage.delete(List.of(linkedObjectMetadata)).get(); } else { // Keep all blocks in the linked object even part of them are expired. // So we could get more precise composite object retained size. objectWriter.addComponent( - new S3ObjectMetadata(linkedObjectIndex.objectId(), ObjectAttributes.builder().bucket(linkedObjectIndex.bucketId()).build().attributes()), + linkedObjectMetadata, dataBlockIndexes.subList(linkedObjectIndex.blockStartIndex(), linkedObjectIndex.blockEndIndex()) ); // The linked object's metadata is already deleted from KRaft after the first time become a part of composite object. @@ -402,6 +403,7 @@ private void compactCompositeObject(S3ObjectMetadata objectMetadata, ObjectReade } static List> group0(List objects, long maxStreamObjectSize) { + // TODO: switch to include/exclude composite object List> objectGroups = new LinkedList<>(); long groupSize = 0; long groupNextOffset = -1L; diff --git a/s3stream/src/main/java/com/automq/stream/s3/objects/ObjectAttributes.java b/s3stream/src/main/java/com/automq/stream/s3/objects/ObjectAttributes.java index 93165468f4..4b7df928c1 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/objects/ObjectAttributes.java +++ b/s3stream/src/main/java/com/automq/stream/s3/objects/ObjectAttributes.java @@ -17,10 +17,12 @@ * 0: normal object * 1: composite object * 2~17: bucket index - * 18~31 unused + * 18: deep delete mark + * 19~31 unused */ public class ObjectAttributes { public static final ObjectAttributes DEFAULT = new ObjectAttributes(0); + private static final int DEEP_DELETE_MASK = 1 << 18; private final int attributes; private ObjectAttributes(int attributes) { @@ -35,6 +37,10 @@ public short bucket() { return (short) ((attributes >> 2) & 0xFFFF); } + public boolean deepDelete() { + return (attributes & DEEP_DELETE_MASK) != 0; + } + public int attributes() { return attributes; } @@ -44,11 +50,19 @@ public static ObjectAttributes from(int attributes) { } public static Builder builder() { - return new Builder(); + return builder(0); + } + + public static Builder builder(int attributes) { + return new Builder(attributes); } public static class Builder { - private int attributes = 0; + private int attributes; + + public Builder(int attributes) { + this.attributes = attributes; + } public Builder type(Type type) { switch (type) { @@ -68,6 +82,11 @@ public Builder bucket(short bucketIndex) { return this; } + public Builder deepDelete() { + attributes |= DEEP_DELETE_MASK; + return this; + } + public ObjectAttributes build() { return new ObjectAttributes(attributes); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/ObjectStorage.java b/s3stream/src/main/java/com/automq/stream/s3/operator/ObjectStorage.java index 6e38d0ec68..189a7ee342 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/ObjectStorage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/ObjectStorage.java @@ -14,6 +14,7 @@ import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.network.ThrottleStrategy; import io.netty.buffer.ByteBuf; +import java.util.List; import java.util.concurrent.CompletableFuture; public interface ObjectStorage { @@ -30,6 +31,10 @@ public interface ObjectStorage { CompletableFuture rangeRead(S3ObjectMetadata objectMetadata, long start, long end); + default CompletableFuture> delete(List objectMetadataList) { + throw new UnsupportedOperationException(); + } + class WriteOptions { public static final WriteOptions DEFAULT = new WriteOptions();