Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,22 +73,28 @@ public void write(ImageWriter writer, ImageWriterOptions options) {
}

public InRangeObjects getObjects(long streamId, long startOffset, long endOffset, int limit) {
if (streamId < 0 || startOffset > endOffset || limit < 0) {
return InRangeObjects.INVALID;
}
S3StreamMetadataImage stream = streamsMetadata.get(streamId);
if (stream == null || startOffset < stream.startOffset()) {
return InRangeObjects.INVALID;
}
List<S3ObjectMetadata> objects = new LinkedList<>();
long nextStartOffset = startOffset;

int streamObjectIndex = stream.floorStreamObjectIndex(startOffset);
// floor value < 0 means that all stream objects' ranges are greater than startOffset
int streamObjectIndex = Math.max(0, stream.floorStreamObjectIndex(startOffset));
List<S3StreamObject> streamObjects = stream.getStreamObjects();

int lastRangeIndex = -1;
List<S3StreamSetObject> streamSetObjects = null;
int streamSetObjectIndex = 0;
for (; ; ) {
int roundStartObjectSize = objects.size();
for (; streamObjectIndex != -1 && streamObjectIndex < streamObjects.size(); streamObjectIndex++) {

// try to find consistent stream objects
for (; streamObjectIndex < streamObjects.size(); streamObjectIndex++) {
S3StreamObject streamObject = streamObjects.get(streamObjectIndex);
if (streamObject.startOffset() != nextStartOffset) {
//noinspection StatementWithEmptyBody
Expand All @@ -108,8 +114,11 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset
return new InRangeObjects(streamId, objects);
}
}

if (streamSetObjects == null) {
int rangeIndex = stream.getRangeContainsOffset(nextStartOffset);
// 1. can not find the range containing nextStartOffset, or
// 2. the range is the same as the last one, which means the nextStartOffset does not move on.
if (rangeIndex < 0 || lastRangeIndex == rangeIndex) {
break;
}
Expand All @@ -123,6 +132,7 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset
for (; streamSetObjectIndex < streamSetObjects.size(); streamSetObjectIndex++) {
S3StreamSetObject streamSetObject = streamSetObjects.get(streamSetObjectIndex);
StreamOffsetRange streamOffsetRange = search(streamSetObject.offsetRangeList(), streamId);
// skip the stream set object not containing the stream or the range is before the nextStartOffset
if (streamOffsetRange == null || streamOffsetRange.endOffset() <= nextStartOffset) {
continue;
}
Expand All @@ -135,11 +145,16 @@ public InRangeObjects getObjects(long streamId, long startOffset, long endOffset
return new InRangeObjects(streamId, objects);
}
} else {
// We keep the corresponding object ( with a range startOffset > nextStartOffset) by not changing
// the streamSetObjectIndex. This object may be picked up in the next round.
break;
}
}
// case 1. streamSetObjectIndex >= streamSetObjects.size(), which means we have reached the end of the stream set objects.
// case 2. objects.size() == roundStartObjectSize, which means we have not found any new object in this round.
if (streamSetObjectIndex >= streamSetObjects.size() || objects.size() == roundStartObjectSize) {
// move to the next range
// This can ensure that we can break the loop.
streamSetObjects = null;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,4 +193,65 @@ public void testGetObjects() {
assertEquals(7, objects.objects().size());
assertEquals(expectedObjectIds.subList(1, 8), objects.objects().stream().map(S3ObjectMetadata::objectId).collect(Collectors.toList()));
}

/**
* Test get objects with the first hit object is a stream object.
*/
@Test
public void testGetObjectsWithFirstStreamObject() {
DeltaMap<Long, S3StreamSetObject> broker0Objects = DeltaMap.of(
0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 20L, 40L)), 0L));
NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH,
broker0Objects);
List<RangeMetadata> ranges = List.of(
new RangeMetadata(STREAM0, 0L, 0, 10L, 40L, BROKER0),
new RangeMetadata(STREAM0, 2L, 2, 40L, 60L, BROKER0));
List<S3StreamObject> streamObjects = List.of(
new S3StreamObject(8, STREAM0, 10L, 20L, S3StreamConstant.INVALID_TS),
new S3StreamObject(8, STREAM0, 40L, 60L, S3StreamConstant.INVALID_TS));
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects);
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, broker0WALMetadataImage));

InRangeObjects objects = streamsImage.getObjects(STREAM0, 22L, 55, 4);
assertEquals(2, objects.objects().size());
assertEquals(20L, objects.startOffset());
assertEquals(60L, objects.endOffset());

objects = streamsImage.getObjects(STREAM0, 22L, 55, 1);
assertEquals(1, objects.objects().size());
assertEquals(20L, objects.startOffset());
assertEquals(40L, objects.endOffset());
}


/**
* Test get objects with the first hit object is a stream set object.
*/
@Test
public void testGetObjectsWithFirstStreamSetObject() {
DeltaMap<Long, S3StreamSetObject> broker0Objects = DeltaMap.of(
0L, new S3StreamSetObject(0, BROKER0, List.of(new StreamOffsetRange(STREAM0, 10L, 20L)), 0L),
1L, new S3StreamSetObject(1, BROKER0, List.of(new StreamOffsetRange(STREAM0, 40L, 60L)), 1L));
NodeS3StreamSetObjectMetadataImage broker0WALMetadataImage = new NodeS3StreamSetObjectMetadataImage(BROKER0, S3StreamConstant.INVALID_BROKER_EPOCH,
broker0Objects);
List<RangeMetadata> ranges = List.of(
new RangeMetadata(STREAM0, 0L, 0, 10L, 40L, BROKER0),
new RangeMetadata(STREAM0, 2L, 2, 40L, 60L, BROKER0));
List<S3StreamObject> streamObjects = List.of(
new S3StreamObject(8, STREAM0, 20L, 40L, S3StreamConstant.INVALID_TS));
S3StreamMetadataImage streamImage = new S3StreamMetadataImage(STREAM0, 4L, StreamState.OPENED, 10, ranges, streamObjects);
S3StreamsMetadataImage streamsImage = new S3StreamsMetadataImage(STREAM0, DeltaMap.of(STREAM0, streamImage),
DeltaMap.of(BROKER0, broker0WALMetadataImage));

InRangeObjects objects = streamsImage.getObjects(STREAM0, 12L, 30, 4);
assertEquals(2, objects.objects().size());
assertEquals(10L, objects.startOffset());
assertEquals(40L, objects.endOffset());

objects = streamsImage.getObjects(STREAM0, 12L, 30, 1);
assertEquals(1, objects.objects().size());
assertEquals(10L, objects.startOffset());
assertEquals(20L, objects.endOffset());
}
}