diff --git a/core/src/main/java/kafka/automq/zerozone/ChannelOffset.java b/core/src/main/java/kafka/automq/zerozone/ChannelOffset.java index 80bdacf2cc..a31a40d1d9 100644 --- a/core/src/main/java/kafka/automq/zerozone/ChannelOffset.java +++ b/core/src/main/java/kafka/automq/zerozone/ChannelOffset.java @@ -43,7 +43,7 @@ public static ChannelOffset of(ByteBuf buf) { public static ChannelOffset of(short channelId, short orderHint, int channelOwnerNodeId, int attributes, ByteBuf walRecordOffset) { ByteBuf channelOffset = Unpooled.buffer(1 /* magic */ + 2 /* channelId */ + 2 /* orderHint */ - + 4 /* channelOwnerNodeId */ + 4 /* targetNodeId */ + walRecordOffset.readableBytes()); + + 4 /* channelOwnerNodeId */ + 4 /* attributes */ + walRecordOffset.readableBytes()); channelOffset.writeByte(MAGIC); channelOffset.writeShort(channelId); channelOffset.writeShort(orderHint); diff --git a/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java b/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java index 6660ffa180..471f524523 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java +++ b/s3stream/src/main/java/com/automq/stream/s3/operator/AbstractObjectStorage.java @@ -1027,18 +1027,24 @@ private boolean canMerge(AbstractObjectStorage.ReadTask readTask) { return objectPath != null && objectPath.equals(readTask.objectPath) && dataSparsityRate <= this.maxMergeReadSparsityRate && - readTask.end != RANGE_READ_TO_END; + // Don't allow merge read to end task. + readTask.end != RANGE_READ_TO_END && + end != RANGE_READ_TO_END; } void handleReadCompleted(ByteBuf rst, Throwable ex) { + handleReadCompleted(this.readTasks, this.start, rst, ex); + } + + static void handleReadCompleted(List readTasks, long mergeReadStart, ByteBuf rst, Throwable ex) { if (ex != null) { readTasks.forEach(readTask -> readTask.cf.completeExceptionally(ex)); } else { ArrayList sliceByteBufList = new ArrayList<>(); for (AbstractObjectStorage.ReadTask readTask : readTasks) { - int sliceStart = (int) (readTask.start - start); + int sliceStart = (int) (readTask.start - mergeReadStart); if (readTask.end == RANGE_READ_TO_END) { - sliceByteBufList.add(rst.retainedSlice(sliceStart, rst.readableBytes())); + sliceByteBufList.add(rst.retainedSlice(sliceStart, rst.readableBytes() - sliceStart)); } else { sliceByteBufList.add(rst.retainedSlice(sliceStart, (int) (readTask.end - readTask.start))); } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/DefaultRecordOffset.java b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/DefaultRecordOffset.java index 1e19997a4a..fd0801a52d 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/impl/DefaultRecordOffset.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/impl/DefaultRecordOffset.java @@ -72,7 +72,7 @@ public int size() { @Override public ByteBuf buffer() { - ByteBuf buffer = Unpooled.buffer(1 + 8 + 4); + ByteBuf buffer = Unpooled.buffer(1 + 8 + 8 + 4); buffer.writeByte(MAGIC); buffer.writeLong(epoch); buffer.writeLong(this.offset); diff --git a/s3stream/src/test/java/com/automq/stream/s3/operator/AbstractObjectStorageTest.java b/s3stream/src/test/java/com/automq/stream/s3/operator/AbstractObjectStorageTest.java index 4242bb67d2..8169151c48 100644 --- a/s3stream/src/test/java/com/automq/stream/s3/operator/AbstractObjectStorageTest.java +++ b/s3stream/src/test/java/com/automq/stream/s3/operator/AbstractObjectStorageTest.java @@ -29,6 +29,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; import java.lang.reflect.Field; import java.util.ArrayList; @@ -58,6 +59,7 @@ import static org.mockito.Mockito.when; @Tag("S3Unit") +@Timeout(10) class AbstractObjectStorageTest { AbstractObjectStorage objectStorage; @@ -146,6 +148,19 @@ void testMergeRead() throws ExecutionException, InterruptedException { buf.release(); } + @Test + void testHandleReadCompleted() throws Throwable { + ByteBuf data = TestUtils.random(4096); + CompletableFuture readToEndCf = new CompletableFuture<>(); + CompletableFuture readRangeCf = new CompletableFuture<>(); + AbstractObjectStorage.MergedReadTask.handleReadCompleted(List.of( + new AbstractObjectStorage.ReadTask(new ReadOptions(), "fake", 3000, -1, readToEndCf), + new AbstractObjectStorage.ReadTask(new ReadOptions(), "fake", 2000, 4096, readRangeCf) + ), 2000, data.slice(2000, 4096 - 2000), null); + assertEquals(data.slice(3000, 4096 - 3000), readToEndCf.get()); + assertEquals(data.slice(2000, 4096 - 2000), readRangeCf.get()); + } + @Test void testByteBufRefCnt() throws ExecutionException, InterruptedException { objectStorage = new MemoryObjectStorage(false); @@ -160,7 +175,6 @@ void testByteBufRefCnt() throws ExecutionException, InterruptedException { }).get(); } - @Test void testFastRetry() throws Throwable { // Initialize memory storage and spy to track method calls @@ -353,7 +367,6 @@ void testWaitWritePermit() throws Exception { .untilAsserted(() -> assertEquals(0, firstBuffer.refCnt())); } - @Test void testReadToEndOfObject() throws ExecutionException, InterruptedException { objectStorage = new MemoryObjectStorage(true);