diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 79735be60d..47dc05b6e9 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -263,6 +263,7 @@
+
diff --git a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java
index f3e117fcf1..bd9d8a6163 100644
--- a/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java
+++ b/core/src/test/java/kafka/log/s3/S3StreamMemoryTest.java
@@ -100,7 +100,7 @@ public ByteBuffer rawPayload() {
StreamManager streamManager;
ObjectManager objectManager;
S3StreamClient streamClient;
- private final static long MAX_APPENDED_OFFSET = 1000;
+ private final static long MAX_APPENDED_OFFSET = 200;
Random random = new Random();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java
index a760fef07b..e7e3ce4e0a 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/stream/S3StreamConstant.java
@@ -27,4 +27,8 @@ public class S3StreamConstant {
public static final long INIT_END_OFFSET = 0L;
+ public static final long INVALID_STREAM_ID = -1L;
+
+ public static final long INVALID_OBJECT_ID = -1L;
+
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java
index 1527b8c3df..b360886a71 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java
@@ -17,7 +17,6 @@
package org.apache.kafka.image;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -30,15 +29,17 @@
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
+import org.apache.kafka.metadata.stream.StreamState;
public class S3StreamMetadataDelta {
+
private final S3StreamMetadataImage image;
private long streamId;
private long newStartOffset;
private long newEpoch;
-
- private int activeRangeIndex = -1;
+ private StreamState currentState;
+ private int currentRangeIndex;
private final Map changedRanges = new HashMap<>();
private final Set removedRanges = new HashSet<>();
@@ -50,28 +51,28 @@ public S3StreamMetadataDelta(S3StreamMetadataImage image) {
this.newEpoch = image.getEpoch();
this.streamId = image.getStreamId();
this.newStartOffset = image.getStartOffset();
- this.activeRangeIndex = image.getRanges().keySet().stream().sorted(Comparator.reverseOrder()).findFirst().orElse(-1);
+ this.currentState = image.state();
+ this.currentRangeIndex = image.rangeIndex();
}
+
public void replay(S3StreamRecord record) {
this.streamId = record.streamId();
this.newEpoch = record.epoch();
this.newStartOffset = record.startOffset();
+ this.currentState = StreamState.fromByte(record.streamState());
+ this.currentRangeIndex = record.rangeIndex();
}
public void replay(RangeRecord record) {
changedRanges.put(record.rangeIndex(), RangeMetadata.of(record));
// new add or update, so remove from removedRanges
removedRanges.remove(record.rangeIndex());
- this.activeRangeIndex = record.rangeIndex();
}
public void replay(RemoveRangeRecord record) {
removedRanges.add(record.rangeIndex());
// new remove, so remove from changedRanges
changedRanges.remove(record.rangeIndex());
- if (record.rangeIndex() == activeRangeIndex) {
- activeRangeIndex = -1;
- }
}
public void replay(S3StreamObjectRecord record) {
@@ -89,10 +90,10 @@ public void replay(RemoveS3StreamObjectRecord record) {
public void replay(AdvanceRangeRecord record) {
long startOffset = record.startOffset();
long newEndOffset = record.endOffset();
- // check active range
- RangeMetadata metadata = this.changedRanges.get(activeRangeIndex);
+ // check current range
+ RangeMetadata metadata = this.changedRanges.get(currentRangeIndex);
if (metadata == null) {
- metadata = this.image.getRanges().get(activeRangeIndex);
+ metadata = this.image.getRanges().get(currentRangeIndex);
}
if (metadata == null) {
// ignore it
@@ -103,7 +104,7 @@ public void replay(AdvanceRangeRecord record) {
return;
}
// update the endOffset
- this.changedRanges.put(activeRangeIndex, new RangeMetadata(
+ this.changedRanges.put(currentRangeIndex, new RangeMetadata(
streamId, metadata.epoch(), metadata.rangeIndex(), metadata.startOffset(), newEndOffset, metadata.brokerId()
));
}
@@ -119,7 +120,7 @@ public S3StreamMetadataImage apply() {
newS3StreamObjects.putAll(changedS3StreamObjects);
// remove all removed stream-objects
removedS3StreamObjectIds.forEach(newS3StreamObjects::remove);
- return new S3StreamMetadataImage(streamId, newEpoch, newStartOffset, newRanges, newS3StreamObjects);
+ return new S3StreamMetadataImage(streamId, newEpoch, currentState, currentRangeIndex, newStartOffset, newRanges, newS3StreamObjects);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java
index 54d99e5707..10bc4b8cbf 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java
@@ -20,34 +20,43 @@
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.common.metadata.S3StreamRecord;
+import org.apache.kafka.controller.stream.S3StreamConstant;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.metadata.stream.StreamState;
public class S3StreamMetadataImage {
public static final S3StreamMetadataImage EMPTY =
- new S3StreamMetadataImage(-1L, -1L, -1L, Map.of(), Map.of());
+ new S3StreamMetadataImage(S3StreamConstant.INVALID_STREAM_ID, S3StreamConstant.INIT_EPOCH, StreamState.CLOSED,
+ S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of());
private final long streamId;
private final long epoch;
+ private final int rangeIndex;
+
private final long startOffset;
+ private final StreamState state;
+
private final Map ranges;
private final Map streamObjects;
public S3StreamMetadataImage(
- long streamId,
- long epoch,
+ long streamId, long epoch, StreamState state,
+ int rangeIndex,
long startOffset,
Map ranges,
Map streamObjects) {
this.streamId = streamId;
this.epoch = epoch;
+ this.state = state;
+ this.rangeIndex = rangeIndex;
this.startOffset = startOffset;
this.ranges = ranges;
this.streamObjects = streamObjects;
@@ -56,6 +65,8 @@ public S3StreamMetadataImage(
public void write(ImageWriter writer, ImageWriterOptions options) {
writer.write(0, new S3StreamRecord()
.setStreamId(streamId)
+ .setRangeIndex(rangeIndex)
+ .setStreamState(state.toByte())
.setEpoch(epoch)
.setStartOffset(startOffset));
ranges.values().forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord()));
@@ -87,6 +98,14 @@ public long startOffset() {
return startOffset;
}
+ public int rangeIndex() {
+ return rangeIndex;
+ }
+
+ public StreamState state() {
+ return state;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -98,6 +117,8 @@ public boolean equals(Object o) {
S3StreamMetadataImage that = (S3StreamMetadataImage) o;
return this.streamId == that.streamId &&
this.epoch == that.epoch &&
+ this.state == that.state &&
+ this.rangeIndex == that.rangeIndex &&
this.startOffset == that.startOffset &&
this.ranges.equals(that.ranges) &&
this.streamObjects.equals(that.streamObjects);
@@ -105,6 +126,6 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return Objects.hash(streamId, epoch, startOffset, ranges, streamObjects);
+ return Objects.hash(streamId, epoch, state, rangeIndex, startOffset, ranges, streamObjects);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java
index 9b01d63e1c..ad4ad855b9 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java
@@ -96,6 +96,9 @@ public void replay(RemoveRangeRecord record) {
public void replay(S3StreamObjectRecord record) {
getOrCreateStreamMetadataDelta(record.streamId()).replay(record);
+ getOrCreateStreamMetadataDelta(record.streamId()).replay(new AdvanceRangeRecord()
+ .setStartOffset(record.startOffset())
+ .setEndOffset(record.endOffset()));
}
public void replay(RemoveS3StreamObjectRecord record) {
diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java
index 91d4155ff6..f0ba6e929a 100644
--- a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java
@@ -28,11 +28,13 @@
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
import org.apache.kafka.common.metadata.S3StreamObjectRecord;
import org.apache.kafka.common.metadata.S3StreamRecord;
+import org.apache.kafka.controller.stream.S3StreamConstant;
import org.apache.kafka.image.writer.ImageWriterOptions;
import org.apache.kafka.image.writer.RecordListWriter;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
+import org.apache.kafka.metadata.stream.StreamState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -56,12 +58,15 @@ public void testRanges() {
// 1. create stream0
delta0Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
+ .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
+ .setStreamState(StreamState.OPENED.toByte())
+ .setStartOffset(S3StreamConstant.INIT_START_OFFSET)
.setEpoch(0L)
.setStartOffset(0L), (short) 0));
RecordTestUtils.replayAll(delta0, delta0Records);
// verify delta and check image's write
S3StreamMetadataImage image1 = new S3StreamMetadataImage(
- STREAM0, 0L, 0L, Map.of(), Map.of());
+ STREAM0, 0L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of());
assertEquals(image1, delta0.apply());
testToImageAndBack(image1);
@@ -71,8 +76,10 @@ public void testRanges() {
S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1);
delta1Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
- .setEpoch(1L)
- .setStartOffset(0L), (short) 0));
+ .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
+ .setStreamState(StreamState.OPENED.toByte())
+ .setStartOffset(S3StreamConstant.INIT_START_OFFSET)
+ .setEpoch(1L), (short) 0));
delta1Records.add(new ApiMessageAndVersion(new RangeRecord()
.setStreamId(STREAM0)
.setRangeIndex(0)
@@ -82,7 +89,7 @@ public void testRanges() {
RecordTestUtils.replayAll(delta1, delta1Records);
// verify delta and check image's write
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
- STREAM0, 1L, 0L,
+ STREAM0, 1L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET,
Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Map.of());
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
@@ -92,8 +99,10 @@ public void testRanges() {
S3StreamMetadataDelta delta2 = new S3StreamMetadataDelta(image2);
delta2Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
- .setEpoch(2L)
- .setStartOffset(0L), (short) 0));
+ .setRangeIndex(0)
+ .setStreamState(StreamState.OPENED.toByte())
+ .setStartOffset(S3StreamConstant.INIT_START_OFFSET)
+ .setEpoch(2L), (short) 0));
delta2Records.add(new ApiMessageAndVersion(new AdvanceRangeRecord()
.setStartOffset(0L)
.setEndOffset(100L), (short) 0));
@@ -107,7 +116,7 @@ public void testRanges() {
RecordTestUtils.replayAll(delta2, delta2Records);
// verify delta and check image's write
S3StreamMetadataImage image3 = new S3StreamMetadataImage(
- STREAM0, 2L, 0L, Map.of(
+ STREAM0, 2L, StreamState.OPENED, 0, 0L, Map.of(
0, new RangeMetadata(STREAM0, 1L, 0, 0, 100, BROKER0),
1, new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Map.of());
assertEquals(image3, delta2.apply());
@@ -119,6 +128,8 @@ public void testRanges() {
delta3Records.add(new ApiMessageAndVersion(new S3StreamRecord()
.setStreamId(STREAM0)
.setEpoch(2L)
+ .setRangeIndex(0)
+ .setStreamState(StreamState.OPENED.toByte())
.setStartOffset(100L), (short) 0));
delta3Records.add(new ApiMessageAndVersion(new RemoveRangeRecord()
.setStreamId(STREAM0)
@@ -126,7 +137,7 @@ public void testRanges() {
RecordTestUtils.replayAll(delta3, delta3Records);
// verify delta and check image's write
S3StreamMetadataImage image4 = new S3StreamMetadataImage(
- STREAM0, 2L, 100L, Map.of(
+ STREAM0, 2L, StreamState.OPENED, 0, 100L, Map.of(
1, new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Map.of());
assertEquals(image4, delta3.apply());
}
@@ -134,7 +145,7 @@ public void testRanges() {
@Test
public void testStreamObjects() {
S3StreamMetadataImage image0 = new S3StreamMetadataImage(
- STREAM0, 0L, 0L, Map.of(), Map.of());
+ STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of());
List delta0Records = new ArrayList<>();
S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0);
// 1. create streamObject0 and streamObject1
@@ -151,7 +162,7 @@ public void testStreamObjects() {
RecordTestUtils.replayAll(delta0, delta0Records);
// verify delta and check image's write
S3StreamMetadataImage image1 = new S3StreamMetadataImage(
- STREAM0, 0L, 0L, Map.of(), Map.of(
+ STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of(
0L, new S3StreamObject(0L, 999, STREAM0, 0L, 100L),
1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
assertEquals(image1, delta0.apply());
@@ -165,7 +176,7 @@ public void testStreamObjects() {
RecordTestUtils.replayAll(delta1, delta1Records);
// verify delta and check image's write
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
- STREAM0, 0L, 0L, Map.of(), Map.of(
+ STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of(
1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
index 19b58030ce..3896c2396d 100644
--- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
@@ -33,6 +33,7 @@
import org.apache.kafka.metadata.stream.S3ObjectStreamIndex;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3WALObject;
+import org.apache.kafka.metadata.stream.StreamState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -120,7 +121,7 @@ public void testGetObjects() {
8L, new S3StreamObject(8, GB, STREAM0, 10L, 100L),
9L, new S3StreamObject(9, GB, STREAM0, 200L, 300L),
10L, new S3StreamObject(10, GB, STREAM0, 300L, 400L));
- S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, 10, ranges, streamObjects);
+ S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 4, 10, ranges, streamObjects);
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Map.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage));