From 886f19a7deba39c858ed1ee95fe3bc47769321d5 Mon Sep 17 00:00:00 2001 From: TheR1sing3un Date: Fri, 1 Sep 2023 15:10:27 +0800 Subject: [PATCH] feat(s3): replay new added state in StreamImage 1. replay new added state in StreamImage Signed-off-by: TheR1sing3un --- checkstyle/import-control.xml | 1 + .../java/kafka/log/s3/S3StreamMemoryTest.java | 2 +- .../controller/stream/S3StreamConstant.java | 4 +++ .../kafka/image/S3StreamMetadataDelta.java | 27 +++++++-------- .../kafka/image/S3StreamMetadataImage.java | 29 +++++++++++++--- .../kafka/image/S3StreamsMetadataDelta.java | 3 ++ .../image/S3StreamMetadataImageTest.java | 33 ++++++++++++------- .../image/S3StreamsMetadataImageTest.java | 3 +- 8 files changed, 72 insertions(+), 30 deletions(-) diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 79735be60d..47dc05b6e9 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -263,6 +263,7 @@ + diff --git a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java index f3e117fcf1..bd9d8a6163 100644 --- a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java +++ b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java @@ -100,7 +100,7 @@ public ByteBuffer rawPayload() { StreamManager streamManager; ObjectManager objectManager; S3StreamClient streamClient; - private final static long MAX_APPENDED_OFFSET = 1000; + private final static long MAX_APPENDED_OFFSET = 200; Random random = new Random(); diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java index a760fef07b..e7e3ce4e0a 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java @@ -27,4 +27,8 @@ public class S3StreamConstant { public static final long INIT_END_OFFSET = 0L; + public static final long INVALID_STREAM_ID = -1L; + + public static final long INVALID_OBJECT_ID = -1L; + } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java index 1527b8c3df..b360886a71 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java @@ -17,7 +17,6 @@ package org.apache.kafka.image; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -30,15 +29,17 @@ import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.StreamState; public class S3StreamMetadataDelta { + private final S3StreamMetadataImage image; private long streamId; private long newStartOffset; private long newEpoch; - - private int activeRangeIndex = -1; + private StreamState currentState; + private int currentRangeIndex; private final Map changedRanges = new HashMap<>(); private final Set removedRanges = new HashSet<>(); @@ -50,28 +51,28 @@ public S3StreamMetadataDelta(S3StreamMetadataImage image) { this.newEpoch = image.getEpoch(); this.streamId = image.getStreamId(); this.newStartOffset = image.getStartOffset(); - this.activeRangeIndex = image.getRanges().keySet().stream().sorted(Comparator.reverseOrder()).findFirst().orElse(-1); + this.currentState = image.state(); + this.currentRangeIndex = image.rangeIndex(); } + public void replay(S3StreamRecord record) { this.streamId = record.streamId(); this.newEpoch = record.epoch(); this.newStartOffset = record.startOffset(); + this.currentState = StreamState.fromByte(record.streamState()); + this.currentRangeIndex = record.rangeIndex(); } public void replay(RangeRecord record) { changedRanges.put(record.rangeIndex(), RangeMetadata.of(record)); // new add or update, so remove from removedRanges removedRanges.remove(record.rangeIndex()); - this.activeRangeIndex = record.rangeIndex(); } public void replay(RemoveRangeRecord record) { removedRanges.add(record.rangeIndex()); // new remove, so remove from changedRanges changedRanges.remove(record.rangeIndex()); - if (record.rangeIndex() == activeRangeIndex) { - activeRangeIndex = -1; - } } public void replay(S3StreamObjectRecord record) { @@ -89,10 +90,10 @@ public void replay(RemoveS3StreamObjectRecord record) { public void replay(AdvanceRangeRecord record) { long startOffset = record.startOffset(); long newEndOffset = record.endOffset(); - // check active range - RangeMetadata metadata = this.changedRanges.get(activeRangeIndex); + // check current range + RangeMetadata metadata = this.changedRanges.get(currentRangeIndex); if (metadata == null) { - metadata = this.image.getRanges().get(activeRangeIndex); + metadata = this.image.getRanges().get(currentRangeIndex); } if (metadata == null) { // ignore it @@ -103,7 +104,7 @@ public void replay(AdvanceRangeRecord record) { return; } // update the endOffset - this.changedRanges.put(activeRangeIndex, new RangeMetadata( + this.changedRanges.put(currentRangeIndex, new RangeMetadata( streamId, metadata.epoch(), metadata.rangeIndex(), metadata.startOffset(), newEndOffset, metadata.brokerId() )); } @@ -119,7 +120,7 @@ public S3StreamMetadataImage apply() { newS3StreamObjects.putAll(changedS3StreamObjects); // remove all removed stream-objects removedS3StreamObjectIds.forEach(newS3StreamObjects::remove); - return new S3StreamMetadataImage(streamId, newEpoch, newStartOffset, newRanges, newS3StreamObjects); + return new S3StreamMetadataImage(streamId, newEpoch, currentState, currentRangeIndex, newStartOffset, newRanges, newS3StreamObjects); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index 54d99e5707..10bc4b8cbf 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -20,34 +20,43 @@ import java.util.Map; import java.util.Objects; import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.controller.stream.S3StreamConstant; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; +import org.apache.kafka.metadata.stream.StreamState; public class S3StreamMetadataImage { public static final S3StreamMetadataImage EMPTY = - new S3StreamMetadataImage(-1L, -1L, -1L, Map.of(), Map.of()); + new S3StreamMetadataImage(S3StreamConstant.INVALID_STREAM_ID, S3StreamConstant.INIT_EPOCH, StreamState.CLOSED, + S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of()); private final long streamId; private final long epoch; + private final int rangeIndex; + private final long startOffset; + private final StreamState state; + private final Map ranges; private final Map streamObjects; public S3StreamMetadataImage( - long streamId, - long epoch, + long streamId, long epoch, StreamState state, + int rangeIndex, long startOffset, Map ranges, Map streamObjects) { this.streamId = streamId; this.epoch = epoch; + this.state = state; + this.rangeIndex = rangeIndex; this.startOffset = startOffset; this.ranges = ranges; this.streamObjects = streamObjects; @@ -56,6 +65,8 @@ public S3StreamMetadataImage( public void write(ImageWriter writer, ImageWriterOptions options) { writer.write(0, new S3StreamRecord() .setStreamId(streamId) + .setRangeIndex(rangeIndex) + .setStreamState(state.toByte()) .setEpoch(epoch) .setStartOffset(startOffset)); ranges.values().forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord())); @@ -87,6 +98,14 @@ public long startOffset() { return startOffset; } + public int rangeIndex() { + return rangeIndex; + } + + public StreamState state() { + return state; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -98,6 +117,8 @@ public boolean equals(Object o) { S3StreamMetadataImage that = (S3StreamMetadataImage) o; return this.streamId == that.streamId && this.epoch == that.epoch && + this.state == that.state && + this.rangeIndex == that.rangeIndex && this.startOffset == that.startOffset && this.ranges.equals(that.ranges) && this.streamObjects.equals(that.streamObjects); @@ -105,6 +126,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return Objects.hash(streamId, epoch, startOffset, ranges, streamObjects); + return Objects.hash(streamId, epoch, state, rangeIndex, startOffset, ranges, streamObjects); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java index 9b01d63e1c..ad4ad855b9 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java @@ -96,6 +96,9 @@ public void replay(RemoveRangeRecord record) { public void replay(S3StreamObjectRecord record) { getOrCreateStreamMetadataDelta(record.streamId()).replay(record); + getOrCreateStreamMetadataDelta(record.streamId()).replay(new AdvanceRangeRecord() + .setStartOffset(record.startOffset()) + .setEndOffset(record.endOffset())); } public void replay(RemoveS3StreamObjectRecord record) { diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java index 91d4155ff6..f0ba6e929a 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java @@ -28,11 +28,13 @@ import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; +import org.apache.kafka.controller.stream.S3StreamConstant; import org.apache.kafka.image.writer.ImageWriterOptions; import org.apache.kafka.image.writer.RecordListWriter; import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; +import org.apache.kafka.metadata.stream.StreamState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -56,12 +58,15 @@ public void testRanges() { // 1. create stream0 delta0Records.add(new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(STREAM0) + .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX) + .setStreamState(StreamState.OPENED.toByte()) + .setStartOffset(S3StreamConstant.INIT_START_OFFSET) .setEpoch(0L) .setStartOffset(0L), (short) 0)); RecordTestUtils.replayAll(delta0, delta0Records); // verify delta and check image's write S3StreamMetadataImage image1 = new S3StreamMetadataImage( - STREAM0, 0L, 0L, Map.of(), Map.of()); + STREAM0, 0L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of()); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -71,8 +76,10 @@ public void testRanges() { S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1); delta1Records.add(new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(STREAM0) - .setEpoch(1L) - .setStartOffset(0L), (short) 0)); + .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX) + .setStreamState(StreamState.OPENED.toByte()) + .setStartOffset(S3StreamConstant.INIT_START_OFFSET) + .setEpoch(1L), (short) 0)); delta1Records.add(new ApiMessageAndVersion(new RangeRecord() .setStreamId(STREAM0) .setRangeIndex(0) @@ -82,7 +89,7 @@ public void testRanges() { RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write S3StreamMetadataImage image2 = new S3StreamMetadataImage( - STREAM0, 1L, 0L, + STREAM0, 1L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Map.of()); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); @@ -92,8 +99,10 @@ public void testRanges() { S3StreamMetadataDelta delta2 = new S3StreamMetadataDelta(image2); delta2Records.add(new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(STREAM0) - .setEpoch(2L) - .setStartOffset(0L), (short) 0)); + .setRangeIndex(0) + .setStreamState(StreamState.OPENED.toByte()) + .setStartOffset(S3StreamConstant.INIT_START_OFFSET) + .setEpoch(2L), (short) 0)); delta2Records.add(new ApiMessageAndVersion(new AdvanceRangeRecord() .setStartOffset(0L) .setEndOffset(100L), (short) 0)); @@ -107,7 +116,7 @@ public void testRanges() { RecordTestUtils.replayAll(delta2, delta2Records); // verify delta and check image's write S3StreamMetadataImage image3 = new S3StreamMetadataImage( - STREAM0, 2L, 0L, Map.of( + STREAM0, 2L, StreamState.OPENED, 0, 0L, Map.of( 0, new RangeMetadata(STREAM0, 1L, 0, 0, 100, BROKER0), 1, new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Map.of()); assertEquals(image3, delta2.apply()); @@ -119,6 +128,8 @@ public void testRanges() { delta3Records.add(new ApiMessageAndVersion(new S3StreamRecord() .setStreamId(STREAM0) .setEpoch(2L) + .setRangeIndex(0) + .setStreamState(StreamState.OPENED.toByte()) .setStartOffset(100L), (short) 0)); delta3Records.add(new ApiMessageAndVersion(new RemoveRangeRecord() .setStreamId(STREAM0) @@ -126,7 +137,7 @@ public void testRanges() { RecordTestUtils.replayAll(delta3, delta3Records); // verify delta and check image's write S3StreamMetadataImage image4 = new S3StreamMetadataImage( - STREAM0, 2L, 100L, Map.of( + STREAM0, 2L, StreamState.OPENED, 0, 100L, Map.of( 1, new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Map.of()); assertEquals(image4, delta3.apply()); } @@ -134,7 +145,7 @@ public void testRanges() { @Test public void testStreamObjects() { S3StreamMetadataImage image0 = new S3StreamMetadataImage( - STREAM0, 0L, 0L, Map.of(), Map.of()); + STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of()); List delta0Records = new ArrayList<>(); S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0); // 1. create streamObject0 and streamObject1 @@ -151,7 +162,7 @@ public void testStreamObjects() { RecordTestUtils.replayAll(delta0, delta0Records); // verify delta and check image's write S3StreamMetadataImage image1 = new S3StreamMetadataImage( - STREAM0, 0L, 0L, Map.of(), Map.of( + STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of( 0L, new S3StreamObject(0L, 999, STREAM0, 0L, 100L), 1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L))); assertEquals(image1, delta0.apply()); @@ -165,7 +176,7 @@ public void testStreamObjects() { RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write S3StreamMetadataImage image2 = new S3StreamMetadataImage( - STREAM0, 0L, 0L, Map.of(), Map.of( + STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of( 1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L))); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 19b58030ce..3896c2396d 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.metadata.stream.S3ObjectStreamIndex; import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.metadata.stream.S3WALObject; +import org.apache.kafka.metadata.stream.StreamState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; @@ -120,7 +121,7 @@ public void testGetObjects() { 8L, new S3StreamObject(8, GB, STREAM0, 10L, 100L), 9L, new S3StreamObject(9, GB, STREAM0, 200L, 300L), 10L, new S3StreamObject(10, GB, STREAM0, 300L, 400L)); - S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, 10, ranges, streamObjects); + S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 4, 10, ranges, streamObjects); S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage), Map.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage));