From d85daa5426239318dc47599666c8dcd8e571779e Mon Sep 17 00:00:00 2001 From: Robin Han Date: Wed, 10 Jan 2024 17:57:43 +0800 Subject: [PATCH 1/2] feat(issues647): optimize s3stream metadata image memory usage Signed-off-by: Robin Han --- checkstyle/import-control.xml | 1 + checkstyle/suppressions.xml | 4 +- .../s3/metadata/StreamMetadataManager.java | 213 +++++--------- .../stream/s3/StreamMetadataManagerTest.java | 33 +-- .../controller/ControllerMetricsManager.java | 1 - .../stream/StreamControlManager.java | 8 + .../java/org/apache/kafka/image/DeltaMap.java | 2 +- .../kafka/image/S3StreamMetadataDelta.java | 72 +++-- .../kafka/image/S3StreamMetadataImage.java | 189 +++++++++---- .../kafka/image/S3StreamsMetadataDelta.java | 11 - .../kafka/image/S3StreamsMetadataImage.java | 261 +++++------------- .../kafka/metadata/stream/InRangeObjects.java | 24 +- .../kafka/metadata/stream/S3StreamObject.java | 34 ++- .../metadata/stream/S3StreamSetObject.java | 23 +- .../common/metadata/AdvanceRangeRecord.json | 36 --- .../image/S3StreamMetadataImageTest.java | 132 +++++---- .../image/S3StreamsMetadataImageTest.java | 38 +-- 17 files changed, 465 insertions(+), 617 deletions(-) delete mode 100644 metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 9721750616..7ae0ce07e0 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -41,6 +41,7 @@ + diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index 109203e177..85425d0d6a 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -306,9 +306,9 @@ + files="(ClientQuotasImage|MetadataDelta|QuorumController|ReplicationControlManager|S3StreamsMetadataImage).java"/> + files="(ClientQuotasImage|KafkaEventQueue|ReplicationControlManager|FeatureControlManager|S3StreamsMetadataImage).java"/> >> pendingGetObjectsTasks; + private final List pendingGetObjectsTasks; private final ExecutorService pendingExecutorService; // TODO: we just need the version of streams metadata, not the whole image private volatile OffsetAndEpoch version; @@ -74,8 +67,7 @@ public StreamMetadataManager(BrokerServer broker, KafkaConfig config) { this.objectsImage = currentImage.objectsMetadata(); this.version = currentImage.highestOffsetAndEpoch(); this.broker.metadataListener().registerMetadataListener(this::onImageChanged); - // TODO: optimize by more suitable data structure for pending tasks - this.pendingGetObjectsTasks = new HashMap<>(); + this.pendingGetObjectsTasks = new LinkedList<>(); this.pendingExecutorService = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pending-get-objects-task-executor")); } @@ -89,15 +81,9 @@ private void onImageChanged(MetadataDelta delta, MetadataImage newImage) { // update image this.streamsImage = newImage.streamsMetadata(); this.objectsImage = newImage.objectsMetadata(); - // remove all catch up pending tasks - List retryTasks = removePendingTasks(); + // retry all pending tasks - if (retryTasks.isEmpty()) { - return; - } - this.pendingExecutorService.submit(() -> { - retryPendingTasks(retryTasks); - }); + retryPendingTasks(); } } @@ -116,77 +102,45 @@ public CompletableFuture> getStreamSetObjects() { } } - // must access thread safe - private List removePendingTasks() { - if (this.pendingGetObjectsTasks == null || this.pendingGetObjectsTasks.isEmpty()) { - return Collections.emptyList(); - } - Set pendingStreams = pendingGetObjectsTasks.keySet(); - List pendingStreamsOffsetRange = pendingStreams - .stream() - .map(streamsImage::offsetRange) - .filter(offset -> offset != StreamOffsetRange.INVALID) - .collect(Collectors.toList()); - if (pendingStreamsOffsetRange.isEmpty()) { - return Collections.emptyList(); - } - List retryTasks = new ArrayList<>(); - pendingStreamsOffsetRange.forEach(offsetRange -> { - long streamId = offsetRange.streamId(); - long endOffset = offsetRange.endOffset(); - Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.get(streamId); - if (tasks == null || tasks.isEmpty()) { - return; - } - Iterator>> iterator = - tasks.entrySet().iterator(); - while (iterator.hasNext()) { - Entry> entry = iterator.next(); - long pendingEndOffset = entry.getKey(); - if (pendingEndOffset > endOffset) { - break; + @Override + public synchronized CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit) { + // TODO: cache the object list for next search + return exec(() -> fetch0(streamId, startOffset, endOffset, limit), LOGGER, "fetchObjects").thenApply(rst -> { + rst.objects().forEach(object -> { + S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId()); + if (objectMetadata == null) { + // should not happen + LOGGER.error( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", + streamId, startOffset, endOffset, limit); + throw new IllegalStateException("cannt find object metadata for object: " + object.objectId()); } - iterator.remove(); - List getObjectsTasks = entry.getValue(); - retryTasks.addAll(getObjectsTasks); - } - if (tasks.isEmpty()) { - StreamMetadataManager.this.pendingGetObjectsTasks.remove(streamId); + object.setObjectSize(objectMetadata.getObjectSize()); + object.setCommittedTimestamp(objectMetadata.getCommittedTimeInMs()); + + }); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace( + "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", + streamId, startOffset, endOffset, limit, rst); } + return rst; }); - return retryTasks; } - @Override - public synchronized CompletableFuture fetch(long streamId, long startOffset, long endOffset, int limit) { - S3StreamMetadataImage streamImage = streamsImage.streamsMetadata().get(streamId); - if (streamImage == null) { - LOGGER.warn( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and streamImage is null", - streamId, startOffset, endOffset, limit); - return CompletableFuture.completedFuture(InRangeObjects.INVALID); - } - StreamOffsetRange offsetRange = streamImage.offsetRange(); - if (offsetRange == null || offsetRange == StreamOffsetRange.INVALID) { - return CompletableFuture.completedFuture(InRangeObjects.INVALID); - } - long streamStartOffset = offsetRange.startOffset(); - long streamEndOffset = offsetRange.endOffset(); - if (startOffset < streamStartOffset) { - LOGGER.warn( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and startOffset < streamStartOffset: {}", - streamId, startOffset, endOffset, limit, streamStartOffset); - return CompletableFuture.completedFuture(InRangeObjects.INVALID); - } - endOffset = endOffset == NOOP_OFFSET ? streamEndOffset : endOffset; - if (endOffset > streamEndOffset) { - // lag behind, need to wait for cache catch up - LOGGER.warn("[FetchObjects]: pending request, stream: {}, startOffset: {}, endOffset: {}, streamEndOffset: {}, limit: {}", - streamId, startOffset, endOffset, streamEndOffset, limit); - return pendingFetch(streamId, startOffset, endOffset, limit); - } - long finalEndOffset = endOffset; - return FutureUtil.exec(() -> fetch0(streamId, startOffset, finalEndOffset, limit), LOGGER, "fetch"); + private synchronized CompletableFuture fetch0(long streamId, long startOffset, long endOffset, int limit) { + InRangeObjects rst = streamsImage.getObjects(streamId, startOffset, endOffset, limit); + if (rst.objects().size() >= limit || rst.endOffset() >= endOffset || rst == InRangeObjects.INVALID) { + return CompletableFuture.completedFuture(rst); + } + if (rst.endOffset() >= endOffset || rst.objects().size() >= limit) { + return CompletableFuture.completedFuture(rst); + } + LOGGER.info("[FetchObjects],[PENDING],streamId={} startOffset={} endOffset={} limit={}", streamId, startOffset, endOffset, limit); + CompletableFuture pendingCf = pendingFetch(); + CompletableFuture rstCf = new CompletableFuture<>(); + FutureUtil.propagate(pendingCf.thenCompose(nil -> fetch0(streamId, startOffset, endOffset, limit)), rstCf); + return rstCf.whenComplete((r, ex) -> LOGGER.info("[FetchObjects],[COMPLETE_PENDING],streamId={} startOffset={} endOffset={} limit={}", streamId, startOffset, endOffset, limit)); } public CompletableFuture> getStreamObjects(long streamId, long startOffset, long endOffset, int limit) { @@ -220,7 +174,12 @@ public List getStreamMetadataList(List streamIds) { continue; } StreamMetadata streamMetadata = new StreamMetadata(streamId, streamImage.getEpoch(), - streamImage.getStartOffset(), streamImage.getEndOffset(), streamImage.state()); + streamImage.getStartOffset(), -1L, streamImage.state()) { + @Override + public long endOffset() { + throw new UnsupportedOperationException(); + } + }; streamMetadataList.add(streamMetadata); } return streamMetadataList; @@ -228,77 +187,31 @@ public List getStreamMetadataList(List streamIds) { } // must access thread safe - private CompletableFuture pendingFetch(long streamId, long startOffset, long endOffset, int limit) { - GetObjectsTask task = GetObjectsTask.of(streamId, startOffset, endOffset, limit); - Map> tasks = StreamMetadataManager.this.pendingGetObjectsTasks.computeIfAbsent(task.streamId, - k -> new TreeMap<>()); - List getObjectsTasks = tasks.computeIfAbsent(task.endOffset, k -> new ArrayList<>()); - getObjectsTasks.add(task); + private CompletableFuture pendingFetch() { + GetObjectsTask task = new GetObjectsTask(); + synchronized (pendingGetObjectsTasks) { + pendingGetObjectsTasks.add(task); + } return task.cf; } - // must access thread safe - private synchronized CompletableFuture fetch0(long streamId, long startOffset, long endOffset, int limit) { - InRangeObjects cachedInRangeObjects = streamsImage.getObjects(streamId, startOffset, endOffset, limit); - if (cachedInRangeObjects == null || cachedInRangeObjects == InRangeObjects.INVALID) { - LOGGER.warn( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", - streamId, startOffset, endOffset, limit); - return CompletableFuture.completedFuture(InRangeObjects.INVALID); - } - // fill the objects' size and committed-timestamp - for (S3ObjectMetadata object : cachedInRangeObjects.objects()) { - S3Object objectMetadata = objectsImage.getObjectMetadata(object.objectId()); - if (objectMetadata == null) { - // should not happen - LOGGER.error( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache failed with empty result", - streamId, startOffset, endOffset, limit); - return CompletableFuture.completedFuture(InRangeObjects.INVALID); + void retryPendingTasks() { + synchronized (pendingGetObjectsTasks) { + if (pendingGetObjectsTasks.isEmpty()) { + return; } - object.setObjectSize(objectMetadata.getObjectSize()); - object.setCommittedTimestamp(objectMetadata.getCommittedTimeInMs()); + LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", pendingGetObjectsTasks.size()); + pendingGetObjectsTasks.forEach(t -> t.cf.completeAsync(() -> null, pendingExecutorService)); + pendingGetObjectsTasks.clear(); } - LOGGER.trace( - "[FetchObjects]: stream: {}, startOffset: {}, endOffset: {}, limit: {}, and search in metadataCache success with result: {}", - streamId, startOffset, endOffset, limit, cachedInRangeObjects); - return CompletableFuture.completedFuture(cachedInRangeObjects); - } - - void retryPendingTasks(List tasks) { - if (tasks == null || tasks.isEmpty()) { - return; - } - LOGGER.info("[RetryPendingTasks]: retry tasks count: {}", tasks.size()); - tasks.forEach(task -> { - long streamId = task.streamId; - long startOffset = task.startOffset; - long endOffset = task.endOffset; - int limit = task.limit; - CompletableFuture newCf = this.fetch(streamId, startOffset, endOffset, limit); - FutureUtil.propagate(newCf, task.cf); - }); } static class GetObjectsTask { - private final CompletableFuture cf; - private final long streamId; - private final long startOffset; - private final long endOffset; - private final int limit; - - public static GetObjectsTask of(long streamId, long startOffset, long endOffset, int limit) { - CompletableFuture cf = new CompletableFuture<>(); - return new GetObjectsTask(cf, streamId, startOffset, endOffset, limit); - } + private final CompletableFuture cf; - private GetObjectsTask(CompletableFuture cf, long streamId, long startOffset, long endOffset, int limit) { - this.cf = cf; - this.streamId = streamId; - this.startOffset = startOffset; - this.endOffset = endOffset; - this.limit = limit; + public GetObjectsTask() { + this.cf = new CompletableFuture<>(); } } diff --git a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java index b3423d3b6f..d9d45e5d0e 100644 --- a/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java +++ b/core/src/test/java/kafka/log/stream/s3/StreamMetadataManagerTest.java @@ -46,7 +46,7 @@ import org.mockito.ArgumentMatchers; import org.mockito.Mockito; -import java.util.HashMap; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -103,12 +103,9 @@ public void setUp() { )); S3ObjectsImage objectsImage = new S3ObjectsImage(2L, map); - Map ranges = Map.of( - 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 100L, BROKER0) - ); - Map streamObjects = Map.of( - 0L, new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS)); - S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 0, 10L, ranges, streamObjects); + List ranges = List.of(new RangeMetadata(STREAM0, 0L, 0, 10L, 100L, BROKER0)); + List streamObjects = List.of(new S3StreamObject(0L, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS)); + S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 1L, StreamState.OPENED, 10L, ranges, streamObjects); NodeS3StreamSetObjectMetadataImage walMetadataImage0 = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, DeltaMap.of( 1L, new S3StreamSetObject(1L, BROKER0, List.of( @@ -120,20 +117,20 @@ public void setUp() { DeltaMap.of(BROKER0, walMetadataImage0)); image0 = new MetadataImage(new MetadataProvenance(0, 0, 0), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null); - ranges = new HashMap<>(ranges); - ranges.put(1, new RangeMetadata(STREAM0, 1L, 1, 100L, 150L, BROKER0)); - streamObjects = new HashMap<>(streamObjects); - streamObjects.put(1L, new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS)); - streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 1, 10L, ranges, streamObjects); + ranges = new ArrayList<>(ranges); + ranges.add(new RangeMetadata(STREAM0, 1L, 1, 100L, 150L, BROKER0)); + streamObjects = new ArrayList<>(streamObjects); + streamObjects.add(new S3StreamObject(1L, STREAM0, 100L, 150L, S3StreamConstant.INVALID_TS)); + streamImage = new S3StreamMetadataImage(STREAM0, 2L, StreamState.OPENED, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), DeltaMap.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); image1 = new MetadataImage(new MetadataProvenance(1, 1, 1), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null); - ranges = new HashMap<>(ranges); - ranges.put(2, new RangeMetadata(STREAM0, 2L, 2, 150L, 200L, BROKER0)); - streamObjects = new HashMap<>(streamObjects); - streamObjects.put(2L, new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS)); - streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 2, 10L, ranges, streamObjects); + ranges = new ArrayList<>(ranges); + ranges.add(new RangeMetadata(STREAM0, 2L, 2, 150L, 200L, BROKER0)); + streamObjects = new ArrayList<>(streamObjects); + streamObjects.add(new S3StreamObject(2L, STREAM0, 150L, 200L, S3StreamConstant.INVALID_TS)); + streamImage = new S3StreamMetadataImage(STREAM0, 3L, StreamState.OPENED, 10L, ranges, streamObjects); streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), DeltaMap.of(BROKER0, NodeS3StreamSetObjectMetadataImage.EMPTY)); image2 = new MetadataImage(new MetadataProvenance(2, 2, 2), null, null, null, null, null, null, null, streamsImage, objectsImage, null, null); @@ -162,7 +159,7 @@ public void testFetch() throws Exception { result = this.manager.fetch(STREAM0, 20L, 100L, 5); inRangeObjects = result.get(); assertEquals(STREAM0, inRangeObjects.streamId()); - assertEquals(20L, inRangeObjects.startOffset()); + assertEquals(10L, inRangeObjects.startOffset()); assertEquals(100L, inRangeObjects.endOffset()); assertEquals(1, inRangeObjects.objects().size()); assertEquals(0L, inRangeObjects.objects().get(0).objectId()); diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java index 7e9179dd02..048f6054bc 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java @@ -161,7 +161,6 @@ void replay(ApiMessage message) { case ASSIGNED_S3_OBJECT_ID_RECORD: case NODE_WALMETADATA_RECORD: case REMOVE_NODE_WALMETADATA_RECORD: - case ADVANCE_RANGE_RECORD: case KVRECORD: case REMOVE_KVRECORD: case UPDATE_NEXT_NODE_ID_RECORD: diff --git a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java index 2b43a03805..1b35166232 100644 --- a/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java +++ b/metadata/src/main/java/org/apache/kafka/controller/stream/StreamControlManager.java @@ -249,6 +249,14 @@ public ControllerResult openStream(int nodeId, long nodeEpoc // means that the new range is not the first range in stream, get the last range's end offset RangeMetadata lastRangeMetadata = streamMetadata.ranges().get(streamMetadata.currentRangeIndex()); startOffset = lastRangeMetadata.endOffset(); + // the RangeMetadata in S3StreamMetadataImage is only update when create, rollToNext and trim + records.add(new ApiMessageAndVersion(new RangeRecord() + .setStreamId(streamId) + .setNodeId(lastRangeMetadata.nodeId()) + .setStartOffset(lastRangeMetadata.startOffset()) + .setEndOffset(lastRangeMetadata.endOffset()) + .setEpoch(lastRangeMetadata.epoch()) + .setRangeIndex(lastRangeMetadata.rangeIndex()), (short) 0)); } // range create record records.add(new ApiMessageAndVersion(new RangeRecord() diff --git a/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java b/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java index c41c76e1db..feb80db955 100644 --- a/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java +++ b/metadata/src/main/java/org/apache/kafka/image/DeltaMap.java @@ -209,6 +209,6 @@ public boolean equals(Object o) { @Override public int hashCode() { - return super.hashCode(); + return compact().hashCode(); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java index 9ebe3aeedc..3314dfb59e 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataDelta.java @@ -17,11 +17,7 @@ package org.apache.kafka.image; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; -import org.apache.kafka.common.metadata.AdvanceRangeRecord; +import com.automq.stream.s3.metadata.StreamState; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; @@ -29,7 +25,15 @@ import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; -import com.automq.stream.s3.metadata.StreamState; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Set; +import java.util.TreeMap; public class S3StreamMetadataDelta { @@ -39,7 +43,6 @@ public class S3StreamMetadataDelta { private long newStartOffset; private long newEpoch; private StreamState currentState; - private int currentRangeIndex; private final Map changedRanges = new HashMap<>(); private final Set removedRanges = new HashSet<>(); @@ -52,7 +55,6 @@ public S3StreamMetadataDelta(S3StreamMetadataImage image) { this.streamId = image.getStreamId(); this.newStartOffset = image.getStartOffset(); this.currentState = image.state(); - this.currentRangeIndex = image.rangeIndex(); } public void replay(S3StreamRecord record) { @@ -60,7 +62,6 @@ public void replay(S3StreamRecord record) { this.newEpoch = record.epoch(); this.newStartOffset = record.startOffset(); this.currentState = StreamState.fromByte(record.streamState()); - this.currentRangeIndex = record.rangeIndex(); } public void replay(RangeRecord record) { @@ -87,40 +88,29 @@ public void replay(RemoveS3StreamObjectRecord record) { changedS3StreamObjects.remove(record.objectId()); } - public void replay(AdvanceRangeRecord record) { - long startOffset = record.startOffset(); - long newEndOffset = record.endOffset(); - // check current range - RangeMetadata metadata = this.changedRanges.get(currentRangeIndex); - if (metadata == null) { - metadata = this.image.getRanges().get(currentRangeIndex); - } - if (metadata == null) { - // ignore it - return; - } - if (startOffset != metadata.endOffset()) { - // ignore it - return; + public S3StreamMetadataImage apply() { + List newRanges; + if (changedRanges.isEmpty() && removedRanges.isEmpty()) { + newRanges = image.getRanges(); + } else { + NavigableMap ranges = new TreeMap<>(); + image.getRanges().forEach(range -> ranges.put(range.rangeIndex(), range)); + // add all new changed ranges + ranges.putAll(changedRanges); + // remove all removed ranges + removedRanges.forEach(ranges::remove); + newRanges = new ArrayList<>(ranges.values()); } - // update the endOffset - this.changedRanges.put(currentRangeIndex, new RangeMetadata( - streamId, metadata.epoch(), metadata.rangeIndex(), metadata.startOffset(), newEndOffset, metadata.nodeId() - )); - } - public S3StreamMetadataImage apply() { - Map newRanges = new HashMap<>(image.getRanges()); - // add all new changed ranges - newRanges.putAll(changedRanges); - // remove all removed ranges - removedRanges.forEach(newRanges::remove); - Map newS3StreamObjects = new HashMap<>(image.getStreamObjects()); - // add all changed stream-objects - newS3StreamObjects.putAll(changedS3StreamObjects); - // remove all removed stream-objects - removedS3StreamObjectIds.forEach(newS3StreamObjects::remove); - return new S3StreamMetadataImage(streamId, newEpoch, currentState, currentRangeIndex, newStartOffset, newRanges, newS3StreamObjects); + DeltaMap newS3StreamObjects; + if (changedS3StreamObjects.isEmpty() && removedS3StreamObjectIds.isEmpty()) { + newS3StreamObjects = image.streamObjectsMap; + } else { + newS3StreamObjects = image.streamObjectsMap.copy(); + newS3StreamObjects.putAll(changedS3StreamObjects); + newS3StreamObjects.removeAll(removedS3StreamObjectIds); + } + return new S3StreamMetadataImage(streamId, newEpoch, currentState, newStartOffset, newRanges, newS3StreamObjects); } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index 54b551f5e0..3e10828f72 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -17,74 +17,159 @@ package org.apache.kafka.image; -import java.util.Map; -import java.util.Objects; -import java.util.stream.Collectors; - import com.automq.stream.s3.metadata.S3StreamConstant; -import com.automq.stream.s3.metadata.StreamOffsetRange; +import com.automq.stream.s3.metadata.StreamState; import org.apache.kafka.common.metadata.S3StreamRecord; -import org.apache.kafka.metadata.stream.RangeMetadata; -import org.apache.kafka.metadata.stream.S3StreamObject; import org.apache.kafka.image.writer.ImageWriter; import org.apache.kafka.image.writer.ImageWriterOptions; -import com.automq.stream.s3.metadata.StreamState; +import org.apache.kafka.metadata.stream.RangeMetadata; +import org.apache.kafka.metadata.stream.S3StreamObject; -public class S3StreamMetadataImage { +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.Objects; +import java.util.TreeMap; +public class S3StreamMetadataImage { public static final S3StreamMetadataImage EMPTY = - new S3StreamMetadataImage(S3StreamConstant.INVALID_STREAM_ID, S3StreamConstant.INIT_EPOCH, StreamState.CLOSED, - S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of()); + new S3StreamMetadataImage(S3StreamConstant.INVALID_STREAM_ID, S3StreamConstant.INIT_EPOCH, StreamState.CLOSED, S3StreamConstant.INIT_START_OFFSET, Collections.emptyList(), Collections.emptyList()); private final long streamId; private final long epoch; - private final int rangeIndex; - private final long startOffset; private final StreamState state; - private final Map ranges; + private final List ranges; - private final Map streamObjects; + final DeltaMap streamObjectsMap; + + private List sortedStreamObjects; + + private NavigableMap streamObjectOffsets; public S3StreamMetadataImage( - long streamId, long epoch, StreamState state, - int rangeIndex, - long startOffset, - Map ranges, - Map streamObjects) { + long streamId, long epoch, StreamState state, + long startOffset, + List ranges, + List sortedStreamObjects) { this.streamId = streamId; this.epoch = epoch; this.state = state; - this.rangeIndex = rangeIndex; this.startOffset = startOffset; this.ranges = ranges; - this.streamObjects = streamObjects; + DeltaMap streamObjectsMap = new DeltaMap<>(); + sortedStreamObjects.forEach(streamObject -> streamObjectsMap.put(streamObject.objectId(), streamObject)); + this.streamObjectsMap = streamObjectsMap; + this.sortedStreamObjects = sortedStreamObjects; + } + + public S3StreamMetadataImage( + long streamId, long epoch, StreamState state, + long startOffset, + List ranges, + DeltaMap streamObjectsMap) { + this.streamId = streamId; + this.epoch = epoch; + this.state = state; + this.startOffset = startOffset; + this.ranges = ranges; + this.streamObjectsMap = streamObjectsMap; } public void write(ImageWriter writer, ImageWriterOptions options) { writer.write(0, new S3StreamRecord() - .setStreamId(streamId) - .setRangeIndex(rangeIndex) - .setStreamState(state.toByte()) - .setEpoch(epoch) - .setStartOffset(startOffset)); - ranges.values().forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord())); - streamObjects.values().forEach(streamObject -> writer.write(streamObject.toRecord())); + .setStreamId(streamId) + .setRangeIndex(currentRangeIndex()) + .setStreamState(state.toByte()) + .setEpoch(epoch) + .setStartOffset(startOffset)); + ranges.forEach(rangeMetadata -> writer.write(rangeMetadata.toRecord())); + streamObjectsMap.forEach((id, obj) -> writer.write(obj.toRecord())); } - public Map getRanges() { + public List getRanges() { return ranges; } - public Map getStreamObjects() { + public RangeMetadata lastRange() { + if (ranges.isEmpty()) { + return null; + } + return ranges.get(ranges.size() - 1); + } + + public int currentRangeIndex() { + if (ranges.isEmpty()) { + return S3StreamConstant.INIT_RANGE_INDEX; + } + return lastRange().rangeIndex(); + } + + public int getRangeContainsOffset(long offset) { + return Collections.binarySearch(ranges, offset, (o1, o2) -> { + long startOffset; + long endOffset; + long offset1; + int revert = -1; + RangeMetadata range; + if (o1 instanceof RangeMetadata) { + range = (RangeMetadata) o1; + offset1 = (Long) o2; + revert = 1; + } else { + range = (RangeMetadata) o2; + offset1 = (Long) o1; + } + startOffset = range.startOffset(); + endOffset = range.rangeIndex() == currentRangeIndex() ? Long.MAX_VALUE : range.endOffset(); + + if (endOffset <= offset1) { + return -1 * revert; + } else if (startOffset > offset1) { + return revert; + } else { + return 0; + } + }); + } + + public List getStreamObjects() { + if (sortedStreamObjects != null) { + return sortedStreamObjects; + } + List streamObjects = new ArrayList<>(); + streamObjectsMap.forEach((objectId, streamObject) -> streamObjects.add(streamObject)); + streamObjects.sort(Comparator.comparingLong(S3StreamObject::startOffset)); + this.sortedStreamObjects = streamObjects; return streamObjects; } + public int floorStreamObjectIndex(long offset) { + List sortedStreamObjects = getStreamObjects(); + if (streamObjectOffsets == null) { + // TODO: optimize, get floor index without construct sorted map + NavigableMap streamObjectOffsets = new TreeMap<>(); + for (int i = 0; i < sortedStreamObjects.size(); i++) { + S3StreamObject streamObject = sortedStreamObjects.get(i); + streamObjectOffsets.put(streamObject.streamOffsetRange().startOffset(), i); + } + this.streamObjectOffsets = streamObjectOffsets; + } + Map.Entry entry = streamObjectOffsets.floorEntry(offset); + if (entry == null) { + return -1; + } + return entry.getValue(); + } + public long getEpoch() { return epoch; } @@ -93,11 +178,6 @@ public long getStartOffset() { return startOffset; } - public long getEndOffset() { - RangeMetadata range = ranges.get(rangeIndex); - return range == null ? startOffset : range.endOffset(); - } - public long getStreamId() { return streamId; } @@ -106,18 +186,10 @@ public long startOffset() { return startOffset; } - public int rangeIndex() { - return rangeIndex; - } - public StreamState state() { return state; } - public StreamOffsetRange offsetRange() { - return new StreamOffsetRange(streamId, startOffset, getEndOffset()); - } - @Override public boolean equals(Object o) { if (this == o) { @@ -128,30 +200,27 @@ public boolean equals(Object o) { } S3StreamMetadataImage that = (S3StreamMetadataImage) o; return this.streamId == that.streamId && - this.epoch == that.epoch && - this.state == that.state && - this.rangeIndex == that.rangeIndex && - this.startOffset == that.startOffset && - this.ranges.equals(that.ranges) && - this.streamObjects.equals(that.streamObjects); + this.epoch == that.epoch && + this.state == that.state && + this.startOffset == that.startOffset && + this.ranges.equals(that.ranges) && + this.streamObjectsMap.equals(that.streamObjectsMap); } @Override public int hashCode() { - return Objects.hash(streamId, epoch, state, rangeIndex, startOffset, ranges, streamObjects); + return Objects.hash(streamId, epoch, state, startOffset, ranges); } @Override public String toString() { return "S3StreamMetadataImage{" + - "streamId=" + streamId + - ", epoch=" + epoch + - ", rangeIndex=" + rangeIndex + - ", startOffset=" + startOffset + - ", state=" + state + - ", ranges=" + ranges + - ", streamObjects=" + streamObjects.entrySet().stream(). - map(e -> e.getKey() + ":" + e.getValue()).collect(Collectors.joining(", ")) + - '}'; + "streamId=" + streamId + + ", epoch=" + epoch + + ", startOffset=" + startOffset + + ", state=" + state + + ", ranges=" + ranges + + ", streamObjects=" + streamObjectsMap + + '}'; } } diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java index 02e0ca8e4a..e22992997a 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataDelta.java @@ -17,7 +17,6 @@ package org.apache.kafka.image; -import org.apache.kafka.common.metadata.AdvanceRangeRecord; import org.apache.kafka.common.metadata.AssignedStreamIdRecord; import org.apache.kafka.common.metadata.NodeWALMetadataRecord; import org.apache.kafka.common.metadata.RangeRecord; @@ -29,7 +28,6 @@ import org.apache.kafka.common.metadata.S3StreamObjectRecord; import org.apache.kafka.common.metadata.S3StreamRecord; import org.apache.kafka.common.metadata.S3StreamSetObjectRecord; -import org.apache.kafka.metadata.stream.S3StreamSetObject; import java.util.HashMap; import java.util.HashSet; @@ -94,9 +92,6 @@ public void replay(RemoveRangeRecord record) { public void replay(S3StreamObjectRecord record) { getOrCreateStreamMetadataDelta(record.streamId()).replay(record); - getOrCreateStreamMetadataDelta(record.streamId()).replay(new AdvanceRangeRecord() - .setStartOffset(record.startOffset()) - .setEndOffset(record.endOffset())); } public void replay(RemoveS3StreamObjectRecord record) { @@ -105,12 +100,6 @@ public void replay(RemoveS3StreamObjectRecord record) { public void replay(S3StreamSetObjectRecord record) { getOrCreateNodeStreamMetadataDelta(record.nodeId()).replay(record); - S3StreamSetObject.decode(record.ranges()).forEach(index -> - getOrCreateStreamMetadataDelta(index.streamId()) - .replay( - new AdvanceRangeRecord().setStartOffset(index.startOffset()).setEndOffset(index.endOffset()) - ) - ); } public void replay(RemoveStreamSetObjectRecord record) { diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 64784dbc26..2d8336ee04 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -18,6 +18,7 @@ package org.apache.kafka.image; import com.automq.stream.s3.metadata.S3ObjectMetadata; +import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.metadata.StreamOffsetRange; import com.automq.stream.utils.biniarysearch.AbstractOrderedCollection; import com.automq.stream.utils.biniarysearch.ComparableItem; @@ -35,9 +36,7 @@ import java.util.Comparator; import java.util.LinkedList; import java.util.List; -import java.util.Map; import java.util.Objects; -import java.util.Queue; import java.util.stream.Collectors; public final class S3StreamsMetadataImage { @@ -74,34 +73,71 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { - S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId); - if (streamMetadata == null) { - return InRangeObjects.INVALID; - } - if (startOffset < streamMetadata.startOffset()) { - // start offset mismatch + S3StreamMetadataImage stream = streamsMetadata.get(streamId); + if (stream == null || startOffset < stream.startOffset()) { return InRangeObjects.INVALID; } - List objects = new ArrayList<>(); - long realEndOffset = startOffset; - List rangeSearchers = rangeSearchers(streamId, startOffset, endOffset); - // TODO: if one stream object in multiple ranges, we may get duplicate objects - for (RangeSearcher rangeSearcher : rangeSearchers) { - InRangeObjects inRangeObjects = rangeSearcher.getObjects(limit); - if (inRangeObjects == InRangeObjects.INVALID) { - break; + List objects = new LinkedList<>(); + long nextStartOffset = startOffset; + + int streamObjectIndex = stream.floorStreamObjectIndex(startOffset); + List streamObjects = stream.getStreamObjects(); + + int lastRangeIndex = -1; + List streamSetObjects = null; + int streamSetObjectIndex = 0; + for (; ; ) { + int roundStartObjectSize = objects.size(); + for (; streamObjectIndex != -1 && streamObjectIndex < streamObjects.size(); streamObjectIndex++) { + S3StreamObject streamObject = streamObjects.get(streamObjectIndex); + if (streamObject.startOffset() != nextStartOffset) { + if (!(objects.isEmpty() && streamObject.endOffset() > nextStartOffset)) { + // it's the first object, we only need the stream object contains the startOffset + break; + } + } + objects.add(streamObject.toMetadata()); + nextStartOffset = streamObject.endOffset(); + if (objects.size() >= limit || nextStartOffset >= endOffset) { + return new InRangeObjects(streamId, objects); + } } - if (inRangeObjects.objects().isEmpty()) { - throw new IllegalStateException("[BUG] expect getObjects return objects from " + rangeSearcher); + if (streamSetObjects == null) { + int rangeIndex = stream.getRangeContainsOffset(nextStartOffset); + if (rangeIndex < 0 || lastRangeIndex == rangeIndex) { + break; + } + lastRangeIndex = rangeIndex; + RangeMetadata range = stream.getRanges().get(rangeIndex); + NodeS3StreamSetObjectMetadataImage node = nodeStreamSetObjectMetadata.get(range.nodeId()); + streamSetObjects = node == null ? Collections.emptyList() : node.orderList(); + streamSetObjectIndex = 0; + } + + for (; streamSetObjectIndex < streamSetObjects.size(); streamSetObjectIndex++) { + S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex); + StreamOffsetRange streamOffsetRange = search(streamSetObject.offsetRangeList(), streamId); + if (streamOffsetRange == null || streamOffsetRange.endOffset() <= nextStartOffset) { + continue; + } + if ((streamOffsetRange.startOffset() == nextStartOffset) + || (objects.isEmpty() && streamOffsetRange.startOffset() < nextStartOffset)) { + objects.add(new S3ObjectMetadata(streamSetObject.objectId(), S3ObjectType.STREAM_SET, List.of(streamOffsetRange), + streamSetObject.dataTimeInMs())); + nextStartOffset = streamOffsetRange.endOffset(); + if (objects.size() >= limit || nextStartOffset >= endOffset) { + return new InRangeObjects(streamId, objects); + } + } else { + break; + } } - realEndOffset = inRangeObjects.endOffset(); - objects.addAll(inRangeObjects.objects()); - limit -= inRangeObjects.objects().size(); - if (limit <= 0 || realEndOffset >= endOffset) { - break; + if (streamSetObjectIndex >= streamSetObjects.size() || objects.size() == roundStartObjectSize) { + // move to the next range + streamSetObjects = null; } } - return new InRangeObjects(streamId, startOffset, realEndOffset, objects); + return new InRangeObjects(streamId, objects); } /** @@ -121,11 +157,11 @@ public List getStreamObjects(long streamId, long startOffset, lo if (stream == null) { throw new IllegalArgumentException("stream not found"); } - Map streamObjectsMetadata = stream.getStreamObjects(); + List streamObjectsMetadata = stream.getStreamObjects(); if (streamObjectsMetadata == null || streamObjectsMetadata.isEmpty()) { return Collections.emptyList(); } - return streamObjectsMetadata.values().stream().filter(obj -> { + return streamObjectsMetadata.stream().filter(obj -> { long objectStartOffset = obj.streamOffsetRange().startOffset(); long objectEndOffset = obj.streamOffsetRange().endOffset(); return objectStartOffset < endOffset && objectEndOffset > startOffset; @@ -140,162 +176,6 @@ public List getStreamSetObjects(int nodeId) { return wal.orderList(); } - private List rangeSearchers(long streamId, long startOffset, long endOffset) { - S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId); - List rangeSearchers = new ArrayList<>(); - // TODO: refactor to make ranges in order - List ranges = streamMetadata.getRanges().values().stream().sorted(Comparator.comparingInt(RangeMetadata::rangeIndex)).collect(Collectors.toList()); - for (RangeMetadata range : ranges) { - if (range.endOffset() <= startOffset) { - continue; - } - if (range.startOffset() >= endOffset) { - break; - } - long searchEndOffset = Math.min(range.endOffset(), endOffset); - long searchStartOffset = Math.max(range.startOffset(), startOffset); - if (searchStartOffset == searchEndOffset) { - continue; - } - rangeSearchers.add(new RangeSearcher(searchStartOffset, searchEndOffset, streamId, range.nodeId())); - } - return rangeSearchers; - } - - class RangeSearcher { - - private final long startOffset; - private final long endOffset; - private final long streamId; - private final int nodeId; - - public RangeSearcher(long startOffset, long endOffset, long streamId, int nodeId) { - this.startOffset = startOffset; - this.endOffset = endOffset; - this.streamId = streamId; - this.nodeId = nodeId; - } - - private Queue rangeOfStreamSetObjects() { - NodeS3StreamSetObjectMetadataImage streamSetObjectImage = nodeStreamSetObjectMetadata.get(nodeId); - List streamSetObjects = streamSetObjectImage.orderList(); - Queue s3ObjectMetadataList = new LinkedList<>(); - for (S3StreamSetObject obj : streamSetObjects) { - // TODO: cache the stream offset ranges to accelerate the search - // TODO: cache the last search index, to accelerate the search - List ranges = obj.offsetRangeList(); - int index = new StreamOffsetRanges(ranges).search(streamId); - if (index < 0) { - continue; - } - StreamOffsetRange range = ranges.get(index); - if (range.startOffset() >= endOffset || range.endOffset() < startOffset) { - continue; - } - S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata( - obj.objectId(), obj.objectType(), ranges, obj.dataTimeInMs(), - obj.orderId()); - s3ObjectMetadataList.add(new S3ObjectMetadataWrapper(s3ObjectMetadata, range.startOffset(), range.endOffset())); - if (range.endOffset() >= endOffset) { - break; - } - } - return s3ObjectMetadataList; - } - - private Queue rangeOfStreamObjects() { - S3StreamMetadataImage stream = streamsMetadata.get(streamId); - Map streamObjectsMetadata = stream.getStreamObjects(); - // TODO: refactor to make stream objects in order - if (streamObjectsMetadata != null && !streamObjectsMetadata.isEmpty()) { - return streamObjectsMetadata.values().stream().filter(obj -> { - long objectStartOffset = obj.streamOffsetRange().startOffset(); - long objectEndOffset = obj.streamOffsetRange().endOffset(); - return objectStartOffset < endOffset && objectEndOffset > startOffset; - }).sorted(Comparator.comparing(S3StreamObject::streamOffsetRange)).map(obj -> { - long startOffset = obj.streamOffsetRange().startOffset(); - long endOffset = obj.streamOffsetRange().endOffset(); - S3ObjectMetadata s3ObjectMetadata = new S3ObjectMetadata( - obj.objectId(), obj.objectType(), List.of(obj.streamOffsetRange()), obj.dataTimeInMs()); - return new S3ObjectMetadataWrapper(s3ObjectMetadata, startOffset, endOffset); - }).collect(Collectors.toCollection(LinkedList::new)); - } - return new LinkedList<>(); - } - - public InRangeObjects getObjects(int limit) { - if (limit <= 0) { - return InRangeObjects.INVALID; - } - if (!nodeStreamSetObjectMetadata.containsKey(nodeId) || !streamsMetadata.containsKey(streamId)) { - return InRangeObjects.INVALID; - } - - Queue streamObjects = rangeOfStreamObjects(); - Queue streamSetObjects = rangeOfStreamSetObjects(); - List inRangeObjects = new ArrayList<>(); - long nextStartOffset = startOffset; - - while (limit > 0 - && nextStartOffset < endOffset - && (!streamObjects.isEmpty() || !streamSetObjects.isEmpty())) { - S3ObjectMetadataWrapper streamRange = null; - if (streamSetObjects.isEmpty() || (!streamObjects.isEmpty() && streamObjects.peek().startOffset() < streamSetObjects.peek().startOffset())) { - streamRange = streamObjects.poll(); - } else { - streamRange = streamSetObjects.poll(); - } - long objectStartOffset = streamRange.startOffset(); - long objectEndOffset = streamRange.endOffset(); - if (objectStartOffset > nextStartOffset) { - break; - } - if (objectEndOffset <= nextStartOffset) { - continue; - } - inRangeObjects.add(streamRange.metadata); - limit--; - nextStartOffset = objectEndOffset; - } - return new InRangeObjects(streamId, startOffset, nextStartOffset, inRangeObjects); - } - - @Override - public String toString() { - return "RangeSearcher{" + - "startOffset=" + startOffset + - ", endOffset=" + endOffset + - ", streamId=" + streamId + - ", nodeId=" + nodeId + - '}'; - } - } - - static class S3ObjectMetadataWrapper { - - private final S3ObjectMetadata metadata; - private final long startOffset; - private final long endOffset; - - public S3ObjectMetadataWrapper(S3ObjectMetadata metadata, long startOffset, long endOffset) { - this.metadata = metadata; - this.startOffset = startOffset; - this.endOffset = endOffset; - } - - public S3ObjectMetadata metadata() { - return metadata; - } - - public long startOffset() { - return startOffset; - } - - public long endOffset() { - return endOffset; - } - } - @Override public boolean equals(Object obj) { if (this == obj) { @@ -323,15 +203,6 @@ public DeltaMap streamsMetadata() { return streamsMetadata; } - public StreamOffsetRange offsetRange(long streamId) { - S3StreamMetadataImage streamMetadata = streamsMetadata.get(streamId); - if (streamMetadata == null) { - return StreamOffsetRange.INVALID; - } - return streamMetadata.offsetRange(); - } - - public long nextAssignedStreamId() { return nextAssignedStreamId; } @@ -341,6 +212,14 @@ public String toString() { return "S3StreamsMetadataImage{nextAssignedStreamId=" + nextAssignedStreamId + '}'; } + public static StreamOffsetRange search(List ranges, long streamId) { + int index = new StreamOffsetRanges(ranges).search(streamId); + if (index < 0) { + return null; + } + return ranges.get(index); + } + static class StreamOffsetRanges extends AbstractOrderedCollection { private final List ranges; diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java index 973e94df60..4629a1529f 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/InRangeObjects.java @@ -24,18 +24,18 @@ public class InRangeObjects { - public static final InRangeObjects INVALID = new InRangeObjects(-1, -1, -1, List.of()); + public static final InRangeObjects INVALID = new InRangeObjects(-1, List.of()); private final long streamId; private final long startOffset; private final long endOffset; private final List objects; - public InRangeObjects(long streamId, long startOffset, long endOffset, List objects) { + public InRangeObjects(long streamId, List objects) { this.streamId = streamId; - this.startOffset = startOffset; - this.endOffset = endOffset; this.objects = objects; + this.startOffset = objects.isEmpty() ? -1L : objects.get(0).startOffset(); + this.endOffset = objects.isEmpty() ? -1L : objects.get(objects.size() - 1).endOffset(); } public long streamId() { @@ -57,11 +57,11 @@ public List objects() { @Override public String toString() { return "InRangeObjects{" + - "streamId=" + streamId + - ", startOffset=" + startOffset + - ", endOffset=" + endOffset + - ", objects=" + objects + - '}'; + "streamId=" + streamId + + ", startOffset=" + startOffset + + ", endOffset=" + endOffset + + ", objects=" + objects + + '}'; } @Override @@ -74,9 +74,9 @@ public boolean equals(Object o) { } InRangeObjects that = (InRangeObjects) o; return streamId == that.streamId - && startOffset == that.startOffset - && endOffset == that.endOffset - && objects.equals(that.objects); + && startOffset == that.startOffset + && endOffset == that.endOffset + && objects.equals(that.objects); } @Override diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java index 012c43d000..b4902c04de 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamObject.java @@ -17,8 +17,10 @@ package org.apache.kafka.metadata.stream; +import java.util.List; import java.util.Objects; +import com.automq.stream.s3.metadata.S3ObjectMetadata; import com.automq.stream.s3.metadata.S3ObjectType; import com.automq.stream.s3.metadata.StreamOffsetRange; import org.apache.kafka.common.metadata.S3StreamObjectRecord; @@ -28,16 +30,32 @@ public class S3StreamObject { private final long objectId; private final long dataTimeInMs; - private final StreamOffsetRange streamOffsetRange; + private final long streamId; + private final long startOffset; + private final long endOffset; public S3StreamObject(long objectId, long streamId, long startOffset, long endOffset, long dataTimeInMs) { this.objectId = objectId; - this.streamOffsetRange = new StreamOffsetRange(streamId, startOffset, endOffset); + this.streamId = streamId; + this.startOffset = startOffset; + this.endOffset = endOffset; this.dataTimeInMs = dataTimeInMs; } public StreamOffsetRange streamOffsetRange() { - return streamOffsetRange; + return new StreamOffsetRange(streamId, startOffset, endOffset); + } + + public long streamId() { + return streamId; + } + + public long startOffset() { + return startOffset; + } + + public long endOffset() { + return endOffset; } public long objectId() { @@ -55,12 +73,16 @@ public long dataTimeInMs() { public ApiMessageAndVersion toRecord() { return new ApiMessageAndVersion(new S3StreamObjectRecord() .setObjectId(objectId) - .setStreamId(streamOffsetRange.streamId()) - .setStartOffset(streamOffsetRange.startOffset()) - .setEndOffset(streamOffsetRange.endOffset()) + .setStreamId(streamId) + .setStartOffset(startOffset) + .setEndOffset(endOffset) .setDataTimeInMs(dataTimeInMs), (short) 0); } + public S3ObjectMetadata toMetadata() { + return new S3ObjectMetadata(objectId, S3ObjectType.STREAM, List.of(streamOffsetRange()), dataTimeInMs); + } + public static S3StreamObject of(S3StreamObjectRecord record) { S3StreamObject s3StreamObject = new S3StreamObject( record.objectId(), record.streamId(), diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java index a3dba0547b..655bd2baae 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java @@ -21,17 +21,27 @@ import com.automq.stream.s3.metadata.S3StreamConstant; import com.automq.stream.s3.metadata.StreamOffsetRange; import com.github.luben.zstd.Zstd; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.Weigher; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import org.apache.kafka.common.metadata.S3StreamSetObjectRecord; import org.apache.kafka.server.common.ApiMessageAndVersion; +import java.time.Duration; import java.util.ArrayList; import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.concurrent.ExecutionException; public class S3StreamSetObject implements Comparable { + private static final Cache> RANGES_CACHE = CacheBuilder.newBuilder() + .expireAfterAccess(Duration.ofMinutes(1)) + .maximumWeight(500000) // expected max heap occupied size is 15MiB + .weigher((Weigher>) (key, value) -> value.size()) + .build(); public static final byte MAGIC = 0x01; public static final byte ZSTD_COMPRESSED = 1 << 1; private static final int COMPRESSION_THRESHOLD = 50; @@ -51,11 +61,11 @@ public class S3StreamSetObject implements Comparable { // Only used for testing public S3StreamSetObject(long objectId, int nodeId, final List streamOffsetRanges, long orderId) { - this(objectId, nodeId, sortAndEncode(streamOffsetRanges), orderId, S3StreamConstant.INVALID_TS); + this(objectId, nodeId, sortAndEncode(objectId, streamOffsetRanges), orderId, S3StreamConstant.INVALID_TS); } public S3StreamSetObject(long objectId, int nodeId, final List streamOffsetRanges, long orderId, long dateTimeInMs) { - this(objectId, nodeId, sortAndEncode(streamOffsetRanges), orderId, dateTimeInMs); + this(objectId, nodeId, sortAndEncode(objectId, streamOffsetRanges), orderId, dateTimeInMs); } public S3StreamSetObject(long objectId, int nodeId, byte[] ranges, long orderId, long dataTimeInMs) { @@ -67,7 +77,11 @@ public S3StreamSetObject(long objectId, int nodeId, byte[] ranges, long orderId, } public List offsetRangeList() { - return decode(ranges); + try { + return RANGES_CACHE.get(objectId, () -> decode(ranges)); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } } public ApiMessageAndVersion toRecord() { @@ -136,9 +150,10 @@ public int compareTo(S3StreamSetObject o) { return Long.compare(this.orderId, o.orderId); } - public static byte[] sortAndEncode(List streamOffsetRanges) { + public static byte[] sortAndEncode(long objectId, List streamOffsetRanges) { streamOffsetRanges = new ArrayList<>(streamOffsetRanges); streamOffsetRanges.sort(Comparator.comparingLong(StreamOffsetRange::streamId)); + RANGES_CACHE.put(objectId, streamOffsetRanges); return encode(streamOffsetRanges); } diff --git a/metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json b/metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json deleted file mode 100644 index f9e50cd59a..0000000000 --- a/metadata/src/main/resources/common/metadata/AdvanceRangeRecord.json +++ /dev/null @@ -1,36 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one or more -// contributor license agreements. See the NOTICE file distributed with -// this work for additional information regarding copyright ownership. -// The ASF licenses this file to You under the Apache License, Version 2.0 -// (the "License"); you may not use this file except in compliance with -// the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -{ - "apiKey": 515, - "type": "metadata", - "name": "AdvanceRangeRecord", - "validVersions": "0", - "flexibleVersions": "0+", - "fields": [ - { - "name": "StartOffset", - "type": "int64", - "versions": "0+", - "about": "The start offset of the range" - }, - { - "name": "EndOffset", - "type": "int64", - "versions": "0+", - "about": "The end offset of the range" - } - ] -} \ No newline at end of file diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java index a6ec2f9e94..008978e821 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamMetadataImageTest.java @@ -17,14 +17,8 @@ package org.apache.kafka.image; -import static org.junit.jupiter.api.Assertions.assertEquals; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - import com.automq.stream.s3.metadata.S3StreamConstant; -import org.apache.kafka.common.metadata.AdvanceRangeRecord; +import com.automq.stream.s3.metadata.StreamState; import org.apache.kafka.common.metadata.RangeRecord; import org.apache.kafka.common.metadata.RemoveRangeRecord; import org.apache.kafka.common.metadata.RemoveS3StreamObjectRecord; @@ -35,12 +29,17 @@ import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.metadata.stream.RangeMetadata; import org.apache.kafka.metadata.stream.S3StreamObject; -import com.automq.stream.s3.metadata.StreamState; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.jupiter.api.Assertions.assertEquals; + @Timeout(value = 40) @Tag("S3Unit") public class S3StreamMetadataImageTest { @@ -58,16 +57,16 @@ public void testRanges() { S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0); // 1. create stream0 delta0Records.add(new ApiMessageAndVersion(new S3StreamRecord() - .setStreamId(STREAM0) - .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX) - .setStreamState(StreamState.OPENED.toByte()) - .setStartOffset(S3StreamConstant.INIT_START_OFFSET) - .setEpoch(0L) - .setStartOffset(0L), (short) 0)); + .setStreamId(STREAM0) + .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX) + .setStreamState(StreamState.OPENED.toByte()) + .setStartOffset(S3StreamConstant.INIT_START_OFFSET) + .setEpoch(0L) + .setStartOffset(0L), (short) 0)); RecordTestUtils.replayAll(delta0, delta0Records); // verify delta and check image's write S3StreamMetadataImage image1 = new S3StreamMetadataImage( - STREAM0, 0L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, Map.of(), Map.of()); + STREAM0, 0L, StreamState.OPENED, S3StreamConstant.INIT_START_OFFSET, Collections.emptyList(), Collections.emptyList()); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -76,22 +75,22 @@ public void testRanges() { List delta1Records = new ArrayList<>(); S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1); delta1Records.add(new ApiMessageAndVersion(new S3StreamRecord() - .setStreamId(STREAM0) - .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX) - .setStreamState(StreamState.OPENED.toByte()) - .setStartOffset(S3StreamConstant.INIT_START_OFFSET) - .setEpoch(1L), (short) 0)); + .setStreamId(STREAM0) + .setRangeIndex(S3StreamConstant.INIT_RANGE_INDEX) + .setStreamState(StreamState.OPENED.toByte()) + .setStartOffset(S3StreamConstant.INIT_START_OFFSET) + .setEpoch(1L), (short) 0)); delta1Records.add(new ApiMessageAndVersion(new RangeRecord() - .setStreamId(STREAM0) - .setRangeIndex(0) - .setEpoch(1L) - .setNodeId(BROKER0) - .setStartOffset(0L), (short) 0)); + .setStreamId(STREAM0) + .setRangeIndex(0) + .setEpoch(1L) + .setNodeId(BROKER0) + .setStartOffset(0L), (short) 0)); RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write S3StreamMetadataImage image2 = new S3StreamMetadataImage( - STREAM0, 1L, StreamState.OPENED, S3StreamConstant.INIT_RANGE_INDEX, S3StreamConstant.INIT_START_OFFSET, - Map.of(0, new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Map.of()); + STREAM0, 1L, StreamState.OPENED, S3StreamConstant.INIT_START_OFFSET, + List.of(new RangeMetadata(STREAM0, 1L, 0, 0L, 0L, BROKER0)), Collections.emptyList()); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); @@ -99,27 +98,24 @@ public void testRanges() { List delta2Records = new ArrayList<>(); S3StreamMetadataDelta delta2 = new S3StreamMetadataDelta(image2); delta2Records.add(new ApiMessageAndVersion(new S3StreamRecord() - .setStreamId(STREAM0) - .setRangeIndex(0) - .setStreamState(StreamState.OPENED.toByte()) - .setStartOffset(S3StreamConstant.INIT_START_OFFSET) - .setEpoch(2L), (short) 0)); - delta2Records.add(new ApiMessageAndVersion(new AdvanceRangeRecord() - .setStartOffset(0L) - .setEndOffset(100L), (short) 0)); + .setStreamId(STREAM0) + .setRangeIndex(0) + .setStreamState(StreamState.OPENED.toByte()) + .setStartOffset(S3StreamConstant.INIT_START_OFFSET) + .setEpoch(2L), (short) 0)); delta2Records.add(new ApiMessageAndVersion(new RangeRecord() - .setStreamId(STREAM0) - .setRangeIndex(1) - .setEpoch(2L) - .setNodeId(BROKER1) - .setStartOffset(100L) - .setEndOffset(100L), (short) 0)); + .setStreamId(STREAM0) + .setRangeIndex(1) + .setEpoch(2L) + .setNodeId(BROKER1) + .setStartOffset(100L) + .setEndOffset(100L), (short) 0)); RecordTestUtils.replayAll(delta2, delta2Records); // verify delta and check image's write S3StreamMetadataImage image3 = new S3StreamMetadataImage( - STREAM0, 2L, StreamState.OPENED, 0, 0L, Map.of( - 0, new RangeMetadata(STREAM0, 1L, 0, 0, 100, BROKER0), - 1, new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Map.of()); + STREAM0, 2L, StreamState.OPENED, 0L, List.of( + new RangeMetadata(STREAM0, 1L, 0, 0, 0, BROKER0), + new RangeMetadata(STREAM0, 2L, 1, 100, 100, BROKER1)), Collections.emptyList()); assertEquals(image3, delta2.apply()); testToImageAndBack(image3); @@ -127,45 +123,45 @@ public void testRanges() { List delta3Records = new ArrayList<>(); S3StreamMetadataDelta delta3 = new S3StreamMetadataDelta(image3); delta3Records.add(new ApiMessageAndVersion(new S3StreamRecord() - .setStreamId(STREAM0) - .setEpoch(2L) - .setRangeIndex(0) - .setStreamState(StreamState.OPENED.toByte()) - .setStartOffset(100L), (short) 0)); + .setStreamId(STREAM0) + .setEpoch(2L) + .setRangeIndex(0) + .setStreamState(StreamState.OPENED.toByte()) + .setStartOffset(100L), (short) 0)); delta3Records.add(new ApiMessageAndVersion(new RemoveRangeRecord() - .setStreamId(STREAM0) - .setRangeIndex(0), (short) 0)); + .setStreamId(STREAM0) + .setRangeIndex(0), (short) 0)); RecordTestUtils.replayAll(delta3, delta3Records); // verify delta and check image's write S3StreamMetadataImage image4 = new S3StreamMetadataImage( - STREAM0, 2L, StreamState.OPENED, 0, 100L, Map.of( - 1, new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Map.of()); + STREAM0, 2L, StreamState.OPENED, 100L, List.of( + new RangeMetadata(STREAM0, 2L, 1, 100L, 100L, BROKER1)), Collections.emptyList()); assertEquals(image4, delta3.apply()); } @Test public void testStreamObjects() { S3StreamMetadataImage image0 = new S3StreamMetadataImage( - STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of()); + STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), Collections.emptyList()); List delta0Records = new ArrayList<>(); S3StreamMetadataDelta delta0 = new S3StreamMetadataDelta(image0); // 1. create streamObject0 and streamObject1 delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord() - .setObjectId(0L) - .setStreamId(STREAM0) - .setStartOffset(0L) - .setEndOffset(100L), (short) 0)); + .setObjectId(0L) + .setStreamId(STREAM0) + .setStartOffset(0L) + .setEndOffset(100L), (short) 0)); delta0Records.add(new ApiMessageAndVersion(new S3StreamObjectRecord() - .setObjectId(1L) - .setStreamId(STREAM0) - .setStartOffset(100L) - .setEndOffset(200L), (short) 0)); + .setObjectId(1L) + .setStreamId(STREAM0) + .setStartOffset(100L) + .setEndOffset(200L), (short) 0)); RecordTestUtils.replayAll(delta0, delta0Records); // verify delta and check image's write S3StreamMetadataImage image1 = new S3StreamMetadataImage( - STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of( - 0L, new S3StreamObject(0L, 999, STREAM0, 0L, 100L), - 1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L))); + STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of( + new S3StreamObject(0L, 999, STREAM0, 0L, 100L), + new S3StreamObject(1L, 999, STREAM0, 100L, 200L))); assertEquals(image1, delta0.apply()); testToImageAndBack(image1); @@ -173,12 +169,12 @@ public void testStreamObjects() { List delta1Records = new ArrayList<>(); S3StreamMetadataDelta delta1 = new S3StreamMetadataDelta(image1); delta1Records.add(new ApiMessageAndVersion(new RemoveS3StreamObjectRecord() - .setObjectId(0L), (short) 0)); + .setObjectId(0L), (short) 0)); RecordTestUtils.replayAll(delta1, delta1Records); // verify delta and check image's write S3StreamMetadataImage image2 = new S3StreamMetadataImage( - STREAM0, 0L, StreamState.OPENED, -1, 0L, Map.of(), Map.of( - 1L, new S3StreamObject(1L, 999, STREAM0, 100L, 200L))); + STREAM0, 0L, StreamState.OPENED, 0L, Collections.emptyList(), List.of( + new S3StreamObject(1L, 999, STREAM0, 100L, 200L))); assertEquals(image2, delta1.apply()); testToImageAndBack(image2); } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 74e656d729..d7845830d4 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -35,7 +35,6 @@ import org.junit.jupiter.api.Timeout; import java.util.List; -import java.util.Map; import java.util.stream.Collectors; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -114,17 +113,17 @@ public void testGetObjects() { broker0Objects); NodeS3StreamSetObjectMetadataImage broker1WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER1, S3StreamConstant.INVALID_BROKER_EPOCH, broker1Objects); - Map ranges = Map.of( - 0, new RangeMetadata(STREAM0, 0L, 0, 10L, 140L, BROKER0), - 1, new RangeMetadata(STREAM0, 1L, 1, 140L, 180L, BROKER1), - 2, new RangeMetadata(STREAM0, 2L, 2, 180L, 420L, BROKER0), - 3, new RangeMetadata(STREAM0, 3L, 3, 420L, 520L, BROKER1), - 4, new RangeMetadata(STREAM0, 4L, 4, 520L, 600L, BROKER0)); - Map streamObjects = Map.of( - 8L, new S3StreamObject(8, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS), - 9L, new S3StreamObject(9, STREAM0, 200L, 300L, S3StreamConstant.INVALID_TS), - 10L, new S3StreamObject(10, STREAM0, 300L, 400L, S3StreamConstant.INVALID_TS)); - S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 4, 10, ranges, streamObjects); + List ranges = List.of( + new RangeMetadata(STREAM0, 0L, 0, 10L, 140L, BROKER0), + new RangeMetadata(STREAM0, 1L, 1, 140L, 180L, BROKER1), + new RangeMetadata(STREAM0, 2L, 2, 180L, 420L, BROKER0), + new RangeMetadata(STREAM0, 3L, 3, 420L, 520L, BROKER1), + new RangeMetadata(STREAM0, 4L, 4, 520L, 600L, BROKER0)); + List streamObjects = List.of( + new S3StreamObject(8, STREAM0, 10L, 100L, S3StreamConstant.INVALID_TS), + new S3StreamObject(9, STREAM0, 200L, 300L, S3StreamConstant.INVALID_TS), + new S3StreamObject(10, STREAM0, 300L, 400L, S3StreamConstant.INVALID_TS)); + S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects); S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), DeltaMap.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage)); @@ -148,14 +147,14 @@ public void testGetObjects() { // 4. search stream_0 in [20, 550) objects = streamsImage.getObjects(STREAM0, 20, 550, Integer.MAX_VALUE); - assertEquals(20, objects.startOffset()); + assertEquals(10, objects.startOffset()); assertEquals(600, objects.endOffset()); assertEquals(11, objects.objects().size()); assertEquals(expectedObjectIds, objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); // 5. search stream_0 in [20, 550) with limit 5 objects = streamsImage.getObjects(STREAM0, 20, 550, 5); - assertEquals(20, objects.startOffset()); + assertEquals(10, objects.startOffset()); assertEquals(180, objects.endOffset()); assertEquals(5, objects.objects().size()); assertEquals(expectedObjectIds.subList(0, 5), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); @@ -169,14 +168,21 @@ public void testGetObjects() { // 7. search stream_0 in [401, 519) objects = streamsImage.getObjects(STREAM0, 401, 519, Integer.MAX_VALUE); - assertEquals(401, objects.startOffset()); + assertEquals(400, objects.startOffset()); assertEquals(520, objects.endOffset()); assertEquals(2, objects.objects().size()); assertEquals(expectedObjectIds.subList(8, 10), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); // 8. search stream_0 in [399, 521) objects = streamsImage.getObjects(STREAM0, 399, 521, Integer.MAX_VALUE); - assertEquals(399, objects.startOffset()); + assertEquals(300, objects.startOffset()); + assertEquals(600, objects.endOffset()); + assertEquals(4, objects.objects().size()); + assertEquals(expectedObjectIds.subList(7, 11), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); + + // 9. search stream0 in [399, 1000) + objects = streamsImage.getObjects(STREAM0, 399, 1000, Integer.MAX_VALUE); + assertEquals(300, objects.startOffset()); assertEquals(600, objects.endOffset()); assertEquals(4, objects.objects().size()); assertEquals(expectedObjectIds.subList(7, 11), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); From d9fbb18dbd26878f55d0be2aab73f824a5285739 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 11 Jan 2024 18:50:33 +0800 Subject: [PATCH 2/2] fix: CR Signed-off-by: Robin Han --- .../kafka/log/stream/s3/metadata/StreamMetadataManager.java | 3 --- .../java/org/apache/kafka/image/S3StreamMetadataImage.java | 5 +++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java index 7f366dc4b9..4ff14c998c 100644 --- a/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java +++ b/core/src/main/scala/kafka/log/stream/s3/metadata/StreamMetadataManager.java @@ -133,9 +133,6 @@ private synchronized CompletableFuture fetch0(long streamId, lon if (rst.objects().size() >= limit || rst.endOffset() >= endOffset || rst == InRangeObjects.INVALID) { return CompletableFuture.completedFuture(rst); } - if (rst.endOffset() >= endOffset || rst.objects().size() >= limit) { - return CompletableFuture.completedFuture(rst); - } LOGGER.info("[FetchObjects],[PENDING],streamId={} startOffset={} endOffset={} limit={}", streamId, startOffset, endOffset, limit); CompletableFuture pendingCf = pendingFetch(); CompletableFuture rstCf = new CompletableFuture<>(); diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java index 3e10828f72..b721365428 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamMetadataImage.java @@ -64,7 +64,7 @@ public S3StreamMetadataImage( this.state = state; this.startOffset = startOffset; this.ranges = ranges; - DeltaMap streamObjectsMap = new DeltaMap<>(); + DeltaMap streamObjectsMap = new DeltaMap<>(new int[]{10}); sortedStreamObjects.forEach(streamObject -> streamObjectsMap.put(streamObject.objectId(), streamObject)); this.streamObjectsMap = streamObjectsMap; this.sortedStreamObjects = sortedStreamObjects; @@ -114,6 +114,7 @@ public int currentRangeIndex() { } public int getRangeContainsOffset(long offset) { + int currentRangeIndex = currentRangeIndex(); return Collections.binarySearch(ranges, offset, (o1, o2) -> { long startOffset; long endOffset; @@ -129,7 +130,7 @@ public int getRangeContainsOffset(long offset) { offset1 = (Long) o1; } startOffset = range.startOffset(); - endOffset = range.rangeIndex() == currentRangeIndex() ? Long.MAX_VALUE : range.endOffset(); + endOffset = range.rangeIndex() == currentRangeIndex ? Long.MAX_VALUE : range.endOffset(); if (endOffset <= offset1) { return -1 * revert;