Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,13 @@
import org.apache.kafka.timeline.TimelineInteger;
import org.apache.kafka.timeline.TimelineLong;
import org.apache.kafka.timeline.TimelineObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;

public class S3StreamMetadata {
private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamMetadata.class);

// current epoch, when created but not open, use -1 represent
private final TimelineLong currentEpoch;
Expand Down Expand Up @@ -94,6 +97,24 @@ public RangeMetadata currentRangeMetadata() {
return ranges.get(currentRangeIndex.get());
}

public void updateEndOffset(long newEndOffset) {
RangeMetadata rangeMetadata = ranges.get(currentRangeIndex.get());
if (rangeMetadata == null) {
LOGGER.error("[UNEXPECTED] cannot find range={}", currentRangeIndex.get());
return;
}
if (rangeMetadata.endOffset() < newEndOffset) {
ranges.put(rangeMetadata.rangeIndex(), new RangeMetadata(
rangeMetadata.streamId(),
rangeMetadata.epoch(),
rangeMetadata.rangeIndex(),
rangeMetadata.startOffset(),
newEndOffset,
rangeMetadata.nodeId()
));
}
}

public Map<Long, S3StreamObject> streamObjects() {
return streamObjects;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1046,17 +1046,9 @@ public void replay(S3StreamSetObjectRecord record) {
// ignore it, the stream may be deleted
return;
}
RangeMetadata rangeMetadata = metadata.currentRangeMetadata();
if (rangeMetadata == null) {
// ignore it
LOGGER.error("[REPLAY_STREAM_SET_OBJECT_FAIL] cannot find streamId={} stream range metadata", streamId);
return;
}
if (rangeMetadata.endOffset() < index.endOffset()) {
// the offset continuous is ensured by the process layer
// when replay from checkpoint, the record may be out of order, so we need to update the end offset to the largest end offset.
rangeMetadata.setEndOffset(index.endOffset());
}
// the offset continuous is ensured by the process layer
// when replay from checkpoint, the record may be out of order, so we need to update the end offset to the largest end offset.
metadata.updateEndOffset(index.endOffset());
});
}

Expand Down Expand Up @@ -1085,17 +1077,9 @@ public void replay(S3StreamObjectRecord record) {
return;
}
streamMetadata.streamObjects().put(objectId, new S3StreamObject(objectId, streamId, startOffset, endOffset, dataTs));
// update range
RangeMetadata rangeMetadata = streamMetadata.currentRangeMetadata();
if (rangeMetadata == null) {
LOGGER.error("[REPLAY_STREAM_SET_OBJECT_FAIL] cannot find streamId={} stream range metadata", streamId);
return;
}
if (rangeMetadata.endOffset() < endOffset) {
// the offset continuous is ensured by the process layer
// when replay from checkpoint, the record may be out of order, so we need to update the end offset to the largest end offset.
rangeMetadata.setEndOffset(endOffset);
}
// the offset continuous is ensured by the process layer
// when replay from checkpoint, the record may be out of order, so we need to update the end offset to the largest end offset.
streamMetadata.updateEndOffset(endOffset);
}

public void replay(RemoveS3StreamObjectRecord record) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ public int compareTo(RangeMetadata o) {
return this.rangeIndex - o.rangeIndex;
}

public long streamId() {
return streamId;
}

public long epoch() {
return epoch;
}
Expand All @@ -87,10 +91,6 @@ public int nodeId() {
return nodeId;
}

public void setEndOffset(long endOffset) {
this.endOffset = endOffset;
}

public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new RangeRecord()
.setStreamId(streamId)
Expand Down