diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 2d8336ee04..472f80188c 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -91,8 +91,14 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset for (; streamObjectIndex != -1 && streamObjectIndex < streamObjects.size(); streamObjectIndex++) { S3StreamObject streamObject = streamObjects.get(streamObjectIndex); if (streamObject.startOffset() != nextStartOffset) { - if (!(objects.isEmpty() && streamObject.endOffset() > nextStartOffset)) { - // it's the first object, we only need the stream object contains the startOffset + //noinspection StatementWithEmptyBody + if (objects.isEmpty() && streamObject.startOffset() <= nextStartOffset && streamObject.endOffset() > nextStartOffset) { + // it's the first object, we only need the stream object contains the nextStartOffset + } else if (streamObject.endOffset() <= nextStartOffset) { + // the stream object not match the requirement, move to the next stream object + continue; + } else { + // the streamObject.startOffset() > nextStartOffset break; } } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index d7845830d4..4329b44cfb 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -186,5 +186,11 @@ public void testGetObjects() { assertEquals(600, objects.endOffset()); assertEquals(4, objects.objects().size()); assertEquals(expectedObjectIds.subList(7, 11), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); + + objects = streamsImage.getObjects(STREAM0, 101, 400L, Integer.MAX_VALUE); + assertEquals(100L, objects.startOffset()); + assertEquals(400L, objects.endOffset()); + assertEquals(7, objects.objects().size()); + assertEquals(expectedObjectIds.subList(1, 8), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); } }