diff --git a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java index 491b5dda46..fb564acd96 100644 --- a/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java +++ b/s3stream/src/main/java/com/automq/stream/s3/cache/blockcache/StreamReader.java @@ -303,25 +303,22 @@ private void getBlocks0(GetBlocksContext ctx, long startOffset, long endOffset, int finalRemainingSize = remainingSize; // use async to prevent recursive call cause stack overflow loadMoreBlocksCf.thenAcceptAsync(moreBlocks -> { - boolean readMore = false; if (ctx.readahead) { // If #loadMoreBlocksWithoutData result is empty, it means the stream is already loads to the end. - if (moreBlocks) { - readMore = true; + if (!moreBlocks) { + ctx.cf.complete(ctx.blocks); + return; } } else { if (!moreBlocks && endOffset > loadedBlockIndexEndOffset) { String errMsg = String.format("[BUG] streamId=%s expect load blocks to endOffset=%s, " + "current loadedBlockIndexEndOffset=%s", streamId, endOffset, loadedBlockIndexEndOffset); ctx.cf.completeExceptionally(new AutoMQException(errMsg)); - } else { - readMore = true; + return; } } - if (readMore) { - long nextStartOffset = ctx.blocks.isEmpty() ? startOffset : ctx.blocks.get(ctx.blocks.size() - 1).index.endOffset(); - getBlocks0(ctx, nextStartOffset, endOffset, finalRemainingSize); - } + long nextStartOffset = ctx.blocks.isEmpty() ? startOffset : ctx.blocks.get(ctx.blocks.size() - 1).index.endOffset(); + getBlocks0(ctx, nextStartOffset, endOffset, finalRemainingSize); }, eventLoop).exceptionally(ex -> { ctx.cf.completeExceptionally(ex); return null;