Skip to content

Commit

Permalink
fix: fix controller check prepare object NPE (#124)
Browse files Browse the repository at this point in the history
Signed-off-by: Robin Han <hanxvdovehx@gmail.com>
  • Loading branch information
superhx committed Sep 13, 2023
1 parent 25e87e5 commit ec85544
Showing 1 changed file with 81 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,6 @@

package org.apache.kafka.controller.stream;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
Expand All @@ -47,9 +34,24 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.apache.kafka.metadata.stream.ObjectUtils.NOOP_OBJECT_ID;

/**
Expand All @@ -75,9 +77,9 @@ public class S3ObjectControlManager {
/**
* The objectId of the next object to be prepared. (start from 0)
*/
private TimelineLong nextAssignedObjectId;
private final TimelineLong nextAssignedObjectId;

private final Queue<Long/*objectId*/> preparedObjects;
private final TimelineHashSet<Long /* objectId */> preparedObjects;

// TODO: support different deletion policies, based on time dimension or space dimension?
private final Queue<Long/*objectId*/> markDestroyedObjects;
Expand All @@ -89,32 +91,31 @@ public class S3ObjectControlManager {
private final ScheduledExecutorService lifecycleCheckTimer;

public S3ObjectControlManager(
QuorumController quorumController,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
String clusterId,
S3Config config,
S3Operator operator) {
QuorumController quorumController,
SnapshotRegistry snapshotRegistry,
LogContext logContext,
String clusterId,
S3Config config,
S3Operator operator) {
this.quorumController = quorumController;
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(S3ObjectControlManager.class);
this.clusterId = clusterId;
this.config = config;
this.nextAssignedObjectId = new TimelineLong(snapshotRegistry);
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.preparedObjects = new LinkedBlockingDeque<>();
this.preparedObjects = new TimelineHashSet<>(snapshotRegistry, 0);
this.markDestroyedObjects = new LinkedBlockingDeque<>();
this.operator = operator;
this.lifecycleListeners = new ArrayList<>();
this.lifecycleCheckTimer = Executors.newSingleThreadScheduledExecutor();
this.lifecycleCheckTimer.scheduleWithFixedDelay(() -> {
triggerCheckEvent();
}, DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
this.lifecycleCheckTimer.scheduleWithFixedDelay(this::triggerCheckEvent,
DEFAULT_INITIAL_DELAY_MS, DEFAULT_LIFECYCLE_CHECK_INTERVAL_MS, TimeUnit.MILLISECONDS);
}

private void triggerCheckEvent() {
ControllerRequestContext ctx = new ControllerRequestContext(
null, null, OptionalLong.empty());
null, null, OptionalLong.empty());
this.quorumController.checkS3ObjectsLifecycle(ctx).whenComplete((ignore, exp) -> {
if (exp != null) {
log.error("Failed to check the S3Object's lifecycle", exp);
Expand All @@ -139,18 +140,18 @@ public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3Obje
// update assigned stream id
long newAssignedObjectId = nextAssignedObjectId.get() + count - 1;
records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord()
.setAssignedS3ObjectId(newAssignedObjectId), (short) 0));
.setAssignedS3ObjectId(newAssignedObjectId), (short) 0));

long firstAssignedObjectId = nextAssignedObjectId.get();
for (int i = 0; i < count; i++) {
Long objectId = nextAssignedObjectId.get() + i;
long preparedTs = System.currentTimeMillis();
long expiredTs = preparedTs + request.timeToLiveInMs();
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectState(S3ObjectState.PREPARED.toByte())
.setPreparedTimeInMs(preparedTs)
.setExpiredTimeInMs(expiredTs);
.setObjectId(objectId)
.setObjectState(S3ObjectState.PREPARED.toByte())
.setPreparedTimeInMs(preparedTs)
.setExpiredTimeInMs(expiredTs);
records.add(new ApiMessageAndVersion(record, (short) 0));
}
response.setFirstS3ObjectId(firstAssignedObjectId);
Expand All @@ -176,14 +177,14 @@ public ControllerResult<Errors> commitObject(long objectId, long objectSize, lon
return ControllerResult.of(Collections.emptyList(), Errors.OBJECT_NOT_EXIST);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(objectSize)
.setObjectState(S3ObjectState.COMMITTED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(committedTs);
.setObjectId(objectId)
.setObjectSize(objectSize)
.setObjectState(S3ObjectState.COMMITTED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(committedTs);
return ControllerResult.of(List.of(
new ApiMessageAndVersion(record, (short) 0)), Errors.NONE);
new ApiMessageAndVersion(record, (short) 0)), Errors.NONE);
}

public ControllerResult<Boolean> markDestroyObjects(List<Long> objects) {
Expand All @@ -195,12 +196,12 @@ public ControllerResult<Boolean> markDestroyObjects(List<Long> objects) {
return ControllerResult.of(Collections.emptyList(), false);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
.setObjectId(objectId)
.setObjectState(S3ObjectState.MARK_DESTROYED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(object.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(System.currentTimeMillis());
records.add(new ApiMessageAndVersion(record, (short) 0));
}
return ControllerResult.atomicOf(records, true);
Expand All @@ -214,20 +215,24 @@ public void replay(S3ObjectRecord record) {
GenerateContextV0 ctx = new GenerateContextV0(clusterId, record.objectId());
String objectKey = S3ObjectKeyGeneratorManager.getByVersion(0).generate(ctx);
S3Object object = new S3Object(record.objectId(), record.objectSize(), objectKey,
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
record.preparedTimeInMs(), record.expiredTimeInMs(), record.committedTimeInMs(), record.markDestroyedTimeInMs(),
S3ObjectState.fromByte(record.objectState()));
objectsMetadata.put(record.objectId(), object);
// TODO: recover the prepared objects and mark destroyed objects when restart the controller
if (object.getS3ObjectState() == S3ObjectState.PREPARED) {
preparedObjects.add(object.getObjectId());
} else if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
markDestroyedObjects.add(object.getObjectId());
} else {
preparedObjects.remove(object.getObjectId());
if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
markDestroyedObjects.add(object.getObjectId());
}
}
}

public void replay(RemoveS3ObjectRecord record) {
objectsMetadata.remove(record.objectId());
markDestroyedObjects.remove(record.objectId());
preparedObjects.remove(record.objectId());
}

/**
Expand All @@ -239,30 +244,30 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
List<ApiMessageAndVersion> records = new ArrayList<>();
// check the expired objects
this.preparedObjects.stream().
map(objectsMetadata::get).
filter(S3Object::isExpired).
forEach(obj -> {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(obj.getObjectId())
.setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal())
.setObjectSize(obj.getObjectSize())
.setPreparedTimeInMs(obj.getPreparedTimeInMs())
.setExpiredTimeInMs(obj.getExpiredTimeInMs())
.setCommittedTimeInMs(obj.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs());
// generate the records which mark the expired objects as destroyed
records.add(new ApiMessageAndVersion(record, (short) 0));
// generate the records which listener reply for the object-destroy events
lifecycleListeners.forEach(listener -> {
ControllerResult<Void> result = listener.onDestroy(obj.getObjectId());
records.addAll(result.records());
map(objectsMetadata::get).
filter(S3Object::isExpired).
forEach(obj -> {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(obj.getObjectId())
.setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal())
.setObjectSize(obj.getObjectSize())
.setPreparedTimeInMs(obj.getPreparedTimeInMs())
.setExpiredTimeInMs(obj.getExpiredTimeInMs())
.setCommittedTimeInMs(obj.getCommittedTimeInMs())
.setMarkDestroyedTimeInMs(obj.getMarkDestroyedTimeInMs());
// generate the records which mark the expired objects as destroyed
records.add(new ApiMessageAndVersion(record, (short) 0));
// generate the records which listener reply for the object-destroy events
lifecycleListeners.forEach(listener -> {
ControllerResult<Void> result = listener.onDestroy(obj.getObjectId());
records.addAll(result.records());
});
});
});
// check the mark destroyed objects
ObjectPair[] destroyedObjects = this.markDestroyedObjects.stream()
// must guarantee that the objects in markDestroyedObjects also exist in objectsMetadata
.map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey()))
.toArray(ObjectPair[]::new);
// must guarantee that the objects in markDestroyedObjects also exist in objectsMetadata
.map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey()))
.toArray(ObjectPair[]::new);
String[] destroyedObjectKeys = Arrays.stream(destroyedObjects).map(ObjectPair::objectKey).toArray(String[]::new);
if (destroyedObjectKeys == null || destroyedObjectKeys.length == 0) {
return ControllerResult.of(records, null);
Expand All @@ -272,16 +277,16 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
this.operator.delele(destroyedObjectKeys).whenCompleteAsync((success, e) -> {
if (e != null || !success) {
log.error("Failed to delete the S3Object from S3, objectKeys: {}",
String.join(",", destroyedObjectKeys), e);
String.join(",", destroyedObjectKeys), e);
return;
}
// notify the controller an objects deletion event to drive the removal of the objects
ControllerRequestContext ctx = new ControllerRequestContext(
null, null, OptionalLong.empty());
null, null, OptionalLong.empty());
this.quorumController.notifyS3ObjectDeleted(ctx, destroyedObjectIds).whenComplete((ignore, exp) -> {
if (exp != null) {
log.error("Failed to notify the controller the S3Object deletion event, objectIds: {}",
destroyedObjectIds, exp);
destroyedObjectIds, exp);
}
});
});
Expand All @@ -298,15 +303,11 @@ public ControllerResult<Void> notifyS3ObjectDeleted(Set<Long> deletedObjectIds)
List<ApiMessageAndVersion> records = new ArrayList<>();
deletedObjectIds.stream().filter(markDestroyedObjects::contains).forEach(objectId -> {
records.add(new ApiMessageAndVersion(
new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0));
new RemoveS3ObjectRecord().setObjectId(objectId), (short) 0));
});
return ControllerResult.of(records, null);
}

public Queue<Long> preparedObjects() {
return preparedObjects;
}

public Map<Long, S3Object> objectsMetadata() {
return objectsMetadata;
}
Expand Down Expand Up @@ -350,9 +351,9 @@ public String objectKey() {
@Override
public String toString() {
return "ObjectPair{" +
"objectId=" + objectId +
", objectKey='" + objectKey + '\'' +
'}';
"objectId=" + objectId +
", objectKey='" + objectKey + '\'' +
'}';
}
}

Expand Down

0 comments on commit ec85544

Please sign in to comment.