diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamMetadata.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamMetadata.java index 31a82013cb..a7185fdfe3 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamMetadata.java @@ -24,10 +24,13 @@ import org.apache.kafka.timeline.TimelineInteger; import org.apache.kafka.timeline.TimelineLong; import org.apache.kafka.timeline.TimelineObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; public class S3StreamMetadata { + private static final Logger LOGGER = LoggerFactory.getLogger(S3StreamMetadata.class); // current epoch, when created but not open, use -1 represent private final TimelineLong currentEpoch; @@ -94,6 +97,24 @@ public RangeMetadata currentRangeMetadata() { return ranges.get(currentRangeIndex.get()); } + public void updateEndOffset(long newEndOffset) { + RangeMetadata rangeMetadata = ranges.get(currentRangeIndex.get()); + if (rangeMetadata == null) { + LOGGER.error("[UNEXPECTED] cannot find range={}", currentRangeIndex.get()); + return; + } + if (rangeMetadata.endOffset() < newEndOffset) { + ranges.put(rangeMetadata.rangeIndex(), new RangeMetadata( + rangeMetadata.streamId(), + rangeMetadata.epoch(), + rangeMetadata.rangeIndex(), + rangeMetadata.startOffset(), + newEndOffset, + rangeMetadata.nodeId() + )); + } + } + public Map streamObjects() { return streamObjects; } diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 0564234eda..eb0833250b 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -1046,17 +1046,9 @@ public void replay(S3StreamSetObjectRecord record) { // ignore it, the stream may be deleted return; } - RangeMetadata rangeMetadata = metadata.currentRangeMetadata(); - if (rangeMetadata == null) { - // ignore it - LOGGER.error("[REPLAY_STREAM_SET_OBJECT_FAIL] cannot find streamId={} stream range metadata", streamId); - return; - } - if (rangeMetadata.endOffset() < index.endOffset()) { - // the offset continuous is ensured by the process layer - // when replay from checkpoint, the record may be out of order, so we need to update the end offset to the largest end offset. - rangeMetadata.setEndOffset(index.endOffset()); - } + // the offset continuous is ensured by the process layer + // when replay from checkpoint, the record may be out of order, so we need to update the end offset to the largest end offset. + metadata.updateEndOffset(index.endOffset()); }); } @@ -1085,17 +1077,9 @@ public void replay(S3StreamObjectRecord record) { return; } streamMetadata.streamObjects().put(objectId, new S3StreamObject(objectId, streamId, startOffset, endOffset, dataTs)); - // update range - RangeMetadata rangeMetadata = streamMetadata.currentRangeMetadata(); - if (rangeMetadata == null) { - LOGGER.error("[REPLAY_STREAM_SET_OBJECT_FAIL] cannot find streamId={} stream range metadata", streamId); - return; - } - if (rangeMetadata.endOffset() < endOffset) { - // the offset continuous is ensured by the process layer - // when replay from checkpoint, the record may be out of order, so we need to update the end offset to the largest end offset. - rangeMetadata.setEndOffset(endOffset); - } + // the offset continuous is ensured by the process layer + // when replay from checkpoint, the record may be out of order, so we need to update the end offset to the largest end offset. + streamMetadata.updateEndOffset(endOffset); } public void replay(RemoveS3StreamObjectRecord record) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java index 54620d8e8f..8fad239459 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/RangeMetadata.java @@ -67,6 +67,10 @@ public int compareTo(RangeMetadata o) { return this.rangeIndex - o.rangeIndex; } + public long streamId() { + return streamId; + } + public long epoch() { return epoch; } @@ -87,10 +91,6 @@ public int nodeId() { return nodeId; } - public void setEndOffset(long endOffset) { - this.endOffset = endOffset; - } - public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new RangeRecord() .setStreamId(streamId)