diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 472f80188c..f2628fe632 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -73,6 +73,9 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { + if (streamId < 0 || startOffset > endOffset || limit < 0) { + return InRangeObjects.INVALID; + } S3StreamMetadataImage stream = streamsMetadata.get(streamId); if (stream == null || startOffset < stream.startOffset()) { return InRangeObjects.INVALID; @@ -80,7 +83,8 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset List objects = new LinkedList<>(); long nextStartOffset = startOffset; - int streamObjectIndex = stream.floorStreamObjectIndex(startOffset); + // floor value < 0 means that all stream objects' ranges are greater than startOffset + int streamObjectIndex = Math.max(0, stream.floorStreamObjectIndex(startOffset)); List streamObjects = stream.getStreamObjects(); int lastRangeIndex = -1; @@ -88,7 +92,9 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset int streamSetObjectIndex = 0; for (; ; ) { int roundStartObjectSize = objects.size(); - for (; streamObjectIndex != -1 && streamObjectIndex < streamObjects.size(); streamObjectIndex++) { + + // try to find consistent stream objects + for (; streamObjectIndex < streamObjects.size(); streamObjectIndex++) { S3StreamObject streamObject = streamObjects.get(streamObjectIndex); if (streamObject.startOffset() != nextStartOffset) { //noinspection StatementWithEmptyBody @@ -108,8 +114,11 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset return new InRangeObjects(streamId, objects); } } + if (streamSetObjects == null) { int rangeIndex = stream.getRangeContainsOffset(nextStartOffset); + // 1. can not find the range containing nextStartOffset, or + // 2. the range is the same as the last one, which means the nextStartOffset does not move on. if (rangeIndex < 0 || lastRangeIndex == rangeIndex) { break; } @@ -123,6 +132,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset for (; streamSetObjectIndex < streamSetObjects.size(); streamSetObjectIndex++) { S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex); StreamOffsetRange streamOffsetRange = search(streamSetObject.offsetRangeList(), streamId); + // skip the stream set object not containing the stream or the range is before the nextStartOffset if (streamOffsetRange == null || streamOffsetRange.endOffset() <= nextStartOffset) { continue; } @@ -135,11 +145,16 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset return new InRangeObjects(streamId, objects); } } else { + // We keep the corresponding object ( with a range startOffset > nextStartOffset) by not changing + // the streamSetObjectIndex. This object may be picked up in the next round. break; } } + // case 1. streamSetObjectIndex >= streamSetObjects.size(), which means we have reached the end of the stream set objects. + // case 2. objects.size() == roundStartObjectSize, which means we have not found any new object in this round. if (streamSetObjectIndex >= streamSetObjects.size() || objects.size() == roundStartObjectSize) { // move to the next range + // This can ensure that we can break the loop. streamSetObjects = null; } } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 4329b44cfb..e9554f55ff 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -193,4 +193,65 @@ public void testGetObjects() { assertEquals(7, objects.objects().size()); assertEquals(expectedObjectIds.subList(1, 8), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); } + + /** + * Test get objects with the first hit object is a stream object. + */ + @Test + public void testGetObjectsWithFirstStreamObject() { + DeltaMap broker0Objects = DeltaMap.of( + 0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 20L, 40L)), 0L)); + NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, + broker0Objects); + List ranges = List.of( + new RangeMetadata(STREAM0, 0L, 0, 10L, 40L, BROKER0), + new RangeMetadata(STREAM0, 2L, 2, 40L, 60L, BROKER0)); + List streamObjects = List.of( + new S3StreamObject(8, STREAM0, 10L, 20L, S3StreamConstant.INVALID_TS), + new S3StreamObject(8, STREAM0, 40L, 60L, S3StreamConstant.INVALID_TS)); + S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects); + S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), + DeltaMap.of(BROKER0, broker0WALMetadataImage)); + + InRangeObjects objects = streamsImage.getObjects(STREAM0, 22L, 55, 4); + assertEquals(2, objects.objects().size()); + assertEquals(20L, objects.startOffset()); + assertEquals(60L, objects.endOffset()); + + objects = streamsImage.getObjects(STREAM0, 22L, 55, 1); + assertEquals(1, objects.objects().size()); + assertEquals(20L, objects.startOffset()); + assertEquals(40L, objects.endOffset()); + } + + + /** + * Test get objects with the first hit object is a stream set object. + */ + @Test + public void testGetObjectsWithFirstStreamSetObject() { + DeltaMap broker0Objects = DeltaMap.of( + 0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 10L, 20L)), 0L), + 1L, new S3StreamSetObject(1, BROKER0, List.of(new StreamOffsetRange(STREAM0, 40L, 60L)), 1L)); + NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, + broker0Objects); + List ranges = List.of( + new RangeMetadata(STREAM0, 0L, 0, 10L, 40L, BROKER0), + new RangeMetadata(STREAM0, 2L, 2, 40L, 60L, BROKER0)); + List streamObjects = List.of( + new S3StreamObject(8, STREAM0, 20L, 40L, S3StreamConstant.INVALID_TS)); + S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects); + S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), + DeltaMap.of(BROKER0, broker0WALMetadataImage)); + + InRangeObjects objects = streamsImage.getObjects(STREAM0, 12L, 30, 4); + assertEquals(2, objects.objects().size()); + assertEquals(10L, objects.startOffset()); + assertEquals(40L, objects.endOffset()); + + objects = streamsImage.getObjects(STREAM0, 12L, 30, 1); + assertEquals(1, objects.objects().size()); + assertEquals(10L, objects.startOffset()); + assertEquals(20L, objects.endOffset()); + } }