diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java index c48a28fd40..cff9534132 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java @@ -28,8 +28,6 @@ import com.automq.stream.utils.LogSuppressor; import com.automq.stream.utils.threads.EventLoop; import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; @@ -71,7 +69,7 @@ public class StreamReader { private final Function objectReaderFactory; private final DataBlockCache dataBlockCache; long nextReadOffset; - private CompletableFuture> inflightLoadIndexCf; + private CompletableFuture inflightLoadIndexCf; private long lastAccessTimestamp = System.currentTimeMillis(); private boolean closed = false; @@ -142,7 +140,7 @@ public void close() { blocksMap.forEach((k, v) -> v.markRead()); } - void read0(ReadContext ctx, long startOffset, long endOffset, int maxBytes) { + void read0(ReadContext ctx, final long startOffset, final long endOffset, final int maxBytes) { // 1. get blocks CompletableFuture> getBlocksCf = getBlocks(startOffset, endOffset, maxBytes, false); @@ -170,6 +168,11 @@ void read0(ReadContext ctx, long startOffset, long endOffset, int maxBytes) { ctx.cf.completeExceptionally(failedBlock.get().exception); return; } + if (blocks.isEmpty()) { + ctx.cf.completeExceptionally(new AutoMQException(String.format("[UNEXPECTED] streamId=%d Get empty blocks [%s, %s) %s", + streamId, startOffset, endOffset, maxBytes))); + return; + } ctx.blocks.addAll(blocks); int remainingSize = maxBytes; long nextStartOffset = startOffset; @@ -260,7 +263,7 @@ private CompletableFuture> getBlocks(long startOffset, long endOffse private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset, int maxBytes) { Long floorKey = blocksMap.floorKey(startOffset); - CompletableFuture> loadMoreBlocksCf; + CompletableFuture loadMoreBlocksCf; int remainingSize = maxBytes; if (floorKey == null || startOffset >= loadedBlockIndexEndOffset) { loadMoreBlocksCf = loadMoreBlocksWithoutData(endOffset); @@ -299,17 +302,23 @@ private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset, } int finalRemainingSize = remainingSize; // use async to prevent recursive call cause stack overflow - loadMoreBlocksCf.thenAcceptAsync(rst -> { - if (rst.isEmpty()) { - // it's already load to the end - if (endOffset != -1L && endOffset > loadedBlockIndexEndOffset) { + loadMoreBlocksCf.thenAcceptAsync(moreBlocks -> { + boolean readMore = false; + if (ctx.readahead) { + // If #loadMoreBlocksWithoutData result is empty, it means the stream is already loads to the end. + if (moreBlocks) { + readMore = true; + } + } else { + if (!moreBlocks && endOffset > loadedBlockIndexEndOffset) { String errMsg = String.format("[BUG] streamId=%s expect load blocks to endOffset=%s, " + "current loadedBlockIndexEndOffset=%s", streamId, endOffset, loadedBlockIndexEndOffset); ctx.cf.completeExceptionally(new AutoMQException(errMsg)); } else { - ctx.cf.complete(ctx.blocks); + readMore = true; } - } else { + } + if (readMore) { long nextStartOffset = ctx.blocks.isEmpty() ? startOffset : ctx.blocks.get(ctx.blocks.size() - 1).index.endOffset(); getBlocks0(ctx, nextStartOffset, endOffset, finalRemainingSize); } @@ -322,20 +331,24 @@ private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset, /** * Load more block indexes * - * @return new block indexes + * @return whether load more blocks */ - private CompletableFuture> loadMoreBlocksWithoutData(long endOffset) { + private CompletableFuture loadMoreBlocksWithoutData(long endOffset) { + long oldLoadedBlockIndexEndOffset = loadedBlockIndexEndOffset; + return loadMoreBlocksWithoutData0(endOffset).thenApply(nil -> loadedBlockIndexEndOffset != oldLoadedBlockIndexEndOffset); + } + + private CompletableFuture loadMoreBlocksWithoutData0(long endOffset) { if (inflightLoadIndexCf != null) { - return inflightLoadIndexCf.thenCompose(rst -> loadMoreBlocksWithoutData(endOffset)); + return inflightLoadIndexCf.thenCompose(rst -> loadMoreBlocksWithoutData0(endOffset)); } if (endOffset != -1L && endOffset <= loadedBlockIndexEndOffset) { - return CompletableFuture.completedFuture(Collections.emptyMap()); + return CompletableFuture.completedFuture(null); } inflightLoadIndexCf = new CompletableFuture<>(); long nextLoadingOffset = calWindowBlocksEndOffset(); AtomicLong nextFindStartOffset = new AtomicLong(nextLoadingOffset); - Map newDataBlockIndex = new HashMap<>(); TimerUtil time = new TimerUtil(); // 1. get objects CompletableFuture> getObjectsCf = objectManager.getObjects(streamId, nextLoadingOffset, endOffset, GET_OBJECT_STEP); @@ -362,7 +375,6 @@ private CompletableFuture> loadMoreBlocksWithoutData(long endOf // After object compaction, the blocks get from different objectManager#getObjects maybe not continuous. throw new BlockNotContinuousException(); } - newDataBlockIndex.put(objectMetadata.objectId(), block); nextFindStartOffset.set(streamDataBlock.getEndOffset()); }), eventLoop @@ -377,9 +389,9 @@ private CompletableFuture> loadMoreBlocksWithoutData(long endOf return; } StorageOperationStats.getInstance().getIndicesTimeFindIndexStats.record(time.elapsedAs(TimeUnit.NANOSECONDS)); - CompletableFuture> cf = inflightLoadIndexCf; + CompletableFuture cf = inflightLoadIndexCf; inflightLoadIndexCf = null; - cf.complete(newDataBlockIndex); + cf.complete(null); }, eventLoop); return inflightLoadIndexCf; }