Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
package org.apache.kafka.controller.stream;

import com.automq.stream.s3.Config;
import com.automq.stream.s3.compact.CompactOperations;
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.objects.ObjectAttributes;
import com.automq.stream.s3.operator.S3Operator;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -219,27 +221,60 @@ public ControllerResult<Errors> commitObject(long objectId, long objectSize, lon
}

public ControllerResult<Boolean> markDestroyObjects(List<Long> objects) {
return markDestroyObjects(objects, Collections.nCopies(objects.size(), CompactOperations.DELETE));
}

public ControllerResult<Boolean> markDestroyObjects(List<Long> objects, List<CompactOperations> operations) {
AutoMQVersion version = this.version.get();
short objectRecordVersion = version.objectRecordVersion();
List<ApiMessageAndVersion> records = new ArrayList<>();
for (Long objectId : objects) {
for (int i = 0; i < objects.size(); i++) {
Long objectId = objects.get(i);
CompactOperations operation = operations.get(i);
S3Object object = this.objectsMetadata.get(objectId);
if (object == null || object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
log.error("object {} not exist when mark destroy object", objectId);
return ControllerResult.of(Collections.emptyList(), false);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(object.getObjectSize())
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
if (version.isCompositeObjectSupported()) {
record.setAttributes(object.getAttributes());
switch (operation) {
case DELETE: {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(object.getObjectSize())
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
if (version.isCompositeObjectSupported()) {
record.setAttributes(object.getAttributes());
}
records.add(new ApiMessageAndVersion(record, objectRecordVersion));
break;
}
case KEEP_DATA: {
records.add(new ApiMessageAndVersion(new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0));
break;
}
case DEEP_DELETE: {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(object.getObjectSize())
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
if (version.isCompositeObjectSupported()) {
int attributes = ObjectAttributes.builder(object.getAttributes()).deepDelete().build().attributes();
record.setAttributes(attributes);
}
records.add(new ApiMessageAndVersion(record, objectRecordVersion));
break;
}
default:
throw new IllegalArgumentException("Unknown operation: " + operation);
}
records.add(new ApiMessageAndVersion(record, objectRecordVersion));
}
return ControllerResult.atomicOf(records, true);
}
Expand Down Expand Up @@ -313,7 +348,7 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
log.info("objects TTL is reached, objects={}", ttlReachedObjects);
}
// check the mark destroyed objects
List<String> requiredDeleteKeys = new LinkedList<>();
List<S3Object> requiredDeleteKeys = new LinkedList<>();
while (true) {
Long objectId = this.markDestroyedObjects.peek();
if (objectId == null) {
Expand All @@ -328,7 +363,7 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
if (object.getMarkDestroyedTimeInMs() + (this.config.objectRetentionTimeInSecond() * 1000L) < System.currentTimeMillis()) {
this.markDestroyedObjects.poll();
// exceed delete retention time, trigger the truly deletion
requiredDeleteKeys.add(object.getObjectKey());
requiredDeleteKeys.add(object);
} else {
// the following objects' mark destroyed time is not expired, so break the loop
break;
Expand Down Expand Up @@ -382,7 +417,9 @@ class ObjectCleaner {

public static final int MAX_BATCH_DELETE_SIZE = 800;

CompletableFuture<Void> clean(List<String> objectKeys) {
CompletableFuture<Void> clean(List<S3Object> objects) {
// TODO: replace with object storage
List<String> objectKeys = objects.stream().map(S3Object::getObjectKey).collect(Collectors.toList());
List<CompletableFuture<Void>> cfList = new LinkedList<>();
for (int i = 0; i < objectKeys.size() / MAX_BATCH_DELETE_SIZE; i++) {
List<String> batch = objectKeys.subList(i * MAX_BATCH_DELETE_SIZE, (i + 1) * MAX_BATCH_DELETE_SIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.kafka.controller.stream;

import com.automq.stream.s3.compact.CompactOperations;
import com.automq.stream.s3.metadata.S3StreamConstant;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.automq.stream.s3.metadata.StreamState;
Expand Down Expand Up @@ -538,7 +539,8 @@ public ControllerResult<DeleteStreamResponse> deleteStream(DeleteStreamRequest r
.setStreamId(streamId), (short) 0));
// generate stream objects destroy records
List<Long> streamObjectIds = new ArrayList<>(streamMetadata.streamObjects().keySet());
ControllerResult<Boolean> markDestroyResult = this.s3ObjectControlManager.markDestroyObjects(streamObjectIds);
// deep delete the composite object: delete the composite object and it's linked objects
ControllerResult<Boolean> markDestroyResult = this.s3ObjectControlManager.markDestroyObjects(streamObjectIds, Collections.nCopies(streamObjectIds.size(), CompactOperations.DEEP_DELETE));
if (!markDestroyResult.response()) {
log.error("[DELETE_STREAM],[FAIL]: failed to mark destroy stream objects. streamId={}, objects={}", streamId, streamObjectIds);
resp.setErrorCode(Errors.STREAM_INNER_ERROR.code());
Expand Down Expand Up @@ -761,7 +763,13 @@ public ControllerResult<CommitStreamObjectResponseData> commitStreamObject(Commi
long dataTs = committedTs;
// mark destroy compacted object
if (sourceObjectIds != null && !sourceObjectIds.isEmpty()) {
ControllerResult<Boolean> destroyResult = this.s3ObjectControlManager.markDestroyObjects(sourceObjectIds);
List<CompactOperations> operations;
if (data.operations().isEmpty()) {
operations = Collections.nCopies(sourceObjectIds.size(), CompactOperations.DELETE);
} else {
operations = data.operations().stream().map(v -> CompactOperations.fromValue(v)).collect(Collectors.toList());
}
ControllerResult<Boolean> destroyResult = this.s3ObjectControlManager.markDestroyObjects(sourceObjectIds, operations);
if (!destroyResult.response()) {
log.error("[CommitStreamObject]: failed to mark destroy compacted objects. compactedObjects={}, req={}",
sourceObjectIds, data);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,36 @@
package org.apache.kafka.metadata.stream;

public enum S3ObjectState {
UNINITIALIZED,
PREPARED,
COMMITTED,
MARK_DESTROYED,
DESTROYED;
UNINITIALIZED((byte) 0),
PREPARED((byte) 1),
COMMITTED((byte) 2),
MARK_DESTROYED((byte) 3),
DESTROYED((byte) 4);

private final byte value;

S3ObjectState(byte value) {
this.value = value;
}

public byte toByte() {
return (byte) ordinal();
return value;
}

public static S3ObjectState fromByte(Byte b) {
int ordinal = b.intValue();
if (ordinal < 0 || ordinal >= values().length) {
throw new IllegalArgumentException("Invalid ObjectState ordinal " + ordinal);
switch (b) {
case 0:
return UNINITIALIZED;
case 1:
return PREPARED;
case 2:
return COMMITTED;
case 3:
return MARK_DESTROYED;
case 4:
return DESTROYED;
default:
throw new IllegalArgumentException("Unknown value: " + b);
}
return values()[ordinal];
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -904,7 +904,7 @@ public void testCommitStreamObjectForFencedStream() {
public void testCommitStreamObject() {
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong(), anyInt()))
.thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE));
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true));
Mockito.when(objectControlManager.markDestroyObjects(anyList(), anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true));
registerAlwaysSuccessEpoch(BROKER0);

// 1. create and open stream_0 and stream_1
Expand Down Expand Up @@ -987,7 +987,7 @@ public void testCommitStreamObject() {
assertEquals(400L, response.streamMetadataList().get(1).endOffset());

// 6. compact a stream object from invalid source object
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false));
Mockito.when(objectControlManager.markDestroyObjects(anyList(), anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), false));
streamObjectRequest = new CommitStreamObjectRequestData()
.setObjectId(5L)
.setStreamId(STREAM1)
Expand All @@ -1012,6 +1012,7 @@ private void mockData0() {
Mockito.when(objectControlManager.commitObject(anyLong(), anyLong(), anyLong(), anyInt()))
.thenReturn(ControllerResult.of(Collections.emptyList(), Errors.NONE));
Mockito.when(objectControlManager.markDestroyObjects(anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true));
Mockito.when(objectControlManager.markDestroyObjects(anyList(), anyList())).thenReturn(ControllerResult.of(Collections.emptyList(), true));
registerAlwaysSuccessEpoch(BROKER0);
registerAlwaysSuccessEpoch(BROKER1);

Expand Down
73 changes: 51 additions & 22 deletions s3stream/src/main/java/com/automq/stream/s3/CompositeObject.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,44 @@
package com.automq.stream.s3;

import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.objects.ObjectAttributes;
import com.automq.stream.s3.operator.ObjectStorage;
import com.automq.stream.s3.operator.Writer;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* CompositeObject is a logic object which soft links multiple objects together.
* <p>
* v0 format:
* objects
* object_count u32
* objects (
* object_id u64
* block_start_index u32
* bucket_index u16
* )*
* object_count u32
* objects (
* object_id u64
* block_start_index u32
* bucket_index u16
* )*
* indexes
* index_count u32
* (
* stream_id u64
* start_offset u64
* end_offset_delta u32
* record_count u32
* block_start_position u64
* block_size u32
* )*
* index_count u32
* (
* stream_id u64
* start_offset u64
* end_offset_delta u32
* record_count u32
* block_start_position u64
* block_size u32
* )*
* index_handle
* position u64
* length u32
* position u64
* length u32
* padding 40byte - 8 - 8 - 4
* magic u64
*
*
*/
public class CompositeObject {
private static final Logger LOGGER = LoggerFactory.getLogger(CompositeObject.class);
public static final byte OBJECTS_BLOCK_MAGIC = 0x52;
public static final int OBJECT_BLOCK_HEADER_SIZE = 1 /* magic */ + 4 /* objects count */;
public static final int OBJECT_UNIT_SIZE = 8 /* objectId */ + 4 /* blockStartIndex */ + 2 /* bucketId */;
Expand All @@ -57,14 +61,39 @@ public static CompositeObjectReader reader(S3ObjectMetadata objectMetadata, Obje
return new CompositeObjectReader(objectMetadata, rangeReader);
}

public static CompositeObjectReader reader(S3ObjectMetadata objectMetadata, ObjectStorage objectStorage) {
return new CompositeObjectReader(objectMetadata, (metadata, startOffset, endOffset) -> objectStorage.rangeRead(ObjectStorage.ReadOptions.DEFAULT, metadata, startOffset, endOffset));
}

public static CompositeObjectWriter writer(Writer writer) {
return new CompositeObjectWriter(writer);
}

public static CompletableFuture<Void> delete(S3ObjectMetadata objectMetadata, ObjectStorage objectStorage) {
@SuppressWarnings("resource")
CompositeObjectReader reader = reader(objectMetadata, objectStorage);
// 1. use reader to get all linked object
// 2. delete linked object
// 3. delete composite object
throw new UnsupportedOperationException();
return reader.basicObjectInfo().thenCompose(info -> {
// 2. delete linked object
List<CompositeObjectReader.ObjectIndex> objectIndexes = ((CompositeObjectReader.BasicObjectInfoExt) info).objectsBlock().indexes();
List<S3ObjectMetadata> metadataList = objectIndexes
.stream()
.map(o -> new S3ObjectMetadata(o.objectId(), ObjectAttributes.builder().bucket(o.bucketId()).build().attributes()))
.collect(Collectors.toList());
return objectStorage.delete(metadataList)
.thenApply(rst -> objectIndexes.stream().map(o -> o.bucketId() + "/" + o.objectId()).collect(Collectors.toList()));
}).thenCompose(linkedObjects -> {
// 3. delete composite object
return objectStorage.delete(List.of(objectMetadata)).thenAccept(rst ->
LOGGER.info("Delete composite object {}/{} success, linked objects: {}",
ObjectAttributes.from(objectMetadata.attributes()).bucket(), objectMetadata.objectId(), linkedObjects)
);
}).thenAccept(rst -> {
}).whenComplete((rst, ex) -> {
reader.release();
if (ex != null) {
LOGGER.error("Delete composite object {} fail", objectMetadata, ex);
}
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,14 @@
package com.automq.stream.s3.compact;

public enum CompactOperations {
// - normal object: delete the object
// - composite object: delete the composite object
DELETE((byte) 0),
KEEP_DATA((byte) 1);
// only delete the metadata in KRaft
KEEP_DATA((byte) 1),
// - normal object: delete the object
// - composite object: delete the composite object and all its linked objects
DEEP_DELETE((byte) 2);

private final byte value;

Expand All @@ -31,6 +37,8 @@ public static CompactOperations fromValue(byte value) {
return DELETE;
case 1:
return KEEP_DATA;
case 2:
return DEEP_DELETE;
default:
throw new IllegalArgumentException("Unknown value: " + value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ private void cleanupExpiredObject(
int start = i;
int end = Math.min(i + EXPIRED_OBJECTS_CLEAN_UP_STEP, expiredObjectCount);
List<Long> subCompactedObjectIds = new ArrayList<>(compactedObjectIds.subList(start, end));
List<CompactOperations> operations = subCompactedObjectIds.stream().map(id -> CompactOperations.DELETE).collect(Collectors.toList());
List<CompactOperations> operations = subCompactedObjectIds.stream().map(id -> CompactOperations.DEEP_DELETE).collect(Collectors.toList());
request = new CompactStreamObjectRequest(ObjectUtils.NOOP_OBJECT_ID, 0,
stream.streamId(), stream.streamEpoch(), NOOP_OFFSET, NOOP_OFFSET, subCompactedObjectIds, operations, ObjectAttributes.DEFAULT.attributes());
objectManager.compactStreamObject(request).get();
Expand Down Expand Up @@ -373,6 +373,7 @@ private void compactCompositeObject(S3ObjectMetadata objectMetadata, ObjectReade
List<DataBlockIndex> dataBlockIndexes = info.indexBlock().indexes();
for (ObjectIndex linkedObjectIndex : linkedObjectIndexes) {
boolean hasLiveBlocks = false;
S3ObjectMetadata linkedObjectMetadata = new S3ObjectMetadata(linkedObjectIndex.objectId(), ObjectAttributes.builder().bucket(linkedObjectIndex.bucketId()).build().attributes());
for (int j = linkedObjectIndex.blockStartIndex(); j < linkedObjectIndex.blockEndIndex(); j++) {
DataBlockIndex dataBlockIndex = dataBlockIndexes.get(j);
if (dataBlockIndex.endOffset() <= startOffset) {
Expand All @@ -382,14 +383,14 @@ private void compactCompositeObject(S3ObjectMetadata objectMetadata, ObjectReade
break;
}
if (!hasLiveBlocks) {
// The linked object is fully expired, so we could delete the object from object storage.
compactedObjectIds.add(linkedObjectIndex.objectId());
operations.add(CompactOperations.DELETE);
// The linked object is fully expired, and there won't be any access to it.
// So we could directly delete the object from object storage.
objectStorage.delete(List.of(linkedObjectMetadata)).get();
} else {
// Keep all blocks in the linked object even part of them are expired.
// So we could get more precise composite object retained size.
objectWriter.addComponent(
new S3ObjectMetadata(linkedObjectIndex.objectId(), ObjectAttributes.builder().bucket(linkedObjectIndex.bucketId()).build().attributes()),
linkedObjectMetadata,
dataBlockIndexes.subList(linkedObjectIndex.blockStartIndex(), linkedObjectIndex.blockEndIndex())
);
// The linked object's metadata is already deleted from KRaft after the first time become a part of composite object.
Expand All @@ -402,6 +403,7 @@ private void compactCompositeObject(S3ObjectMetadata objectMetadata, ObjectReade
}

static List<List<S3ObjectMetadata>> group0(List<S3ObjectMetadata> objects, long maxStreamObjectSize) {
// TODO: switch to include/exclude composite object
List<List<S3ObjectMetadata>> objectGroups = new LinkedList<>();
long groupSize = 0;
long groupNextOffset = -1L;
Expand Down
Loading