diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/AbstractWALChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/AbstractWALChannel.java deleted file mode 100644 index 88ff3992e3..0000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/AbstractWALChannel.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Copyright 2025, AutoMQ HK Limited. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.wal.util; - -import com.automq.stream.utils.Threads; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import io.netty.buffer.ByteBuf; - -public abstract class AbstractWALChannel implements WALChannel { - - private static final Logger LOGGER = LoggerFactory.getLogger(AbstractWALChannel.class); - - /** - * Flag to indicate if the WAL has failed. - * It will be set to true if an IO operation fails continuously, and it will never be reset. - * Any IO operation will fail immediately if this flag is true. - */ - private volatile boolean failed = false; - - @Override - public void markFailed() { - this.failed = true; - } - - @Override - public void write(ByteBuf src, long position) throws IOException { - checkFailed(); - doWrite(src, position); - } - - @Override - public void retryWrite(ByteBuf src, long position, long retryIntervalMillis, - long retryTimeoutMillis) throws IOException { - checkFailed(); - retry(() -> write(src, position), retryIntervalMillis, retryTimeoutMillis); - } - - @Override - public void flush() throws IOException { - checkFailed(); - doFlush(); - } - - @Override - public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException { - checkFailed(); - retry(this::flush, retryIntervalMillis, retryTimeoutMillis); - } - - @Override - public int read(ByteBuf dst, long position, int length) throws IOException { - checkFailed(); - return doRead(dst, position, length); - } - - @Override - public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis, - long retryTimeoutMillis) throws IOException { - checkFailed(); - return retry(() -> read(dst, position, length), retryIntervalMillis, retryTimeoutMillis); - } - - protected void retry(IORunnable runnable, long retryIntervalMillis, long retryTimeoutMillis) throws IOException { - retry(IOSupplier.from(runnable), retryIntervalMillis, retryTimeoutMillis); - } - - private T retry(IOSupplier supplier, long retryIntervalMillis, long retryTimeoutMillis) throws IOException { - long start = System.nanoTime(); - long retryTimeoutNanos = TimeUnit.MILLISECONDS.toNanos(retryTimeoutMillis); - while (true) { - try { - return supplier.get(); - } catch (IOException e) { - if (System.nanoTime() - start > retryTimeoutNanos) { - failed = true; - LOGGER.error("Failed to execute IO operation, retry timeout", e); - throw e; - } - checkFailed(); - LOGGER.warn("Failed to execute IO operation, retrying in {}ms, error: {}", retryIntervalMillis, e.getMessage()); - Threads.sleep(retryIntervalMillis); - } - } - } - - protected void checkFailed() throws IOException { - if (failed) { - IOException e = new IOException("Failed to execute IO operation, WAL failed"); - LOGGER.error("Failed to execute IO operation, WAL failed", e); - throw e; - } - } - - protected abstract void doWrite(ByteBuf src, long position) throws IOException; - - protected abstract void doFlush() throws IOException; - - protected abstract int doRead(ByteBuf dst, long position, int length) throws IOException; - - private interface IOSupplier { - T get() throws IOException; - - static IOSupplier from(IORunnable runnable) { - return () -> { - runnable.run(); - return null; - }; - } - } - - private interface IORunnable { - void run() throws IOException; - } -} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java deleted file mode 100644 index 945526d513..0000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannel.java +++ /dev/null @@ -1,347 +0,0 @@ -/* - * Copyright 2025, AutoMQ HK Limited. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.wal.util; - -import com.automq.stream.s3.wal.exception.WALCapacityMismatchException; -import com.automq.stream.s3.wal.exception.WALNotInitializedException; -import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOLib; -import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectIOUtils; -import com.automq.stream.thirdparty.moe.cnkirito.kdio.DirectRandomAccessFile; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.concurrent.ExecutionException; - -import io.netty.buffer.ByteBuf; -import io.netty.util.concurrent.FastThreadLocal; - -import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET; -import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice; - -public class WALBlockDeviceChannel extends AbstractWALChannel { - private static final Logger LOGGER = LoggerFactory.getLogger(WALBlockDeviceChannel.class); - private static final String CHECK_DIRECT_IO_AVAILABLE_FORMAT = "%s.check_direct_io_available"; - final String path; - final long capacityWant; - final boolean recoveryMode; - final DirectIOLib directIOLib; - /** - * 0 means allocate on demand - */ - final int initTempBufferSize; - /** - * 0 means no limit - */ - final int maxTempBufferSize; - /** - * Flag indicating whether unaligned write is allowed. - * Currently, it is only allowed when testing. - */ - public boolean unalignedWrite = false; - - long capacityFact = 0; - DirectRandomAccessFile randomAccessFile; - - FastThreadLocal threadLocalByteBuffer = new FastThreadLocal<>() { - @Override - protected ByteBuffer initialValue() { - return DirectIOUtils.allocateForDirectIO(directIOLib, initTempBufferSize); - } - }; - - public WALBlockDeviceChannel(String path, long capacityWant) { - this(path, capacityWant, 0, 0, false); - } - - public WALBlockDeviceChannel(String path, long capacityWant, int initTempBufferSize, int maxTempBufferSize, - boolean recoveryMode) { - this.path = path; - this.recoveryMode = recoveryMode; - if (recoveryMode) { - this.capacityWant = CAPACITY_NOT_SET; - } else { - assert capacityWant > 0; - this.capacityWant = capacityWant; - if (!WALUtil.isAligned(capacityWant)) { - throw new RuntimeException("wal capacity must be aligned by block size when using block device"); - } - } - this.initTempBufferSize = initTempBufferSize; - this.maxTempBufferSize = maxTempBufferSize; - - DirectIOLib lib = DirectIOLib.getLibForPath(path); - if (null == lib) { - throw new RuntimeException("O_DIRECT not supported"); - } - int blockSize = lib.blockSize(); - if (WALUtil.BLOCK_SIZE % blockSize != 0) { - throw new RuntimeException(String.format("block size %d is not a multiple of %d, update it by jvm option: -D%s=%d", - WALUtil.BLOCK_SIZE, blockSize, WALUtil.BLOCK_SIZE_PROPERTY, blockSize)); - } - this.directIOLib = lib; - } - - /** - * Check whether the {@link WALBlockDeviceChannel} is available for the given path. - * - * @return null if available, otherwise the reason why it's not available - */ - public static String checkAvailable(String path) { - if (!DirectIOLib.binit) { - return "O_DIRECT not supported"; - } - if (!DirectIOUtils.allocatorAvailable()) { - return "java.nio.DirectByteBuffer.(long, int) not available." + - " Add --add-opens=java.base/java.nio=ALL-UNNAMED and -Dio.netty.tryReflectionSetAccessible=true to JVM options may fix this."; - } - if (!isBlockDevice(path)) { - String reason = tryOpenFileWithDirectIO(String.format(CHECK_DIRECT_IO_AVAILABLE_FORMAT, path)); - if (null != reason) { - return "O_DIRECT not supported by the file system, path: " + path + ", reason: " + reason; - } - } - return null; - } - - /** - * Try to open a file with O_DIRECT flag to check whether the file system supports O_DIRECT. - * The file will be deleted after the test. - * - * @return null if the file is opened successfully, otherwise the reason why it's not available - */ - private static String tryOpenFileWithDirectIO(String path) { - File file = new File(path); - try { - DirectRandomAccessFile randomAccessFile = new DirectRandomAccessFile(file, "rw"); - randomAccessFile.close(); - return null; - } catch (IOException e) { - return e.getMessage(); - } finally { - // the file may be created in {@link DirectRandomAccessFile(File, String)}, so delete it - file.delete(); - } - } - - @Override - public void open(CapacityReader reader) throws IOException { - if (!isBlockDevice(path)) { - openAndCheckFile(); - } else { - try { - long capacity = WALUtil.getBlockDeviceCapacity(path); - if (!recoveryMode && capacityWant > capacity) { - // the real capacity of the block device is smaller than requested - throw new WALCapacityMismatchException(path, capacityWant, capacity); - } - } catch (ExecutionException e) { - LOGGER.warn("failed to get the real capacity of the block device {}, just skip checking", path, e); - } - // We could not get the real capacity of the WAL in block device, so we just use the `capacityWant` as the capacity here - // It will be checked and updated in `checkCapacity` later - capacityFact = capacityWant; - } - - randomAccessFile = new DirectRandomAccessFile(new File(path), "rw"); - - checkCapacity(reader); - } - - /** - * Create the file and set length if not exists, and check the file size if exists. - */ - private void openAndCheckFile() throws IOException { - File file = new File(path); - if (file.exists()) { - if (!file.isFile()) { - throw new IOException(path + " is not a file"); - } - capacityFact = file.length(); - if (!recoveryMode && capacityFact != capacityWant) { - // the file exists but not the same size as requested - throw new WALCapacityMismatchException(path, capacityWant, capacityFact); - } - } else { - // the file does not exist - if (recoveryMode) { - throw new WALNotInitializedException("try to open an uninitialized WAL in recovery mode: file not exists. path: " + path); - } - WALUtil.createFile(path, capacityWant); - capacityFact = capacityWant; - } - } - - private void checkCapacity(CapacityReader reader) throws IOException { - if (null == reader) { - return; - } - Long capacity = reader.capacity(this); - if (null == capacity) { - if (recoveryMode) { - throw new WALNotInitializedException("try to open an uninitialized WAL in recovery mode: empty header. path: " + path); - } - } else if (capacityFact == CAPACITY_NOT_SET) { - // recovery mode on block device - capacityFact = capacity; - } else if (capacityFact != capacity) { - throw new WALCapacityMismatchException(path, capacityFact, capacity); - } - assert capacityFact != CAPACITY_NOT_SET; - } - - @Override - public void close() { - try { - if (randomAccessFile != null) { - randomAccessFile.close(); - } - } catch (IOException ignored) { - } - } - - @Override - public long capacity() { - return capacityFact; - } - - @Override - public String path() { - return path; - } - - private ByteBuffer getBuffer(int alignedSize) { - assert WALUtil.isAligned(alignedSize); - - ByteBuffer currentBuf = threadLocalByteBuffer.get(); - if (alignedSize <= currentBuf.capacity()) { - return currentBuf; - } - if (maxTempBufferSize > 0 && alignedSize > maxTempBufferSize) { - throw new RuntimeException("too large write size"); - } - - ByteBuffer newBuf = DirectIOUtils.allocateForDirectIO(directIOLib, alignedSize); - threadLocalByteBuffer.set(newBuf); - DirectIOUtils.releaseDirectBuffer(currentBuf); - return newBuf; - } - - @Override - public void doWrite(ByteBuf src, long position) throws IOException { - if (unalignedWrite) { - // unaligned write, just used for testing - unalignedWrite(src, position); - return; - } - assert WALUtil.isAligned(position); - - int alignedSize = (int) WALUtil.alignLargeByBlockSize(src.readableBytes()); - assert position + alignedSize <= capacity(); - ByteBuffer tmpBuf = getBuffer(alignedSize); - tmpBuf.clear(); - - for (ByteBuffer buffer : src.nioBuffers()) { - tmpBuf.put(buffer); - } - tmpBuf.position(0).limit(alignedSize); - - write(tmpBuf, position); - } - - private void unalignedWrite(ByteBuf src, long position) throws IOException { - long start = position; - long end = position + src.readableBytes(); - long alignedStart = WALUtil.alignSmallByBlockSize(start); - long alignedEnd = WALUtil.alignLargeByBlockSize(end); - int alignedSize = (int) (alignedEnd - alignedStart); - - // read the data in the range [alignedStart, alignedEnd) to tmpBuf - ByteBuffer tmpBuf = getBuffer(alignedSize); - tmpBuf.position(0).limit(alignedSize); - read(tmpBuf, alignedStart); - - // overwrite the data in the range [start, end) in tmpBuf - for (ByteBuffer buffer : src.nioBuffers()) { - tmpBuf.position((int) (start - alignedStart)); - start += buffer.remaining(); - tmpBuf.put(buffer); - } - tmpBuf.position(0).limit(alignedSize); - - // write it - write(tmpBuf, alignedStart); - } - - private int write(ByteBuffer src, long position) throws IOException { - assert WALUtil.isAligned(src.remaining()); - - int bytesWritten = 0; - while (src.hasRemaining()) { - int written = randomAccessFile.write(src, position + bytesWritten); - // kdio will throw an exception rather than return -1, so we don't need to check for -1 - bytesWritten += written; - } - return bytesWritten; - } - - @Override - public void doFlush() { - } - - @Override - public int doRead(ByteBuf dst, long position, int length) throws IOException { - long start = position; - length = Math.min(length, dst.writableBytes()); - long end = position + length; - long alignedStart = WALUtil.alignSmallByBlockSize(start); - long alignedEnd = WALUtil.alignLargeByBlockSize(end); - int alignedSize = (int) (alignedEnd - alignedStart); - // capacity may be CAPACITY_NOT_SET only when we call {@link CapacityReader#capacity} in recovery mode - assert CAPACITY_NOT_SET == capacity() || alignedEnd <= capacity(); - - ByteBuffer tmpBuf = getBuffer(alignedSize); - tmpBuf.position(0).limit(alignedSize); - - read(tmpBuf, alignedStart); - tmpBuf.position((int) (start - alignedStart)).limit((int) (end - alignedStart)); - - dst.writeBytes(tmpBuf); - return (int) (end - start); - } - - private int read(ByteBuffer dst, long position) throws IOException { - int bytesRead = 0; - while (dst.hasRemaining()) { - int read = randomAccessFile.read(dst, position + bytesRead); - // kdio will throw an exception rather than return -1, so we don't need to check for -1 - bytesRead += read; - } - return bytesRead; - } - - @Override - public boolean useDirectIO() { - return true; - } -} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java deleted file mode 100644 index 1ab5ebf322..0000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALCachedChannel.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Copyright 2025, AutoMQ HK Limited. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.wal.util; - -import com.automq.stream.s3.ByteBufAlloc; - -import java.io.IOException; - -import io.netty.buffer.ByteBuf; - -import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET; - -/** - * A wrapper of {@link WALChannel} that caches for read to reduce I/O. - */ -public class WALCachedChannel implements WALChannel { - - private static final int DEFAULT_CACHE_SIZE = 1 << 20; - - private final WALChannel channel; - private final int cacheSize; - - private ByteBuf cache; - private long cachePosition = -1; - - private WALCachedChannel(WALChannel channel, int cacheSize) { - this.channel = channel; - this.cacheSize = cacheSize; - } - - public static WALCachedChannel of(WALChannel channel) { - return new WALCachedChannel(channel, DEFAULT_CACHE_SIZE); - } - - public static WALCachedChannel of(WALChannel channel, int cacheSize) { - return new WALCachedChannel(channel, cacheSize); - } - - @Override - public void markFailed() { - channel.markFailed(); - } - - @Override - public int read(ByteBuf dst, long position, int length) throws IOException { - return read(channel::read, dst, position, length); - } - - @Override - public int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis, - long retryTimeoutMillis) throws IOException { - Reader reader = (buf, pos, len) -> channel.retryRead(buf, pos, len, retryIntervalMillis, retryTimeoutMillis); - return read(reader, dst, position, length); - } - - /** - * As we use a common cache for all threads, we need to synchronize the read. - */ - private synchronized int read(Reader reader, ByteBuf dst, long position, int length) throws IOException { - if (CAPACITY_NOT_SET == channel.capacity()) { - // If we don't know the capacity now, we can't cache. - return reader.read(dst, position, length); - } - - long start = position; - length = Math.min(length, dst.writableBytes()); - long end = position + length; - - ByteBuf cache = getCache(); - if (length > cache.capacity()) { - // If the length is larger than the cache capacity, we can't cache. - return reader.read(dst, position, length); - } - - boolean fallWithinCache = cachePosition >= 0 && cachePosition <= start && end <= cachePosition + cache.readableBytes(); - if (!fallWithinCache) { - cache.clear(); - cachePosition = start; - // Make sure the cache is not larger than the channel capacity. - int cacheLength = (int) Math.min(cache.writableBytes(), channel.capacity() - cachePosition); - reader.read(cache, cachePosition, cacheLength); - } - - // Now the cache is ready. - int relativePosition = (int) (start - cachePosition); - dst.writeBytes(cache, relativePosition, length); - return length; - } - - @Override - public void close() { - releaseCache(); - this.channel.close(); - } - - /** - * Release the cache if it is not null. - * This method should be called when no more {@link #read}s will be called to release the allocated memory. - */ - public synchronized void releaseCache() { - if (this.cache != null) { - this.cache.release(); - this.cache = null; - } - this.cachePosition = -1; - } - - /** - * Get the cache. If the cache is not initialized, initialize it. - * Should be called under synchronized. - */ - private ByteBuf getCache() { - if (this.cache == null) { - this.cache = ByteBufAlloc.byteBuffer(cacheSize); - } - return this.cache; - } - - private interface Reader { - int read(ByteBuf dst, long position, int length) throws IOException; - } - - @Override - public void open(CapacityReader reader) throws IOException { - this.channel.open(reader); - } - - @Override - public long capacity() { - return this.channel.capacity(); - } - - @Override - public String path() { - return this.channel.path(); - } - - @Override - public void write(ByteBuf src, long position) throws IOException { - this.channel.write(src, position); - } - - @Override - public void retryWrite(ByteBuf src, long position, long retryIntervalMillis, - long retryTimeoutMillis) throws IOException { - channel.retryWrite(src, position, retryIntervalMillis, retryTimeoutMillis); - } - - @Override - public void flush() throws IOException { - this.channel.flush(); - } - - @Override - public void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException { - channel.retryFlush(retryIntervalMillis, retryTimeoutMillis); - } - - @Override - public boolean useDirectIO() { - return channel.useDirectIO(); - } -} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java deleted file mode 100644 index d15b6f991b..0000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALChannel.java +++ /dev/null @@ -1,214 +0,0 @@ -/* - * Copyright 2025, AutoMQ HK Limited. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.wal.util; - -import com.automq.stream.s3.wal.exception.WALCapacityMismatchException; -import com.automq.stream.s3.wal.exception.WALNotInitializedException; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.util.concurrent.TimeUnit; - -import io.netty.buffer.ByteBuf; - -import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET; -import static com.automq.stream.s3.wal.util.WALUtil.isBlockDevice; - -/** - * There are two implementations of WALChannel: - * 1. WALFileChannel based on file system, which calls fsync after each write to ensure data is flushed to disk. - * 2. WALBlockDeviceChannel based on block device, which uses O_DIRECT to bypass page cache. - */ -public interface WALChannel { - - long DEFAULT_RETRY_INTERVAL = TimeUnit.MILLISECONDS.toMillis(100); - long DEFAULT_RETRY_TIMEOUT = TimeUnit.MINUTES.toMillis(1); - - static WALChannelBuilder builder(String path) { - return new WALChannelBuilder(path); - } - - /** - * Open the channel for read and write. - * If {@code reader} is null, checks will be skipped. - * - * @param reader the reader to get the capacity of the channel - * @throws WALCapacityMismatchException if the capacity of the channel does not match the expected capacity - * @throws WALNotInitializedException if try to open an un-initialized channel in recovery mode - * @throws IOException if any I/O error happens - */ - void open(CapacityReader reader) throws IOException; - - default void open() throws IOException { - open(null); - } - - void close(); - - /** - * Mark the channel as failed. - * Note: Once this method is called, the channel cannot be used anymore. - */ - void markFailed(); - - long capacity(); - - String path(); - - /** - * Write bytes from the given buffer to the given position of the channel from the current reader index - * to the end of the buffer. It only returns when all bytes are written successfully. - * {@link #flush()} should be called after this method to ensure data is flushed to disk. - * This method will change the reader index of the given buffer to the end of the written bytes. - * This method will not change the writer index of the given buffer. - */ - void write(ByteBuf src, long position) throws IOException; - - default void retryWrite(ByteBuf src, long position) throws IOException { - retryWrite(src, position, DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT); - } - - /** - * Retry {@link #write(ByteBuf, long)} with the given interval until success or timeout. - */ - void retryWrite(ByteBuf src, long position, long retryIntervalMillis, long retryTimeoutMillis) throws IOException; - - /** - * Flush to disk. - */ - void flush() throws IOException; - - default void retryFlush() throws IOException { - retryFlush(DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT); - } - - /** - * Retry {@link #flush()} with the given interval until success or timeout. - */ - void retryFlush(long retryIntervalMillis, long retryTimeoutMillis) throws IOException; - - default int read(ByteBuf dst, long position) throws IOException { - return read(dst, position, dst.writableBytes()); - } - - /** - * Read bytes from the given position of the channel to the given buffer from the current writer index - * until reaching the given length or the end of the channel. - * This method will change the writer index of the given buffer to the end of the read bytes. - * This method will not change the reader index of the given buffer. - * If the given length is larger than the writable bytes of the given buffer, only the first - * {@code dst.writableBytes()} bytes will be read. - */ - int read(ByteBuf dst, long position, int length) throws IOException; - - default int retryRead(ByteBuf dst, long position) throws IOException { - return retryRead(dst, position, dst.writableBytes(), DEFAULT_RETRY_INTERVAL, DEFAULT_RETRY_TIMEOUT); - } - - /** - * Retry {@link #read(ByteBuf, long, int)} with the given interval until success or timeout. - */ - int retryRead(ByteBuf dst, long position, int length, long retryIntervalMillis, long retryTimeoutMillis) throws IOException; - - boolean useDirectIO(); - - interface CapacityReader { - /** - * Get the capacity of the given channel. - * It returns null if the channel has not been initialized before. - */ - Long capacity(WALChannel channel) throws IOException; - } - - class WALChannelBuilder { - private static final Logger LOGGER = LoggerFactory.getLogger(WALChannelBuilder.class); - private final String path; - private Boolean direct; - private long capacity; - private int initBufferSize; - private int maxBufferSize; - private boolean recoveryMode; - - private WALChannelBuilder(String path) { - this.path = path; - } - - public WALChannelBuilder direct(boolean direct) { - this.direct = direct; - return this; - } - - public WALChannelBuilder capacity(long capacity) { - assert capacity == CAPACITY_NOT_SET || WALUtil.isAligned(capacity); - this.capacity = capacity; - return this; - } - - public WALChannelBuilder initBufferSize(int initBufferSize) { - this.initBufferSize = initBufferSize; - return this; - } - - public WALChannelBuilder maxBufferSize(int maxBufferSize) { - this.maxBufferSize = maxBufferSize; - return this; - } - - public WALChannelBuilder recoveryMode(boolean recoveryMode) { - this.recoveryMode = recoveryMode; - return this; - } - - public WALChannel build() { - String directNotAvailableMsg = WALBlockDeviceChannel.checkAvailable(path); - boolean isBlockDevice = isBlockDevice(path); - boolean useDirect = false; - if (direct != null) { - // Set by user. - useDirect = direct; - } else if (isBlockDevice) { - // We can only use direct IO for block devices. - useDirect = true; - } else if (directNotAvailableMsg == null) { - // If direct IO is available, we use it by default. - useDirect = true; - } - - if (useDirect && directNotAvailableMsg != null) { - throw new IllegalArgumentException(directNotAvailableMsg); - } - - if (!isBlockDevice) { - LOGGER.warn("WAL in a file system, which may cause performance degradation. path: {}", new File(path).getAbsolutePath()); - } - - if (useDirect) { - return new WALBlockDeviceChannel(path, capacity, initBufferSize, maxBufferSize, recoveryMode); - } else { - LOGGER.warn("Direct IO not used for WAL, which may cause performance degradation. path: {}, isBlockDevice: {}, reason: {}", - new File(path).getAbsolutePath(), isBlockDevice, directNotAvailableMsg); - return new WALFileChannel(path, capacity, recoveryMode); - } - } - } -} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java deleted file mode 100644 index 8810bb7c0a..0000000000 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALFileChannel.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Copyright 2025, AutoMQ HK Limited. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.wal.util; - -import com.automq.stream.s3.wal.exception.WALCapacityMismatchException; -import com.automq.stream.s3.wal.exception.WALNotInitializedException; - -import java.io.File; -import java.io.IOException; -import java.io.RandomAccessFile; -import java.nio.ByteBuffer; -import java.nio.channels.FileChannel; - -import io.netty.buffer.ByteBuf; - -import static com.automq.stream.s3.Constants.CAPACITY_NOT_SET; - -public class WALFileChannel extends AbstractWALChannel { - final String filePath; - final long fileCapacityWant; - /** - * When set to true, the file should exist and the file size does not need to be verified. - */ - final boolean recoveryMode; - long fileCapacityFact = 0; - RandomAccessFile randomAccessFile; - FileChannel fileChannel; - - public WALFileChannel(String filePath, long fileCapacityWant, boolean recoveryMode) { - this.filePath = filePath; - this.recoveryMode = recoveryMode; - if (recoveryMode) { - this.fileCapacityWant = CAPACITY_NOT_SET; - } else { - assert fileCapacityWant > 0; - this.fileCapacityWant = fileCapacityWant; - } - } - - @Override - public void open(CapacityReader reader) throws IOException { - File file = new File(filePath); - if (file.exists()) { - if (!file.isFile()) { - throw new IOException(filePath + " is not a file"); - } - randomAccessFile = new RandomAccessFile(file, "rw"); - fileCapacityFact = randomAccessFile.length(); - if (!recoveryMode && fileCapacityFact != fileCapacityWant) { - // the file exists but not the same size as requested - throw new WALCapacityMismatchException(filePath, fileCapacityWant, fileCapacityFact); - } - } else { - // the file does not exist - if (recoveryMode) { - throw new WALNotInitializedException("try to open an uninitialized WAL in recovery mode: file not exists: " + filePath); - } - WALUtil.createFile(filePath, fileCapacityWant); - randomAccessFile = new RandomAccessFile(filePath, "rw"); - fileCapacityFact = fileCapacityWant; - } - - fileChannel = randomAccessFile.getChannel(); - - checkCapacity(reader); - } - - private void checkCapacity(CapacityReader reader) throws IOException { - if (null == reader) { - return; - } - Long capacity = reader.capacity(this); - if (null == capacity) { - if (recoveryMode) { - throw new WALNotInitializedException("try to open an uninitialized WAL in recovery mode: empty header. path: " + filePath); - } - } else if (fileCapacityFact != capacity) { - throw new WALCapacityMismatchException(filePath, fileCapacityFact, capacity); - } - assert fileCapacityFact != CAPACITY_NOT_SET; - } - - @Override - public void close() { - try { - fileChannel.close(); - randomAccessFile.close(); - } catch (IOException ignored) { - } - } - - @Override - public long capacity() { - return fileCapacityFact; - } - - @Override - public String path() { - return filePath; - } - - @Override - public void doWrite(ByteBuf src, long position) throws IOException { - assert src.readableBytes() + position <= capacity(); - ByteBuffer[] nioBuffers = src.nioBuffers(); - for (ByteBuffer nioBuffer : nioBuffers) { - int bytesWritten = write(nioBuffer, position); - position += bytesWritten; - } - } - - @Override - public void doFlush() throws IOException { - fileChannel.force(false); - } - - @Override - public int doRead(ByteBuf dst, long position, int length) throws IOException { - length = Math.min(length, dst.writableBytes()); - assert position + length <= capacity(); - int bytesRead = 0; - while (dst.isWritable()) { - int read = dst.writeBytes(fileChannel, position + bytesRead, length); - if (read == -1) { - // EOF - break; - } - bytesRead += read; - } - return bytesRead; - } - - private int write(ByteBuffer src, long position) throws IOException { - int bytesWritten = 0; - while (src.hasRemaining()) { - int written = fileChannel.write(src, position + bytesWritten); - if (written == -1) { - throw new IOException("write -1"); - } - bytesWritten += written; - } - return bytesWritten; - } - - @Override - public boolean useDirectIO() { - return false; - } -} diff --git a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java index 063510ab7e..2be513bb10 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java +++ b/s3stream/src/main/java/com/automq/stream/s3/wal/util/WALUtil.java @@ -1,27 +1,9 @@ -/* - * Copyright 2025, AutoMQ HK Limited. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package com.automq.stream.s3.wal.util; import com.automq.stream.s3.ByteBufAlloc; import com.automq.stream.s3.wal.common.Record; import com.automq.stream.s3.wal.common.RecordHeader; +import com.automq.stream.s3.wal.exception.WALCapacityMismatchException; import com.automq.stream.utils.CommandResult; import com.automq.stream.utils.CommandUtils; @@ -124,11 +106,15 @@ public static boolean isAligned(long offset) { return offset % BLOCK_SIZE == 0; } + public static RandomAccessFile createFile(String path, long length) throws IOException { + return createFile(path, length, "rw"); + } + /** * Create a file with the given path and length. * Note {@code path} must NOT exist. */ - public static void createFile(String path, long length) throws IOException { + public static RandomAccessFile createFile(String path, long length, String openMode) throws IOException { File file = new File(path); assert !file.exists(); @@ -136,20 +122,16 @@ public static void createFile(String path, long length) throws IOException { if (null != parent && !parent.exists() && !parent.mkdirs()) { throw new IOException("mkdirs " + parent + " fail"); } - if (!file.createNewFile()) { - throw new IOException("create " + path + " fail"); - } - if (!file.setReadable(true)) { - throw new IOException("set " + path + " readable fail"); - } - if (!file.setWritable(true)) { - throw new IOException("set " + path + " writable fail"); - } - - // set length - try (RandomAccessFile raf = new RandomAccessFile(file, "rw")) { + RandomAccessFile raf = new RandomAccessFile(file, openMode); + long realLength = raf.length(); + if (realLength == 0) { + // set length raf.setLength(length); + } else if (realLength != length) { + // the file exists but not the same size as requested + throw new WALCapacityMismatchException(path, length, realLength); } + return raf; } /** diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java deleted file mode 100644 index 9e522d019b..0000000000 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALBlockDeviceChannelTest.java +++ /dev/null @@ -1,247 +0,0 @@ -/* - * Copyright 2025, AutoMQ HK Limited. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.wal.util; - -import com.automq.stream.s3.TestUtils; -import com.automq.stream.utils.ThreadUtils; -import com.automq.stream.utils.Threads; - -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.condition.EnabledOnOs; -import org.junit.jupiter.api.condition.OS; - -import java.io.IOException; -import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.TimeUnit; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; -import io.netty.buffer.Unpooled; - -import static com.automq.stream.s3.wal.util.WALChannelTest.TEST_BLOCK_DEVICE_KEY; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; - -@Tag("S3Unit") -@EnabledOnOs(OS.LINUX) -public class WALBlockDeviceChannelTest { - - static final String TEST_BLOCK_DEVICE = System.getenv(TEST_BLOCK_DEVICE_KEY); - - private String getTestPath() { - return Optional.ofNullable(TEST_BLOCK_DEVICE).orElse(TestUtils.tempFilePath()); - } - - @Test - public void testSingleThreadWriteBasic() throws IOException { - final int size = 4096 + 1; - final int count = 100; - final long capacity = WALUtil.alignLargeByBlockSize(size) * count; - - WALBlockDeviceChannel channel = new WALBlockDeviceChannel(getTestPath(), capacity); - channel.open(); - - for (int i = 0; i < count; i++) { - ByteBuf data = TestUtils.random(size); - long pos = WALUtil.alignLargeByBlockSize(size) * i; - writeAndFlush(channel, data, pos); - } - - channel.close(); - } - - @Test - public void testSingleThreadWriteComposite() throws IOException { - final int maxSize = 4096 * 4; - final int count = 100; - final int batch = 10; - final long capacity = WALUtil.alignLargeByBlockSize(maxSize) * count; - - WALBlockDeviceChannel channel = new WALBlockDeviceChannel(getTestPath(), capacity); - channel.open(); - - for (int i = 0; i < count; i += batch) { - CompositeByteBuf data = Unpooled.compositeBuffer(); - for (int j = 0; j < batch; j++) { - int size = ThreadLocalRandom.current().nextInt(1, maxSize); - data.addComponent(true, TestUtils.random(size)); - } - long pos = WALUtil.alignLargeByBlockSize(maxSize) * i; - writeAndFlush(channel, data, pos); - } - - channel.close(); - } - - @Test - public void testMultiThreadWrite() throws IOException, InterruptedException { - final int size = 4096 + 1; - final int count = 1000; - final int threads = 8; - final long capacity = WALUtil.alignLargeByBlockSize(size) * count; - - WALBlockDeviceChannel channel = new WALBlockDeviceChannel(getTestPath(), capacity); - channel.open(); - - ExecutorService executor = Threads.newFixedThreadPool(threads, - ThreadUtils.createThreadFactory("test-block-device-channel-write-%d", false), null); - for (int i = 0; i < count; i++) { - final int index = i; - executor.submit(() -> { - ByteBuf data = TestUtils.random(size); - long pos = WALUtil.alignLargeByBlockSize(size) * index; - try { - writeAndFlush(channel, data, pos); - } catch (IOException e) { - throw new RuntimeException(e); - } - }); - } - executor.shutdown(); - assertTrue(executor.awaitTermination(10, TimeUnit.SECONDS)); - - channel.close(); - } - - @Test - public void testWriteNotAlignedBufferSize() throws IOException { - WALBlockDeviceChannel channel = new WALBlockDeviceChannel(getTestPath(), 1 << 20); - channel.open(); - - ByteBuf data = TestUtils.random(42); - // It's ok to do this - assertDoesNotThrow(() -> writeAndFlush(channel, data, 0)); - - channel.close(); - } - - @Test - public void testWriteNotAlignedPosition() throws IOException { - WALBlockDeviceChannel channel = new WALBlockDeviceChannel(getTestPath(), 1 << 20); - channel.open(); - - ByteBuf data = TestUtils.random(4096); - assertThrows(AssertionError.class, () -> writeAndFlush(channel, data, 42)); - - channel.close(); - } - - @Test - public void testWriteOutOfBound() throws IOException { - WALBlockDeviceChannel channel = new WALBlockDeviceChannel(getTestPath(), 4096); - channel.open(); - - ByteBuf data = TestUtils.random(4096); - assertThrows(AssertionError.class, () -> writeAndFlush(channel, data, 8192)); - - channel.close(); - } - - @Test - public void testReadBasic() throws IOException { - final int size = 4096 + 1; - final int count = 100; - final long capacity = WALUtil.alignLargeByBlockSize(size) * count; - final String path = getTestPath(); - - WALBlockDeviceChannel wChannel = new WALBlockDeviceChannel(path, capacity); - wChannel.open(); - WALBlockDeviceChannel rChannel = new WALBlockDeviceChannel(path, capacity); - rChannel.open(); - - for (int i = 0; i < count; i++) { - ByteBuf data = TestUtils.random(size); - long pos = ThreadLocalRandom.current().nextLong(0, capacity - size); - pos = WALUtil.alignSmallByBlockSize(pos); - writeAndFlush(wChannel, data, pos); - - ByteBuf buf = Unpooled.buffer(size); - int read = rChannel.read(buf, pos); - assert read == size; - assert data.equals(buf); - } - - rChannel.close(); - wChannel.close(); - } - - @Test - public void testReadInside() throws IOException { - final int size = 4096 * 4 + 1; - final int count = 100; - final long capacity = WALUtil.alignLargeByBlockSize(size) * count; - final String path = getTestPath(); - - WALBlockDeviceChannel wChannel = new WALBlockDeviceChannel(path, capacity); - wChannel.open(); - WALBlockDeviceChannel rChannel = new WALBlockDeviceChannel(path, capacity); - rChannel.open(); - - for (int i = 0; i < count; i++) { - ByteBuf data = TestUtils.random(size); - long pos = ThreadLocalRandom.current().nextLong(0, capacity - size); - pos = WALUtil.alignSmallByBlockSize(pos); - writeAndFlush(wChannel, data, pos); - - int start = ThreadLocalRandom.current().nextInt(0, size - 1); - int end = ThreadLocalRandom.current().nextInt(start + 1, size); - ByteBuf buf = Unpooled.buffer(end - start); - int read = rChannel.read(buf, pos + start); - assert read == end - start; - assert data.slice(start, end - start).equals(buf); - } - - rChannel.close(); - wChannel.close(); - } - - @Test - public void testReadNotAlignedBufferSize() throws IOException { - WALBlockDeviceChannel channel = new WALBlockDeviceChannel(getTestPath(), 1 << 20); - channel.open(); - - ByteBuf data = Unpooled.buffer(42); - // It's ok to do this - assertDoesNotThrow(() -> channel.read(data, 0)); - - channel.close(); - } - - @Test - public void testReadNotAlignedPosition() throws IOException { - WALBlockDeviceChannel channel = new WALBlockDeviceChannel(getTestPath(), 1 << 20); - channel.open(); - - ByteBuf data = Unpooled.buffer(4096); - // It's ok to do this - assertDoesNotThrow(() -> channel.read(data, 42)); - - channel.close(); - } - - private void writeAndFlush(WALChannel channel, ByteBuf src, long position) throws IOException { - channel.write(src, position); - channel.flush(); - } -} diff --git a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALChannelTest.java b/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALChannelTest.java deleted file mode 100644 index cbe9e7aa2f..0000000000 --- a/s3stream/src/test/java/com/automq/stream/s3/wal/util/WALChannelTest.java +++ /dev/null @@ -1,92 +0,0 @@ -/* - * Copyright 2025, AutoMQ HK Limited. - * - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.automq.stream.s3.wal.util; - -import com.automq.stream.s3.TestUtils; - -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Tag; -import org.junit.jupiter.api.Test; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; - -@Tag("S3Unit") -public class WALChannelTest { - public static final String TEST_BLOCK_DEVICE_KEY = "WAL_TEST_BLOCK_DEVICE"; - - WALChannel walChannel; - - @BeforeEach - void setUp() { - walChannel = WALChannel.builder(String.format("%s/WALChannelUnitTest.data", TestUtils.tempFilePath())).direct(false).capacity(1024 * 1024 * 20).build(); - try { - walChannel.open(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @AfterEach - void tearDown() { - walChannel.close(); - } - - ByteBuffer createRandomTextByteBuffer(int size) { - ByteBuffer byteBuffer = ByteBuffer.allocate(size); - - for (int i = 0; i < size; i++) { - byteBuffer.put("ABCDEFGH".getBytes()[i % 8]); - } - - return byteBuffer.flip(); - } - - @Test - void testWriteAndRead() throws IOException { - ByteBuf data = TestUtils.random(1024 * 3); - for (int i = 0; i < 100; i++) { - try { - walChannel.write(data, (long) i * data.readableBytes()); - walChannel.flush(); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - final String content = "Hello World"; - walChannel.write(Unpooled.wrappedBuffer(content.getBytes()), 100); - walChannel.flush(); - - ByteBuf readBuffer = Unpooled.buffer(content.length()); - int read = walChannel.read(readBuffer, 100); - - String readString = new String(readBuffer.array()); - System.out.println(new String(readBuffer.array())); - System.out.println(read); - - assert read == content.length(); - assert readString.equals(content); - } -}