diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9721750616..7ae0ce07e0 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -41,6 +41,7 @@
+
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 109203e177..85425d0d6a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -306,9 +306,9 @@
+ files="(ClientQuotasImage|MetadataDelta|QuorumController|ReplicationControlManager|S3StreamsMetadataImage).java"/>
+ files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|S3StreamsMetadataImage).java"/>
>> pendingGetObjectsTasks;
+ private final List pendingGetObjectsTasks;
private final ExecutorService pendingExecutorService;
// TODO: we just need the version of streams metadata, not the whole image
private volatile OffsetAndEpoch version;
@@ -74,8 +67,7 @@ public StreamMetadataManager(BrokerServer broker, KafkaConfig config) {
this.objectsImage = currentImage.objectsMetadata();
this.version = currentImage.highestOffsetAndEpoch();
this.broker.metadataListener().registerMetadataListener(this::onImageChanged);
- // TODO: optimize by more suitable data structure for pending tasks
- this.pendingGetObjectsTasks = new HashMap<>();
+ this.pendingGetObjectsTasks = new LinkedList<>();
this.pendingExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pending-get-objects-task-executor"));
}
@@ -89,15 +81,9 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) {
// update image
this.streamsImage = newImage.streamsMetadata();
this.objectsImage = newImage.objectsMetadata();
- // remove all catch up pending tasks
- List retryTasks = removePendingTasks();
+
// retry all pending tasks
- if (retryTasks.isEmpty()) {
- return;
- }
- this.pendingExecutorService.submit(() -> {
- retryPendingTasks(retryTasks);
- });
+ retryPendingTasks();
}
}
@@ -116,77 +102,42 @@ public CompletableFuture> getStreamSetObjects() {
}
}
- // must access thread safe
- private List removePendingTasks() {
- if (this.pendingGetObjectsTasks == null || this.pendingGetObjectsTasks.isEmpty()) {
- return Collections.emptyList();
- }
- Set pendingStreams = pendingGetObjectsTasks.keySet();
- List pendingStreamsOffsetRange = pendingStreams
- .stream()
- .map(streamsImage::offsetRange)
- .filter(offset -> offset != StreamOffsetRange.INVALID)
- .collect(Collectors.toList());
- if (pendingStreamsOffsetRange.isEmpty()) {
- return Collections.emptyList();
- }
- List retryTasks = new ArrayList<>();
- pendingStreamsOffsetRange.forEach(offsetRange -> {
- long streamId = offsetRange.streamId();
- long endOffset = offsetRange.endOffset();
- Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.get(streamId);
- if (tasks == null || tasks.isEmpty()) {
- return;
- }
- Iterator>> iterator =
- tasks.entrySet().iterator();
- while (iterator.hasNext()) {
- Entry> entry = iterator.next();
- long pendingEndOffset = entry.getKey();
- if (pendingEndOffset > endOffset) {
- break;
+ @Override
+ public synchronized CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit) {
+ // TODO: cache the object list for next search
+ return exec(() -> fetch0(streamId, startOffset, endOffset, limit), LOGGER, "fetchObjects").thenApply(rst -> {
+ rst.objects().forEach(object -> {
+ S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId());
+ if (objectMetadata == null) {
+ // should not happen
+ LOGGER.error(
+ "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result",
+ streamId, startOffset, endOffset, limit);
+ throw new IllegalStateException("cannt find object metadata for object: " + object.objectId());
}
- iterator.remove();
- List getObjectsTasks = entry.getValue();
- retryTasks.addAll(getObjectsTasks);
- }
- if (tasks.isEmpty()) {
- StreamMetadataManager.this.pendingGetObjectsTasks.remove(streamId);
+ object.setObjectSize(objectMetadata.getObjectSize());
+ object.setCommittedTimestamp(objectMetadata.getCommittedTimeInMs());
+
+ });
+ if (LOGGER.isTraceEnabled()) {
+ LOGGER.trace(
+ "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}",
+ streamId, startOffset, endOffset, limit, rst);
}
+ return rst;
});
- return retryTasks;
}
- @Override
- public synchronized CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit) {
- S3StreamMetadataImage streamImage = streamsImage.streamsMetadata().get(streamId);
- if (streamImage == null) {
- LOGGER.warn(
- "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and streamImage is null",
- streamId, startOffset, endOffset, limit);
- return CompletableFuture.completedFuture(InRangeObjects.INVALID);
- }
- StreamOffsetRange offsetRange = streamImage.offsetRange();
- if (offsetRange == null || offsetRange == StreamOffsetRange.INVALID) {
- return CompletableFuture.completedFuture(InRangeObjects.INVALID);
- }
- long streamStartOffset = offsetRange.startOffset();
- long streamEndOffset = offsetRange.endOffset();
- if (startOffset < streamStartOffset) {
- LOGGER.warn(
- "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}",
- streamId, startOffset, endOffset, limit, streamStartOffset);
- return CompletableFuture.completedFuture(InRangeObjects.INVALID);
- }
- endOffset = endOffset == NOOP_OFFSET ? streamEndOffset : endOffset;
- if (endOffset > streamEndOffset) {
- // lag behind, need to wait for cache catch up
- LOGGER.warn("[FetchObjects]: pending request, stream: {}, startOffset: {}, endOffset: {}, streamEndOffset: {}, limit: {}",
- streamId, startOffset, endOffset, streamEndOffset, limit);
- return pendingFetch(streamId, startOffset, endOffset, limit);
- }
- long finalEndOffset = endOffset;
- return FutureUtil.exec(() -> fetch0(streamId, startOffset, finalEndOffset, limit), LOGGER, "fetch");
+ private synchronized CompletableFuture fetch0(long streamId, long startOffset, long endOffset, int limit) {
+ InRangeObjects rst = streamsImage.getObjects(streamId, startOffset, endOffset, limit);
+ if (rst.objects().size() >= limit || rst.endOffset() >= endOffset || rst == InRangeObjects.INVALID) {
+ return CompletableFuture.completedFuture(rst);
+ }
+ LOGGER.info("[FetchObjects],[PENDING],streamId={} startOffset={} endOffset={} limit={}", streamId, startOffset, endOffset, limit);
+ CompletableFuture pendingCf = pendingFetch();
+ CompletableFuture rstCf = new CompletableFuture<>();
+ FutureUtil.propagate(pendingCf.thenCompose(nil -> fetch0(streamId, startOffset, endOffset, limit)), rstCf);
+ return rstCf.whenComplete((r, ex) -> LOGGER.info("[FetchObjects],[COMPLETE_PENDING],streamId={} startOffset={} endOffset={} limit={}", streamId, startOffset, endOffset, limit));
}
public CompletableFuture> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) {
@@ -220,7 +171,12 @@ public List getStreamMetadataList(List streamIds) {
continue;
}
StreamMetadata streamMetadata = new StreamMetadata(streamId, streamImage.getEpoch(),
- streamImage.getStartOffset(), streamImage.getEndOffset(), streamImage.state());
+ streamImage.getStartOffset(), -1L, streamImage.state()) {
+ @Override
+ public long endOffset() {
+ throw new UnsupportedOperationException();
+ }
+ };
streamMetadataList.add(streamMetadata);
}
return streamMetadataList;
@@ -228,77 +184,31 @@ public List getStreamMetadataList(List streamIds) {
}
// must access thread safe
- private CompletableFuture pendingFetch(long streamId, long startOffset, long endOffset, int limit) {
- GetObjectsTask task = GetObjectsTask.of(streamId, startOffset, endOffset, limit);
- Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId,
- k -> new TreeMap<>());
- List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>());
- getObjectsTasks.add(task);
+ private CompletableFuture pendingFetch() {
+ GetObjectsTask task = new GetObjectsTask();
+ synchronized (pendingGetObjectsTasks) {
+ pendingGetObjectsTasks.add(task);
+ }
return task.cf;
}
- // must access thread safe
- private synchronized CompletableFuture fetch0(long streamId, long startOffset, long endOffset, int limit) {
- InRangeObjects cachedInRangeObjects = streamsImage.getObjects(streamId, startOffset, endOffset, limit);
- if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) {
- LOGGER.warn(
- "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result",
- streamId, startOffset, endOffset, limit);
- return CompletableFuture.completedFuture(InRangeObjects.INVALID);
- }
- // fill the objects' size and committed-timestamp
- for (S3ObjectMetadata object : cachedInRangeObjects.objects()) {
- S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId());
- if (objectMetadata == null) {
- // should not happen
- LOGGER.error(
- "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result",
- streamId, startOffset, endOffset, limit);
- return CompletableFuture.completedFuture(InRangeObjects.INVALID);
+ void retryPendingTasks() {
+ synchronized (pendingGetObjectsTasks) {
+ if (pendingGetObjectsTasks.isEmpty()) {
+ return;
}
- object.setObjectSize(objectMetadata.getObjectSize());
- object.setCommittedTimestamp(objectMetadata.getCommittedTimeInMs());
+ LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", pendingGetObjectsTasks.size());
+ pendingGetObjectsTasks.forEach(t -> t.cf.completeAsync(() -> null, pendingExecutorService));
+ pendingGetObjectsTasks.clear();
}
- LOGGER.trace(
- "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}",
- streamId, startOffset, endOffset, limit, cachedInRangeObjects);
- return CompletableFuture.completedFuture(cachedInRangeObjects);
- }
-
- void retryPendingTasks(List tasks) {
- if (tasks == null || tasks.isEmpty()) {
- return;
- }
- LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", tasks.size());
- tasks.forEach(task -> {
- long streamId = task.streamId;
- long startOffset = task.startOffset;
- long endOffset = task.endOffset;
- int limit = task.limit;
- CompletableFuture newCf = this.fetch(streamId, startOffset, endOffset, limit);
- FutureUtil.propagate(newCf, task.cf);
- });
}
static class GetObjectsTask {
- private final CompletableFuture cf;
- private final long streamId;
- private final long startOffset;
- private final long endOffset;
- private final int limit;
-
- public static GetObjectsTask of(long streamId, long startOffset, long endOffset, int limit) {
- CompletableFuture cf = new CompletableFuture<>();
- return new GetObjectsTask(cf, streamId, startOffset, endOffset, limit);
- }
+ private final CompletableFuture cf;
- private GetObjectsTask(CompletableFuture cf, long streamId, long startOffset, long endOffset, int limit) {
- this.cf = cf;
- this.streamId = streamId;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- this.limit = limit;
+ public GetObjectsTask() {
+ this.cf = new CompletableFuture<>();
}
}
diff --git a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java
index b3423d3b6f..d9d45e5d0e 100644
--- a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java
+++ b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java
@@ -46,7 +46,7 @@
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
-import java.util.HashMap;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -103,12 +103,9 @@ public void setUp() {
));
S3ObjectsImage objectsImage = new S3ObjectsImage(2L, map);
- Map ranges = Map.of(
- 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 100L, BROKER0)
- );
- Map streamObjects = Map.of(
- 0L, new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS));
- S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects);
+ List ranges = List.of(new RangeMetadata(STREAM0, 0L, 0, 10L, 100L, BROKER0));
+ List streamObjects = List.of(new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS));
+ S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 10L, ranges, streamObjects);
NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, DeltaMap.of(
1L, new S3StreamSetObject(1L, BROKER0, List.of(
@@ -120,20 +117,20 @@ public void setUp() {
DeltaMap.of(BROKER0, walMetadataImage0));
image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null);
- ranges = new HashMap<>(ranges);
- ranges.put(1, new RangeMetadata(STREAM0, 1L, 1, 100L, 150L, BROKER0));
- streamObjects = new HashMap<>(streamObjects);
- streamObjects.put(1L, new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS));
- streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects);
+ ranges = new ArrayList<>(ranges);
+ ranges.add(new RangeMetadata(STREAM0, 1L, 1, 100L, 150L, BROKER0));
+ streamObjects = new ArrayList<>(streamObjects);
+ streamObjects.add(new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS));
+ streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 10L, ranges, streamObjects);
streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY));
image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null);
- ranges = new HashMap<>(ranges);
- ranges.put(2, new RangeMetadata(STREAM0, 2L, 2, 150L, 200L, BROKER0));
- streamObjects = new HashMap<>(streamObjects);
- streamObjects.put(2L, new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS));
- streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects);
+ ranges = new ArrayList<>(ranges);
+ ranges.add(new RangeMetadata(STREAM0, 2L, 2, 150L, 200L, BROKER0));
+ streamObjects = new ArrayList<>(streamObjects);
+ streamObjects.add(new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS));
+ streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 10L, ranges, streamObjects);
streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY));
image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null);
@@ -162,7 +159,7 @@ public void testFetch() throws Exception {
result = this.manager.fetch(STREAM0, 20L, 100L, 5);
inRangeObjects = result.get();
assertEquals(STREAM0, inRangeObjects.streamId());
- assertEquals(20L, inRangeObjects.startOffset());
+ assertEquals(10L, inRangeObjects.startOffset());
assertEquals(100L, inRangeObjects.endOffset());
assertEquals(1, inRangeObjects.objects().size());
assertEquals(0L, inRangeObjects.objects().get(0).objectId());
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
index 7e9179dd02..048f6054bc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java
@@ -161,7 +161,6 @@ void replay(ApiMessage message) {
case ASSIGNED_S3_OBJECT_ID_RECORD:
case NODE_WALMETADATA_RECORD:
case REMOVE_NODE_WALMETADATA_RECORD:
- case ADVANCE_RANGE_RECORD:
case KVRECORD:
case REMOVE_KVRECORD:
case UPDATE_NEXT_NODE_ID_RECORD:
diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java
index 2b43a03805..1b35166232 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java
@@ -249,6 +249,14 @@ public ControllerResult openStream(int nodeId, long nodeEpoc
// means that the new range is not the first range in stream, get the last range's end offset
RangeMetadata lastRangeMetadata = streamMetadata.ranges().get(streamMetadata.currentRangeIndex());
startOffset = lastRangeMetadata.endOffset();
+ // the RangeMetadata in S3StreamMetadataImage is only update when create, rollToNext and trim
+ records.add(new ApiMessageAndVersion(new RangeRecord()
+ .setStreamId(streamId)
+ .setNodeId(lastRangeMetadata.nodeId())
+ .setStartOffset(lastRangeMetadata.startOffset())
+ .setEndOffset(lastRangeMetadata.endOffset())
+ .setEpoch(lastRangeMetadata.epoch())
+ .setRangeIndex(lastRangeMetadata.rangeIndex()), (short) 0));
}
// range create record
records.add(new ApiMessageAndVersion(new RangeRecord()
diff --git a/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java b/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java
index c41c76e1db..feb80db955 100644
--- a/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java
+++ b/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java
@@ -209,6 +209,6 @@ public boolean equals(Object o) {
@Override
public int hashCode() {
- return super.hashCode();
+ return compact().hashCode();
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java
index 9ebe3aeedc..3314dfb59e 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java
@@ -17,11 +17,7 @@
package org.apache.kafka.image;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.kafka.common.metadata.AdvanceRangeRecord;
+import com.automq.stream.s3.metadata.StreamState;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
@@ -29,7 +25,15 @@
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
-import com.automq.stream.s3.metadata.StreamState;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Set;
+import java.util.TreeMap;
public class S3StreamMetadataDelta {
@@ -39,7 +43,6 @@ public class S3StreamMetadataDelta {
private long newStartOffset;
private long newEpoch;
private StreamState currentState;
- private int currentRangeIndex;
private final Map changedRanges = new HashMap<>();
private final Set removedRanges = new HashSet<>();
@@ -52,7 +55,6 @@ public S3StreamMetadataDelta(S3StreamMetadataImage image) {
this.streamId = image.getStreamId();
this.newStartOffset = image.getStartOffset();
this.currentState = image.state();
- this.currentRangeIndex = image.rangeIndex();
}
public void replay(S3StreamRecord record) {
@@ -60,7 +62,6 @@ public void replay(S3StreamRecord record) {
this.newEpoch = record.epoch();
this.newStartOffset = record.startOffset();
this.currentState = StreamState.fromByte(record.streamState());
- this.currentRangeIndex = record.rangeIndex();
}
public void replay(RangeRecord record) {
@@ -87,40 +88,29 @@ public void replay(RemoveS3StreamObjectRecord record) {
changedS3StreamObjects.remove(record.objectId());
}
- public void replay(AdvanceRangeRecord record) {
- long startOffset = record.startOffset();
- long newEndOffset = record.endOffset();
- // check current range
- RangeMetadata metadata = this.changedRanges.get(currentRangeIndex);
- if (metadata == null) {
- metadata = this.image.getRanges().get(currentRangeIndex);
- }
- if (metadata == null) {
- // ignore it
- return;
- }
- if (startOffset != metadata.endOffset()) {
- // ignore it
- return;
+ public S3StreamMetadataImage apply() {
+ List newRanges;
+ if (changedRanges.isEmpty() && removedRanges.isEmpty()) {
+ newRanges = image.getRanges();
+ } else {
+ NavigableMap ranges = new TreeMap<>();
+ image.getRanges().forEach(range -> ranges.put(range.rangeIndex(), range));
+ // add all new changed ranges
+ ranges.putAll(changedRanges);
+ // remove all removed ranges
+ removedRanges.forEach(ranges::remove);
+ newRanges = new ArrayList<>(ranges.values());
}
- // update the endOffset
- this.changedRanges.put(currentRangeIndex, new RangeMetadata(
- streamId, metadata.epoch(), metadata.rangeIndex(), metadata.startOffset(), newEndOffset, metadata.nodeId()
- ));
- }
- public S3StreamMetadataImage apply() {
- Map newRanges = new HashMap<>(image.getRanges());
- // add all new changed ranges
- newRanges.putAll(changedRanges);
- // remove all removed ranges
- removedRanges.forEach(newRanges::remove);
- Map newS3StreamObjects = new HashMap<>(image.getStreamObjects());
- // add all changed stream-objects
- newS3StreamObjects.putAll(changedS3StreamObjects);
- // remove all removed stream-objects
- removedS3StreamObjectIds.forEach(newS3StreamObjects::remove);
- return new S3StreamMetadataImage(streamId, newEpoch, currentState, currentRangeIndex, newStartOffset, newRanges, newS3StreamObjects);
+ DeltaMap newS3StreamObjects;
+ if (changedS3StreamObjects.isEmpty() && removedS3StreamObjectIds.isEmpty()) {
+ newS3StreamObjects = image.streamObjectsMap;
+ } else {
+ newS3StreamObjects = image.streamObjectsMap.copy();
+ newS3StreamObjects.putAll(changedS3StreamObjects);
+ newS3StreamObjects.removeAll(removedS3StreamObjectIds);
+ }
+ return new S3StreamMetadataImage(streamId, newEpoch, currentState, newStartOffset, newRanges, newS3StreamObjects);
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java
index 54b551f5e0..b721365428 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java
@@ -17,74 +17,160 @@
package org.apache.kafka.image;
-import java.util.Map;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
import com.automq.stream.s3.metadata.S3StreamConstant;
-import com.automq.stream.s3.metadata.StreamOffsetRange;
+import com.automq.stream.s3.metadata.StreamState;
import org.apache.kafka.common.metadata.S3StreamRecord;
-import org.apache.kafka.metadata.stream.RangeMetadata;
-import org.apache.kafka.metadata.stream.S3StreamObject;
import org.apache.kafka.image.writer.ImageWriter;
import org.apache.kafka.image.writer.ImageWriterOptions;
-import com.automq.stream.s3.metadata.StreamState;
+import org.apache.kafka.metadata.stream.RangeMetadata;
+import org.apache.kafka.metadata.stream.S3StreamObject;
-public class S3StreamMetadataImage {
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+public class S3StreamMetadataImage {
public static final S3StreamMetadataImage EMPTY =
- new S3StreamMetadataImage(S3StreamConstant.INVALID_STREAM_ID, S3StreamConstant.INIT_EPOCH, StreamState.CLOSED,
- S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of());
+ new S3StreamMetadataImage(S3StreamConstant.INVALID_STREAM_ID, S3StreamConstant.INIT_EPOCH, StreamState.CLOSED, S3StreamConstant.INIT_START_OFFSET, Collections.emptyList(), Collections.emptyList());
private final long streamId;
private final long epoch;
- private final int rangeIndex;
-
private final long startOffset;
private final StreamState state;
- private final Map ranges;
+ private final List ranges;
- private final Map streamObjects;
+ final DeltaMap streamObjectsMap;
+
+ private List sortedStreamObjects;
+
+ private NavigableMap streamObjectOffsets;
public S3StreamMetadataImage(
- long streamId, long epoch, StreamState state,
- int rangeIndex,
- long startOffset,
- Map ranges,
- Map streamObjects) {
+ long streamId, long epoch, StreamState state,
+ long startOffset,
+ List ranges,
+ List sortedStreamObjects) {
this.streamId = streamId;
this.epoch = epoch;
this.state = state;
- this.rangeIndex = rangeIndex;
this.startOffset = startOffset;
this.ranges = ranges;
- this.streamObjects = streamObjects;
+ DeltaMap streamObjectsMap = new DeltaMap<>(new int[]{10});
+ sortedStreamObjects.forEach(streamObject -> streamObjectsMap.put(streamObject.objectId(), streamObject));
+ this.streamObjectsMap = streamObjectsMap;
+ this.sortedStreamObjects = sortedStreamObjects;
+ }
+
+ public S3StreamMetadataImage(
+ long streamId, long epoch, StreamState state,
+ long startOffset,
+ List ranges,
+ DeltaMap streamObjectsMap) {
+ this.streamId = streamId;
+ this.epoch = epoch;
+ this.state = state;
+ this.startOffset = startOffset;
+ this.ranges = ranges;
+ this.streamObjectsMap = streamObjectsMap;
}
public void write(ImageWriter writer, ImageWriterOptions options) {
writer.write(0, new S3StreamRecord()
- .setStreamId(streamId)
- .setRangeIndex(rangeIndex)
- .setStreamState(state.toByte())
- .setEpoch(epoch)
- .setStartOffset(startOffset));
- ranges.values().forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord()));
- streamObjects.values().forEach(streamObject -> writer.write(streamObject.toRecord()));
+ .setStreamId(streamId)
+ .setRangeIndex(currentRangeIndex())
+ .setStreamState(state.toByte())
+ .setEpoch(epoch)
+ .setStartOffset(startOffset));
+ ranges.forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord()));
+ streamObjectsMap.forEach((id, obj) -> writer.write(obj.toRecord()));
}
- public Map getRanges() {
+ public List getRanges() {
return ranges;
}
- public Map getStreamObjects() {
+ public RangeMetadata lastRange() {
+ if (ranges.isEmpty()) {
+ return null;
+ }
+ return ranges.get(ranges.size() - 1);
+ }
+
+ public int currentRangeIndex() {
+ if (ranges.isEmpty()) {
+ return S3StreamConstant.INIT_RANGE_INDEX;
+ }
+ return lastRange().rangeIndex();
+ }
+
+ public int getRangeContainsOffset(long offset) {
+ int currentRangeIndex = currentRangeIndex();
+ return Collections.binarySearch(ranges, offset, (o1, o2) -> {
+ long startOffset;
+ long endOffset;
+ long offset1;
+ int revert = -1;
+ RangeMetadata range;
+ if (o1 instanceof RangeMetadata) {
+ range = (RangeMetadata) o1;
+ offset1 = (Long) o2;
+ revert = 1;
+ } else {
+ range = (RangeMetadata) o2;
+ offset1 = (Long) o1;
+ }
+ startOffset = range.startOffset();
+ endOffset = range.rangeIndex() == currentRangeIndex ? Long.MAX_VALUE : range.endOffset();
+
+ if (endOffset <= offset1) {
+ return -1 * revert;
+ } else if (startOffset > offset1) {
+ return revert;
+ } else {
+ return 0;
+ }
+ });
+ }
+
+ public List getStreamObjects() {
+ if (sortedStreamObjects != null) {
+ return sortedStreamObjects;
+ }
+ List streamObjects = new ArrayList<>();
+ streamObjectsMap.forEach((objectId, streamObject) -> streamObjects.add(streamObject));
+ streamObjects.sort(Comparator.comparingLong(S3StreamObject::startOffset));
+ this.sortedStreamObjects = streamObjects;
return streamObjects;
}
+ public int floorStreamObjectIndex(long offset) {
+ List sortedStreamObjects = getStreamObjects();
+ if (streamObjectOffsets == null) {
+ // TODO: optimize, get floor index without construct sorted map
+ NavigableMap streamObjectOffsets = new TreeMap<>();
+ for (int i = 0; i < sortedStreamObjects.size(); i++) {
+ S3StreamObject streamObject = sortedStreamObjects.get(i);
+ streamObjectOffsets.put(streamObject.streamOffsetRange().startOffset(), i);
+ }
+ this.streamObjectOffsets = streamObjectOffsets;
+ }
+ Map.Entry entry = streamObjectOffsets.floorEntry(offset);
+ if (entry == null) {
+ return -1;
+ }
+ return entry.getValue();
+ }
+
public long getEpoch() {
return epoch;
}
@@ -93,11 +179,6 @@ public long getStartOffset() {
return startOffset;
}
- public long getEndOffset() {
- RangeMetadata range = ranges.get(rangeIndex);
- return range == null ? startOffset : range.endOffset();
- }
-
public long getStreamId() {
return streamId;
}
@@ -106,18 +187,10 @@ public long startOffset() {
return startOffset;
}
- public int rangeIndex() {
- return rangeIndex;
- }
-
public StreamState state() {
return state;
}
- public StreamOffsetRange offsetRange() {
- return new StreamOffsetRange(streamId, startOffset, getEndOffset());
- }
-
@Override
public boolean equals(Object o) {
if (this == o) {
@@ -128,30 +201,27 @@ public boolean equals(Object o) {
}
S3StreamMetadataImage that = (S3StreamMetadataImage) o;
return this.streamId == that.streamId &&
- this.epoch == that.epoch &&
- this.state == that.state &&
- this.rangeIndex == that.rangeIndex &&
- this.startOffset == that.startOffset &&
- this.ranges.equals(that.ranges) &&
- this.streamObjects.equals(that.streamObjects);
+ this.epoch == that.epoch &&
+ this.state == that.state &&
+ this.startOffset == that.startOffset &&
+ this.ranges.equals(that.ranges) &&
+ this.streamObjectsMap.equals(that.streamObjectsMap);
}
@Override
public int hashCode() {
- return Objects.hash(streamId, epoch, state, rangeIndex, startOffset, ranges, streamObjects);
+ return Objects.hash(streamId, epoch, state, startOffset, ranges);
}
@Override
public String toString() {
return "S3StreamMetadataImage{" +
- "streamId=" + streamId +
- ", epoch=" + epoch +
- ", rangeIndex=" + rangeIndex +
- ", startOffset=" + startOffset +
- ", state=" + state +
- ", ranges=" + ranges +
- ", streamObjects=" + streamObjects.entrySet().stream().
- map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) +
- '}';
+ "streamId=" + streamId +
+ ", epoch=" + epoch +
+ ", startOffset=" + startOffset +
+ ", state=" + state +
+ ", ranges=" + ranges +
+ ", streamObjects=" + streamObjectsMap +
+ '}';
}
}
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java
index 02e0ca8e4a..e22992997a 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java
@@ -17,7 +17,6 @@
package org.apache.kafka.image;
-import org.apache.kafka.common.metadata.AdvanceRangeRecord;
import org.apache.kafka.common.metadata.AssignedStreamIdRecord;
import org.apache.kafka.common.metadata.NodeWALMetadataRecord;
import org.apache.kafka.common.metadata.RangeRecord;
@@ -29,7 +28,6 @@
import org.apache.kafka.common.metadata.S3StreamObjectRecord;
import org.apache.kafka.common.metadata.S3StreamRecord;
import org.apache.kafka.common.metadata.S3StreamSetObjectRecord;
-import org.apache.kafka.metadata.stream.S3StreamSetObject;
import java.util.HashMap;
import java.util.HashSet;
@@ -94,9 +92,6 @@ public void replay(RemoveRangeRecord record) {
public void replay(S3StreamObjectRecord record) {
getOrCreateStreamMetadataDelta(record.streamId()).replay(record);
- getOrCreateStreamMetadataDelta(record.streamId()).replay(new AdvanceRangeRecord()
- .setStartOffset(record.startOffset())
- .setEndOffset(record.endOffset()));
}
public void replay(RemoveS3StreamObjectRecord record) {
@@ -105,12 +100,6 @@ public void replay(RemoveS3StreamObjectRecord record) {
public void replay(S3StreamSetObjectRecord record) {
getOrCreateNodeStreamMetadataDelta(record.nodeId()).replay(record);
- S3StreamSetObject.decode(record.ranges()).forEach(index ->
- getOrCreateStreamMetadataDelta(index.streamId())
- .replay(
- new AdvanceRangeRecord().setStartOffset(index.startOffset()).setEndOffset(index.endOffset())
- )
- );
}
public void replay(RemoveStreamSetObjectRecord record) {
diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java
index 64784dbc26..2d8336ee04 100644
--- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java
+++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java
@@ -18,6 +18,7 @@
package org.apache.kafka.image;
import com.automq.stream.s3.metadata.S3ObjectMetadata;
+import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection;
import com.automq.stream.utils.biniarysearch.ComparableItem;
@@ -35,9 +36,7 @@
import java.util.Comparator;
import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
import java.util.Objects;
-import java.util.Queue;
import java.util.stream.Collectors;
public final class S3StreamsMetadataImage {
@@ -74,34 +73,71 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
}
public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) {
- S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId);
- if (streamMetadata == null) {
- return InRangeObjects.INVALID;
- }
- if (startOffset < streamMetadata.startOffset()) {
- // start offset mismatch
+ S3StreamMetadataImage stream = streamsMetadata.get(streamId);
+ if (stream == null || startOffset < stream.startOffset()) {
return InRangeObjects.INVALID;
}
- List objects = new ArrayList<>();
- long realEndOffset = startOffset;
- List rangeSearchers = rangeSearchers(streamId, startOffset, endOffset);
- // TODO: if one stream object in multiple ranges, we may get duplicate objects
- for (RangeSearcher rangeSearcher : rangeSearchers) {
- InRangeObjects inRangeObjects = rangeSearcher.getObjects(limit);
- if (inRangeObjects == InRangeObjects.INVALID) {
- break;
+ List objects = new LinkedList<>();
+ long nextStartOffset = startOffset;
+
+ int streamObjectIndex = stream.floorStreamObjectIndex(startOffset);
+ List streamObjects = stream.getStreamObjects();
+
+ int lastRangeIndex = -1;
+ List streamSetObjects = null;
+ int streamSetObjectIndex = 0;
+ for (; ; ) {
+ int roundStartObjectSize = objects.size();
+ for (; streamObjectIndex != -1 && streamObjectIndex < streamObjects.size(); streamObjectIndex++) {
+ S3StreamObject streamObject = streamObjects.get(streamObjectIndex);
+ if (streamObject.startOffset() != nextStartOffset) {
+ if (!(objects.isEmpty() && streamObject.endOffset() > nextStartOffset)) {
+ // it's the first object, we only need the stream object contains the startOffset
+ break;
+ }
+ }
+ objects.add(streamObject.toMetadata());
+ nextStartOffset = streamObject.endOffset();
+ if (objects.size() >= limit || nextStartOffset >= endOffset) {
+ return new InRangeObjects(streamId, objects);
+ }
}
- if (inRangeObjects.objects().isEmpty()) {
- throw new IllegalStateException("[BUG] expect getObjects return objects from " + rangeSearcher);
+ if (streamSetObjects == null) {
+ int rangeIndex = stream.getRangeContainsOffset(nextStartOffset);
+ if (rangeIndex < 0 || lastRangeIndex == rangeIndex) {
+ break;
+ }
+ lastRangeIndex = rangeIndex;
+ RangeMetadata range = stream.getRanges().get(rangeIndex);
+ NodeS3StreamSetObjectMetadataImage node = nodeStreamSetObjectMetadata.get(range.nodeId());
+ streamSetObjects = node == null ? Collections.emptyList() : node.orderList();
+ streamSetObjectIndex = 0;
+ }
+
+ for (; streamSetObjectIndex < streamSetObjects.size(); streamSetObjectIndex++) {
+ S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex);
+ StreamOffsetRange streamOffsetRange = search(streamSetObject.offsetRangeList(), streamId);
+ if (streamOffsetRange == null || streamOffsetRange.endOffset() <= nextStartOffset) {
+ continue;
+ }
+ if ((streamOffsetRange.startOffset() == nextStartOffset)
+ || (objects.isEmpty() && streamOffsetRange.startOffset() < nextStartOffset)) {
+ objects.add(new S3ObjectMetadata(streamSetObject.objectId(), S3ObjectType.STREAM_SET, List.of(streamOffsetRange),
+ streamSetObject.dataTimeInMs()));
+ nextStartOffset = streamOffsetRange.endOffset();
+ if (objects.size() >= limit || nextStartOffset >= endOffset) {
+ return new InRangeObjects(streamId, objects);
+ }
+ } else {
+ break;
+ }
}
- realEndOffset = inRangeObjects.endOffset();
- objects.addAll(inRangeObjects.objects());
- limit -= inRangeObjects.objects().size();
- if (limit <= 0 || realEndOffset >= endOffset) {
- break;
+ if (streamSetObjectIndex >= streamSetObjects.size() || objects.size() == roundStartObjectSize) {
+ // move to the next range
+ streamSetObjects = null;
}
}
- return new InRangeObjects(streamId, startOffset, realEndOffset, objects);
+ return new InRangeObjects(streamId, objects);
}
/**
@@ -121,11 +157,11 @@ public List getStreamObjects(long streamId, long startOffset, lo
if (stream == null) {
throw new IllegalArgumentException("stream not found");
}
- Map streamObjectsMetadata = stream.getStreamObjects();
+ List streamObjectsMetadata = stream.getStreamObjects();
if (streamObjectsMetadata == null || streamObjectsMetadata.isEmpty()) {
return Collections.emptyList();
}
- return streamObjectsMetadata.values().stream().filter(obj -> {
+ return streamObjectsMetadata.stream().filter(obj -> {
long objectStartOffset = obj.streamOffsetRange().startOffset();
long objectEndOffset = obj.streamOffsetRange().endOffset();
return objectStartOffset < endOffset && objectEndOffset > startOffset;
@@ -140,162 +176,6 @@ public List getStreamSetObjects(int nodeId) {
return wal.orderList();
}
- private List rangeSearchers(long streamId, long startOffset, long endOffset) {
- S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId);
- List rangeSearchers = new ArrayList<>();
- // TODO: refactor to make ranges in order
- List ranges = streamMetadata.getRanges().values().stream().sorted(Comparator.comparingInt(RangeMetadata::rangeIndex)).collect(Collectors.toList());
- for (RangeMetadata range : ranges) {
- if (range.endOffset() <= startOffset) {
- continue;
- }
- if (range.startOffset() >= endOffset) {
- break;
- }
- long searchEndOffset = Math.min(range.endOffset(), endOffset);
- long searchStartOffset = Math.max(range.startOffset(), startOffset);
- if (searchStartOffset == searchEndOffset) {
- continue;
- }
- rangeSearchers.add(new RangeSearcher(searchStartOffset, searchEndOffset, streamId, range.nodeId()));
- }
- return rangeSearchers;
- }
-
- class RangeSearcher {
-
- private final long startOffset;
- private final long endOffset;
- private final long streamId;
- private final int nodeId;
-
- public RangeSearcher(long startOffset, long endOffset, long streamId, int nodeId) {
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- this.streamId = streamId;
- this.nodeId = nodeId;
- }
-
- private Queue rangeOfStreamSetObjects() {
- NodeS3StreamSetObjectMetadataImage streamSetObjectImage = nodeStreamSetObjectMetadata.get(nodeId);
- List streamSetObjects = streamSetObjectImage.orderList();
- Queue s3ObjectMetadataList = new LinkedList<>();
- for (S3StreamSetObject obj : streamSetObjects) {
- // TODO: cache the stream offset ranges to accelerate the search
- // TODO: cache the last search index, to accelerate the search
- List ranges = obj.offsetRangeList();
- int index = new StreamOffsetRanges(ranges).search(streamId);
- if (index < 0) {
- continue;
- }
- StreamOffsetRange range = ranges.get(index);
- if (range.startOffset() >= endOffset || range.endOffset() < startOffset) {
- continue;
- }
- S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(
- obj.objectId(), obj.objectType(), ranges, obj.dataTimeInMs(),
- obj.orderId());
- s3ObjectMetadataList.add(new S3ObjectMetadataWrapper(s3ObjectMetadata, range.startOffset(), range.endOffset()));
- if (range.endOffset() >= endOffset) {
- break;
- }
- }
- return s3ObjectMetadataList;
- }
-
- private Queue rangeOfStreamObjects() {
- S3StreamMetadataImage stream = streamsMetadata.get(streamId);
- Map streamObjectsMetadata = stream.getStreamObjects();
- // TODO: refactor to make stream objects in order
- if (streamObjectsMetadata != null && !streamObjectsMetadata.isEmpty()) {
- return streamObjectsMetadata.values().stream().filter(obj -> {
- long objectStartOffset = obj.streamOffsetRange().startOffset();
- long objectEndOffset = obj.streamOffsetRange().endOffset();
- return objectStartOffset < endOffset && objectEndOffset > startOffset;
- }).sorted(Comparator.comparing(S3StreamObject::streamOffsetRange)).map(obj -> {
- long startOffset = obj.streamOffsetRange().startOffset();
- long endOffset = obj.streamOffsetRange().endOffset();
- S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata(
- obj.objectId(), obj.objectType(), List.of(obj.streamOffsetRange()), obj.dataTimeInMs());
- return new S3ObjectMetadataWrapper(s3ObjectMetadata, startOffset, endOffset);
- }).collect(Collectors.toCollection(LinkedList::new));
- }
- return new LinkedList<>();
- }
-
- public InRangeObjects getObjects(int limit) {
- if (limit <= 0) {
- return InRangeObjects.INVALID;
- }
- if (!nodeStreamSetObjectMetadata.containsKey(nodeId) || !streamsMetadata.containsKey(streamId)) {
- return InRangeObjects.INVALID;
- }
-
- Queue streamObjects = rangeOfStreamObjects();
- Queue streamSetObjects = rangeOfStreamSetObjects();
- List inRangeObjects = new ArrayList<>();
- long nextStartOffset = startOffset;
-
- while (limit > 0
- && nextStartOffset < endOffset
- && (!streamObjects.isEmpty() || !streamSetObjects.isEmpty())) {
- S3ObjectMetadataWrapper streamRange = null;
- if (streamSetObjects.isEmpty() || (!streamObjects.isEmpty() && streamObjects.peek().startOffset() < streamSetObjects.peek().startOffset())) {
- streamRange = streamObjects.poll();
- } else {
- streamRange = streamSetObjects.poll();
- }
- long objectStartOffset = streamRange.startOffset();
- long objectEndOffset = streamRange.endOffset();
- if (objectStartOffset > nextStartOffset) {
- break;
- }
- if (objectEndOffset <= nextStartOffset) {
- continue;
- }
- inRangeObjects.add(streamRange.metadata);
- limit--;
- nextStartOffset = objectEndOffset;
- }
- return new InRangeObjects(streamId, startOffset, nextStartOffset, inRangeObjects);
- }
-
- @Override
- public String toString() {
- return "RangeSearcher{" +
- "startOffset=" + startOffset +
- ", endOffset=" + endOffset +
- ", streamId=" + streamId +
- ", nodeId=" + nodeId +
- '}';
- }
- }
-
- static class S3ObjectMetadataWrapper {
-
- private final S3ObjectMetadata metadata;
- private final long startOffset;
- private final long endOffset;
-
- public S3ObjectMetadataWrapper(S3ObjectMetadata metadata, long startOffset, long endOffset) {
- this.metadata = metadata;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
- }
-
- public S3ObjectMetadata metadata() {
- return metadata;
- }
-
- public long startOffset() {
- return startOffset;
- }
-
- public long endOffset() {
- return endOffset;
- }
- }
-
@Override
public boolean equals(Object obj) {
if (this == obj) {
@@ -323,15 +203,6 @@ public DeltaMap streamsMetadata() {
return streamsMetadata;
}
- public StreamOffsetRange offsetRange(long streamId) {
- S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId);
- if (streamMetadata == null) {
- return StreamOffsetRange.INVALID;
- }
- return streamMetadata.offsetRange();
- }
-
-
public long nextAssignedStreamId() {
return nextAssignedStreamId;
}
@@ -341,6 +212,14 @@ public String toString() {
return "S3StreamsMetadataImage{nextAssignedStreamId=" + nextAssignedStreamId + '}';
}
+ public static StreamOffsetRange search(List ranges, long streamId) {
+ int index = new StreamOffsetRanges(ranges).search(streamId);
+ if (index < 0) {
+ return null;
+ }
+ return ranges.get(index);
+ }
+
static class StreamOffsetRanges extends AbstractOrderedCollection {
private final List ranges;
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java
index 973e94df60..4629a1529f 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java
@@ -24,18 +24,18 @@
public class InRangeObjects {
- public static final InRangeObjects INVALID = new InRangeObjects(-1, -1, -1, List.of());
+ public static final InRangeObjects INVALID = new InRangeObjects(-1, List.of());
private final long streamId;
private final long startOffset;
private final long endOffset;
private final List objects;
- public InRangeObjects(long streamId, long startOffset, long endOffset, List objects) {
+ public InRangeObjects(long streamId, List objects) {
this.streamId = streamId;
- this.startOffset = startOffset;
- this.endOffset = endOffset;
this.objects = objects;
+ this.startOffset = objects.isEmpty() ? -1L : objects.get(0).startOffset();
+ this.endOffset = objects.isEmpty() ? -1L : objects.get(objects.size() - 1).endOffset();
}
public long streamId() {
@@ -57,11 +57,11 @@ public List objects() {
@Override
public String toString() {
return "InRangeObjects{" +
- "streamId=" + streamId +
- ", startOffset=" + startOffset +
- ", endOffset=" + endOffset +
- ", objects=" + objects +
- '}';
+ "streamId=" + streamId +
+ ", startOffset=" + startOffset +
+ ", endOffset=" + endOffset +
+ ", objects=" + objects +
+ '}';
}
@Override
@@ -74,9 +74,9 @@ public boolean equals(Object o) {
}
InRangeObjects that = (InRangeObjects) o;
return streamId == that.streamId
- && startOffset == that.startOffset
- && endOffset == that.endOffset
- && objects.equals(that.objects);
+ && startOffset == that.startOffset
+ && endOffset == that.endOffset
+ && objects.equals(that.objects);
}
@Override
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java
index 012c43d000..b4902c04de 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java
@@ -17,8 +17,10 @@
package org.apache.kafka.metadata.stream;
+import java.util.List;
import java.util.Objects;
+import com.automq.stream.s3.metadata.S3ObjectMetadata;
import com.automq.stream.s3.metadata.S3ObjectType;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import org.apache.kafka.common.metadata.S3StreamObjectRecord;
@@ -28,16 +30,32 @@ public class S3StreamObject {
private final long objectId;
private final long dataTimeInMs;
- private final StreamOffsetRange streamOffsetRange;
+ private final long streamId;
+ private final long startOffset;
+ private final long endOffset;
public S3StreamObject(long objectId, long streamId, long startOffset, long endOffset, long dataTimeInMs) {
this.objectId = objectId;
- this.streamOffsetRange = new StreamOffsetRange(streamId, startOffset, endOffset);
+ this.streamId = streamId;
+ this.startOffset = startOffset;
+ this.endOffset = endOffset;
this.dataTimeInMs = dataTimeInMs;
}
public StreamOffsetRange streamOffsetRange() {
- return streamOffsetRange;
+ return new StreamOffsetRange(streamId, startOffset, endOffset);
+ }
+
+ public long streamId() {
+ return streamId;
+ }
+
+ public long startOffset() {
+ return startOffset;
+ }
+
+ public long endOffset() {
+ return endOffset;
}
public long objectId() {
@@ -55,12 +73,16 @@ public long dataTimeInMs() {
public ApiMessageAndVersion toRecord() {
return new ApiMessageAndVersion(new S3StreamObjectRecord()
.setObjectId(objectId)
- .setStreamId(streamOffsetRange.streamId())
- .setStartOffset(streamOffsetRange.startOffset())
- .setEndOffset(streamOffsetRange.endOffset())
+ .setStreamId(streamId)
+ .setStartOffset(startOffset)
+ .setEndOffset(endOffset)
.setDataTimeInMs(dataTimeInMs), (short) 0);
}
+ public S3ObjectMetadata toMetadata() {
+ return new S3ObjectMetadata(objectId, S3ObjectType.STREAM, List.of(streamOffsetRange()), dataTimeInMs);
+ }
+
public static S3StreamObject of(S3StreamObjectRecord record) {
S3StreamObject s3StreamObject = new S3StreamObject(
record.objectId(), record.streamId(),
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java
index a3dba0547b..655bd2baae 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java
@@ -21,17 +21,27 @@
import com.automq.stream.s3.metadata.S3StreamConstant;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.github.luben.zstd.Zstd;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.Weigher;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.kafka.common.metadata.S3StreamSetObjectRecord;
import org.apache.kafka.server.common.ApiMessageAndVersion;
+import java.time.Duration;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.ExecutionException;
public class S3StreamSetObject implements Comparable {
+ private static final Cache> RANGES_CACHE = CacheBuilder.newBuilder()
+ .expireAfterAccess(Duration.ofMinutes(1))
+ .maximumWeight(500000) // expected max heap occupied size is 15MiB
+ .weigher((Weigher>) (key, value) -> value.size())
+ .build();
public static final byte MAGIC = 0x01;
public static final byte ZSTD_COMPRESSED = 1 << 1;
private static final int COMPRESSION_THRESHOLD = 50;
@@ -51,11 +61,11 @@ public class S3StreamSetObject implements Comparable {
// Only used for testing
public S3StreamSetObject(long objectId, int nodeId, final List streamOffsetRanges, long orderId) {
- this(objectId, nodeId, sortAndEncode(streamOffsetRanges), orderId, S3StreamConstant.INVALID_TS);
+ this(objectId, nodeId, sortAndEncode(objectId, streamOffsetRanges), orderId, S3StreamConstant.INVALID_TS);
}
public S3StreamSetObject(long objectId, int nodeId, final List streamOffsetRanges, long orderId, long dateTimeInMs) {
- this(objectId, nodeId, sortAndEncode(streamOffsetRanges), orderId, dateTimeInMs);
+ this(objectId, nodeId, sortAndEncode(objectId, streamOffsetRanges), orderId, dateTimeInMs);
}
public S3StreamSetObject(long objectId, int nodeId, byte[] ranges, long orderId, long dataTimeInMs) {
@@ -67,7 +77,11 @@ public S3StreamSetObject(long objectId, int nodeId, byte[] ranges, long orderId,
}
public List offsetRangeList() {
- return decode(ranges);
+ try {
+ return RANGES_CACHE.get(objectId, () -> decode(ranges));
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
}
public ApiMessageAndVersion toRecord() {
@@ -136,9 +150,10 @@ public int compareTo(S3StreamSetObject o) {
return Long.compare(this.orderId, o.orderId);
}
- public static byte[] sortAndEncode(List streamOffsetRanges) {
+ public static byte[] sortAndEncode(long objectId, List streamOffsetRanges) {
streamOffsetRanges = new ArrayList<>(streamOffsetRanges);
streamOffsetRanges.sort(Comparator.comparingLong(StreamOffsetRange::streamId));
+ RANGES_CACHE.put(objectId, streamOffsetRanges);
return encode(streamOffsetRanges);
}
diff --git a/metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json b/metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json
deleted file mode 100644
index f9e50cd59a..0000000000
--- a/metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json
+++ /dev/null
@@ -1,36 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-{
- "apiKey": 515,
- "type": "metadata",
- "name": "AdvanceRangeRecord",
- "validVersions": "0",
- "flexibleVersions": "0+",
- "fields": [
- {
- "name": "StartOffset",
- "type": "int64",
- "versions": "0+",
- "about": "The start offset of the range"
- },
- {
- "name": "EndOffset",
- "type": "int64",
- "versions": "0+",
- "about": "The end offset of the range"
- }
- ]
-}
\ No newline at end of file
diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java
index a6ec2f9e94..008978e821 100644
--- a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java
@@ -17,14 +17,8 @@
package org.apache.kafka.image;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
import com.automq.stream.s3.metadata.S3StreamConstant;
-import org.apache.kafka.common.metadata.AdvanceRangeRecord;
+import com.automq.stream.s3.metadata.StreamState;
import org.apache.kafka.common.metadata.RangeRecord;
import org.apache.kafka.common.metadata.RemoveRangeRecord;
import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord;
@@ -35,12 +29,17 @@
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.metadata.stream.RangeMetadata;
import org.apache.kafka.metadata.stream.S3StreamObject;
-import com.automq.stream.s3.metadata.StreamState;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
@Timeout(value = 40)
@Tag("S3Unit")
public class S3StreamMetadataImageTest {
@@ -58,16 +57,16 @@ public void testRanges() {
S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0);
// 1. create stream0
delta0Records.add(new ApiMessageAndVersion(new S3StreamRecord()
- .setStreamId(STREAM0)
- .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
- .setStreamState(StreamState.OPENED.toByte())
- .setStartOffset(S3StreamConstant.INIT_START_OFFSET)
- .setEpoch(0L)
- .setStartOffset(0L), (short) 0));
+ .setStreamId(STREAM0)
+ .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
+ .setStreamState(StreamState.OPENED.toByte())
+ .setStartOffset(S3StreamConstant.INIT_START_OFFSET)
+ .setEpoch(0L)
+ .setStartOffset(0L), (short) 0));
RecordTestUtils.replayAll(delta0, delta0Records);
// verify delta and check image's write
S3StreamMetadataImage image1 = new S3StreamMetadataImage(
- STREAM0, 0L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of());
+ STREAM0, 0L, StreamState.OPENED, S3StreamConstant.INIT_START_OFFSET, Collections.emptyList(), Collections.emptyList());
assertEquals(image1, delta0.apply());
testToImageAndBack(image1);
@@ -76,22 +75,22 @@ public void testRanges() {
List delta1Records = new ArrayList<>();
S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1);
delta1Records.add(new ApiMessageAndVersion(new S3StreamRecord()
- .setStreamId(STREAM0)
- .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
- .setStreamState(StreamState.OPENED.toByte())
- .setStartOffset(S3StreamConstant.INIT_START_OFFSET)
- .setEpoch(1L), (short) 0));
+ .setStreamId(STREAM0)
+ .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX)
+ .setStreamState(StreamState.OPENED.toByte())
+ .setStartOffset(S3StreamConstant.INIT_START_OFFSET)
+ .setEpoch(1L), (short) 0));
delta1Records.add(new ApiMessageAndVersion(new RangeRecord()
- .setStreamId(STREAM0)
- .setRangeIndex(0)
- .setEpoch(1L)
- .setNodeId(BROKER0)
- .setStartOffset(0L), (short) 0));
+ .setStreamId(STREAM0)
+ .setRangeIndex(0)
+ .setEpoch(1L)
+ .setNodeId(BROKER0)
+ .setStartOffset(0L), (short) 0));
RecordTestUtils.replayAll(delta1, delta1Records);
// verify delta and check image's write
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
- STREAM0, 1L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET,
- Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Map.of());
+ STREAM0, 1L, StreamState.OPENED, S3StreamConstant.INIT_START_OFFSET,
+ List.of(new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Collections.emptyList());
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
@@ -99,27 +98,24 @@ public void testRanges() {
List delta2Records = new ArrayList<>();
S3StreamMetadataDelta delta2 = new S3StreamMetadataDelta(image2);
delta2Records.add(new ApiMessageAndVersion(new S3StreamRecord()
- .setStreamId(STREAM0)
- .setRangeIndex(0)
- .setStreamState(StreamState.OPENED.toByte())
- .setStartOffset(S3StreamConstant.INIT_START_OFFSET)
- .setEpoch(2L), (short) 0));
- delta2Records.add(new ApiMessageAndVersion(new AdvanceRangeRecord()
- .setStartOffset(0L)
- .setEndOffset(100L), (short) 0));
+ .setStreamId(STREAM0)
+ .setRangeIndex(0)
+ .setStreamState(StreamState.OPENED.toByte())
+ .setStartOffset(S3StreamConstant.INIT_START_OFFSET)
+ .setEpoch(2L), (short) 0));
delta2Records.add(new ApiMessageAndVersion(new RangeRecord()
- .setStreamId(STREAM0)
- .setRangeIndex(1)
- .setEpoch(2L)
- .setNodeId(BROKER1)
- .setStartOffset(100L)
- .setEndOffset(100L), (short) 0));
+ .setStreamId(STREAM0)
+ .setRangeIndex(1)
+ .setEpoch(2L)
+ .setNodeId(BROKER1)
+ .setStartOffset(100L)
+ .setEndOffset(100L), (short) 0));
RecordTestUtils.replayAll(delta2, delta2Records);
// verify delta and check image's write
S3StreamMetadataImage image3 = new S3StreamMetadataImage(
- STREAM0, 2L, StreamState.OPENED, 0, 0L, Map.of(
- 0, new RangeMetadata(STREAM0, 1L, 0, 0, 100, BROKER0),
- 1, new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Map.of());
+ STREAM0, 2L, StreamState.OPENED, 0L, List.of(
+ new RangeMetadata(STREAM0, 1L, 0, 0, 0, BROKER0),
+ new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Collections.emptyList());
assertEquals(image3, delta2.apply());
testToImageAndBack(image3);
@@ -127,45 +123,45 @@ public void testRanges() {
List delta3Records = new ArrayList<>();
S3StreamMetadataDelta delta3 = new S3StreamMetadataDelta(image3);
delta3Records.add(new ApiMessageAndVersion(new S3StreamRecord()
- .setStreamId(STREAM0)
- .setEpoch(2L)
- .setRangeIndex(0)
- .setStreamState(StreamState.OPENED.toByte())
- .setStartOffset(100L), (short) 0));
+ .setStreamId(STREAM0)
+ .setEpoch(2L)
+ .setRangeIndex(0)
+ .setStreamState(StreamState.OPENED.toByte())
+ .setStartOffset(100L), (short) 0));
delta3Records.add(new ApiMessageAndVersion(new RemoveRangeRecord()
- .setStreamId(STREAM0)
- .setRangeIndex(0), (short) 0));
+ .setStreamId(STREAM0)
+ .setRangeIndex(0), (short) 0));
RecordTestUtils.replayAll(delta3, delta3Records);
// verify delta and check image's write
S3StreamMetadataImage image4 = new S3StreamMetadataImage(
- STREAM0, 2L, StreamState.OPENED, 0, 100L, Map.of(
- 1, new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Map.of());
+ STREAM0, 2L, StreamState.OPENED, 100L, List.of(
+ new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Collections.emptyList());
assertEquals(image4, delta3.apply());
}
@Test
public void testStreamObjects() {
S3StreamMetadataImage image0 = new S3StreamMetadataImage(
- STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of());
+ STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), Collections.emptyList());
List delta0Records = new ArrayList<>();
S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0);
// 1. create streamObject0 and streamObject1
delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord()
- .setObjectId(0L)
- .setStreamId(STREAM0)
- .setStartOffset(0L)
- .setEndOffset(100L), (short) 0));
+ .setObjectId(0L)
+ .setStreamId(STREAM0)
+ .setStartOffset(0L)
+ .setEndOffset(100L), (short) 0));
delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord()
- .setObjectId(1L)
- .setStreamId(STREAM0)
- .setStartOffset(100L)
- .setEndOffset(200L), (short) 0));
+ .setObjectId(1L)
+ .setStreamId(STREAM0)
+ .setStartOffset(100L)
+ .setEndOffset(200L), (short) 0));
RecordTestUtils.replayAll(delta0, delta0Records);
// verify delta and check image's write
S3StreamMetadataImage image1 = new S3StreamMetadataImage(
- STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of(
- 0L, new S3StreamObject(0L, 999, STREAM0, 0L, 100L),
- 1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
+ STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of(
+ new S3StreamObject(0L, 999, STREAM0, 0L, 100L),
+ new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
assertEquals(image1, delta0.apply());
testToImageAndBack(image1);
@@ -173,12 +169,12 @@ public void testStreamObjects() {
List delta1Records = new ArrayList<>();
S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1);
delta1Records.add(new ApiMessageAndVersion(new RemoveS3StreamObjectRecord()
- .setObjectId(0L), (short) 0));
+ .setObjectId(0L), (short) 0));
RecordTestUtils.replayAll(delta1, delta1Records);
// verify delta and check image's write
S3StreamMetadataImage image2 = new S3StreamMetadataImage(
- STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of(
- 1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
+ STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of(
+ new S3StreamObject(1L, 999, STREAM0, 100L, 200L)));
assertEquals(image2, delta1.apply());
testToImageAndBack(image2);
}
diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
index 74e656d729..d7845830d4 100644
--- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java
@@ -35,7 +35,6 @@
import org.junit.jupiter.api.Timeout;
import java.util.List;
-import java.util.Map;
import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -114,17 +113,17 @@ public void testGetObjects() {
broker0Objects);
NodeS3StreamSetObjectMetadataImage broker1WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER1, S3StreamConstant.INVALID_BROKER_EPOCH,
broker1Objects);
- Map ranges = Map.of(
- 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 140L, BROKER0),
- 1, new RangeMetadata(STREAM0, 1L, 1, 140L, 180L, BROKER1),
- 2, new RangeMetadata(STREAM0, 2L, 2, 180L, 420L, BROKER0),
- 3, new RangeMetadata(STREAM0, 3L, 3, 420L, 520L, BROKER1),
- 4, new RangeMetadata(STREAM0, 4L, 4, 520L, 600L, BROKER0));
- Map streamObjects = Map.of(
- 8L, new S3StreamObject(8, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS),
- 9L, new S3StreamObject(9, STREAM0, 200L, 300L, S3StreamConstant.INVALID_TS),
- 10L, new S3StreamObject(10, STREAM0, 300L, 400L, S3StreamConstant.INVALID_TS));
- S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 4, 10, ranges, streamObjects);
+ List ranges = List.of(
+ new RangeMetadata(STREAM0, 0L, 0, 10L, 140L, BROKER0),
+ new RangeMetadata(STREAM0, 1L, 1, 140L, 180L, BROKER1),
+ new RangeMetadata(STREAM0, 2L, 2, 180L, 420L, BROKER0),
+ new RangeMetadata(STREAM0, 3L, 3, 420L, 520L, BROKER1),
+ new RangeMetadata(STREAM0, 4L, 4, 520L, 600L, BROKER0));
+ List streamObjects = List.of(
+ new S3StreamObject(8, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS),
+ new S3StreamObject(9, STREAM0, 200L, 300L, S3StreamConstant.INVALID_TS),
+ new S3StreamObject(10, STREAM0, 300L, 400L, S3StreamConstant.INVALID_TS));
+ S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects);
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage));
@@ -148,14 +147,14 @@ public void testGetObjects() {
// 4. search stream_0 in [20, 550)
objects = streamsImage.getObjects(STREAM0, 20, 550, Integer.MAX_VALUE);
- assertEquals(20, objects.startOffset());
+ assertEquals(10, objects.startOffset());
assertEquals(600, objects.endOffset());
assertEquals(11, objects.objects().size());
assertEquals(expectedObjectIds, objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList()));
// 5. search stream_0 in [20, 550) with limit 5
objects = streamsImage.getObjects(STREAM0, 20, 550, 5);
- assertEquals(20, objects.startOffset());
+ assertEquals(10, objects.startOffset());
assertEquals(180, objects.endOffset());
assertEquals(5, objects.objects().size());
assertEquals(expectedObjectIds.subList(0, 5), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList()));
@@ -169,14 +168,21 @@ public void testGetObjects() {
// 7. search stream_0 in [401, 519)
objects = streamsImage.getObjects(STREAM0, 401, 519, Integer.MAX_VALUE);
- assertEquals(401, objects.startOffset());
+ assertEquals(400, objects.startOffset());
assertEquals(520, objects.endOffset());
assertEquals(2, objects.objects().size());
assertEquals(expectedObjectIds.subList(8, 10), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList()));
// 8. search stream_0 in [399, 521)
objects = streamsImage.getObjects(STREAM0, 399, 521, Integer.MAX_VALUE);
- assertEquals(399, objects.startOffset());
+ assertEquals(300, objects.startOffset());
+ assertEquals(600, objects.endOffset());
+ assertEquals(4, objects.objects().size());
+ assertEquals(expectedObjectIds.subList(7, 11), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList()));
+
+ // 9. search stream0 in [399, 1000)
+ objects = streamsImage.getObjects(STREAM0, 399, 1000, Integer.MAX_VALUE);
+ assertEquals(300, objects.startOffset());
assertEquals(600, objects.endOffset());
assertEquals(4, objects.objects().size());
assertEquals(expectedObjectIds.subList(7, 11), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList()));