diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index 4c94af6096..0d3d93e3a2 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -140,17 +140,35 @@ private CompletableFuture readAll0(FetchContext context, long startOffs if (nextFetchOffset >= endOffset) { return CompletableFuture.completedFuture(MemoryRecords.EMPTY); } - return fetch0(context, nextFetchOffset, endOffset, maxSize) - .thenApply(rst -> PooledMemoryRecords.of(baseOffset, rst, context.readOptions().pooledBuf())); + List results = new LinkedList<>(); + return fetch0(context, nextFetchOffset, endOffset, maxSize, results) + .whenComplete((nil, e) -> { + if (e != null) { + results.forEach(FetchResult::free); + results.clear(); + } + }) + .thenApply(nil -> PooledMemoryRecords.of(baseOffset, results, context.readOptions().pooledBuf())); } - private CompletableFuture> fetch0(FetchContext context, long startOffset, long endOffset, int maxSize) { + /** + * Fetch records from the {@link ElasticStreamSlice} + * + * @param context fetch context + * @param startOffset start offset + * @param endOffset end offset + * @param maxSize max size of the fetched records + * @param results result list to be filled + * @return a future that completes when reaching the end offset or the max size + */ + private CompletableFuture fetch0(FetchContext context, long startOffset, long endOffset, int maxSize, List results) { if (startOffset >= endOffset || maxSize <= 0) { - return CompletableFuture.completedFuture(new LinkedList<>()); + return CompletableFuture.completedFuture(null); } int adjustedMaxSize = Math.min(maxSize, 1024 * 1024); return streamSlice.fetch(context, startOffset, endOffset, adjustedMaxSize) .thenCompose(rst -> { + results.add(rst); long nextFetchOffset = startOffset; int readSize = 0; for (RecordBatchWithContext recordBatchWithContext : rst.recordBatchList()) { @@ -163,12 +181,7 @@ private CompletableFuture> fetch0(FetchContext context, } readSize += recordBatchWithContext.rawPayload().remaining(); } - return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize) - .thenApply(rstList -> { - // add to first since we need to reverse the order. - rstList.addFirst(rst); - return rstList; - }); + return fetch0(context, nextFetchOffset, endOffset, maxSize - readSize, results); }); }