diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index f418cce951..218b220893 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -17,11 +17,21 @@ package org.apache.kafka.controller.stream; -import java.util.Arrays; - import com.automq.stream.s3.Config; import com.automq.stream.s3.metadata.ObjectUtils; import com.automq.stream.s3.operator.S3Operator; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.OptionalLong; +import java.util.Queue; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.kafka.common.message.PrepareS3ObjectRequestData; import org.apache.kafka.common.message.PrepareS3ObjectResponseData; import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord; @@ -42,18 +52,6 @@ import org.apache.kafka.timeline.TimelineLong; import org.slf4j.Logger; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; -import java.util.Map; -import java.util.OptionalLong; -import java.util.Queue; -import java.util.concurrent.Executors; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - import static com.automq.stream.s3.metadata.ObjectUtils.NOOP_OBJECT_ID; @@ -95,12 +93,12 @@ public class S3ObjectControlManager { private final ObjectCleaner objectCleaner; public S3ObjectControlManager( - QuorumController quorumController, - SnapshotRegistry snapshotRegistry, - LogContext logContext, - String clusterId, - Config config, - S3Operator operator) { + QuorumController quorumController, + SnapshotRegistry snapshotRegistry, + LogContext logContext, + String clusterId, + Config config, + S3Operator operator) { this.quorumController = quorumController; this.log = logContext.logger(S3ObjectControlManager.class); this.clusterId = clusterId; @@ -114,7 +112,7 @@ public S3ObjectControlManager( this.lifecycleCheckTimer = Executors.newSingleThreadScheduledExecutor( ThreadUtils.createThreadFactory("s3-object-lifecycle-check-timer", true)); this.lifecycleCheckTimer.scheduleWithFixedDelay(this::triggerCheckEvent, - DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS); + DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS); this.objectCleaner = new ObjectCleaner(); } @@ -123,7 +121,7 @@ private void triggerCheckEvent() { return; } ControllerRequestContext ctx = new ControllerRequestContext( - null, null, OptionalLong.empty()); + null, null, OptionalLong.empty()); this.quorumController.checkS3ObjectsLifecycle(ctx).whenComplete((ignore, exp) -> { if (exp != null) { log.error("Failed to check the S3Object's lifecycle", exp); @@ -148,7 +146,7 @@ public ControllerResult prepareObject(PrepareS3Obje // update assigned stream id long newAssignedObjectId = nextAssignedObjectId.get() + count - 1; records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord() - .setAssignedS3ObjectId(newAssignedObjectId), (short) 0)); + .setAssignedS3ObjectId(newAssignedObjectId), (short) 0)); long firstAssignedObjectId = nextAssignedObjectId.get(); for (int i = 0; i < count; i++) { @@ -156,10 +154,10 @@ public ControllerResult prepareObject(PrepareS3Obje long preparedTs = System.currentTimeMillis(); long expiredTs = preparedTs + request.timeToLiveInMs(); S3ObjectRecord record = new S3ObjectRecord() - .setObjectId(objectId) - .setObjectState(S3ObjectState.PREPARED.toByte()) - .setPreparedTimeInMs(preparedTs) - .setExpiredTimeInMs(expiredTs); + .setObjectId(objectId) + .setObjectState(S3ObjectState.PREPARED.toByte()) + .setPreparedTimeInMs(preparedTs) + .setExpiredTimeInMs(expiredTs); records.add(new ApiMessageAndVersion(record, (short) 0)); } response.setFirstS3ObjectId(firstAssignedObjectId); @@ -185,14 +183,14 @@ public ControllerResult commitObject(long objectId, long objectSize, lon return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST); } S3ObjectRecord record = new S3ObjectRecord() - .setObjectId(objectId) - .setObjectSize(objectSize) - .setObjectState(S3ObjectState.COMMITTED.toByte()) - .setPreparedTimeInMs(object.getPreparedTimeInMs()) - .setExpiredTimeInMs(object.getExpiredTimeInMs()) - .setCommittedTimeInMs(committedTs); + .setObjectId(objectId) + .setObjectSize(objectSize) + .setObjectState(S3ObjectState.COMMITTED.toByte()) + .setPreparedTimeInMs(object.getPreparedTimeInMs()) + .setExpiredTimeInMs(object.getExpiredTimeInMs()) + .setCommittedTimeInMs(committedTs); return ControllerResult.of(List.of( - new ApiMessageAndVersion(record, (short) 0)), Errors.NONE); + new ApiMessageAndVersion(record, (short) 0)), Errors.NONE); } public ControllerResult markDestroyObjects(List objects) { @@ -204,12 +202,13 @@ public ControllerResult markDestroyObjects(List objects) { return ControllerResult.of(Collections.emptyList(), false); } S3ObjectRecord record = new S3ObjectRecord() - .setObjectId(objectId) - .setObjectState(S3ObjectState.MARK_DESTROYED.toByte()) - .setPreparedTimeInMs(object.getPreparedTimeInMs()) - .setExpiredTimeInMs(object.getExpiredTimeInMs()) - .setCommittedTimeInMs(object.getCommittedTimeInMs()) - .setMarkDestroyedTimeInMs(System.currentTimeMillis()); + .setObjectId(objectId) + .setObjectSize(object.getObjectSize()) + .setObjectState(S3ObjectState.MARK_DESTROYED.toByte()) + .setPreparedTimeInMs(object.getPreparedTimeInMs()) + .setExpiredTimeInMs(object.getExpiredTimeInMs()) + .setCommittedTimeInMs(object.getCommittedTimeInMs()) + .setMarkDestroyedTimeInMs(System.currentTimeMillis()); records.add(new ApiMessageAndVersion(record, (short) 0)); } return ControllerResult.atomicOf(records, true); @@ -222,8 +221,8 @@ public void replay(AssignedS3ObjectIdRecord record) { public void replay(S3ObjectRecord record) { String objectKey = ObjectUtils.genKey(0, record.objectId()); S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey, - record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(), - S3ObjectState.fromByte(record.objectState())); + record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(), + S3ObjectState.fromByte(record.objectState())); objectsMetadata.put(record.objectId(), object); if (object.getS3ObjectState() == S3ObjectState.PREPARED) { preparedObjects.add(object.getObjectId()); @@ -250,31 +249,31 @@ public ControllerResult checkS3ObjectsLifecycle() { List records = new ArrayList<>(); // check the expired objects this.preparedObjects.stream(). - map(objectsMetadata::get). - filter(S3Object::isExpired). - forEach(obj -> { - S3ObjectRecord record = new S3ObjectRecord() - .setObjectId(obj.getObjectId()) - .setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()) - .setObjectSize(obj.getObjectSize()) - .setPreparedTimeInMs(obj.getPreparedTimeInMs()) - .setExpiredTimeInMs(obj.getExpiredTimeInMs()) - .setCommittedTimeInMs(obj.getCommittedTimeInMs()) - .setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs()); - // generate the records which mark the expired objects as destroyed - records.add(new ApiMessageAndVersion(record, (short) 0)); - // generate the records which listener reply for the object-destroy events - lifecycleListeners.forEach(listener -> { - ControllerResult result = listener.onDestroy(obj.getObjectId()); - records.addAll(result.records()); + map(objectsMetadata::get). + filter(S3Object::isExpired). + forEach(obj -> { + S3ObjectRecord record = new S3ObjectRecord() + .setObjectId(obj.getObjectId()) + .setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal()) + .setObjectSize(obj.getObjectSize()) + .setPreparedTimeInMs(obj.getPreparedTimeInMs()) + .setExpiredTimeInMs(obj.getExpiredTimeInMs()) + .setCommittedTimeInMs(obj.getCommittedTimeInMs()) + .setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs()); + // generate the records which mark the expired objects as destroyed + records.add(new ApiMessageAndVersion(record, (short) 0)); + // generate the records which listener reply for the object-destroy events + lifecycleListeners.forEach(listener -> { + ControllerResult result = listener.onDestroy(obj.getObjectId()); + records.addAll(result.records()); + }); }); - }); // check the mark destroyed objects List destroyedObjectKeys = this.markDestroyedObjects.stream() - .map(this.objectsMetadata::get) - .filter(obj -> obj.getMarkDestroyedTimeInMs() + (this.config.objectRetentionTimeInSecond() * 1000L) < System.currentTimeMillis()) - .map(S3Object::getObjectKey) - .collect(Collectors.toList()); + .map(this.objectsMetadata::get) + .filter(obj -> obj.getMarkDestroyedTimeInMs() + (this.config.objectRetentionTimeInSecond() * 1000L) < System.currentTimeMillis()) + .map(S3Object::getObjectKey) + .collect(Collectors.toList()); if (destroyedObjectKeys.isEmpty()) { return ControllerResult.of(records, null); } @@ -290,8 +289,8 @@ public ControllerResult checkS3ObjectsLifecycle() { */ public ControllerResult notifyS3ObjectDeleted(List deletedObjectIds) { List records = deletedObjectIds.stream().filter(markDestroyedObjects::contains) - .map(objectId -> new ApiMessageAndVersion(new RemoveS3ObjectRecord() - .setObjectId(objectId), (short) 0)).collect(Collectors.toList()); + .map(objectId -> new ApiMessageAndVersion(new RemoveS3ObjectRecord() + .setObjectId(objectId), (short) 0)).collect(Collectors.toList()); return ControllerResult.of(records, null); } @@ -336,18 +335,18 @@ private void clean0(List objectKeys) { operator.delete(objectKeys).whenCompleteAsync((resp, e) -> { if (e != null) { log.error("Failed to delete the S3Object from S3, objectKeys: {}", - String.join(",", objectKeys), e); + String.join(",", objectKeys), e); return; } if (resp != null && !resp.isEmpty()) { List deletedObjectIds = resp.stream().map(key -> ObjectUtils.parseObjectId(0, key)).collect(Collectors.toList()); // notify the controller an objects deletion event to drive the removal of the objects ControllerRequestContext ctx = new ControllerRequestContext( - null, null, OptionalLong.empty()); + null, null, OptionalLong.empty()); quorumController.notifyS3ObjectDeleted(ctx, deletedObjectIds).whenComplete((ignore, exp) -> { if (exp != null) { log.error("Failed to notify the controller the S3Object deletion event, objectIds: {}", - Arrays.toString(deletedObjectIds.toArray()), exp); + Arrays.toString(deletedObjectIds.toArray()), exp); } }); }