From a080174205a08233b9e405486b3ca3fc93d7d8bb Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Thu, 18 Jan 2024 16:35:09 +0800 Subject: [PATCH 1/3] fix: get objects corner cases; add more tests and comments Signed-off-by: Curtis Wan --- .../kafka/image/S3StreamsMetadataImage.java | 112 +++++++++++++++--- .../image/S3StreamsMetadataImageTest.java | 61 ++++++++++ 2 files changed, 154 insertions(+), 19 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 472f80188c..39b7384b53 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -72,15 +72,87 @@ public void write(ImageWriter writer, ImageWriterOptions options) { nodeStreamSetObjectMetadata.forEach((k, v) -> v.write(writer, options)); } + /** + * Get the first object containing targetOffset. + * It will firstly search the stream objects, if not found, it will search the stream set objects. + * It is generally used to find the first object containing the start offset of the stream and get the next + * searching start offset. + * @param streamId stream id + * @param targetOffset target offset in the stream + * @param objects the list to store the first object metadata containing targetOffset + * @return the end offset of the stream object or the end range of the stream set object, -1 if not found + */ + private long findAndAddFirstHitObjectMetadata(long streamId, long targetOffset, List objects) { + S3StreamMetadataImage stream = streamsMetadata.get(streamId); + if (stream == null || targetOffset < stream.startOffset()) { + return -1; + } + + int streamObjectIndex = stream.floorStreamObjectIndex(targetOffset); + if (streamObjectIndex >= 0) { + List streamObjects = stream.getStreamObjects(); + for(int i = streamObjectIndex; i < streamObjects.size(); i++) { + S3StreamObject streamObject = streamObjects.get(i); + if (streamObject.startOffset() <= targetOffset && streamObject.endOffset() > targetOffset) { + // 1. find the first stream object containing targetOffset + objects.add(streamObject.toMetadata()); + return streamObject.endOffset(); + } + if (streamObject.startOffset() > targetOffset) { + break; + } + } + } + + // try to find the first object from stream set objects + int rangeIndex = stream.getRangeContainsOffset(targetOffset); + if (rangeIndex < 0 ) { + return -1; + } + RangeMetadata range = stream.getRanges().get(rangeIndex); + NodeS3StreamSetObjectMetadataImage node = nodeStreamSetObjectMetadata.get(range.nodeId()); + List streamSetObjects = node == null ? Collections.emptyList() : node.orderList(); + for (S3StreamSetObject streamSetObject : streamSetObjects) { + StreamOffsetRange streamOffsetRange = search(streamSetObject.offsetRangeList(), streamId); + if (streamOffsetRange == null || streamOffsetRange.endOffset() <= targetOffset) { + continue; + } + // 2. find the first stream set object containing targetOffset + if (streamOffsetRange.startOffset() <= targetOffset) { + objects.add(new S3ObjectMetadata(streamSetObject.objectId(), S3ObjectType.STREAM_SET, List.of(streamOffsetRange), + streamSetObject.dataTimeInMs())); + return streamOffsetRange.endOffset(); + } else { + break; + } + } + + // 3. can not find any object containing targetOffset + return -1; + + } + + public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { + if (streamId < 0 || startOffset >= endOffset || limit <=0 ) { + return InRangeObjects.INVALID; + } S3StreamMetadataImage stream = streamsMetadata.get(streamId); if (stream == null || startOffset < stream.startOffset()) { return InRangeObjects.INVALID; } + + // try to find the first object List objects = new LinkedList<>(); - long nextStartOffset = startOffset; + // Note that nextStartOffset is the exact start offset to search now. + long nextStartOffset = findAndAddFirstHitObjectMetadata(streamId, startOffset, objects); + if (nextStartOffset < 0) { + return InRangeObjects.INVALID; + } + if (objects.size() >= limit || nextStartOffset >= endOffset) { + return new InRangeObjects(streamId, objects); + } - int streamObjectIndex = stream.floorStreamObjectIndex(startOffset); List streamObjects = stream.getStreamObjects(); int lastRangeIndex = -1; @@ -88,28 +160,24 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset int streamSetObjectIndex = 0; for (; ; ) { int roundStartObjectSize = objects.size(); - for (; streamObjectIndex != -1 && streamObjectIndex < streamObjects.size(); streamObjectIndex++) { - S3StreamObject streamObject = streamObjects.get(streamObjectIndex); - if (streamObject.startOffset() != nextStartOffset) { - //noinspection StatementWithEmptyBody - if (objects.isEmpty() && streamObject.startOffset() <= nextStartOffset && streamObject.endOffset() > nextStartOffset) { - // it's the first object, we only need the stream object contains the nextStartOffset - } else if (streamObject.endOffset() <= nextStartOffset) { - // the stream object not match the requirement, move to the next stream object - continue; - } else { - // the streamObject.startOffset() > nextStartOffset - break; - } + + // try to find consistent stream objects + for (int i = stream.floorStreamObjectIndex(nextStartOffset); i >= 0 && i < streamObjects.size(); i ++) { + S3StreamObject s3StreamObject = streamObjects.get(i); + if (s3StreamObject.startOffset() != nextStartOffset) { + break; } - objects.add(streamObject.toMetadata()); - nextStartOffset = streamObject.endOffset(); + objects.add(s3StreamObject.toMetadata()); + nextStartOffset = s3StreamObject.endOffset(); if (objects.size() >= limit || nextStartOffset >= endOffset) { return new InRangeObjects(streamId, objects); } } + if (streamSetObjects == null) { int rangeIndex = stream.getRangeContainsOffset(nextStartOffset); + // 1. can not find the range containing nextStartOffset, or + // 2. the range is the same as the last one, which means the nextStartOffset does not move on. if (rangeIndex < 0 || lastRangeIndex == rangeIndex) { break; } @@ -123,11 +191,12 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset for (; streamSetObjectIndex < streamSetObjects.size(); streamSetObjectIndex++) { S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex); StreamOffsetRange streamOffsetRange = search(streamSetObject.offsetRangeList(), streamId); + // skip the stream set object not containing the stream or the range is before the nextStartOffset if (streamOffsetRange == null || streamOffsetRange.endOffset() <= nextStartOffset) { continue; } - if ((streamOffsetRange.startOffset() == nextStartOffset) - || (objects.isEmpty() && streamOffsetRange.startOffset() < nextStartOffset)) { + // find the exact next stream set object + if (streamOffsetRange.startOffset() == nextStartOffset) { objects.add(new S3ObjectMetadata(streamSetObject.objectId(), S3ObjectType.STREAM_SET, List.of(streamOffsetRange), streamSetObject.dataTimeInMs())); nextStartOffset = streamOffsetRange.endOffset(); @@ -135,11 +204,16 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset return new InRangeObjects(streamId, objects); } } else { + // We keep the corresponding object ( with a range startOffset > nextStartOffset) by not changing + // the streamSetObjectIndex. This object may be picked up in the next round. break; } } + // case 1. streamSetObjectIndex >= streamSetObjects.size(), which means we have reached the end of the stream set objects. + // case 2. objects.size() == roundStartObjectSize, which means we have not found any new object in this round. if (streamSetObjectIndex >= streamSetObjects.size() || objects.size() == roundStartObjectSize) { // move to the next range + // This can ensure that we can break the loop. streamSetObjects = null; } } diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index 4329b44cfb..e9554f55ff 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -193,4 +193,65 @@ public void testGetObjects() { assertEquals(7, objects.objects().size()); assertEquals(expectedObjectIds.subList(1, 8), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList())); } + + /** + * Test get objects with the first hit object is a stream object. + */ + @Test + public void testGetObjectsWithFirstStreamObject() { + DeltaMap broker0Objects = DeltaMap.of( + 0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 20L, 40L)), 0L)); + NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, + broker0Objects); + List ranges = List.of( + new RangeMetadata(STREAM0, 0L, 0, 10L, 40L, BROKER0), + new RangeMetadata(STREAM0, 2L, 2, 40L, 60L, BROKER0)); + List streamObjects = List.of( + new S3StreamObject(8, STREAM0, 10L, 20L, S3StreamConstant.INVALID_TS), + new S3StreamObject(8, STREAM0, 40L, 60L, S3StreamConstant.INVALID_TS)); + S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects); + S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), + DeltaMap.of(BROKER0, broker0WALMetadataImage)); + + InRangeObjects objects = streamsImage.getObjects(STREAM0, 22L, 55, 4); + assertEquals(2, objects.objects().size()); + assertEquals(20L, objects.startOffset()); + assertEquals(60L, objects.endOffset()); + + objects = streamsImage.getObjects(STREAM0, 22L, 55, 1); + assertEquals(1, objects.objects().size()); + assertEquals(20L, objects.startOffset()); + assertEquals(40L, objects.endOffset()); + } + + + /** + * Test get objects with the first hit object is a stream set object. + */ + @Test + public void testGetObjectsWithFirstStreamSetObject() { + DeltaMap broker0Objects = DeltaMap.of( + 0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 10L, 20L)), 0L), + 1L, new S3StreamSetObject(1, BROKER0, List.of(new StreamOffsetRange(STREAM0, 40L, 60L)), 1L)); + NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, + broker0Objects); + List ranges = List.of( + new RangeMetadata(STREAM0, 0L, 0, 10L, 40L, BROKER0), + new RangeMetadata(STREAM0, 2L, 2, 40L, 60L, BROKER0)); + List streamObjects = List.of( + new S3StreamObject(8, STREAM0, 20L, 40L, S3StreamConstant.INVALID_TS)); + S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects); + S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage), + DeltaMap.of(BROKER0, broker0WALMetadataImage)); + + InRangeObjects objects = streamsImage.getObjects(STREAM0, 12L, 30, 4); + assertEquals(2, objects.objects().size()); + assertEquals(10L, objects.startOffset()); + assertEquals(40L, objects.endOffset()); + + objects = streamsImage.getObjects(STREAM0, 12L, 30, 1); + assertEquals(1, objects.objects().size()); + assertEquals(10L, objects.startOffset()); + assertEquals(20L, objects.endOffset()); + } } From 839280cdec4ddc25841cdc040fdf378f48ecfaf0 Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Thu, 18 Jan 2024 18:59:00 +0800 Subject: [PATCH 2/3] refactor: make sure streamObjectIndex >= 0 Signed-off-by: Curtis Wan --- .../kafka/image/S3StreamsMetadataImage.java | 101 ++++-------------- 1 file changed, 21 insertions(+), 80 deletions(-) diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 39b7384b53..8e8d1e4ba0 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -72,87 +72,19 @@ public void write(ImageWriter writer, ImageWriterOptions options) { nodeStreamSetObjectMetadata.forEach((k, v) -> v.write(writer, options)); } - /** - * Get the first object containing targetOffset. - * It will firstly search the stream objects, if not found, it will search the stream set objects. - * It is generally used to find the first object containing the start offset of the stream and get the next - * searching start offset. - * @param streamId stream id - * @param targetOffset target offset in the stream - * @param objects the list to store the first object metadata containing targetOffset - * @return the end offset of the stream object or the end range of the stream set object, -1 if not found - */ - private long findAndAddFirstHitObjectMetadata(long streamId, long targetOffset, List objects) { - S3StreamMetadataImage stream = streamsMetadata.get(streamId); - if (stream == null || targetOffset < stream.startOffset()) { - return -1; - } - - int streamObjectIndex = stream.floorStreamObjectIndex(targetOffset); - if (streamObjectIndex >= 0) { - List streamObjects = stream.getStreamObjects(); - for(int i = streamObjectIndex; i < streamObjects.size(); i++) { - S3StreamObject streamObject = streamObjects.get(i); - if (streamObject.startOffset() <= targetOffset && streamObject.endOffset() > targetOffset) { - // 1. find the first stream object containing targetOffset - objects.add(streamObject.toMetadata()); - return streamObject.endOffset(); - } - if (streamObject.startOffset() > targetOffset) { - break; - } - } - } - - // try to find the first object from stream set objects - int rangeIndex = stream.getRangeContainsOffset(targetOffset); - if (rangeIndex < 0 ) { - return -1; - } - RangeMetadata range = stream.getRanges().get(rangeIndex); - NodeS3StreamSetObjectMetadataImage node = nodeStreamSetObjectMetadata.get(range.nodeId()); - List streamSetObjects = node == null ? Collections.emptyList() : node.orderList(); - for (S3StreamSetObject streamSetObject : streamSetObjects) { - StreamOffsetRange streamOffsetRange = search(streamSetObject.offsetRangeList(), streamId); - if (streamOffsetRange == null || streamOffsetRange.endOffset() <= targetOffset) { - continue; - } - // 2. find the first stream set object containing targetOffset - if (streamOffsetRange.startOffset() <= targetOffset) { - objects.add(new S3ObjectMetadata(streamSetObject.objectId(), S3ObjectType.STREAM_SET, List.of(streamOffsetRange), - streamSetObject.dataTimeInMs())); - return streamOffsetRange.endOffset(); - } else { - break; - } - } - - // 3. can not find any object containing targetOffset - return -1; - - } - - public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { - if (streamId < 0 || startOffset >= endOffset || limit <=0 ) { + if (streamId < 0 || startOffset >= endOffset || limit <= 0) { return InRangeObjects.INVALID; } S3StreamMetadataImage stream = streamsMetadata.get(streamId); if (stream == null || startOffset < stream.startOffset()) { return InRangeObjects.INVALID; } - - // try to find the first object List objects = new LinkedList<>(); - // Note that nextStartOffset is the exact start offset to search now. - long nextStartOffset = findAndAddFirstHitObjectMetadata(streamId, startOffset, objects); - if (nextStartOffset < 0) { - return InRangeObjects.INVALID; - } - if (objects.size() >= limit || nextStartOffset >= endOffset) { - return new InRangeObjects(streamId, objects); - } + long nextStartOffset = startOffset; + // floor value < 0 means that all stream objects' ranges are greater than startOffset + int streamObjectIndex = Math.max(0, stream.floorStreamObjectIndex(startOffset)); List streamObjects = stream.getStreamObjects(); int lastRangeIndex = -1; @@ -162,13 +94,22 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset int roundStartObjectSize = objects.size(); // try to find consistent stream objects - for (int i = stream.floorStreamObjectIndex(nextStartOffset); i >= 0 && i < streamObjects.size(); i ++) { - S3StreamObject s3StreamObject = streamObjects.get(i); - if (s3StreamObject.startOffset() != nextStartOffset) { - break; + for (; streamObjectIndex < streamObjects.size(); streamObjectIndex++) { + S3StreamObject streamObject = streamObjects.get(streamObjectIndex); + if (streamObject.startOffset() != nextStartOffset) { + //noinspection StatementWithEmptyBody + if (objects.isEmpty() && streamObject.startOffset() <= nextStartOffset && streamObject.endOffset() > nextStartOffset) { + // it's the first object, we only need the stream object contains the nextStartOffset + } else if (streamObject.endOffset() <= nextStartOffset) { + // the stream object not match the requirement, move to the next stream object + continue; + } else { + // the streamObject.startOffset() > nextStartOffset + break; + } } - objects.add(s3StreamObject.toMetadata()); - nextStartOffset = s3StreamObject.endOffset(); + objects.add(streamObject.toMetadata()); + nextStartOffset = streamObject.endOffset(); if (objects.size() >= limit || nextStartOffset >= endOffset) { return new InRangeObjects(streamId, objects); } @@ -195,8 +136,8 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset if (streamOffsetRange == null || streamOffsetRange.endOffset() <= nextStartOffset) { continue; } - // find the exact next stream set object - if (streamOffsetRange.startOffset() == nextStartOffset) { + if ((streamOffsetRange.startOffset() == nextStartOffset) + || (objects.isEmpty() && streamOffsetRange.startOffset() < nextStartOffset)) { objects.add(new S3ObjectMetadata(streamSetObject.objectId(), S3ObjectType.STREAM_SET, List.of(streamOffsetRange), streamSetObject.dataTimeInMs())); nextStartOffset = streamOffsetRange.endOffset(); From 49ecacd8519699cfb771b0e71d4864ed5cc8524b Mon Sep 17 00:00:00 2001 From: Curtis Wan Date: Thu, 18 Jan 2024 20:28:14 +0800 Subject: [PATCH 3/3] fix: remove equal cases Signed-off-by: Curtis Wan --- .../java/org/apache/kafka/image/S3StreamsMetadataImage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 8e8d1e4ba0..f2628fe632 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -73,7 +73,7 @@ public void write(ImageWriter writer, ImageWriterOptions options) { } public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) { - if (streamId < 0 || startOffset >= endOffset || limit <= 0) { + if (streamId < 0 || startOffset > endOffset || limit < 0) { return InRangeObjects.INVALID; } S3StreamMetadataImage stream = streamsMetadata.get(streamId);