diff --git a/build.gradle b/build.gradle
index dd5f84de0c..c6f228c816 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1268,6 +1268,7 @@ project(':metadata') {
implementation libs.jacksonJDK8Datatypes
implementation libs.metrics
implementation libs.nettyCommon
+ implementation libs.zstd
compileOnly libs.log4j
testImplementation libs.junitJupiter
testImplementation libs.jqwik
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index b5f885958f..9721750616 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -40,6 +40,7 @@
+
diff --git a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java
index 132f65973d..900b987bc3 100644
--- a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java
+++ b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java
@@ -107,7 +107,7 @@ public CompletableFuture> getStreamSetObjects() {
.map(object -> {
S3Object s3Object = this.objectsImage.getObjectMetadata(object.objectId());
return new S3ObjectMetadata(object.objectId(), object.objectType(),
- new ArrayList<>(object.offsetRanges().values()), object.dataTimeInMs(),
+ object.offsetRangeList(), object.dataTimeInMs(),
s3Object.getCommittedTimeInMs(), s3Object.getObjectSize(),
object.orderId());
})
diff --git a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java
index b8f83f67d7..3dcfeabdcb 100644
--- a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java
+++ b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java
@@ -17,16 +17,19 @@
package kafka.log.stream.s3;
-import kafka.log.stream.s3.metadata.StreamMetadataManager;
+import com.automq.stream.s3.metadata.S3StreamConstant;
+import com.automq.stream.s3.metadata.StreamOffsetRange;
+import com.automq.stream.s3.metadata.StreamState;
import kafka.log.stream.s3.metadata.MetadataListener;
+import kafka.log.stream.s3.metadata.StreamMetadataManager;
import kafka.server.BrokerServer;
import kafka.server.KafkaConfig;
import kafka.server.metadata.BrokerMetadataListener;
import kafka.server.metadata.KRaftMetadataCache;
import org.apache.kafka.image.DeltaMap;
-import org.apache.kafka.image.NodeS3StreamSetObjectMetadataImage;
import org.apache.kafka.image.MetadataImage;
import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.NodeS3StreamSetObjectMetadataImage;
import org.apache.kafka.image.S3ObjectsImage;
import org.apache.kafka.image.S3StreamMetadataImage;
import org.apache.kafka.image.S3StreamsMetadataImage;
@@ -34,11 +37,8 @@
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3Object;
import org.apache.kafka.metadata.stream.S3ObjectState;
-import com.automq.stream.s3.metadata.S3StreamConstant;
import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.metadata.stream.S3StreamSetObject;
-import com.automq.stream.s3.metadata.StreamOffsetRange;
-import com.automq.stream.s3.metadata.StreamState;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
@@ -47,6 +47,7 @@
import org.mockito.Mockito;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -94,7 +95,7 @@ public void setUp() {
private static MetadataImage image2;
static {
- DeltaMap map = new DeltaMap<>(new int[] {});
+ DeltaMap map = new DeltaMap<>(new int[]{});
map.putAll(Map.of(
0L, new S3Object(0L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED),
1L, new S3Object(1L, 128, null, -1, -1, -1, -1, S3ObjectState.COMMITTED),
@@ -110,10 +111,10 @@ public void setUp() {
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects);
NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, Map.of(
- 1L, new S3StreamSetObject(1L, BROKER0, Map.of(
- STREAM1, new StreamOffsetRange(STREAM1, 0L, 100L)), 1L),
- 2L, new S3StreamSetObject(2L, BROKER0, Map.of(
- STREAM2, new StreamOffsetRange(STREAM2, 0L, 100L)), 2L)));
+ 1L, new S3StreamSetObject(1L, BROKER0, List.of(
+ new StreamOffsetRange(STREAM1, 0L, 100L)), 1L),
+ 2L, new S3StreamSetObject(2L, BROKER0, List.of(
+ new StreamOffsetRange(STREAM2, 0L, 100L)), 2L)));
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, Map.of(STREAM0, streamImage),
Map.of(BROKER0, walMetadataImage0));
diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java
index 7ac3ef0f7f..9f40ca3785 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java
@@ -68,7 +68,6 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -445,41 +444,7 @@ public ControllerResult trimStream(int nodeId, long nodeEpoc
if (resp.errorCode() != Errors.NONE.code()) {
return ControllerResult.of(Collections.emptyList(), resp);
}
- // remove stream set object or remove stream range in stream set object
- // TODO: optimize
- this.nodesMetadata.values()
- .stream()
- .flatMap(entry -> entry.streamSetObjects().values().stream())
- .filter(streamSetObject -> streamSetObject.offsetRanges().containsKey(streamId))
- .filter(streamSetObject -> streamSetObject.offsetRanges().get(streamId).getEndOffset() <= newStartOffset)
- .forEach(streamSetObj -> {
- if (streamSetObj.offsetRanges().size() == 1) {
- // only this range, but we will remove this range, so now we can remove this stream set object
- records.add(new ApiMessageAndVersion(
- new RemoveStreamSetObjectRecord()
- .setNodeId(streamSetObj.nodeId())
- .setObjectId(streamSetObj.objectId()), (short) 0
- ));
- ControllerResult markDestroyResult = this.s3ObjectControlManager.markDestroyObjects(
- List.of(streamSetObj.objectId()));
- if (!markDestroyResult.response()) {
- log.error("[TrimStream] Mark destroy stream set object: {} failed", streamSetObj.objectId());
- resp.setErrorCode(Errors.STREAM_INNER_ERROR.code());
- return;
- }
- records.addAll(markDestroyResult.records());
- return;
- }
- Map newOffsetRange = new HashMap<>(streamSetObj.offsetRanges());
- // remove offset range
- newOffsetRange.remove(streamId);
- records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord()
- .setObjectId(streamSetObj.objectId())
- .setNodeId(streamSetObj.nodeId())
- .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList()))
- .setDataTimeInMs(streamSetObj.dataTimeInMs())
- .setOrderId(streamSetObj.orderId()), (short) 0));
- });
+ // the data in stream set object will be removed by compaction
if (resp.errorCode() != Errors.NONE.code()) {
return ControllerResult.of(Collections.emptyList(), resp);
}
@@ -523,39 +488,7 @@ public ControllerResult deleteStream(int nodeId, long node
return ControllerResult.of(Collections.emptyList(), resp);
}
records.addAll(markDestroyResult.records());
- // remove stream set object or remove stream-offset-range in stream set object
- this.nodesMetadata.values()
- .stream()
- .flatMap(entry -> entry.streamSetObjects().values().stream())
- .filter(streamsSetObject -> streamsSetObject.offsetRanges().containsKey(streamId))
- .forEach(streamSetObj -> {
- if (streamSetObj.offsetRanges().size() == 1) {
- // only this range, but we will remove this range, so now we can remove this stream set object
- records.add(new ApiMessageAndVersion(
- new RemoveStreamSetObjectRecord()
- .setNodeId(streamSetObj.nodeId())
- .setObjectId(streamSetObj.objectId()), (short) 0
- ));
- ControllerResult result = this.s3ObjectControlManager.markDestroyObjects(
- List.of(streamSetObj.objectId()));
- if (!result.response()) {
- log.error("[DeleteStream]: Mark destroy stream set object: {} failed", streamSetObj.objectId());
- resp.setErrorCode(Errors.STREAM_INNER_ERROR.code());
- return;
- }
- records.addAll(result.records());
- return;
- }
- Map newOffsetRange = new HashMap<>(streamSetObj.offsetRanges());
- // remove offset range
- newOffsetRange.remove(streamId);
- records.add(new ApiMessageAndVersion(new S3StreamSetObjectRecord()
- .setObjectId(streamSetObj.objectId())
- .setNodeId(streamSetObj.nodeId())
- .setStreamsIndex(newOffsetRange.values().stream().map(Convertor::to).collect(Collectors.toList()))
- .setDataTimeInMs(streamSetObj.dataTimeInMs())
- .setOrderId(streamSetObj.orderId()), (short) 0));
- });
+ // the data in stream set object will be removed by compaction
if (resp.errorCode() != Errors.NONE.code()) {
return ControllerResult.of(Collections.emptyList(), resp);
}
@@ -1043,10 +976,8 @@ public void replay(S3StreamSetObjectRecord record) {
}
// create stream set object
- Map indexMap = streamIndexes
- .stream()
- .collect(Collectors.toMap(StreamIndex::streamId, Convertor::to));
- nodeMetadata.streamSetObjects().put(objectId, new S3StreamSetObject(objectId, nodeId, indexMap, orderId, dataTs));
+ List ranges = streamIndexes.stream().map(Convertor::to).collect(Collectors.toList());
+ nodeMetadata.streamSetObjects().put(objectId, new S3StreamSetObject(objectId, nodeId, ranges, orderId, dataTs));
// update range
record.streamsIndex().forEach(index -> {
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java
index 24fef1bc26..196537b31c 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java
@@ -19,6 +19,8 @@
import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.StreamOffsetRange;
+import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection;
+import com.automq.stream.utils.biniarysearch.ComparableItem;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
@@ -176,25 +178,29 @@ public RangeSearcher(long startOffset, long endOffset, long streamId, int nodeId
private Queue rangeOfStreamSetObjects() {
NodeS3StreamSetObjectMetadataImage streamSetObjectImage = nodeStreamSetObjectMetadata.get(nodeId);
- return streamSetObjectImage.orderList().stream()
- .filter(obj -> obj.offsetRanges().containsKey(streamId))
- .filter(obj -> {
- StreamOffsetRange offsetRange = obj.offsetRanges().get(streamId);
- long objectStartOffset = offsetRange.getStartOffset();
- long objectEndOffset = offsetRange.getEndOffset();
- return objectStartOffset < endOffset && objectEndOffset > startOffset;
- }).map(obj -> {
- StreamOffsetRange offsetRange = obj.offsetRanges().get(streamId);
- long startOffset = offsetRange.getStartOffset();
- long endOffset = offsetRange.getEndOffset();
- List offsetRanges = obj.offsetRanges().values().stream().sorted()
- .collect(Collectors.toList());
- S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(
- obj.objectId(), obj.objectType(), offsetRanges, obj.dataTimeInMs(),
+ List streamSetObjects = streamSetObjectImage.orderList();
+ Queue s3ObjectMetadataList = new LinkedList<>();
+ for (S3StreamSetObject obj : streamSetObjects) {
+ // TODO: cache the stream offset ranges to accelerate the search
+ // TODO: cache the last search index, to accelerate the search
+ List ranges = obj.offsetRangeList();
+ int index = new StreamOffsetRanges(ranges).search(streamId);
+ if (index < 0) {
+ continue;
+ }
+ StreamOffsetRange range = ranges.get(index);
+ if (range.getStartOffset() >= endOffset || range.getEndOffset() < startOffset) {
+ continue;
+ }
+ S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(
+ obj.objectId(), obj.objectType(), ranges, obj.dataTimeInMs(),
obj.orderId());
- return new S3ObjectMetadataWrapper(s3ObjectMetadata, startOffset, endOffset);
- })
- .collect(Collectors.toCollection(LinkedList::new));
+ s3ObjectMetadataList.add(new S3ObjectMetadataWrapper(s3ObjectMetadata, range.getStartOffset(), range.getEndOffset()));
+ if (range.getEndOffset() >= endOffset) {
+ break;
+ }
+ }
+ return s3ObjectMetadataList;
}
private Queue rangeOfStreamObjects() {
@@ -340,4 +346,34 @@ public String toString() {
map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) +
'}';
}
+
+ static class StreamOffsetRanges extends AbstractOrderedCollection {
+ private final List ranges;
+
+ public StreamOffsetRanges(List ranges) {
+ this.ranges = ranges;
+ }
+
+ @Override
+ protected int size() {
+ return ranges.size();
+ }
+
+ @Override
+ protected ComparableItem get(int index) {
+ StreamOffsetRange range = ranges.get(index);
+ return new ComparableItem<>() {
+ @Override
+ public boolean isLessThan(Long o) {
+ return range.getStreamId() < o;
+ }
+
+ @Override
+ public boolean isGreaterThan(Long o) {
+ return range.getStreamId() > o;
+ }
+ };
+ }
+
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java
index a8a02d7013..a9e275da7d 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java
@@ -17,21 +17,29 @@
package org.apache.kafka.metadata.stream;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.s3.metadata.S3StreamConstant;
import com.automq.stream.s3.metadata.StreamOffsetRange;
+import com.github.luben.zstd.Zstd;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.kafka.common.metadata.S3StreamSetObjectRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
public class S3StreamSetObject implements Comparable {
+ public static final byte MAGIC = 0x01;
+ public static final byte ZSTD_COMPRESSED = 1 << 1;
+ private static final int COMPRESSION_THRESHOLD = 50;
private final long objectId;
private final int nodeId;
- private final Map streamOffsetRanges;
+ private final byte[] ranges;
/**
* The order id of the object. Sort by this field to get the order of the objects which contains logically increasing streams.
@@ -43,51 +51,44 @@ public class S3StreamSetObject implements Comparable {
private final long dataTimeInMs;
// Only used for testing
- public S3StreamSetObject(long objectId, int nodeId, final Map streamOffsetRanges, long orderId) {
+ public S3StreamSetObject(long objectId, int nodeId, final List streamOffsetRanges, long orderId) {
this(objectId, nodeId, streamOffsetRanges, orderId, S3StreamConstant.INVALID_TS);
}
- public S3StreamSetObject(long objectId, int nodeId, final Map streamOffsetRanges, long orderId, long dataTimeInMs) {
+ public S3StreamSetObject(long objectId, int nodeId, List streamOffsetRanges, long orderId, long dataTimeInMs) {
this.orderId = orderId;
this.objectId = objectId;
this.nodeId = nodeId;
- this.streamOffsetRanges = streamOffsetRanges;
+ streamOffsetRanges = new ArrayList<>(streamOffsetRanges);
+ streamOffsetRanges.sort(Comparator.comparingLong(StreamOffsetRange::getStreamId));
+ this.ranges = encode(streamOffsetRanges);
this.dataTimeInMs = dataTimeInMs;
}
- public boolean intersect(long streamId, long startOffset, long endOffset) {
- StreamOffsetRange offsetRange = streamOffsetRanges.get(streamId);
- if (offsetRange == null) {
- return false;
- }
- return startOffset >= offsetRange.getStartOffset() && startOffset <= offsetRange.getEndOffset();
- }
-
- public Map offsetRanges() {
- return streamOffsetRanges;
+ public List offsetRangeList() {
+ return decode(ranges);
}
public ApiMessageAndVersion toRecord() {
+ // TODO: S3StreamSetObjectRecord also use raw data
+ List rangesList = decode(ranges);
return new ApiMessageAndVersion(new S3StreamSetObjectRecord()
- .setObjectId(objectId)
- .setNodeId(nodeId)
- .setOrderId(orderId)
- .setDataTimeInMs(dataTimeInMs)
- .setStreamsIndex(
- streamOffsetRanges
- .values()
- .stream()
- .map(Convertor::to)
- .collect(Collectors.toList())), (short) 0);
+ .setObjectId(objectId)
+ .setNodeId(nodeId)
+ .setOrderId(orderId)
+ .setDataTimeInMs(dataTimeInMs)
+ .setStreamsIndex(
+ rangesList
+ .stream()
+ .map(Convertor::to)
+ .collect(Collectors.toList())), (short) 0);
}
public static S3StreamSetObject of(S3StreamSetObjectRecord record) {
- Map offsetRanges = record.streamsIndex()
- .stream()
- .collect(Collectors.toMap(S3StreamSetObjectRecord.StreamIndex::streamId,
- index -> new StreamOffsetRange(index.streamId(), index.startOffset(), index.endOffset())));
+ List offsetRanges = record.streamsIndex()
+ .stream().map(index -> new StreamOffsetRange(index.streamId(), index.startOffset(), index.endOffset())).collect(Collectors.toList());
return new S3StreamSetObject(record.objectId(), record.nodeId(),
- offsetRanges, record.orderId(), record.dataTimeInMs());
+ offsetRanges, record.orderId(), record.dataTimeInMs());
}
public Integer nodeId() {
@@ -130,16 +131,69 @@ public int hashCode() {
@Override
public String toString() {
return "S3StreamSetObject{" +
- "objectId=" + objectId +
- ", orderId=" + orderId +
- ", nodeId=" + nodeId +
- ", streamOffsetRanges=" + streamOffsetRanges +
- ", dataTimeInMs=" + dataTimeInMs +
- '}';
+ "objectId=" + objectId +
+ ", orderId=" + orderId +
+ ", nodeId=" + nodeId +
+ ", dataTimeInMs=" + dataTimeInMs +
+ '}';
}
@Override
public int compareTo(S3StreamSetObject o) {
return Long.compare(this.orderId, o.orderId);
}
+
+ public static byte[] encode(List streamOffsetRanges) {
+ boolean compressed = streamOffsetRanges.size() > COMPRESSION_THRESHOLD;
+ int flag = 0;
+ if (compressed) {
+ flag = flag | ZSTD_COMPRESSED;
+ }
+ ByteBuf rangesBuf = Unpooled.buffer(streamOffsetRanges.size() * 20);
+ streamOffsetRanges.forEach(r -> {
+ rangesBuf.writeLong(r.getStreamId());
+ rangesBuf.writeLong(r.getStartOffset());
+ rangesBuf.writeInt((int) (r.getEndOffset() - r.getStartOffset()));
+ });
+ byte[] compressedBytes;
+ if (compressed) {
+ compressedBytes = Zstd.compress(rangesBuf.array());
+ } else {
+ compressedBytes = rangesBuf.array();
+ }
+ ByteBuf buf = Unpooled.buffer(1 /* magic */ + 1 /* flag */ + 4 /* origin size */ + compressedBytes.length);
+ buf.writeByte(MAGIC);
+ buf.writeByte(flag);
+ buf.writeInt(rangesBuf.readableBytes());
+ buf.writeBytes(compressedBytes);
+ return buf.array();
+ }
+
+ public static List decode(byte[] bytes) {
+ ByteBuf buf = Unpooled.wrappedBuffer(bytes);
+ byte magic = buf.readByte();
+ if (magic != MAGIC) {
+ throw new IllegalArgumentException("Invalid magic byte: " + magic);
+ }
+ byte flag = buf.readByte();
+ int rangeBytesSize = buf.readInt();
+ byte[] rangesBytes;
+ boolean compressed = (flag & ZSTD_COMPRESSED) != 0;
+ byte[] compressedBytes = new byte[buf.readableBytes()];
+ buf.readBytes(compressedBytes);
+ if (compressed) {
+ rangesBytes = Zstd.decompress(compressedBytes, rangeBytesSize);
+ } else {
+ rangesBytes = compressedBytes;
+ }
+ List ranges = new ArrayList<>(rangeBytesSize / 20);
+ ByteBuf rangesBuf = Unpooled.wrappedBuffer(rangesBytes);
+ while (rangesBuf.readableBytes() > 0) {
+ long streamId = rangesBuf.readLong();
+ long startOffset = rangesBuf.readLong();
+ int count = rangesBuf.readInt();
+ ranges.add(new StreamOffsetRange(streamId, startOffset, startOffset + count));
+ }
+ return ranges;
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java
index 4df6bea222..95883a4674 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/StreamControlManagerTest.java
@@ -19,7 +19,6 @@
import com.automq.stream.s3.metadata.ObjectUtils;
import com.automq.stream.s3.metadata.S3StreamConstant;
-import com.automq.stream.s3.metadata.StreamOffsetRange;
import org.apache.kafka.common.message.CloseStreamsRequestData.CloseStreamRequest;
import org.apache.kafka.common.message.CloseStreamsResponseData.CloseStreamResponse;
import org.apache.kafka.common.message.CommitStreamObjectRequestData;
@@ -39,8 +38,8 @@
import org.apache.kafka.common.message.TrimStreamsRequestData.TrimStreamRequest;
import org.apache.kafka.common.message.TrimStreamsResponseData.TrimStreamResponse;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
-import org.apache.kafka.common.metadata.NodeWALMetadataRecord;
import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.NodeWALMetadataRecord;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveNodeWALMetadataRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
@@ -54,11 +53,9 @@
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.controller.stream.S3ObjectControlManager;
-import org.apache.kafka.controller.stream.StreamControlManager;
-import org.apache.kafka.controller.stream.NodeMetadata;
import org.apache.kafka.controller.stream.S3StreamMetadata;
+import org.apache.kafka.controller.stream.StreamControlManager;
import org.apache.kafka.metadata.stream.RangeMetadata;
-import org.apache.kafka.metadata.stream.S3StreamSetObject;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.junit.jupiter.api.BeforeEach;
@@ -826,20 +823,6 @@ public void testTrim() {
assertEquals(60, rangeMetadata.startOffset());
assertEquals(70, rangeMetadata.endOffset());
assertEquals(0, streamMetadata.streamObjects().size());
- NodeMetadata node0Metadata = manager.nodesMetadata().get(BROKER0);
- assertEquals(1, node0Metadata.streamSetObjects().size());
- S3StreamSetObject s3StreamSetObject = node0Metadata.streamSetObjects().get(1L);
- assertEquals(1, s3StreamSetObject.offsetRanges().size());
- StreamOffsetRange range = s3StreamSetObject.offsetRanges().get(STREAM0);
- assertNull(range);
- NodeMetadata node1Metadata = manager.nodesMetadata().get(BROKER1);
- assertEquals(1, node1Metadata.streamSetObjects().size());
- s3StreamSetObject = node1Metadata.streamSetObjects().get(3L);
- assertEquals(1, s3StreamSetObject.offsetRanges().size());
- range = s3StreamSetObject.offsetRanges().get(STREAM0);
- assertNotNull(range);
- assertEquals(40, range.getStartOffset());
- assertEquals(70, range.getEndOffset());
// 3. trim stream0 to [100, ..)
trimRequest = new TrimStreamRequest()
@@ -859,10 +842,6 @@ public void testTrim() {
assertEquals(70, rangeMetadata.startOffset());
assertEquals(70, rangeMetadata.endOffset());
assertEquals(0, streamMetadata.streamObjects().size());
- node0Metadata = manager.nodesMetadata().get(BROKER0);
- assertEquals(1, node0Metadata.streamSetObjects().size());
- node1Metadata = manager.nodesMetadata().get(BROKER1);
- assertEquals(0, node1Metadata.streamSetObjects().size());
// 5. commit stream set object with stream0-[70, 100)
CommitStreamSetObjectRequestData requestData = new CommitStreamSetObjectRequestData()
@@ -919,12 +898,7 @@ public void testDelete() {
// 4. verify
assertNull(manager.streamsMetadata().get(STREAM0));
- assertEquals(1, manager.nodesMetadata().get(BROKER0).streamSetObjects().size());
- S3StreamSetObject streamSetObject = manager.nodesMetadata().get(BROKER0).streamSetObjects().get(1L);
- assertEquals(1, streamSetObject.offsetRanges().size());
- StreamOffsetRange offsetRange = streamSetObject.offsetRanges().get(STREAM1);
- assertNotNull(offsetRange);
- assertEquals(0, manager.nodesMetadata().get(BROKER1).streamSetObjects().size());
+ assertEquals(2, manager.nodesMetadata().get(BROKER0).streamSetObjects().size());
// 5. delete again
req = new DeleteStreamRequest()
diff --git a/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java
index 0b274ad7bb..8f4c1acc7d 100644
--- a/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/NodeMetadataImageTest.java
@@ -83,11 +83,11 @@ public void testS3Objects() {
// verify delta and check image's write
NodeS3StreamSetObjectMetadataImage image1 = new NodeS3StreamSetObjectMetadataImage(BROKER0, 1,
Map.of(
- 0L, new S3StreamSetObject(0L, BROKER0, Map.of(
- STREAM0, new StreamOffsetRange(STREAM0, 0L, 100L),
- STREAM1, new StreamOffsetRange(STREAM1, 0L, 200L)), 0L),
- 1L, new S3StreamSetObject(1L, BROKER0, Map.of(
- STREAM0, new StreamOffsetRange(STREAM0, 101L, 200L)), 1L)));
+ 0L, new S3StreamSetObject(0L, BROKER0, List.of(
+ new StreamOffsetRange(STREAM0, 0L, 100L),
+ new StreamOffsetRange(STREAM1, 0L, 200L)), 0L),
+ 1L, new S3StreamSetObject(1L, BROKER0, List.of(
+ new StreamOffsetRange(STREAM0, 101L, 200L)), 1L)));
assertEquals(image1, delta0.apply());
testToImageAndBack(image1);
@@ -110,10 +110,10 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 200L)), 0L),
// verify delta and check image's write
NodeS3StreamSetObjectMetadataImage image2 = new NodeS3StreamSetObjectMetadataImage(BROKER0, 2,
Map.of(
- 0L, new S3StreamSetObject(0L, BROKER0, Map.of(
- STREAM1, new StreamOffsetRange(STREAM1, 0L, 200L)), 0L),
- 1L, new S3StreamSetObject(1L, BROKER0, Map.of(
- STREAM0, new StreamOffsetRange(STREAM0, 101L, 200L)), 1L)));
+ 0L, new S3StreamSetObject(0L, BROKER0, List.of(
+ new StreamOffsetRange(STREAM1, 0L, 200L)), 0L),
+ 1L, new S3StreamSetObject(1L, BROKER0, List.of(
+ new StreamOffsetRange(STREAM0, 101L, 200L)), 1L)));
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
@@ -126,8 +126,8 @@ STREAM1, new StreamOffsetRange(STREAM1, 0L, 200L)), 0L),
// verify delta and check image's write
NodeS3StreamSetObjectMetadataImage image3 = new NodeS3StreamSetObjectMetadataImage(BROKER0, 2,
Map.of(
- 0L, new S3StreamSetObject(0L, BROKER0, Map.of(
- STREAM1, new StreamOffsetRange(STREAM1, 0L, 200L)), 0L)));
+ 0L, new S3StreamSetObject(0L, BROKER0, List.of(
+ new StreamOffsetRange(STREAM1, 0L, 200L)), 0L)));
assertEquals(image3, delta2.apply());
testToImageAndBack(image3);
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
index f7cd5c5cc1..e8131ec1ec 100644
--- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
@@ -102,16 +102,16 @@ private void testToImageAndBack(S3StreamsMetadataImage image) {
@Test
public void testGetObjects() {
Map broker0Objects = Map.of(
- 0L, new S3StreamSetObject(0, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 100L, 120L)), 0L),
- 1L, new S3StreamSetObject(1, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 120L, 140L)), 1L),
- 2L, new S3StreamSetObject(2, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 180L, 200L)), 2L),
- 3L, new S3StreamSetObject(3, BROKER0, Map.of(STREAM0,
+ 0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 100L, 120L)), 0L),
+ 1L, new S3StreamSetObject(1, BROKER0, List.of(new StreamOffsetRange(STREAM0, 120L, 140L)), 1L),
+ 2L, new S3StreamSetObject(2, BROKER0, List.of(new StreamOffsetRange(STREAM0, 180L, 200L)), 2L),
+ 3L, new S3StreamSetObject(3, BROKER0, List.of(
new StreamOffsetRange(STREAM0, 400L, 420L)), 3L),
- 4L, new S3StreamSetObject(4, BROKER0, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 520L, 600L)), 4L));
+ 4L, new S3StreamSetObject(4, BROKER0, List.of(new StreamOffsetRange(STREAM0, 520L, 600L)), 4L));
Map broker1Objects = Map.of(
- 5L, new S3StreamSetObject(5, BROKER1, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 140L, 160L)), 0L),
- 6L, new S3StreamSetObject(6, BROKER1, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 160L, 180L)), 1L),
- 7L, new S3StreamSetObject(7, BROKER1, Map.of(STREAM0, new StreamOffsetRange(STREAM0, 420L, 520L)), 2L));
+ 5L, new S3StreamSetObject(5, BROKER1, List.of(new StreamOffsetRange(STREAM0, 140L, 160L)), 0L),
+ 6L, new S3StreamSetObject(6, BROKER1, List.of(new StreamOffsetRange(STREAM0, 160L, 180L)), 1L),
+ 7L, new S3StreamSetObject(7, BROKER1, List.of(new StreamOffsetRange(STREAM0, 420L, 520L)), 2L));
NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH,
new HashMap<>(broker0Objects));
NodeS3StreamSetObjectMetadataImage broker1WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER1, S3StreamConstant.INVALID_BROKER_EPOCH,
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedStreamSetObjectsListTest.java b/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedStreamSetObjectsListTest.java
index d9e3e60d2f..05c02076a2 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedStreamSetObjectsListTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/stream/SortedStreamSetObjectsListTest.java
@@ -19,6 +19,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.api.Tag;
@@ -31,11 +32,11 @@ public class SortedStreamSetObjectsListTest {
@Test
public void testSorted() {
SortedStreamSetObjects objects = new SortedStreamSetObjectsList();
- objects.add(new S3StreamSetObject(0, -1, null, 2));
- objects.add(new S3StreamSetObject(1, -1, null, 1));
- objects.add(new S3StreamSetObject(2, -1, null, 3));
- objects.add(new S3StreamSetObject(3, -1, null, 0));
- objects.add(new S3StreamSetObject(4, -1, null, 4));
+ objects.add(new S3StreamSetObject(0, -1, Collections.emptyList(), 2));
+ objects.add(new S3StreamSetObject(1, -1, Collections.emptyList(), 1));
+ objects.add(new S3StreamSetObject(2, -1, Collections.emptyList(), 3));
+ objects.add(new S3StreamSetObject(3, -1, Collections.emptyList(), 0));
+ objects.add(new S3StreamSetObject(4, -1, Collections.emptyList(), 4));
assertEquals(5, objects.size());
List expectedOrderIds = List.of(0L, 1L, 2L, 3L, 4L);