Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static ChannelOffset of(ByteBuf buf) {
public static ChannelOffset of(short channelId, short orderHint, int channelOwnerNodeId, int attributes,
ByteBuf walRecordOffset) {
ByteBuf channelOffset = Unpooled.buffer(1 /* magic */ + 2 /* channelId */ + 2 /* orderHint */
+ 4 /* channelOwnerNodeId */ + 4 /* targetNodeId */ + walRecordOffset.readableBytes());
+ 4 /* channelOwnerNodeId */ + 4 /* attributes */ + walRecordOffset.readableBytes());
channelOffset.writeByte(MAGIC);
channelOffset.writeShort(channelId);
channelOffset.writeShort(orderHint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1027,18 +1027,24 @@ private boolean canMerge(AbstractObjectStorage.ReadTask readTask) {
return objectPath != null &&
objectPath.equals(readTask.objectPath) &&
dataSparsityRate <= this.maxMergeReadSparsityRate &&
readTask.end != RANGE_READ_TO_END;
// Don't allow merge read to end task.
readTask.end != RANGE_READ_TO_END &&
end != RANGE_READ_TO_END;
}

void handleReadCompleted(ByteBuf rst, Throwable ex) {
handleReadCompleted(this.readTasks, this.start, rst, ex);
}

static void handleReadCompleted(List<ReadTask> readTasks, long mergeReadStart, ByteBuf rst, Throwable ex) {
if (ex != null) {
readTasks.forEach(readTask -> readTask.cf.completeExceptionally(ex));
} else {
ArrayList<ByteBuf> sliceByteBufList = new ArrayList<>();
for (AbstractObjectStorage.ReadTask readTask : readTasks) {
int sliceStart = (int) (readTask.start - start);
int sliceStart = (int) (readTask.start - mergeReadStart);
if (readTask.end == RANGE_READ_TO_END) {
sliceByteBufList.add(rst.retainedSlice(sliceStart, rst.readableBytes()));
sliceByteBufList.add(rst.retainedSlice(sliceStart, rst.readableBytes() - sliceStart));
} else {
sliceByteBufList.add(rst.retainedSlice(sliceStart, (int) (readTask.end - readTask.start)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public int size() {

@Override
public ByteBuf buffer() {
ByteBuf buffer = Unpooled.buffer(1 + 8 + 4);
ByteBuf buffer = Unpooled.buffer(1 + 8 + 8 + 4);
buffer.writeByte(MAGIC);
buffer.writeLong(epoch);
buffer.writeLong(this.offset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

import java.lang.reflect.Field;
import java.util.ArrayList;
Expand Down Expand Up @@ -58,6 +59,7 @@
import static org.mockito.Mockito.when;

@Tag("S3Unit")
@Timeout(10)
class AbstractObjectStorageTest {

AbstractObjectStorage objectStorage;
Expand Down Expand Up @@ -146,6 +148,19 @@ void testMergeRead() throws ExecutionException, InterruptedException {
buf.release();
}

@Test
void testHandleReadCompleted() throws Throwable {
ByteBuf data = TestUtils.random(4096);
CompletableFuture<ByteBuf> readToEndCf = new CompletableFuture<>();
CompletableFuture<ByteBuf> readRangeCf = new CompletableFuture<>();
AbstractObjectStorage.MergedReadTask.handleReadCompleted(List.of(
new AbstractObjectStorage.ReadTask(new ReadOptions(), "fake", 3000, -1, readToEndCf),
new AbstractObjectStorage.ReadTask(new ReadOptions(), "fake", 2000, 4096, readRangeCf)
), 2000, data.slice(2000, 4096 - 2000), null);
assertEquals(data.slice(3000, 4096 - 3000), readToEndCf.get());
assertEquals(data.slice(2000, 4096 - 2000), readRangeCf.get());
}

@Test
void testByteBufRefCnt() throws ExecutionException, InterruptedException {
objectStorage = new MemoryObjectStorage(false);
Expand All @@ -160,7 +175,6 @@ void testByteBufRefCnt() throws ExecutionException, InterruptedException {
}).get();
}


@Test
void testFastRetry() throws Throwable {
// Initialize memory storage and spy to track method calls
Expand Down Expand Up @@ -353,7 +367,6 @@ void testWaitWritePermit() throws Exception {
.untilAsserted(() -> assertEquals(0, firstBuffer.refCnt()));
}


@Test
void testReadToEndOfObject() throws ExecutionException, InterruptedException {
objectStorage = new MemoryObjectStorage(true);
Expand Down
Loading