Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,23 +151,29 @@ void getObjects0(GetObjectsContext ctx) {
return;
}
List<S3ObjectMetadata> objects = new LinkedList<>();
long nextStartOffset = startOffset;

// floor value < 0 means that all stream objects' ranges are greater than startOffset
int streamObjectIndex = Math.max(0, stream.floorStreamObjectIndex(startOffset));

final List<S3StreamObject> streamObjects = stream.getStreamObjects();
final int streamObjectsSize = streamObjects.size();

int lastRangeIndex = -1;
List<S3StreamSetObject> streamSetObjects = null;
int streamSetObjectIndex = 0;
NodeS3StreamSetObjectMetadataImage node = null;
fillObjects(ctx, stream, objects, lastRangeIndex, streamObjectIndex, streamObjects, streamSetObjectIndex,
null, null);
ctx.cf.complete(new InRangeObjects(streamId, objects));
}

void fillObjects(GetObjectsContext ctx, S3StreamMetadataImage stream, List<S3ObjectMetadata> objects, int lastRangeIndex,
int streamObjectIndex, List<S3StreamObject> streamObjects,
int streamSetObjectIndex, List<S3StreamSetObject> streamSetObjects,
NodeS3StreamSetObjectMetadataImage node) {
long nextStartOffset = ctx.startOffset;
for (; ; ) {
int roundStartObjectSize = objects.size();

// try to find consistent stream objects
for (; streamObjectIndex < streamObjectsSize; streamObjectIndex++) {
for (; streamObjectIndex < streamObjects.size(); streamObjectIndex++) {
S3StreamObject streamObject = streamObjects.get(streamObjectIndex);
if (streamObject.startOffset() != nextStartOffset) {
//noinspection StatementWithEmptyBody
Expand All @@ -183,8 +189,8 @@ void getObjects0(GetObjectsContext ctx) {
}
objects.add(streamObject.toMetadata());
nextStartOffset = streamObject.endOffset();
if (objects.size() >= limit || (endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= endOffset)) {
ctx.cf.complete(new InRangeObjects(streamId, objects));
if (objects.size() >= ctx.limit || (ctx.endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= ctx.endOffset)) {
ctx.cf.complete(new InRangeObjects(ctx.streamId, objects));
return;
}
}
Expand All @@ -201,18 +207,26 @@ void getObjects0(GetObjectsContext ctx) {
node = nodeStreamSetObjectMetadata.get(range.nodeId());
if (node != null) {
streamSetObjects = node.orderList();
streamSetObjectIndex = node.floorStreamSetObjectIndex(streamId, nextStartOffset);
streamSetObjectIndex = node.floorStreamSetObjectIndex(ctx.streamId, nextStartOffset);
} else {
streamSetObjects = Collections.emptyList();
}
}

final int streamSetObjectsSize = streamSetObjects.size();
// load stream set object index
if (loadStreamSetObjectInfo(ctx, streamSetObjects, streamSetObjectIndex)) {
// load stream set object index
final int finalLastRangeIndex = lastRangeIndex;
final long finalNextStartOffset = nextStartOffset;
final int finalStreamObjectIndex = streamObjectIndex;
final int finalStreamSetObjectIndex = streamSetObjectIndex;
final List<S3StreamSetObject> finalStreamSetObjects = streamSetObjects;
final NodeS3StreamSetObjectMetadataImage finalNode = node;
loadStreamSetObjectInfo(ctx, streamSetObjects, streamSetObjectIndex).thenAccept(v -> {
ctx.startOffset = finalNextStartOffset;
fillObjects(ctx, stream, objects, finalLastRangeIndex, finalStreamObjectIndex, streamObjects,
finalStreamSetObjectIndex, finalStreamSetObjects, finalNode);
});
return;
}

final int streamSetObjectsSize = streamSetObjects.size();
for (; streamSetObjectIndex < streamSetObjectsSize; streamSetObjectIndex++) {
S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex);
StreamOffsetRange streamOffsetRange = findStreamInStreamSetObject(ctx, streamSetObject).orElse(null);
Expand All @@ -223,13 +237,13 @@ void getObjects0(GetObjectsContext ctx) {
if ((streamOffsetRange.startOffset() == nextStartOffset)
|| (objects.isEmpty() && streamOffsetRange.startOffset() < nextStartOffset)) {
if (node != null) {
node.recordStreamSetObjectIndex(streamId, nextStartOffset, streamSetObjectIndex);
node.recordStreamSetObjectIndex(ctx.streamId, nextStartOffset, streamSetObjectIndex);
}
objects.add(new S3ObjectMetadata(streamSetObject.objectId(), S3ObjectType.STREAM_SET, List.of(streamOffsetRange),
streamSetObject.dataTimeInMs()));
nextStartOffset = streamOffsetRange.endOffset();
if (objects.size() >= limit || (endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= endOffset)) {
ctx.cf.complete(new InRangeObjects(streamId, objects));
if (objects.size() >= ctx.limit || (ctx.endOffset != ObjectUtils.NOOP_OFFSET && nextStartOffset >= ctx.endOffset)) {
ctx.cf.complete(new InRangeObjects(ctx.streamId, objects));
return;
}
} else {
Expand All @@ -246,15 +260,14 @@ void getObjects0(GetObjectsContext ctx) {
streamSetObjects = null;
}
}
ctx.cf.complete(new InRangeObjects(streamId, objects));
}

/**
* Load the stream set object range info is missing
*
* @return async load
*/
private boolean loadStreamSetObjectInfo(GetObjectsContext ctx, List<S3StreamSetObject> streamSetObjects,
private CompletableFuture<Void> loadStreamSetObjectInfo(GetObjectsContext ctx, List<S3StreamSetObject> streamSetObjects,
int startSearchIndex) {
final int streamSetObjectsSize = streamSetObjects.size();
List<CompletableFuture<Void>> loadIndexCfList = new LinkedList<>();
Expand All @@ -273,15 +286,13 @@ private boolean loadStreamSetObjectInfo(GetObjectsContext ctx, List<S3StreamSetO
);
}
if (loadIndexCfList.isEmpty()) {
return false;
return CompletableFuture.completedFuture(null);
}
CompletableFuture.allOf(loadIndexCfList.toArray(new CompletableFuture[0]))
.thenAccept(nil -> getObjects0(ctx))
return CompletableFuture.allOf(loadIndexCfList.toArray(new CompletableFuture[0]))
.exceptionally(ex -> {
ctx.cf.completeExceptionally(ex);
return null;
});
return true;
}

private Optional<StreamOffsetRange> findStreamInStreamSetObject(GetObjectsContext ctx, S3StreamSetObject object) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.common.metadata.S3StreamSetObjectRecord;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.automq.AutoMQVersion;

Expand Down Expand Up @@ -185,6 +186,9 @@ public int compareTo(S3StreamSetObject o) {
}

public static byte[] sortAndEncode(long objectId, List<StreamOffsetRange> streamOffsetRanges) {
if (streamOffsetRanges == null || streamOffsetRanges.isEmpty()) {
return Bytes.EMPTY;
}
streamOffsetRanges = new ArrayList<>(streamOffsetRanges);
streamOffsetRanges.sort(Comparator.comparingLong(StreamOffsetRange::streamId));
RANGES_CACHE.put(objectId, streamOffsetRanges);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@
import com.google.common.collect.Range;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -54,6 +56,8 @@
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand All @@ -74,7 +78,7 @@ public class S3StreamsMetadataImageTest {
private static final long MB = 1024 * KB;

private static final long GB = 1024 * MB;
private final RangeGetter rangeGetter = (objectId, streamId) -> FutureUtil.failedFuture(new UnsupportedOperationException());
private final RangeGetter defaultRangeGetter = (objectId, streamId) -> FutureUtil.failedFuture(new UnsupportedOperationException());

static final S3StreamsMetadataImage IMAGE1;

Expand Down Expand Up @@ -126,19 +130,44 @@ private void testToImageAndBack(S3StreamsMetadataImage image) {
assertEquals(image, newImage);
}

@Test
public void testGetObjects() throws ExecutionException, InterruptedException {
private RangeGetter buildMemoryRangeGetter() {
return (objectId, streamId) -> {
if (objectId == 0) {
return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 100L, 120L)));
} else if (objectId == 1) {
return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 120L, 140L)));
} else if (objectId == 2) {
return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 180L, 200L)));
} else if (objectId == 3) {
return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 400L, 420L)));
} else if (objectId == 4) {
return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 520L, 600L)));
} else if (objectId == 5) {
return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 140L, 160L)));
} else if (objectId == 6) {
return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 160L, 180L)));
} else if (objectId == 7) {
return CompletableFuture.completedFuture(Optional.of(new StreamOffsetRange(STREAM0, 420L, 520L)));
} else {
return CompletableFuture.completedFuture(Optional.empty());
}
};
}

@SuppressWarnings("NPathComplexity")
@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testGetObjects(boolean isHugeCluster) throws ExecutionException, InterruptedException {
DeltaMap<Long, S3StreamSetObject> broker0Objects = DeltaMap.of(
0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 100L, 120L)), 0L),
1L, new S3StreamSetObject(1, BROKER0, List.of(new StreamOffsetRange(STREAM0, 120L, 140L)), 1L),
2L, new S3StreamSetObject(2, BROKER0, List.of(new StreamOffsetRange(STREAM0, 180L, 200L)), 2L),
3L, new S3StreamSetObject(3, BROKER0, List.of(
new StreamOffsetRange(STREAM0, 400L, 420L)), 3L),
4L, new S3StreamSetObject(4, BROKER0, List.of(new StreamOffsetRange(STREAM0, 520L, 600L)), 4L));
0L, new S3StreamSetObject(0, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 100L, 120L)), 0L),
1L, new S3StreamSetObject(1, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 120L, 140L)), 1L),
2L, new S3StreamSetObject(2, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 180L, 200L)), 2L),
3L, new S3StreamSetObject(3, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 400L, 420L)), 3L),
4L, new S3StreamSetObject(4, BROKER0, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 520L, 600L)), 4L));
DeltaMap<Long, S3StreamSetObject> broker1Objects = DeltaMap.of(
5L, new S3StreamSetObject(5, BROKER1, List.of(new StreamOffsetRange(STREAM0, 140L, 160L)), 0L),
6L, new S3StreamSetObject(6, BROKER1, List.of(new StreamOffsetRange(STREAM0, 160L, 180L)), 1L),
7L, new S3StreamSetObject(7, BROKER1, List.of(new StreamOffsetRange(STREAM0, 420L, 520L)), 2L));
5L, new S3StreamSetObject(5, BROKER1, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 140L, 160L)), 0L),
6L, new S3StreamSetObject(6, BROKER1, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 160L, 180L)), 1L),
7L, new S3StreamSetObject(7, BROKER1, isHugeCluster ? null : List.of(new StreamOffsetRange(STREAM0, 420L, 520L)), 2L));
NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH,
broker0Objects);
NodeS3StreamSetObjectMetadataImage broker1WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER1, S3StreamConstant.INVALID_BROKER_EPOCH,
Expand All @@ -157,6 +186,7 @@ public void testGetObjects() throws ExecutionException, InterruptedException {
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, RegistryRef.NOOP, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, broker0WALMetadataImage, BROKER1, broker1WALMetadataImage), new DeltaMap<>(), new DeltaMap<>(), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0));

RangeGetter rangeGetter = isHugeCluster ? buildMemoryRangeGetter() : defaultRangeGetter;
// 1. search stream_1
InRangeObjects objects = streamsImage.getObjects(STREAM1, 10, 100, Integer.MAX_VALUE, rangeGetter).get();
assertEquals(InRangeObjects.INVALID, objects);
Expand Down Expand Up @@ -249,12 +279,12 @@ public void testGetObjectsWithFirstStreamObject() throws ExecutionException, Int
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, RegistryRef.NOOP, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, broker0WALMetadataImage), new DeltaMap<>(), new DeltaMap<>(), new TimelineHashMap<>(RegistryRef.NOOP.registry(), 0));

InRangeObjects objects = streamsImage.getObjects(STREAM0, 22L, 55, 4, rangeGetter).get();
InRangeObjects objects = streamsImage.getObjects(STREAM0, 22L, 55, 4, defaultRangeGetter).get();
assertEquals(2, objects.objects().size());
assertEquals(20L, objects.startOffset());
assertEquals(60L, objects.endOffset());

objects = streamsImage.getObjects(STREAM0, 22L, 55, 1, rangeGetter).get();
objects = streamsImage.getObjects(STREAM0, 22L, 55, 1, defaultRangeGetter).get();
assertEquals(1, objects.objects().size());
assertEquals(20L, objects.startOffset());
assertEquals(40L, objects.endOffset());
Expand Down Expand Up @@ -327,12 +357,12 @@ private S3StreamsMetadataImage generateStreamImage(long streamId, Range<Long> st
public void testGetObjectsWithFirstStreamSetObject() throws ExecutionException, InterruptedException {
S3StreamsMetadataImage streamsImage = createStreamImage();

InRangeObjects objects = streamsImage.getObjects(STREAM0, 12L, 30, 4, rangeGetter).get();
InRangeObjects objects = streamsImage.getObjects(STREAM0, 12L, 30, 4, defaultRangeGetter).get();
assertEquals(2, objects.objects().size());
assertEquals(10L, objects.startOffset());
assertEquals(40L, objects.endOffset());

objects = streamsImage.getObjects(STREAM0, 12L, 30, 1, rangeGetter).get();
objects = streamsImage.getObjects(STREAM0, 12L, 30, 1, defaultRangeGetter).get();
assertEquals(1, objects.objects().size());
assertEquals(10L, objects.startOffset());
assertEquals(20L, objects.endOffset());
Expand Down Expand Up @@ -381,7 +411,7 @@ public Item(long start, long end, int limit) {
long end = r.nextLong(start, endOffset);
int limit = r.nextInt(3, 17);
Item item = new Item(start, end, limit);
result.put(item, streamsImage.getObjects(STREAM0, start, end, limit, rangeGetter).get());
result.put(item, streamsImage.getObjects(STREAM0, start, end, limit, defaultRangeGetter).get());
} catch (Exception e) {
hasException.set(true);
}
Expand All @@ -406,7 +436,7 @@ public Item(long start, long end, int limit) {
Item item = entry.getKey();
InRangeObjects objects = entry.getValue();
try {
assertEquals(streamsImage.getObjects(STREAM0, item.start, item.end, item.limit, rangeGetter).get(), objects);
assertEquals(streamsImage.getObjects(STREAM0, item.start, item.end, item.limit, defaultRangeGetter).get(), objects);
} catch (Throwable e) {
throw new RuntimeException(e);
}
Expand Down