Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.kafka.common.errors.s3;

import org.apache.kafka.common.errors.ApiException;

public class ObjectNotExistException extends ApiException {

public ObjectNotExistException(String message) {
super(message);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.errors.s3.ObjectNotExistException;
import org.apache.kafka.common.errors.s3.StreamExistException;
import org.apache.kafka.common.errors.s3.StreamFencedException;
import org.apache.kafka.common.errors.s3.StreamNotExistException;
Expand Down Expand Up @@ -379,8 +380,8 @@ public enum Errors {

STREAM_EXIST(501, "The stream already exists.", StreamExistException::new),
STREAM_NOT_EXIST(502, "The stream does not exist.", StreamNotExistException::new),
STREAM_FENCED(503, "The stream is fenced.", StreamFencedException::new);

STREAM_FENCED(503, "The stream is fenced.", StreamFencedException::new),
OBJECT_NOT_EXIST(504, "The object does not exist.", ObjectNotExistException::new);

// Kafka on S3 inject end

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"fields": [
{
"name": "StreamId",
"type": "int32",
"type": "int64",
"versions": "0+",
"about": "The ID of the stream"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
"fields": [
{
"name": "StreamId",
"type": "int32",
"type": "int64",
"versions": "0+",
"about": "The ID of the stream"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.controller.stream.DefaultS3Operator;
import org.apache.kafka.controller.stream.S3ObjectControlManager;
import org.apache.kafka.controller.stream.StreamControlManager;
import org.apache.kafka.metadata.BrokerHeartbeatReply;
Expand Down Expand Up @@ -1801,7 +1802,8 @@ private QuorumController(

// Kafka on S3 inject start
this.s3Config = s3Config;
this.s3ObjectControlManager = new S3ObjectControlManager(this, snapshotRegistry, logContext, clusterId, s3Config);
this.s3ObjectControlManager = new S3ObjectControlManager(
this, snapshotRegistry, logContext, clusterId, s3Config, new DefaultS3Operator());
this.streamControlManager = new StreamControlManager(snapshotRegistry, logContext, this.s3ObjectControlManager);
// Kafka on S3 inject end
updateWriteOffset(-1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
Expand Down Expand Up @@ -89,7 +90,8 @@ public S3ObjectControlManager(
SnapshotRegistry snapshotRegistry,
LogContext logContext,
String clusterId,
S3Config config) {
S3Config config,
S3Operator operator) {
this.quorumController = quorumController;
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(S3ObjectControlManager.class);
Expand All @@ -99,7 +101,7 @@ public S3ObjectControlManager(
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.preparedObjects = new LinkedBlockingDeque<>();
this.markDestroyedObjects = new LinkedBlockingDeque<>();
this.operator = new DefaultS3Operator();
this.operator = operator;
this.lifecycleListeners = new ArrayList<>();
this.lifecycleCheckTimer = Executors.newSingleThreadScheduledExecutor();
this.lifecycleCheckTimer.scheduleWithFixedDelay(() -> {
Expand Down Expand Up @@ -153,6 +155,23 @@ public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3Obje
return ControllerResult.atomicOf(records, response);
}

public ControllerResult<Boolean> commitObject(long objectId, long objectSize) {
S3Object object = this.objectsMetadata.get(objectId);
if (object == null) {
log.error("object {} not exist when commit wal object", objectId);
return ControllerResult.of(Collections.emptyList(), false);
}
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(objectId)
.setObjectSize(objectSize)
.setObjectState(S3ObjectState.COMMITTED.toByte())
.setPreparedTimeInMs(object.getPreparedTimeInMs())
.setExpiredTimeInMs(object.getExpiredTimeInMs())
.setCommittedTimeInMs(System.currentTimeMillis());
return ControllerResult.of(List.of(
new ApiMessageAndVersion(record, (short) 0)), true);
}

public void replay(AssignedS3ObjectIdRecord record) {
nextAssignedObjectId.set(record.assignedS3ObjectId() + 1);
}
Expand Down Expand Up @@ -191,7 +210,7 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
forEach(obj -> {
S3ObjectRecord record = new S3ObjectRecord()
.setObjectId(obj.getObjectId())
.setObjectState((byte) S3ObjectState.DESTROYED.ordinal())
.setObjectState((byte) S3ObjectState.MARK_DESTROYED.ordinal())
.setObjectSize(obj.getObjectSize())
.setPreparedTimeInMs(obj.getPreparedTimeInMs())
.setExpiredTimeInMs(obj.getExpiredTimeInMs())
Expand All @@ -211,6 +230,9 @@ public ControllerResult<Void> checkS3ObjectsLifecycle() {
.map(id -> new ObjectPair(id, objectsMetadata.get(id).getObjectKey()))
.toArray(ObjectPair[]::new);
String[] destroyedObjectKeys = Arrays.stream(destroyedObjects).map(ObjectPair::objectKey).toArray(String[]::new);
if (destroyedObjectKeys == null || destroyedObjectKeys.length == 0) {
return ControllerResult.of(records, null);
}
Set<Long> destroyedObjectIds = Arrays.stream(destroyedObjects).map(ObjectPair::objectId).collect(Collectors.toSet());
// TODO: deal with failed objects in batch object deletion request
this.operator.delele(destroyedObjectKeys).whenCompleteAsync((success, e) -> {
Expand Down Expand Up @@ -255,6 +277,10 @@ public Map<Long, S3Object> objectsMetadata() {
return objectsMetadata;
}

public S3Object getObject(Long objectId) {
return this.objectsMetadata.get(objectId);
}

/**
* S3Object's lifecycle listener.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.kafka.common.message.CloseStreamRequestData;
import org.apache.kafka.common.message.CloseStreamResponseData;
import org.apache.kafka.common.message.CommitCompactObjectRequestData;
import org.apache.kafka.common.message.CommitCompactObjectResponseData;
import org.apache.kafka.common.message.CommitStreamObjectRequestData;
import org.apache.kafka.common.message.CommitStreamObjectResponseData;
import org.apache.kafka.common.message.CommitWALObjectRequestData;
import org.apache.kafka.common.message.CommitWALObjectRequestData.ObjectStreamRange;
import org.apache.kafka.common.message.CommitWALObjectResponseData;
import org.apache.kafka.common.message.CreateStreamRequestData;
import org.apache.kafka.common.message.CreateStreamResponseData;
Expand All @@ -38,14 +40,18 @@
import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.BrokerWALMetadataRecord;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamRecord;
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.common.metadata.WALObjectRecord;
import org.apache.kafka.common.metadata.WALObjectRecord.StreamIndex;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.ControllerResult;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3ObjectStreamIndex;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3WALObject;
import org.apache.kafka.server.common.ApiMessageAndVersion;
Expand All @@ -61,9 +67,8 @@
*/
public class StreamControlManager {

// TODO: assigned record
// TODO: timeline check
public static class S3StreamMetadata {

// current epoch, when created but not open, use 0 represent
private TimelineLong currentEpoch;
// rangeIndex, when created but not open, there is no range, use -1 represent
Expand Down Expand Up @@ -117,6 +122,7 @@ public String toString() {
}

public static class BrokerS3WALMetadata {

private int brokerId;
private TimelineHashSet<S3WALObject> walObjects;

Expand All @@ -129,7 +135,7 @@ public int getBrokerId() {
return brokerId;
}

public TimelineHashSet<S3WALObject> getWalObjects() {
public TimelineHashSet<S3WALObject> walObjects() {
return walObjects;
}

Expand Down Expand Up @@ -243,7 +249,7 @@ public ControllerResult<OpenStreamResponseData> openStream(OpenStreamRequestData
.setEndOffset(startOffset)
.setEpoch(newEpoch)
.setRangeIndex(newRangeIndex), (short) 0));
resp.setStartOffset(startOffset);
resp.setStartOffset(streamMetadata.startOffset());
resp.setNextOffset(startOffset);
return ControllerResult.atomicOf(records, resp);
}
Expand All @@ -257,7 +263,65 @@ public ControllerResult<DeleteStreamResponseData> deleteStream(DeleteStreamReque
}

public ControllerResult<CommitWALObjectResponseData> commitWALObject(CommitWALObjectRequestData data) {
throw new UnsupportedOperationException();
CommitWALObjectResponseData resp = new CommitWALObjectResponseData();
List<ApiMessageAndVersion> records = new ArrayList<>();
List<Long> failedStreamIds = new ArrayList<>();
resp.setFailedStreamIds(failedStreamIds);
long objectId = data.objectId();
int brokerId = data.brokerId();
long objectSize = data.objectSize();
List<ObjectStreamRange> streamRanges = data.objectStreamRanges();
// verify stream epoch
streamRanges.stream().filter(range -> !verifyWalStreamRanges(range, brokerId))
.mapToLong(ObjectStreamRange::streamId).forEach(failedStreamIds::add);
if (!failedStreamIds.isEmpty()) {
log.error("stream is invalid when commit wal object, failed stream ids [{}]",
String.join(",", failedStreamIds.stream().map(String::valueOf).collect(Collectors.toList())));
}
// commit object
ControllerResult<Boolean> commitResult = this.s3ObjectControlManager.commitObject(objectId, objectSize);
if (!commitResult.response()) {
log.error("object {} not exist when commit wal object", objectId);
resp.setErrorCode(Errors.OBJECT_NOT_EXIST.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
records.addAll(commitResult.records());
List<S3ObjectStreamIndex> indexes = new ArrayList<>(streamRanges.size());
streamRanges.stream().filter(range -> !failedStreamIds.contains(range.streamId())).forEach(range -> {
// build WAL object
long streamId = range.streamId();
long startOffset = range.startOffset();
long endOffset = range.endOffset();
indexes.add(new S3ObjectStreamIndex(objectId, startOffset, endOffset));
// TODO: support lazy flush range's end offset
// update range's offset
S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
RangeMetadata oldRange = streamMetadata.ranges.get(streamMetadata.currentRangeIndex());
RangeRecord record = new RangeRecord()
.setStreamId(streamId)
.setBrokerId(brokerId)
.setEpoch(oldRange.epoch())
.setRangeIndex(oldRange.rangeIndex())
.setStartOffset(oldRange.startOffset())
.setEndOffset(endOffset);
records.add(new ApiMessageAndVersion(record, (short) 0));
});
// update broker's wal object
BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId);
if (brokerMetadata == null) {
// first time commit wal object, create broker's metadata
records.add(new ApiMessageAndVersion(new BrokerWALMetadataRecord()
.setBrokerId(brokerId), (short) 0));
}
// create broker's wal object
records.add(new ApiMessageAndVersion(new WALObjectRecord()
.setObjectId(objectId)
.setBrokerId(brokerId)
.setStreamsIndex(
indexes.stream()
.map(S3ObjectStreamIndex::toRecordStreamIndex)
.collect(Collectors.toList())), (short) 0));
return ControllerResult.atomicOf(records, resp);
}

public ControllerResult<CommitCompactObjectResponseData> commitCompactObject(CommitCompactObjectRequestData data) {
Expand Down Expand Up @@ -315,6 +379,28 @@ public void replay(RemoveRangeRecord record) {
streamMetadata.ranges.remove(record.rangeIndex());
}

public void replay(BrokerWALMetadataRecord record) {
int brokerId = record.brokerId();
this.brokersMetadata.computeIfAbsent(brokerId, id -> new BrokerS3WALMetadata(id, this.snapshotRegistry));
}

public void replay(WALObjectRecord record) {
long objectId = record.objectId();
int brokerId = record.brokerId();
List<StreamIndex> streamIndexes = record.streamsIndex();
BrokerS3WALMetadata brokerMetadata = this.brokersMetadata.get(brokerId);
if (brokerMetadata == null) {
// should not happen
log.error("broker {} not exist when replay wal object record {}", brokerId, record);
return;
}
Map<Long, List<S3ObjectStreamIndex>> indexMap = streamIndexes
.stream()
.map(S3ObjectStreamIndex::of)
.collect(Collectors.groupingBy(S3ObjectStreamIndex::getStreamId));
brokerMetadata.walObjects.add(new S3WALObject(objectId, brokerId, indexMap));
}


public Map<Long, S3StreamMetadata> streamsMetadata() {
return streamsMetadata;
Expand All @@ -328,6 +414,37 @@ public Long nextAssignedStreamId() {
return nextAssignedStreamId.get();
}

private boolean verifyWalStreamRanges(ObjectStreamRange range, long brokerId) {
long streamId = range.streamId();
long epoch = range.streamEpoch();
// verify
S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
if (streamMetadata == null) {
return false;
}
// compare epoch
if (streamMetadata.currentEpoch() > epoch) {
return false;
}
if (streamMetadata.currentEpoch() < epoch) {
return false;
}
RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get());
if (rangeMetadata == null) {
return false;
}
// compare broker
if (rangeMetadata.brokerId() != brokerId) {
return false;
}
// compare offset
if (rangeMetadata.endOffset() != range.startOffset()) {
return false;
}
return true;
}


@Override
public String toString() {
return "StreamControlManager{" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,6 @@ public S3ObjectState getS3ObjectState() {
}

public boolean isExpired() {
return System.currentTimeMillis() > expiredTimeInMs;
return this.s3ObjectState == S3ObjectState.PREPARED && System.currentTimeMillis() > expiredTimeInMs;
}
}
Loading