diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java index 061e19213e..c42f83a0f6 100644 --- a/core/src/test/java/kafka/test/MockController.java +++ b/core/src/test/java/kafka/test/MockController.java @@ -454,4 +454,11 @@ public int curClaimEpoch() { public void close() { beginShutdown(); } + + // Kafka on S3 inject start + @Override + public CompletableFuture checkS3ObjectsLifecycle(ControllerRequestContext context) { + throw new UnsupportedOperationException(); + } + // Kafka on S3 inject end } diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java index ed6c523753..e1a650a13a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java +++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java @@ -353,4 +353,13 @@ default boolean isActive() { * Blocks until we have shut down and freed all resources. */ void close() throws InterruptedException; + + // Kafka on S3 inject start + + /** + * Check the lifecycle of the S3 objects. + */ + CompletableFuture checkS3ObjectsLifecycle(ControllerRequestContext context); + + // Kafka on S3 inject end } diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java index d130de5928..b28059e432 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerResult.java @@ -25,7 +25,7 @@ import java.util.stream.Collectors; -class ControllerResult { +public class ControllerResult { private final List records; private final T response; private final boolean isAtomic; diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java index dc8941eb03..b54c8cd4cd 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java +++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java @@ -1785,7 +1785,7 @@ private QuorumController( // Kafka on S3 inject start this.s3Config = s3Config; - this.s3ObjectControlManager = new S3ObjectControlManager(snapshotRegistry, logContext, clusterId, s3Config); + this.s3ObjectControlManager = new S3ObjectControlManager(this, snapshotRegistry, logContext, clusterId, s3Config); this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager); // Kafka on S3 inject end updateWriteOffset(-1); @@ -2151,6 +2151,14 @@ public void close() throws InterruptedException { controllerMetrics.close(); } + // Kafka on S3 inject start + @Override + public CompletableFuture checkS3ObjectsLifecycle(ControllerRequestContext context) { + return appendWriteEvent("checkS3ObjectsLifecycle", context.deadlineNs(), + () -> s3ObjectControlManager.checkS3ObjectsLifecycle()); + } + // Kafka on S3 inject end + // VisibleForTesting CountDownLatch pause() { final CountDownLatch latch = new CountDownLatch(1); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultS3Operator.java b/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultS3Operator.java new file mode 100644 index 0000000000..2aa5af0940 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/DefaultS3Operator.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.stream; + +import java.util.concurrent.CompletableFuture; + +public class DefaultS3Operator implements S3Operator { + public CompletableFuture delete(String objectKey) { + return this.delele(new String[]{objectKey}); + } + + @Override + public CompletableFuture delele(String[] objectKeys) { + return CompletableFuture.completedFuture(true); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/MockS3Operator.java b/metadata/src/main/java/org/apache/kafka/controller/stream/MockS3Operator.java new file mode 100644 index 0000000000..e607cbe1df --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/MockS3Operator.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.stream; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import org.apache.kafka.metadata.stream.S3Object; + +public class MockS3Operator implements S3Operator { + + private final Map objects = new HashMap<>(); + + @Override + public CompletableFuture delete(String objectKey) { + return delele(new String[]{objectKey}); + } + + @Override + public CompletableFuture delele(String[] objectKeys) { + return CompletableFuture.completedFuture(false); + } +} diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java index de07c2cb59..f4d7552ddd 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3ObjectControlManager.java @@ -17,15 +17,26 @@ package org.apache.kafka.controller.stream; -import java.util.LinkedList; +import java.util.ArrayList; +import java.util.List; +import java.util.OptionalLong; import java.util.Queue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.apache.kafka.common.metadata.RemoveS3ObjectRecord; import org.apache.kafka.common.metadata.S3ObjectRecord; import org.apache.kafka.common.utils.LogContext; +import org.apache.kafka.controller.ControllerRequestContext; +import org.apache.kafka.controller.ControllerResult; +import org.apache.kafka.controller.QuorumController; import org.apache.kafka.controller.stream.S3ObjectKeyGeneratorManager.GenerateContextV0; import org.apache.kafka.metadata.stream.S3Config; import org.apache.kafka.metadata.stream.S3Object; import org.apache.kafka.metadata.stream.S3ObjectState; +import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.timeline.SnapshotRegistry; import org.apache.kafka.timeline.TimelineHashMap; import org.slf4j.Logger; @@ -34,9 +45,16 @@ * The S3ObjectControlManager manages all S3Object's lifecycle, such as apply, create, destroy, etc. */ public class S3ObjectControlManager { + + // TODO: config it in properties + private static final long DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS = 3000L; + + private static final long DEFAULT_INITIAL_DELAY_MS = 5000L; + + private final QuorumController quorumController; private final SnapshotRegistry snapshotRegistry; private final Logger log; - + private final TimelineHashMap objectsMetadata; private final String clusterId; @@ -47,23 +65,52 @@ public class S3ObjectControlManager { * The objectId of the next object to be prepared. (start from 0) */ private Long nextAssignedObjectId = 0L; - - // TODO: add timer task to periodically check if there are objects to be destroyed or expired + private final Queue preparedObjects; + + // TODO: support different deletion policies, based on time dimension or space dimension? private final Queue markDestroyedObjects; - + + private final S3Operator operator; + + private final List lifecycleListeners; + + private final ScheduledExecutorService lifecycleCheckTimer; + public S3ObjectControlManager( + QuorumController quorumController, SnapshotRegistry snapshotRegistry, LogContext logContext, String clusterId, S3Config config) { + this.quorumController = quorumController; this.snapshotRegistry = snapshotRegistry; this.log = logContext.logger(S3ObjectControlManager.class); this.clusterId = clusterId; this.config = config; this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0); - this.preparedObjects = new LinkedList<>(); - this.markDestroyedObjects = new LinkedList<>(); + this.preparedObjects = new LinkedBlockingDeque<>(); + this.markDestroyedObjects = new LinkedBlockingDeque<>(); + this.operator = new DefaultS3Operator(); + this.lifecycleListeners = new ArrayList<>(); + this.lifecycleCheckTimer = Executors.newSingleThreadScheduledExecutor(); + this.lifecycleCheckTimer.scheduleWithFixedDelay(() -> { + triggerCheckEvent(); + }, DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS); + } + + private void triggerCheckEvent() { + ControllerRequestContext ctx = new ControllerRequestContext( + null, null, OptionalLong.empty()); + this.quorumController.checkS3ObjectsLifecycle(ctx).whenComplete((ignore, exp) -> { + if (exp != null) { + log.error("Failed to check the S3Object's lifecycle", exp); + } + }); + } + + public void registerListener(S3ObjectLifeCycleListener listener) { + this.lifecycleListeners.add(listener); } public Long nextAssignedObjectId() { @@ -74,14 +121,91 @@ public void replay(S3ObjectRecord record) { GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId()); String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx); S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey, - record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), S3ObjectState.fromByte(record.objectState())); + record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), + S3ObjectState.fromByte(record.objectState())); objectsMetadata.put(record.objectId(), object); + // TODO: recover the prepared objects and mark destroyed objects when restart the controller + if (object.getS3ObjectState() == S3ObjectState.PREPARED) { + preparedObjects.add(object.getObjectId()); + } else if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) { + markDestroyedObjects.add(object.getObjectId()); + } nextAssignedObjectId = Math.max(nextAssignedObjectId, record.objectId() + 1); } public void replay(RemoveS3ObjectRecord record) { objectsMetadata.remove(record.objectId()); + markDestroyedObjects.remove(record.objectId()); + } + + /** + * Check the S3Object's lifecycle, and do the corresponding actions. + * + * @return the result of the check, contains the records which should be applied to the raft. + */ + public ControllerResult checkS3ObjectsLifecycle() { + List records = new ArrayList<>(); + // check the expired objects + this.preparedObjects.stream(). + map(objectsMetadata::get). + filter(S3Object::isExpired). + forEach(obj -> { + S3ObjectRecord record = new S3ObjectRecord() + .setObjectId(obj.getObjectId()) + .setObjectState((byte) S3ObjectState.DESTROYED.ordinal()) + .setObjectSize(obj.getObjectSize().get()) + .setAppliedTimeInMs(obj.getAppliedTimeInMs().get()) + .setExpiredTimeInMs(obj.getExpiredTimeInMs().get()) + .setCommittedTimeInMs(obj.getCommittedTimeInMs().get()) + .setDestroyedTimeInMs(obj.getDestroyedTimeInMs().get()); + // generate the records which mark the expired objects as destroyed + records.add(new ApiMessageAndVersion(record, (short) 0)); + // generate the records which listener reply for the object-destroy events + lifecycleListeners.forEach(listener -> { + ControllerResult result = listener.onDestroy(obj.getObjectId()); + records.addAll(result.records()); + }); + }); + // check the mark destroyed objects + String[] destroyedObjectIds = this.markDestroyedObjects.stream() + .map(objectsMetadata::get) + .map(S3Object::getObjectKey) + .toArray(String[]::new); + // TODO: optimize this ugly implementation, now the thread will be blocked until the deletion request is finished + CompletableFuture future = this.operator.delele(destroyedObjectIds).exceptionally(exp -> { + log.error("Failed to delete the S3Object from S3, objectIds: {}", + String.join(",", destroyedObjectIds), exp); + return false; + }).thenAccept(success -> { + if (success) { + // generate the records which remove the mark destroyed objects + this.markDestroyedObjects.forEach(objectId -> { + records.add(new ApiMessageAndVersion( + new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0)); + }); + } + }); + try { + future.get(); + } catch (Exception e) { + log.error("Failed to get S3Object deletion operation result, objectIds: {}", + String.join(",", destroyedObjectIds), e); + } + return ControllerResult.of(records, null); + } + + /** + * S3Object's lifecycle listener. + */ + public interface S3ObjectLifeCycleListener { + + /** + * Notify the listener that the S3Object has been destroyed. + * + * @param objectId the destroyed S3Object's id + * @return the result of the listener, contains the records which should be applied to the raft. + */ + ControllerResult onDestroy(Long objectId); } - } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3Operator.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3Operator.java new file mode 100644 index 0000000000..d0ab8ca764 --- /dev/null +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3Operator.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.controller.stream; + +import java.util.concurrent.CompletableFuture; + +public interface S3Operator { + CompletableFuture delete(String objectKey); + + CompletableFuture delele(String[] objectKeys); +} diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java index fd91dd6939..96ea91ca9c 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3Object.java @@ -186,7 +186,15 @@ public Optional getDestroyedTimeInMs() { return destroyedTimeInMs; } + public Optional getExpiredTimeInMs() { + return expiredTimeInMs; + } + public S3ObjectState getS3ObjectState() { return s3ObjectState; } + + public boolean isExpired() { + return System.currentTimeMillis() > expiredTimeInMs.get(); + } }