Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions core/src/test/java/kafka/test/MockController.java
Original file line number Diff line number Diff line change
Expand Up @@ -454,4 +454,11 @@ public int curClaimEpoch() {
public void close() {
beginShutdown();
}

// Kafka on S3 inject start
@Override
public CompletableFuture<Void> checkS3ObjectsLifecycle(ControllerRequestContext context) {
throw new UnsupportedOperationException();
}
// Kafka on S3 inject end
}
Original file line number Diff line number Diff line change
Expand Up @@ -353,4 +353,13 @@ default boolean isActive() {
* Blocks until we have shut down and freed all resources.
*/
void close() throws InterruptedException;

// Kafka on S3 inject start

/**
* Check the lifecycle of the S3 objects.
*/
CompletableFuture<Void> checkS3ObjectsLifecycle(ControllerRequestContext context);

// Kafka on S3 inject end
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import java.util.stream.Collectors;


class ControllerResult<T> {
public class ControllerResult<T> {
private final List<ApiMessageAndVersion> records;
private final T response;
private final boolean isAtomic;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1785,7 +1785,7 @@ private QuorumController(

// Kafka on S3 inject start
this.s3Config = s3Config;
this.s3ObjectControlManager = new S3ObjectControlManager(snapshotRegistry, logContext, clusterId, s3Config);
this.s3ObjectControlManager = new S3ObjectControlManager(this, snapshotRegistry, logContext, clusterId, s3Config);
this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager);
// Kafka on S3 inject end
updateWriteOffset(-1);
Expand Down Expand Up @@ -2151,6 +2151,14 @@ public void close() throws InterruptedException {
controllerMetrics.close();
}

// Kafka on S3 inject start
@Override
public CompletableFuture<Void> checkS3ObjectsLifecycle(ControllerRequestContext context) {
return appendWriteEvent("checkS3ObjectsLifecycle", context.deadlineNs(),
() -> s3ObjectControlManager.checkS3ObjectsLifecycle());
}
// Kafka on S3 inject end

// VisibleForTesting
CountDownLatch pause() {
final CountDownLatch latch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller.stream;

import java.util.concurrent.CompletableFuture;

public class DefaultS3Operator implements S3Operator {
public CompletableFuture<Boolean> delete(String objectKey) {
return this.delele(new String[]{objectKey});
}

@Override
public CompletableFuture<Boolean> delele(String[] objectKeys) {
return CompletableFuture.completedFuture(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller.stream;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.metadata.stream.S3Object;

public class MockS3Operator implements S3Operator {

private final Map<String/*objectKey*/, S3Object> objects = new HashMap<>();

@Override
public CompletableFuture<Boolean> delete(String objectKey) {
return delele(new String[]{objectKey});
}

@Override
public CompletableFuture<Boolean> delele(String[] objectKeys) {
return CompletableFuture.completedFuture(false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,26 @@

package org.apache.kafka.controller.stream;

import java.util.LinkedList;
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.ControllerRequestContext;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.controller.QuorumController;
import org.apache.kafka.controller.stream.S3ObjectKeyGeneratorManager.GenerateContextV0;
import org.apache.kafka.metadata.stream.S3Config;
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.slf4j.Logger;
Expand All @@ -34,9 +45,16 @@
* The S3ObjectControlManager manages all S3Object's lifecycle, such as apply, create, destroy, etc.
*/
public class S3ObjectControlManager {

// TODO: config it in properties
private static final long DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS = 3000L;

private static final long DEFAULT_INITIAL_DELAY_MS = 5000L;

private final QuorumController quorumController;
private final SnapshotRegistry snapshotRegistry;
private final Logger log;

private final TimelineHashMap<Long/*objectId*/, S3Object> objectsMetadata;

private final String clusterId;
Expand All @@ -47,23 +65,52 @@ public class S3ObjectControlManager {
* The objectId of the next object to be prepared. (start from 0)
*/
private Long nextAssignedObjectId = 0L;

// TODO: add timer task to periodically check if there are objects to be destroyed or expired

private final Queue<Long/*objectId*/> preparedObjects;

// TODO: support different deletion policies, based on time dimension or space dimension?
private final Queue<Long/*objectId*/> markDestroyedObjects;


private final S3Operator operator;

private final List<S3ObjectLifeCycleListener> lifecycleListeners;

private final ScheduledExecutorService lifecycleCheckTimer;

public S3ObjectControlManager(
QuorumController quorumController,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
String clusterId,
S3Config config) {
this.quorumController = quorumController;
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(S3ObjectControlManager.class);
this.clusterId = clusterId;
this.config = config;
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.preparedObjects = new LinkedList<>();
this.markDestroyedObjects = new LinkedList<>();
this.preparedObjects = new LinkedBlockingDeque<>();
this.markDestroyedObjects = new LinkedBlockingDeque<>();
this.operator = new DefaultS3Operator();
this.lifecycleListeners = new ArrayList<>();
this.lifecycleCheckTimer = Executors.newSingleThreadScheduledExecutor();
this.lifecycleCheckTimer.scheduleWithFixedDelay(() -> {
triggerCheckEvent();
}, DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}

private void triggerCheckEvent() {
ControllerRequestContext ctx = new ControllerRequestContext(
null, null, OptionalLong.empty());
this.quorumController.checkS3ObjectsLifecycle(ctx).whenComplete((ignore, exp) -> {
if (exp != null) {
log.error("Failed to check the S3Object's lifecycle", exp);
}
});
}

public void registerListener(S3ObjectLifeCycleListener listener) {
this.lifecycleListeners.add(listener);
}

public Long nextAssignedObjectId() {
Expand All @@ -74,14 +121,91 @@ public void replay(S3ObjectRecord record) {
GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId());
String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx);
S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey,
record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(), S3ObjectState.fromByte(record.objectState()));
record.appliedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.destroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
objectsMetadata.put(record.objectId(), object);
// TODO: recover the prepared objects and mark destroyed objects when restart the controller
if (object.getS3ObjectState() == S3ObjectState.PREPARED) {
preparedObjects.add(object.getObjectId());
} else if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
markDestroyedObjects.add(object.getObjectId());
}
nextAssignedObjectId = Math.max(nextAssignedObjectId, record.objectId() + 1);
}

public void replay(RemoveS3ObjectRecord record) {
objectsMetadata.remove(record.objectId());
markDestroyedObjects.remove(record.objectId());
}

/**
* Check the S3Object's lifecycle, and do the corresponding actions.
*
* @return the result of the check, contains the records which should be applied to the raft.
*/
public ControllerResult<Void> checkS3ObjectsLifecycle() {
List<ApiMessageAndVersion> records = new ArrayList<>();
// check the expired objects
this.preparedObjects.stream().
map(objectsMetadata::get).
filter(S3Object::isExpired).
forEach(obj -> {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(obj.getObjectId())
.setObjectState((byte) S3ObjectState.DESTROYED.ordinal())
.setObjectSize(obj.getObjectSize().get())
.setAppliedTimeInMs(obj.getAppliedTimeInMs().get())
.setExpiredTimeInMs(obj.getExpiredTimeInMs().get())
.setCommittedTimeInMs(obj.getCommittedTimeInMs().get())
.setDestroyedTimeInMs(obj.getDestroyedTimeInMs().get());
// generate the records which mark the expired objects as destroyed
records.add(new ApiMessageAndVersion(record, (short) 0));
// generate the records which listener reply for the object-destroy events
lifecycleListeners.forEach(listener -> {
ControllerResult<Void> result = listener.onDestroy(obj.getObjectId());
records.addAll(result.records());
});
});
// check the mark destroyed objects
String[] destroyedObjectIds = this.markDestroyedObjects.stream()
.map(objectsMetadata::get)
.map(S3Object::getObjectKey)
.toArray(String[]::new);
// TODO: optimize this ugly implementation, now the thread will be blocked until the deletion request is finished
CompletableFuture<Void> future = this.operator.delele(destroyedObjectIds).exceptionally(exp -> {
log.error("Failed to delete the S3Object from S3, objectIds: {}",
String.join(",", destroyedObjectIds), exp);
return false;
}).thenAccept(success -> {
if (success) {
// generate the records which remove the mark destroyed objects
this.markDestroyedObjects.forEach(objectId -> {
records.add(new ApiMessageAndVersion(
new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0));
});
}
});
try {
future.get();
} catch (Exception e) {
log.error("Failed to get S3Object deletion operation result, objectIds: {}",
String.join(",", destroyedObjectIds), e);
}
return ControllerResult.of(records, null);
}

/**
* S3Object's lifecycle listener.
*/
public interface S3ObjectLifeCycleListener {

/**
* Notify the listener that the S3Object has been destroyed.
*
* @param objectId the destroyed S3Object's id
* @return the result of the listener, contains the records which should be applied to the raft.
*/
ControllerResult<Void> onDestroy(Long objectId);
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.controller.stream;

import java.util.concurrent.CompletableFuture;

public interface S3Operator {
CompletableFuture<Boolean> delete(String objectKey);

CompletableFuture<Boolean> delele(String[] objectKeys);
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,15 @@ public Optional<Long> getDestroyedTimeInMs() {
return destroyedTimeInMs;
}

public Optional<Long> getExpiredTimeInMs() {
return expiredTimeInMs;
}

public S3ObjectState getS3ObjectState() {
return s3ObjectState;
}

public boolean isExpired() {
return System.currentTimeMillis() > expiredTimeInMs.get();
}
}