Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,21 @@

package org.apache.kafka.controller.stream;

import java.util.Arrays;

import com.automq.stream.s3.Config;
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.operator.S3Operator;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
Expand All @@ -42,18 +52,6 @@
import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID;


Expand Down Expand Up @@ -95,12 +93,12 @@ public class S3ObjectControlManager {
private final ObjectCleaner objectCleaner;

public S3ObjectControlManager(
QuorumController quorumController,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
String clusterId,
Config config,
S3Operator operator) {
QuorumController quorumController,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
String clusterId,
Config config,
S3Operator operator) {
this.quorumController = quorumController;
this.log = logContext.logger(S3ObjectControlManager.class);
this.clusterId = clusterId;
Expand All @@ -114,7 +112,7 @@ public S3ObjectControlManager(
this.lifecycleCheckTimer = Executors.newSingleThreadScheduledExecutor(
ThreadUtils.createThreadFactory("s3-object-lifecycle-check-timer", true));
this.lifecycleCheckTimer.scheduleWithFixedDelay(this::triggerCheckEvent,
DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
this.objectCleaner = new ObjectCleaner();
}

Expand All @@ -123,7 +121,7 @@ private void triggerCheckEvent() {
return;
}
ControllerRequestContext ctx = new ControllerRequestContext(
null, null, OptionalLong.empty());
null, null, OptionalLong.empty());
this.quorumController.checkS3ObjectsLifecycle(ctx).whenComplete((ignore, exp) -> {
if (exp != null) {
log.error("Failed to check the S3Object's lifecycle", exp);
Expand All @@ -148,18 +146,18 @@ public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3Obje
// update assigned stream id
long newAssignedObjectId = nextAssignedObjectId.get() + count - 1;
records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord()
.setAssignedS3ObjectId(newAssignedObjectId), (short) 0));
.setAssignedS3ObjectId(newAssignedObjectId), (short) 0));

long firstAssignedObjectId = nextAssignedObjectId.get();
for (int i = 0; i < count; i++) {
Long objectId = nextAssignedObjectId.get() + i;
long preparedTs = System.currentTimeMillis();
long expiredTs = preparedTs + request.timeToLiveInMs();
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectState(S3ObjectState.PREPARED.toByte())
.setPreparedTimeInMs(preparedTs)
.setExpiredTimeInMs(expiredTs);
.setObjectId(objectId)
.setObjectState(S3ObjectState.PREPARED.toByte())
.setPreparedTimeInMs(preparedTs)
.setExpiredTimeInMs(expiredTs);
records.add(new ApiMessageAndVersion(record, (short) 0));
}
response.setFirstS3ObjectId(firstAssignedObjectId);
Expand All @@ -185,14 +183,14 @@ public ControllerResult<Errors> commitObject(long objectId, long objectSize, lon
return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(objectSize)
.setObjectState(S3ObjectState.COMMITTED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(committedTs);
.setObjectId(objectId)
.setObjectSize(objectSize)
.setObjectState(S3ObjectState.COMMITTED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(committedTs);
return ControllerResult.of(List.of(
new ApiMessageAndVersion(record, (short) 0)), Errors.NONE);
new ApiMessageAndVersion(record, (short) 0)), Errors.NONE);
}

public ControllerResult<Boolean> markDestroyObjects(List<Long> objects) {
Expand All @@ -204,12 +202,13 @@ public ControllerResult<Boolean> markDestroyObjects(List<Long> objects) {
return ControllerResult.of(Collections.emptyList(), false);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
.setObjectId(objectId)
.setObjectSize(object.getObjectSize())
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
records.add(new ApiMessageAndVersion(record, (short) 0));
}
return ControllerResult.atomicOf(records, true);
Expand All @@ -222,8 +221,8 @@ public void replay(AssignedS3ObjectIdRecord record) {
public void replay(S3ObjectRecord record) {
String objectKey = ObjectUtils.genKey(0, record.objectId());
S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey,
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
objectsMetadata.put(record.objectId(), object);
if (object.getS3ObjectState() == S3ObjectState.PREPARED) {
preparedObjects.add(object.getObjectId());
Expand All @@ -250,31 +249,31 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
List<ApiMessageAndVersion> records = new ArrayList<>();
// check the expired objects
this.preparedObjects.stream().
map(objectsMetadata::get).
filter(S3Object::isExpired).
forEach(obj -> {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(obj.getObjectId())
.setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal())
.setObjectSize(obj.getObjectSize())
.setPreparedTimeInMs(obj.getPreparedTimeInMs())
.setExpiredTimeInMs(obj.getExpiredTimeInMs())
.setCommittedTimeInMs(obj.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs());
// generate the records which mark the expired objects as destroyed
records.add(new ApiMessageAndVersion(record, (short) 0));
// generate the records which listener reply for the object-destroy events
lifecycleListeners.forEach(listener -> {
ControllerResult<Void> result = listener.onDestroy(obj.getObjectId());
records.addAll(result.records());
map(objectsMetadata::get).
filter(S3Object::isExpired).
forEach(obj -> {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(obj.getObjectId())
.setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal())
.setObjectSize(obj.getObjectSize())
.setPreparedTimeInMs(obj.getPreparedTimeInMs())
.setExpiredTimeInMs(obj.getExpiredTimeInMs())
.setCommittedTimeInMs(obj.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs());
// generate the records which mark the expired objects as destroyed
records.add(new ApiMessageAndVersion(record, (short) 0));
// generate the records which listener reply for the object-destroy events
lifecycleListeners.forEach(listener -> {
ControllerResult<Void> result = listener.onDestroy(obj.getObjectId());
records.addAll(result.records());
});
});
});
// check the mark destroyed objects
List<String> destroyedObjectKeys = this.markDestroyedObjects.stream()
.map(this.objectsMetadata::get)
.filter(obj -> obj.getMarkDestroyedTimeInMs() + (this.config.objectRetentionTimeInSecond() * 1000L) < System.currentTimeMillis())
.map(S3Object::getObjectKey)
.collect(Collectors.toList());
.map(this.objectsMetadata::get)
.filter(obj -> obj.getMarkDestroyedTimeInMs() + (this.config.objectRetentionTimeInSecond() * 1000L) < System.currentTimeMillis())
.map(S3Object::getObjectKey)
.collect(Collectors.toList());
if (destroyedObjectKeys.isEmpty()) {
return ControllerResult.of(records, null);
}
Expand All @@ -290,8 +289,8 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
*/
public ControllerResult<Void> notifyS3ObjectDeleted(List<Long> deletedObjectIds) {
List<ApiMessageAndVersion> records = deletedObjectIds.stream().filter(markDestroyedObjects::contains)
.map(objectId -> new ApiMessageAndVersion(new RemoveS3ObjectRecord()
.setObjectId(objectId), (short) 0)).collect(Collectors.toList());
.map(objectId -> new ApiMessageAndVersion(new RemoveS3ObjectRecord()
.setObjectId(objectId), (short) 0)).collect(Collectors.toList());
return ControllerResult.of(records, null);
}

Expand Down Expand Up @@ -336,18 +335,18 @@ private void clean0(List<String> objectKeys) {
operator.delete(objectKeys).whenCompleteAsync((resp, e) -> {
if (e != null) {
log.error("Failed to delete the S3Object from S3, objectKeys: {}",
String.join(",", objectKeys), e);
String.join(",", objectKeys), e);
return;
}
if (resp != null && !resp.isEmpty()) {
List<Long> deletedObjectIds = resp.stream().map(key -> ObjectUtils.parseObjectId(0, key)).collect(Collectors.toList());
// notify the controller an objects deletion event to drive the removal of the objects
ControllerRequestContext ctx = new ControllerRequestContext(
null, null, OptionalLong.empty());
null, null, OptionalLong.empty());
quorumController.notifyS3ObjectDeleted(ctx, deletedObjectIds).whenComplete((ignore, exp) -> {
if (exp != null) {
log.error("Failed to notify the controller the S3Object deletion event, objectIds: {}",
Arrays.toString(deletedObjectIds.toArray()), exp);
Arrays.toString(deletedObjectIds.toArray()), exp);
}
});
}
Expand Down