Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions core/src/test/java/kafka/test/MockController.java
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,11 @@ public CompletableFuture<Void> checkS3ObjectsLifecycle(ControllerRequestContext
throw new UnsupportedOperationException();
}

public CompletableFuture<Void> notifyS3ObjectDeleted(ControllerRequestContext context,
Set<Long/*objectId*/> deletedObjectIds) {
throw new UnsupportedOperationException();
}

@Override
public CompletableFuture<CreateStreamResponseData> createStream(ControllerRequestContext context, CreateStreamRequestData request) {
throw new UnsupportedOperationException();
Expand Down
10 changes: 10 additions & 0 deletions metadata/src/main/java/org/apache/kafka/controller/Controller.java
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,16 @@ default boolean isActive() {
*/
CompletableFuture<Void> checkS3ObjectsLifecycle(ControllerRequestContext context);


/**
* Notify the S3Object is really deleted. Call when S3 object deletion is confirmed.
*/
CompletableFuture<Void> notifyS3ObjectDeleted(
ControllerRequestContext context,
Set<Long/*objectId*/> deletedObjectIds
);


/**
* Create a stream
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2174,6 +2174,16 @@ public CompletableFuture<Void> checkS3ObjectsLifecycle(ControllerRequestContext
() -> s3ObjectControlManager.checkS3ObjectsLifecycle());
}


@Override
public CompletableFuture<Void> notifyS3ObjectDeleted(ControllerRequestContext context,
Set<Long/*objectId*/> deletedObjectIds) {
return appendWriteEvent("notifyS3ObjectDeleted", context.deadlineNs(),
() -> s3ObjectControlManager.notifyS3ObjectDeleted(deletedObjectIds));
}



@Override
public CompletableFuture<CreateStreamResponseData> createStream(ControllerRequestContext context, CreateStreamRequestData request) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,16 @@
package org.apache.kafka.controller.stream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.utils.LogContext;
Expand Down Expand Up @@ -139,7 +141,7 @@ public void replay(RemoveS3ObjectRecord record) {
}

/**
* Check the S3Object's lifecycle, and do the corresponding actions.
* Check the S3Object's lifecycle, mark destroy the expired objects and trigger truly deletion for the mark destroyed objects.
*
* @return the result of the check, contains the records which should be applied to the raft.
*/
Expand Down Expand Up @@ -167,30 +169,43 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
});
});
// check the mark destroyed objects
String[] destroyedObjectIds = this.markDestroyedObjects.stream()
.map(objectsMetadata::get)
.map(S3Object::getObjectKey)
.toArray(String[]::new);
// TODO: optimize this ugly implementation, now the thread will be blocked until the deletion request is finished
CompletableFuture<Void> future = this.operator.delele(destroyedObjectIds).exceptionally(exp -> {
log.error("Failed to delete the S3Object from S3, objectIds: {}",
String.join(",", destroyedObjectIds), exp);
return false;
}).thenAccept(success -> {
if (success) {
// generate the records which remove the mark destroyed objects
this.markDestroyedObjects.forEach(objectId -> {
records.add(new ApiMessageAndVersion(
new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0));
});
ObjectPair[] destroyedObjects = this.markDestroyedObjects.stream()
// must guarantee that the objects in markDestroyedObjects also exist in objectsMetadata
.map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey().get()))
.toArray(ObjectPair[]::new);
String[] destroyedObjectKeys = Arrays.stream(destroyedObjects).map(ObjectPair::objectKey).toArray(String[]::new);
Set<Long> destroyedObjectIds = Arrays.stream(destroyedObjects).map(ObjectPair::objectId).collect(Collectors.toSet());
// TODO: deal with failed objects in batch object deletion request
this.operator.delele(destroyedObjectKeys).whenCompleteAsync((success, e) -> {
if (e != null || !success) {
log.error("Failed to delete the S3Object from S3, objectKeys: {}",
String.join(",", destroyedObjectKeys), e);
return;
}
// notify the controller an objects deletion event to drive the removal of the objects
ControllerRequestContext ctx = new ControllerRequestContext(
null, null, OptionalLong.empty());
this.quorumController.notifyS3ObjectDeleted(ctx, destroyedObjectIds).whenComplete((ignore, exp) -> {
if (exp != null) {
log.error("Failed to notify the controller the S3Object deletion event, objectIds: {}",
destroyedObjectIds, exp);
}
});
});
return ControllerResult.of(records, null);
}

/**
* Generate RemoveS3ObjectRecord for the deleted S3Objects.
* @param deletedObjectIds the deleted S3Objects' ids
* @return the result of the generation, contains the records which should be applied to the raft.
*/
public ControllerResult<Void> notifyS3ObjectDeleted(Set<Long> deletedObjectIds) {
List<ApiMessageAndVersion> records = new ArrayList<>();
deletedObjectIds.stream().filter(markDestroyedObjects::contains).forEach(objectId -> {
records.add(new ApiMessageAndVersion(
new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0));
});
try {
future.get();
} catch (Exception e) {
log.error("Failed to get S3Object deletion operation result, objectIds: {}",
String.join(",", destroyedObjectIds), e);
}
return ControllerResult.of(records, null);
}

Expand All @@ -208,4 +223,30 @@ public interface S3ObjectLifeCycleListener {
ControllerResult<Void> onDestroy(Long objectId);
}

static class ObjectPair {
private final Long objectId;
private final String objectKey;

public ObjectPair(Long objectId, String objectKey) {
this.objectId = objectId;
this.objectKey = objectKey;
}

public Long objectId() {
return objectId;
}

public String objectKey() {
return objectKey;
}

@Override
public String toString() {
return "ObjectPair{" +
"objectId=" + objectId +
", objectKey='" + objectKey + '\'' +
'}';
}
}

}