diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index a15b9d875d..530625a740 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -18,7 +18,9 @@ package kafka.log.streamaspect; import com.automq.stream.api.ReadOptions; +import com.automq.stream.s3.DirectByteBufAlloc; import com.automq.stream.utils.FutureUtil; +import io.netty.buffer.ByteBuf; import io.netty.buffer.CompositeByteBuf; import io.netty.buffer.Unpooled; import com.automq.stream.api.FetchResult; @@ -276,21 +278,29 @@ protected RecordBatchIterator batchIterator(long startOffset, long } public static class PooledMemoryRecords extends AbstractRecords implements PooledResource { - private final List fetchResults; + private final ByteBuf pack; private final MemoryRecords memoryRecords; private final long lastOffset; private PooledMemoryRecords(List fetchResults) { - this.fetchResults = fetchResults; - CompositeByteBuf compositeByteBuf = Unpooled.compositeBuffer(Integer.MAX_VALUE); long lastOffset = 0; + int size = 0; for (FetchResult fetchResult : fetchResults) { for (RecordBatchWithContext recordBatchWithContext : fetchResult.recordBatchList()) { - compositeByteBuf.addComponent(true, Unpooled.wrappedBuffer(recordBatchWithContext.rawPayload())); + size += recordBatchWithContext.rawPayload().remaining(); lastOffset = recordBatchWithContext.lastOffset(); } } - this.memoryRecords = MemoryRecords.readableRecords(compositeByteBuf.nioBuffer()); + // TODO: create a new ByteBufMemoryRecords data struct to avoid copy + this.pack = DirectByteBufAlloc.byteBuffer(size); + for (FetchResult fetchResult : fetchResults) { + for (RecordBatchWithContext recordBatchWithContext : fetchResult.recordBatchList()) { + pack.writeBytes(recordBatchWithContext.rawPayload()); + } + } + fetchResults.forEach(FetchResult::free); + fetchResults.clear(); + this.memoryRecords = MemoryRecords.readableRecords(pack.nioBuffer()); this.lastOffset = lastOffset; } @@ -325,8 +335,7 @@ public long writeTo(TransferableChannel channel, long position, int length) thro @Override public void release() { - fetchResults.forEach(FetchResult::free); - fetchResults.clear(); + pack.release(); } public long lastOffset() { diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index f5567d9ac5..8752f9ccde 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2754,8 +2754,8 @@ class ReplicaManager(val config: KafkaConfig, def awaitAllPartitionShutdown(): Unit = { val start = System.currentTimeMillis() - // await 5s partitions transfer to other alive brokers. - // when there are no alive brokers, it will still await 5s. + // await 15s partitions transfer to other alive brokers. + // when there are no alive brokers, it will still await 15s. while (!checkAllPartitionClosed() && (System.currentTimeMillis() - start) < 15000) { info("still has opening partition, retry check later") Thread.sleep(1000)