From 7a3e5972256ab355b008fb094e3154b25d5fb0e2 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Wed, 23 Aug 2023 10:16:57 +0800 Subject: [PATCH] feat(s3): synchronous triggered object delete 1. synchronous triggered object delete Signed-off-by: TheR1sing3un --- .../test/java/kafka/test/MockController.java | 5 ++ .../apache/kafka/controller/Controller.java | 10 +++ .../kafka/controller/QuorumController.java | 10 +++ .../stream/S3ObjectControlManager.java | 89 ++++++++++++++----- 4 files changed, 90 insertions(+), 24 deletions(-) diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index 3264676726..6c1024c299 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -477,6 +477,11 @@ public CompletableFuture checkS3ObjectsLifecycle(ControllerRequestContext throw new UnsupportedOperationException(); } + public CompletableFuture notifyS3ObjectDeleted(ControllerRequestContext context, + Set deletedObjectIds) { + throw new UnsupportedOperationException(); + } + @Override public CompletableFuture createStream(ControllerRequestContext context, CreateStreamRequestData request) { throw new UnsupportedOperationException(); diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index 91312817e3..356574b786 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -382,6 +382,16 @@ default boolean isActive() { */ CompletableFuture checkS3ObjectsLifecycle(ControllerRequestContext context); + + /** + * Notify the S3Object is really deleted. Call when S3 object deletion is confirmed. + */ + CompletableFuture notifyS3ObjectDeleted( + ControllerRequestContext context, + Set deletedObjectIds + ); + + /** * Create a stream */ diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index c82c80a43a..fba468dbb7 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -2174,6 +2174,16 @@ public CompletableFuture checkS3ObjectsLifecycle(ControllerRequestContext () -> s3ObjectControlManager.checkS3ObjectsLifecycle()); } + + @Override + public CompletableFuture notifyS3ObjectDeleted(ControllerRequestContext context, + Set deletedObjectIds) { + return appendWriteEvent("notifyS3ObjectDeleted", context.deadlineNs(), + () -> s3ObjectControlManager.notifyS3ObjectDeleted(deletedObjectIds)); + } + + + @Override public CompletableFuture createStream(ControllerRequestContext context, CreateStreamRequestData request) { return null; diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index f4d7552ddd..06b78a4cf8 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -18,14 +18,16 @@ package org.apache.kafka.controller.stream; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.OptionalLong; import java.util.Queue; -import java.util.concurrent.CompletableFuture; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.utils.LogContext; @@ -139,7 +141,7 @@ public void replay(RemoveS3ObjectRecord record) { } /** - * Check the S3Object's lifecycle, and do the corresponding actions. + * Check the S3Object's lifecycle, mark destroy the expired objects and trigger truly deletion for the mark destroyed objects. * * @return the result of the check, contains the records which should be applied to the raft. */ @@ -167,30 +169,43 @@ public ControllerResult checkS3ObjectsLifecycle() { }); }); // check the mark destroyed objects - String[] destroyedObjectIds = this.markDestroyedObjects.stream() - .map(objectsMetadata::get) - .map(S3Object::getObjectKey) - .toArray(String[]::new); - // TODO: optimize this ugly implementation, now the thread will be blocked until the deletion request is finished - CompletableFuture future = this.operator.delele(destroyedObjectIds).exceptionally(exp -> { - log.error("Failed to delete the S3Object from S3, objectIds: {}", - String.join(",", destroyedObjectIds), exp); - return false; - }).thenAccept(success -> { - if (success) { - // generate the records which remove the mark destroyed objects - this.markDestroyedObjects.forEach(objectId -> { - records.add(new ApiMessageAndVersion( - new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0)); - }); + ObjectPair[] destroyedObjects = this.markDestroyedObjects.stream() + // must guarantee that the objects in markDestroyedObjects also exist in objectsMetadata + .map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey().get())) + .toArray(ObjectPair[]::new); + String[] destroyedObjectKeys = Arrays.stream(destroyedObjects).map(ObjectPair::objectKey).toArray(String[]::new); + Set destroyedObjectIds = Arrays.stream(destroyedObjects).map(ObjectPair::objectId).collect(Collectors.toSet()); + // TODO: deal with failed objects in batch object deletion request + this.operator.delele(destroyedObjectKeys).whenCompleteAsync((success, e) -> { + if (e != null || !success) { + log.error("Failed to delete the S3Object from S3, objectKeys: {}", + String.join(",", destroyedObjectKeys), e); + return; } + // notify the controller an objects deletion event to drive the removal of the objects + ControllerRequestContext ctx = new ControllerRequestContext( + null, null, OptionalLong.empty()); + this.quorumController.notifyS3ObjectDeleted(ctx, destroyedObjectIds).whenComplete((ignore, exp) -> { + if (exp != null) { + log.error("Failed to notify the controller the S3Object deletion event, objectIds: {}", + destroyedObjectIds, exp); + } + }); + }); + return ControllerResult.of(records, null); + } + + /** + * Generate RemoveS3ObjectRecord for the deleted S3Objects. + * @param deletedObjectIds the deleted S3Objects' ids + * @return the result of the generation, contains the records which should be applied to the raft. + */ + public ControllerResult notifyS3ObjectDeleted(Set deletedObjectIds) { + List records = new ArrayList<>(); + deletedObjectIds.stream().filter(markDestroyedObjects::contains).forEach(objectId -> { + records.add(new ApiMessageAndVersion( + new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0)); }); - try { - future.get(); - } catch (Exception e) { - log.error("Failed to get S3Object deletion operation result, objectIds: {}", - String.join(",", destroyedObjectIds), e); - } return ControllerResult.of(records, null); } @@ -208,4 +223,30 @@ public interface S3ObjectLifeCycleListener { ControllerResult onDestroy(Long objectId); } + static class ObjectPair { + private final Long objectId; + private final String objectKey; + + public ObjectPair(Long objectId, String objectKey) { + this.objectId = objectId; + this.objectKey = objectKey; + } + + public Long objectId() { + return objectId; + } + + public String objectKey() { + return objectKey; + } + + @Override + public String toString() { + return "ObjectPair{" + + "objectId=" + objectId + + ", objectKey='" + objectKey + '\'' + + '}'; + } + } + }