diff --git a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java index 2990e07aa6..4e8ccd93cd 100644 --- a/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java +++ b/metadata/src/main/java/org/apache/kafka/image/S3StreamsMetadataImage.java @@ -151,23 +151,29 @@ void getObjects0(GetObjectsContext ctx) { return; } List objects = new LinkedList<>(); - long nextStartOffset = startOffset; // floor value < 0 means that all stream objects' ranges are greater than startOffset int streamObjectIndex = Math.max(0, stream.floorStreamObjectIndex(startOffset)); final List streamObjects = stream.getStreamObjects(); - final int streamObjectsSize = streamObjects.size(); int lastRangeIndex = -1; - List streamSetObjects = null; int streamSetObjectIndex = 0; - NodeS3StreamSetObjectMetadataImage node = null; + fillObjects(ctx, stream, objects, lastRangeIndex, streamObjectIndex, streamObjects, streamSetObjectIndex, + null, null); + ctx.cf.complete(new InRangeObjects(streamId, objects)); + } + + void fillObjects(GetObjectsContext ctx, S3StreamMetadataImage stream, List objects, int lastRangeIndex, + int streamObjectIndex, List streamObjects, + int streamSetObjectIndex, List streamSetObjects, + NodeS3StreamSetObjectMetadataImage node) { + long nextStartOffset = ctx.startOffset; for (; ; ) { int roundStartObjectSize = objects.size(); // try to find consistent stream objects - for (; streamObjectIndex < streamObjectsSize; streamObjectIndex++) { + for (; streamObjectIndex < streamObjects.size(); streamObjectIndex++) { S3StreamObject streamObject = streamObjects.get(streamObjectIndex); if (streamObject.startOffset() != nextStartOffset) { //noinspection StatementWithEmptyBody @@ -183,8 +189,8 @@ void getObjects0(GetObjectsContext ctx) { } objects.add(streamObject.toMetadata()); nextStartOffset = streamObject.endOffset(); - if (objects.size() >= limit || (endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= endOffset)) { - ctx.cf.complete(new InRangeObjects(streamId, objects)); + if (objects.size() >= ctx.limit || (ctx.endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= ctx.endOffset)) { + ctx.cf.complete(new InRangeObjects(ctx.streamId, objects)); return; } } @@ -201,18 +207,26 @@ void getObjects0(GetObjectsContext ctx) { node = nodeStreamSetObjectMetadata.get(range.nodeId()); if (node != null) { streamSetObjects = node.orderList(); - streamSetObjectIndex = node.floorStreamSetObjectIndex(streamId, nextStartOffset); + streamSetObjectIndex = node.floorStreamSetObjectIndex(ctx.streamId, nextStartOffset); } else { streamSetObjects = Collections.emptyList(); } - } - - final int streamSetObjectsSize = streamSetObjects.size(); - // load stream set object index - if (loadStreamSetObjectInfo(ctx, streamSetObjects, streamSetObjectIndex)) { + // load stream set object index + final int finalLastRangeIndex = lastRangeIndex; + final long finalNextStartOffset = nextStartOffset; + final int finalStreamObjectIndex = streamObjectIndex; + final int finalStreamSetObjectIndex = streamSetObjectIndex; + final List finalStreamSetObjects = streamSetObjects; + final NodeS3StreamSetObjectMetadataImage finalNode = node; + loadStreamSetObjectInfo(ctx, streamSetObjects, streamSetObjectIndex).thenAccept(v -> { + ctx.startOffset = finalNextStartOffset; + fillObjects(ctx, stream, objects, finalLastRangeIndex, finalStreamObjectIndex, streamObjects, + finalStreamSetObjectIndex, finalStreamSetObjects, finalNode); + }); return; } + final int streamSetObjectsSize = streamSetObjects.size(); for (; streamSetObjectIndex < streamSetObjectsSize; streamSetObjectIndex++) { S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex); StreamOffsetRange streamOffsetRange = findStreamInStreamSetObject(ctx, streamSetObject).orElse(null); @@ -223,13 +237,13 @@ void getObjects0(GetObjectsContext ctx) { if ((streamOffsetRange.startOffset() == nextStartOffset) || (objects.isEmpty() && streamOffsetRange.startOffset() < nextStartOffset)) { if (node != null) { - node.recordStreamSetObjectIndex(streamId, nextStartOffset, streamSetObjectIndex); + node.recordStreamSetObjectIndex(ctx.streamId, nextStartOffset, streamSetObjectIndex); } objects.add(new S3ObjectMetadata(streamSetObject.objectId(), S3ObjectType.STREAM_SET, List.of(streamOffsetRange), streamSetObject.dataTimeInMs())); nextStartOffset = streamOffsetRange.endOffset(); - if (objects.size() >= limit || (endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= endOffset)) { - ctx.cf.complete(new InRangeObjects(streamId, objects)); + if (objects.size() >= ctx.limit || (ctx.endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= ctx.endOffset)) { + ctx.cf.complete(new InRangeObjects(ctx.streamId, objects)); return; } } else { @@ -246,7 +260,6 @@ void getObjects0(GetObjectsContext ctx) { streamSetObjects = null; } } - ctx.cf.complete(new InRangeObjects(streamId, objects)); } /** @@ -254,7 +267,7 @@ void getObjects0(GetObjectsContext ctx) { * * @return async load */ - private boolean loadStreamSetObjectInfo(GetObjectsContext ctx, List streamSetObjects, + private CompletableFuture loadStreamSetObjectInfo(GetObjectsContext ctx, List streamSetObjects, int startSearchIndex) { final int streamSetObjectsSize = streamSetObjects.size(); List> loadIndexCfList = new LinkedList<>(); @@ -273,15 +286,13 @@ private boolean loadStreamSetObjectInfo(GetObjectsContext ctx, List getObjects0(ctx)) + return CompletableFuture.allOf(loadIndexCfList.toArray(new CompletableFuture[0])) .exceptionally(ex -> { ctx.cf.completeExceptionally(ex); return null; }); - return true; } private Optional findStreamInStreamSetObject(GetObjectsContext ctx, S3StreamSetObject object) { diff --git a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java index dc038d37c0..d06f78488d 100644 --- a/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java +++ b/metadata/src/main/java/org/apache/kafka/metadata/stream/S3StreamSetObject.java @@ -37,6 +37,7 @@ import java.util.Optional; import java.util.concurrent.ExecutionException; import org.apache.kafka.common.metadata.S3StreamSetObjectRecord; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.automq.AutoMQVersion; @@ -185,6 +186,9 @@ public int compareTo(S3StreamSetObject o) { } public static byte[] sortAndEncode(long objectId, List streamOffsetRanges) { + if (streamOffsetRanges == null || streamOffsetRanges.isEmpty()) { + return Bytes.EMPTY; + } streamOffsetRanges = new ArrayList<>(streamOffsetRanges); streamOffsetRanges.sort(Comparator.comparingLong(StreamOffsetRange::streamId)); RANGES_CACHE.put(objectId, streamOffsetRanges); diff --git a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java index a57da80d32..fd9f02440a 100644 --- a/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java +++ b/metadata/src/test/java/org/apache/kafka/image/S3StreamsMetadataImageTest.java @@ -26,7 +26,9 @@ import com.google.common.collect.Range; import java.util.ArrayList; import java.util.List; +import java.util.Optional; import java.util.Set; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; @@ -54,6 +56,8 @@ import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -74,7 +78,7 @@ public class S3StreamsMetadataImageTest { private static final long MB = 1024 * KB; private static final long GB = 1024 * MB; - private final RangeGetter rangeGetter = (objectId, streamId) -> FutureUtil.failedFuture(new UnsupportedOperationException()); + private final RangeGetter defaultRangeGetter = (objectId, streamId) -> FutureUtil.failedFuture(new UnsupportedOperationException()); static final S3StreamsMetadataImage IMAGE1; @@ -126,19 +130,44 @@ private void testToImageAndBack(S3StreamsMetadataImage image) { assertEquals(image, newImage); } - @Test - public void testGetObjects() throws ExecutionException, InterruptedException { + private RangeGetter buildMemoryRangeGetter() { + return (objectId, streamId) -> { + if (objectId == 0) { + return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 100L, 120L))); + } else if (objectId == 1) { + return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 120L, 140L))); + } else if (objectId == 2) { + return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 180L, 200L))); + } else if (objectId == 3) { + return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 400L, 420L))); + } else if (objectId == 4) { + return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 520L, 600L))); + } else if (objectId == 5) { + return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 140L, 160L))); + } else if (objectId == 6) { + return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 160L, 180L))); + } else if (objectId == 7) { + return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 420L, 520L))); + } else { + return CompletableFuture.completedFuture(Optional.empty()); + } + }; + } + + @SuppressWarnings("NPathComplexity") + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testGetObjects(boolean isHugeCluster) throws ExecutionException, InterruptedException { DeltaMap broker0Objects = DeltaMap.of( - 0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 100L, 120L)), 0L), - 1L, new S3StreamSetObject(1, BROKER0, List.of(new StreamOffsetRange(STREAM0, 120L, 140L)), 1L), - 2L, new S3StreamSetObject(2, BROKER0, List.of(new StreamOffsetRange(STREAM0, 180L, 200L)), 2L), - 3L, new S3StreamSetObject(3, BROKER0, List.of( - new StreamOffsetRange(STREAM0, 400L, 420L)), 3L), - 4L, new S3StreamSetObject(4, BROKER0, List.of(new StreamOffsetRange(STREAM0, 520L, 600L)), 4L)); + 0L, new S3StreamSetObject(0, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 100L, 120L)), 0L), + 1L, new S3StreamSetObject(1, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 120L, 140L)), 1L), + 2L, new S3StreamSetObject(2, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 180L, 200L)), 2L), + 3L, new S3StreamSetObject(3, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 400L, 420L)), 3L), + 4L, new S3StreamSetObject(4, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 520L, 600L)), 4L)); DeltaMap broker1Objects = DeltaMap.of( - 5L, new S3StreamSetObject(5, BROKER1, List.of(new StreamOffsetRange(STREAM0, 140L, 160L)), 0L), - 6L, new S3StreamSetObject(6, BROKER1, List.of(new StreamOffsetRange(STREAM0, 160L, 180L)), 1L), - 7L, new S3StreamSetObject(7, BROKER1, List.of(new StreamOffsetRange(STREAM0, 420L, 520L)), 2L)); + 5L, new S3StreamSetObject(5, BROKER1, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 140L, 160L)), 0L), + 6L, new S3StreamSetObject(6, BROKER1, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 160L, 180L)), 1L), + 7L, new S3StreamSetObject(7, BROKER1, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 420L, 520L)), 2L)); NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH, broker0Objects); NodeS3StreamSetObjectMetadataImage broker1WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER1, S3StreamConstant.INVALID_BROKER_EPOCH, @@ -157,6 +186,7 @@ public void testGetObjects() throws ExecutionException, InterruptedException { S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, RegistryRef.NOOP, DeltaMap.of(STREAM0, streamImage), DeltaMap.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage), new DeltaMap<>(), new DeltaMap<>(), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0)); + RangeGetter rangeGetter = isHugeCluster ? buildMemoryRangeGetter() : defaultRangeGetter; // 1. search stream_1 InRangeObjects objects = streamsImage.getObjects(STREAM1, 10, 100, Integer.MAX_VALUE, rangeGetter).get(); assertEquals(InRangeObjects.INVALID, objects); @@ -249,12 +279,12 @@ public void testGetObjectsWithFirstStreamObject() throws ExecutionException, Int S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, RegistryRef.NOOP, DeltaMap.of(STREAM0, streamImage), DeltaMap.of(BROKER0, broker0WALMetadataImage), new DeltaMap<>(), new DeltaMap<>(), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0)); - InRangeObjects objects = streamsImage.getObjects(STREAM0, 22L, 55, 4, rangeGetter).get(); + InRangeObjects objects = streamsImage.getObjects(STREAM0, 22L, 55, 4, defaultRangeGetter).get(); assertEquals(2, objects.objects().size()); assertEquals(20L, objects.startOffset()); assertEquals(60L, objects.endOffset()); - objects = streamsImage.getObjects(STREAM0, 22L, 55, 1, rangeGetter).get(); + objects = streamsImage.getObjects(STREAM0, 22L, 55, 1, defaultRangeGetter).get(); assertEquals(1, objects.objects().size()); assertEquals(20L, objects.startOffset()); assertEquals(40L, objects.endOffset()); @@ -327,12 +357,12 @@ private S3StreamsMetadataImage generateStreamImage(long streamId, Range st public void testGetObjectsWithFirstStreamSetObject() throws ExecutionException, InterruptedException { S3StreamsMetadataImage streamsImage = createStreamImage(); - InRangeObjects objects = streamsImage.getObjects(STREAM0, 12L, 30, 4, rangeGetter).get(); + InRangeObjects objects = streamsImage.getObjects(STREAM0, 12L, 30, 4, defaultRangeGetter).get(); assertEquals(2, objects.objects().size()); assertEquals(10L, objects.startOffset()); assertEquals(40L, objects.endOffset()); - objects = streamsImage.getObjects(STREAM0, 12L, 30, 1, rangeGetter).get(); + objects = streamsImage.getObjects(STREAM0, 12L, 30, 1, defaultRangeGetter).get(); assertEquals(1, objects.objects().size()); assertEquals(10L, objects.startOffset()); assertEquals(20L, objects.endOffset()); @@ -381,7 +411,7 @@ public Item(long start, long end, int limit) { long end = r.nextLong(start, endOffset); int limit = r.nextInt(3, 17); Item item = new Item(start, end, limit); - result.put(item, streamsImage.getObjects(STREAM0, start, end, limit, rangeGetter).get()); + result.put(item, streamsImage.getObjects(STREAM0, start, end, limit, defaultRangeGetter).get()); } catch (Exception e) { hasException.set(true); } @@ -406,7 +436,7 @@ public Item(long start, long end, int limit) { Item item = entry.getKey(); InRangeObjects objects = entry.getValue(); try { - assertEquals(streamsImage.getObjects(STREAM0, item.start, item.end, item.limit, rangeGetter).get(), objects); + assertEquals(streamsImage.getObjects(STREAM0, item.start, item.end, item.limit, defaultRangeGetter).get(), objects); } catch (Throwable e) { throw new RuntimeException(e); }