diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java index b6fd89a0a5..a8d8b9f8ed 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/BlockWALService.java @@ -308,7 +308,10 @@ private void recoverWALHeader() throws IOException { for (int i = 0; i < WAL_HEADER_COUNT; i++) { try { final ByteBuffer byteBuffer = ByteBuffer.allocate(WAL_HEADER_SIZE); - walChannel.read(byteBuffer, i * WAL_HEADER_CAPACITY); + int read = walChannel.read(byteBuffer, i * WAL_HEADER_CAPACITY); + if (read != WAL_HEADER_SIZE) { + continue; + } WALHeaderCoreData walHeaderCoreData = WALHeaderCoreData.unmarshal(byteBuffer.position(0).limit(WAL_HEADER_SIZE)); if (walHeaderCoreDataAvailable == null || walHeaderCoreDataAvailable.lastWriteTimestamp3 < walHeaderCoreData.lastWriteTimestamp3) { walHeaderCoreDataAvailable = walHeaderCoreData; diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java index 0b6efdf512..c2472e0da2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java @@ -113,6 +113,7 @@ public void write(ByteBuffer src, long position) throws IOException { int writen = 0; do { ByteBuffer slice = byteBufferWrite.slice().position(writen).limit(remaining); + // FIXME: make sure the position is aligned int write = randomAccessFile.write(slice, position + writen); if (write == -1) { throw new IOException("write -1"); @@ -127,20 +128,28 @@ public void write(ByteBuffer src, long position) throws IOException { @Override public int read(ByteBuffer dst, long position) throws IOException { + // FIXME: a small dst will lead to a zero size read int bufferDirectIOAlignedSize = (int) WALUtil.alignSmallByBlockSize(dst.capacity()); makeThreadLocalBytebufferMatchDirectIO(bufferDirectIOAlignedSize); ByteBuffer tlDirectBuffer = threadLocalByteBuffer.get(); tlDirectBuffer.position(0).limit(bufferDirectIOAlignedSize); - int count = randomAccessFile.read(tlDirectBuffer, position); - if (count == -1) { - throw new IOException("read -1"); + + int bytesRead = 0; + while (tlDirectBuffer.hasRemaining()) { + // FIXME: make sure the position is aligned + int read = randomAccessFile.read(tlDirectBuffer, position + bytesRead); + if (read == -1) { + break; + } + bytesRead += read; } - tlDirectBuffer.position(0).limit(count); + tlDirectBuffer.position(0).limit(bytesRead); + // FIXME: newPosition is wrong dst.position(0).put(tlDirectBuffer); - dst.position(0).limit(count); - return count; + dst.position(0).limit(bytesRead); + return bytesRead; } } diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java index 0491abe4be..89edf4b7e2 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java @@ -38,6 +38,9 @@ public interface WALChannel { */ void write(ByteBuffer src, long position) throws IOException; + /** + * Read bytes into the given buffer from the given position until the buffer is full or the end of the file. + */ int read(ByteBuffer dst, long position) throws IOException; class WALChannelBuilder { diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java index cfba08962a..e4d37d3975 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java @@ -96,6 +96,14 @@ public void write(ByteBuffer src, long position) throws IOException { @Override public int read(ByteBuffer dst, long position) throws IOException { - return fileChannel.read(dst, position); + int bytesRead = 0; + while (dst.hasRemaining()) { + int read = fileChannel.read(dst, position + bytesRead); + if (read == -1) { + break; + } + bytesRead += read; + } + return bytesRead; } }