Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@

<module name="ClassFanOutComplexity">
<!-- default is 20 -->
<property name="max" value="50"/>
<property name="max" value="100"/>
</module>
<module name="CyclomaticComplexity">
<!-- default is 10-->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
"type": "int64",
"versions": "0+",
"about": "The start offset of the opened stream"
},
{
"name": "NextOffset",
"type": "int64",
"versions": "0+",
"about": "The next offset of the opened stream"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.stream.Collectors;
import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.RemoveS3ObjectRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -44,6 +45,7 @@
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;

/**
Expand All @@ -69,7 +71,7 @@ public class S3ObjectControlManager {
/**
* The objectId of the next object to be prepared. (start from 0)
*/
private Long nextAssignedObjectId = 0L;
private TimelineLong nextAssignedObjectId;

private final Queue<Long/*objectId*/> preparedObjects;

Expand All @@ -93,6 +95,7 @@ public S3ObjectControlManager(
this.log = logContext.logger(S3ObjectControlManager.class);
this.clusterId = clusterId;
this.config = config;
this.nextAssignedObjectId = new TimelineLong(snapshotRegistry);
this.objectsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.preparedObjects = new LinkedBlockingDeque<>();
this.markDestroyedObjects = new LinkedBlockingDeque<>();
Expand All @@ -119,17 +122,23 @@ public void registerListener(S3ObjectLifeCycleListener listener) {
}

public Long nextAssignedObjectId() {
return nextAssignedObjectId;
return nextAssignedObjectId.get();
}

public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3ObjectRequestData request) {
// TODO: support batch prepare objects
// TODO: pre assigned a batch of objectIds in controller
List<ApiMessageAndVersion> records = new ArrayList<>();
PrepareS3ObjectResponseData response = new PrepareS3ObjectResponseData();
int count = request.preparedCount();
List<Long> prepareObjectIds = new ArrayList<>(count);

// update assigned stream id
long newAssignedObjectId = nextAssignedObjectId.get() + count - 1;
records.add(new ApiMessageAndVersion(new AssignedS3ObjectIdRecord()
.setAssignedS3ObjectId(newAssignedObjectId), (short) 0));

for (int i = 0; i < count; i++) {
Long objectId = nextAssignedObjectId + i;
Long objectId = nextAssignedObjectId.get() + i;
prepareObjectIds.add(objectId);
long preparedTs = System.currentTimeMillis();
long expiredTs = preparedTs + request.timeToLiveInMs();
Expand All @@ -141,7 +150,11 @@ public ControllerResult<PrepareS3ObjectResponseData> prepareObject(PrepareS3Obje
records.add(new ApiMessageAndVersion(record, (short) 0));
}
response.setS3ObjectIds(prepareObjectIds);
return ControllerResult.of(records, response);
return ControllerResult.atomicOf(records, response);
}

public void replay(AssignedS3ObjectIdRecord record) {
nextAssignedObjectId.set(record.assignedS3ObjectId() + 1);
}

public void replay(S3ObjectRecord record) {
Expand All @@ -157,7 +170,6 @@ public void replay(S3ObjectRecord record) {
} else if (object.getS3ObjectState() == S3ObjectState.MARK_DESTROYED) {
markDestroyedObjects.add(object.getObjectId());
}
nextAssignedObjectId = Math.max(nextAssignedObjectId, record.objectId() + 1);
}

public void replay(RemoveS3ObjectRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.kafka.common.message.DeleteStreamResponseData;
import org.apache.kafka.common.message.OpenStreamRequestData;
import org.apache.kafka.common.message.OpenStreamResponseData;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamRecord;
Expand All @@ -51,6 +52,8 @@
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
import org.apache.kafka.timeline.TimelineHashSet;
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineLong;
import org.slf4j.Logger;

/**
Expand All @@ -62,32 +65,35 @@ public class StreamControlManager {
// TODO: timeline check
public static class S3StreamMetadata {
// current epoch, when created but not open, use 0 represent
private long currentEpoch;
private TimelineLong currentEpoch;
// rangeIndex, when created but not open, there is no range, use -1 represent
private int currentRangeIndex = -1;
private long startOffset;
private TimelineInteger currentRangeIndex;
private TimelineLong startOffset;
private TimelineHashMap<Integer/*rangeIndex*/, RangeMetadata> ranges;
private TimelineHashSet<S3StreamObject> streamObjects;

public S3StreamMetadata(long currentEpoch, int currentRangeIndex, long startOffset,
SnapshotRegistry registry) {
this.currentEpoch = currentEpoch;
this.currentRangeIndex = currentRangeIndex;
this.startOffset = startOffset;
this.currentEpoch = new TimelineLong(registry);
this.currentEpoch.set(currentEpoch);
this.currentRangeIndex = new TimelineInteger(registry);
this.currentRangeIndex.set(currentRangeIndex);
this.startOffset = new TimelineLong(registry);
this.startOffset.set(startOffset);
this.ranges = new TimelineHashMap<>(registry, 0);
this.streamObjects = new TimelineHashSet<>(registry, 0);
}

public long currentEpoch() {
return currentEpoch;
return currentEpoch.get();
}

public int currentRangeIndex() {
return currentRangeIndex;
return currentRangeIndex.get();
}

public long startOffset() {
return startOffset;
return startOffset.get();
}

public Map<Integer, RangeMetadata> ranges() {
Expand Down Expand Up @@ -145,7 +151,7 @@ public String toString() {
/**
* The next stream id to be assigned.
*/
private Long nextAssignedStreamId = 0L;
private final TimelineLong nextAssignedStreamId;

private final TimelineHashMap<Long/*streamId*/, S3StreamMetadata> streamsMetadata;

Expand All @@ -158,24 +164,27 @@ public StreamControlManager(
this.snapshotRegistry = snapshotRegistry;
this.log = logContext.logger(StreamControlManager.class);
this.s3ObjectControlManager = s3ObjectControlManager;
this.nextAssignedStreamId = new TimelineLong(snapshotRegistry);
this.streamsMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersMetadata = new TimelineHashMap<>(snapshotRegistry, 0);
}

// TODO: refactor to return next offset of stream in response
// TODO: lazy update range's end offset
// TODO: controller allocate the stream id
public ControllerResult<CreateStreamResponseData> createStream(CreateStreamRequestData data) {
// TODO: pre assigned a batch of stream id in controller
CreateStreamResponseData resp = new CreateStreamResponseData();
long streamId = nextAssignedStreamId;
long streamId = nextAssignedStreamId.get();
// update assigned id
ApiMessageAndVersion record0 = new ApiMessageAndVersion(new AssignedStreamIdRecord()
.setAssignedStreamId(streamId), (short) 0);
// create stream
ApiMessageAndVersion record = new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(streamId)
.setEpoch(0)
.setStartOffset(0L)
.setRangeIndex(-1), (short) 0);
resp.setStreamId(streamId);
return ControllerResult.of(Arrays.asList(record), resp);
return ControllerResult.atomicOf(Arrays.asList(record0, record), resp);
}

public ControllerResult<OpenStreamResponseData> openStream(OpenStreamRequestData data) {
Expand All @@ -190,37 +199,40 @@ public ControllerResult<OpenStreamResponseData> openStream(OpenStreamRequestData
}
// verify epoch match
S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
if (streamMetadata.currentEpoch > epoch) {
if (streamMetadata.currentEpoch.get() > epoch) {
resp.setErrorCode(Errors.STREAM_FENCED.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
if (streamMetadata.currentEpoch == epoch) {
if (streamMetadata.currentEpoch.get() == epoch) {
// epoch equals, verify broker
RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex);
if (rangeMetadata == null || rangeMetadata.brokerId() != brokerId) {
resp.setErrorCode(Errors.STREAM_FENCED.code());
RangeMetadata rangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get());
if (rangeMetadata != null) {
if (rangeMetadata.brokerId() != brokerId) {
resp.setErrorCode(Errors.STREAM_FENCED.code());
return ControllerResult.of(Collections.emptyList(), resp);
}
// epoch equals, broker equals, regard it as redundant open operation, just return success
resp.setStartOffset(streamMetadata.startOffset.get());
resp.setNextOffset(rangeMetadata.endOffset());
return ControllerResult.of(Collections.emptyList(), resp);
}
// epoch equals, broker equals, regard it as redundant open operation, just return success
resp.setStartOffset(streamMetadata.startOffset);
return ControllerResult.of(Collections.emptyList(), resp);
}
// now the request in valid, update the stream's epoch and create a new range for this broker
List<ApiMessageAndVersion> records = new ArrayList<>();
long newEpoch = epoch;
int newRangeIndex = streamMetadata.currentRangeIndex + 1;
int newRangeIndex = streamMetadata.currentRangeIndex.get() + 1;
// stream update record
records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(streamId)
.setEpoch(newEpoch)
.setRangeIndex(newRangeIndex)
.setStartOffset(streamMetadata.startOffset), (short) 0));
.setStartOffset(streamMetadata.startOffset.get()), (short) 0));
// get new range's start offset
// default regard this range is the first range in stream, use 0 as start offset
long startOffset = 0;
if (newRangeIndex > 0) {
// means that the new range is not the first range in stream, get the last range's end offset
RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex);
RangeMetadata lastRangeMetadata = streamMetadata.ranges.get(streamMetadata.currentRangeIndex.get());
startOffset = lastRangeMetadata.endOffset();
}
// range create record
Expand All @@ -232,7 +244,8 @@ public ControllerResult<OpenStreamResponseData> openStream(OpenStreamRequestData
.setEpoch(newEpoch)
.setRangeIndex(newRangeIndex), (short) 0));
resp.setStartOffset(startOffset);
return ControllerResult.of(records, resp);
resp.setNextOffset(startOffset);
return ControllerResult.atomicOf(records, resp);
}

public ControllerResult<CloseStreamResponseData> closeStream(CloseStreamRequestData data) {
Expand All @@ -255,22 +268,24 @@ public ControllerResult<CommitStreamObjectResponseData> commitStreamObject(Commi
throw new UnsupportedOperationException();
}

public void replay(AssignedStreamIdRecord record) {
this.nextAssignedStreamId.set(record.assignedStreamId() + 1);
}

public void replay(S3StreamRecord record) {
long streamId = record.streamId();
// already exist, update the stream's self metadata
if (this.streamsMetadata.containsKey(streamId)) {
S3StreamMetadata streamMetadata = this.streamsMetadata.get(streamId);
streamMetadata.startOffset = record.startOffset();
streamMetadata.currentEpoch = record.epoch();
streamMetadata.currentRangeIndex = record.rangeIndex();
streamMetadata.startOffset.set(record.startOffset());
streamMetadata.currentEpoch.set(record.epoch());
streamMetadata.currentRangeIndex.set(record.rangeIndex());
return;
}
// not exist, create a new stream
S3StreamMetadata streamMetadata = new S3StreamMetadata(record.epoch(), record.rangeIndex(),
record.startOffset(), this.snapshotRegistry);
this.streamsMetadata.put(streamId, streamMetadata);
this.nextAssignedStreamId = Math.max(this.nextAssignedStreamId, streamId + 1);
}

public void replay(RemoveS3StreamRecord record) {
Expand Down Expand Up @@ -310,7 +325,7 @@ public Map<Integer, BrokerS3WALMetadata> brokersMetadata() {
}

public Long nextAssignedStreamId() {
return nextAssignedStreamId;
return nextAssignedStreamId.get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import org.apache.kafka.common.message.PrepareS3ObjectRequestData;
import org.apache.kafka.common.message.PrepareS3ObjectResponseData;
import org.apache.kafka.common.metadata.AssignedS3ObjectIdRecord;
import org.apache.kafka.common.metadata.S3ObjectRecord;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.Errors;
Expand All @@ -40,6 +41,7 @@
@Timeout(40)
@Tag("S3Unit")
public class S3ObjectControlManagerTest {

private static final int BROKER0 = 0;
private static final int BROKER1 = 1;
private static final String CLUSTER = "kafka-on-S3_cluster";
Expand Down Expand Up @@ -67,11 +69,16 @@ public void testBasicPrepareObject() {
.setPreparedCount(3)
.setTimeToLiveInMs(1000));
assertEquals(Errors.NONE.code(), result0.response().errorCode());
assertEquals(3, result0.records().size());
assertEquals(4, result0.records().size());
ApiMessage message = result0.records().get(0).message();
assertInstanceOf(AssignedS3ObjectIdRecord.class, message);
AssignedS3ObjectIdRecord assignedRecord = (AssignedS3ObjectIdRecord) message;
assertEquals(2, assignedRecord.assignedS3ObjectId());
for (int i = 0; i < 3; i++) {
verifyPrepareObjectRecord(result0.records().get(i), i, 1000);
verifyPrepareObjectRecord(result0.records().get(i + 1), i, 1000);
}
result0.records().stream().map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record));
manager.replay(assignedRecord);
result0.records().stream().skip(1).map(ApiMessageAndVersion::message).forEach(record -> manager.replay((S3ObjectRecord) record));

// verify the 3 objects are prepared
assertEquals(3, manager.objectsMetadata().size());
Expand Down
Loading