From 95d0bbfdf95ae3249d2583f66e1e5e8f9a1a9bad Mon Sep 17 00:00:00 2001 From: Robin Han Date: Fri, 1 Dec 2023 18:15:47 +0800 Subject: [PATCH] feat(issues500): fetch use direct bytebuf Signed-off-by: Robin Han --- .../org/apache/kafka/common/network/NetworkSend.java | 4 ++++ .../java/org/apache/kafka/common/network/Send.java | 2 ++ .../common/record/LazyDownConversionRecords.java | 9 ++++++++- .../apache/kafka/common/record/MultiRecordsSend.java | 10 ++++++++++ .../org/apache/kafka/common/record/RecordsSend.java | 8 ++------ .../kafka/log/streamaspect/ElasticLogFileRecords.java | 8 ++++---- core/src/main/scala/kafka/network/SocketServer.scala | 9 ++++++++- core/src/main/scala/kafka/server/KafkaApis.scala | 9 +++++++++ core/src/main/scala/kafka/server/ReplicaManager.scala | 11 +++++++++++ gradle/dependencies.gradle | 2 +- 10 files changed, 59 insertions(+), 13 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java index 2a51a56932..7861b3e0b3 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java +++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkSend.java @@ -46,4 +46,8 @@ public long size() { return send.size(); } + @Override + public void release() { + send.release(); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Send.java b/clients/src/main/java/org/apache/kafka/common/network/Send.java index 1a7b0a906d..2442b0f652 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Send.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Send.java @@ -42,4 +42,6 @@ public interface Send { */ long size(); + default void release() {} + } diff --git a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java index 56ef8e1609..01704f1aa0 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/LazyDownConversionRecords.java @@ -28,7 +28,7 @@ * Encapsulation for holding records that require down-conversion in a lazy, chunked manner (KIP-283). See * {@link LazyDownConversionRecordsSend} for the actual chunked send implementation. */ -public class LazyDownConversionRecords implements BaseRecords { +public class LazyDownConversionRecords implements BaseRecords, PooledResource { private final TopicPartition topicPartition; private final Records records; private final byte toMagic; @@ -124,6 +124,13 @@ public java.util.Iterator> iterator(long maximumReadSize) { return new Iterator(records, maximumReadSize, firstBatch); } + @Override + public void release() { + if (records instanceof PooledResource) { + ((PooledResource) records).release(); + } + } + /** * Implementation for being able to iterate over down-converted records. Goal of this implementation is to keep * it as memory-efficient as possible by not having to maintain all down-converted records in-memory. Maintains diff --git a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java index 22883b278a..85b4c92f0a 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MultiRecordsSend.java @@ -25,7 +25,9 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Queue; @@ -35,6 +37,7 @@ public class MultiRecordsSend implements Send { private static final Logger log = LoggerFactory.getLogger(MultiRecordsSend.class); + private final List sendList; private final Queue sendQueue; private final long size; private Map recordConversionStats; @@ -47,6 +50,7 @@ public class MultiRecordsSend implements Send { * progresses (on completion, it will be empty). */ public MultiRecordsSend(Queue sends) { + this.sendList = new ArrayList<>(sends); this.sendQueue = sends; long size = 0; @@ -58,6 +62,7 @@ public MultiRecordsSend(Queue sends) { } public MultiRecordsSend(Queue sends, long size) { + this.sendList = new ArrayList<>(sends); this.sendQueue = sends; this.size = size; this.current = sendQueue.poll(); @@ -138,4 +143,9 @@ private void updateRecordConversionStats(Send completedSend) { recordConversionStats.put(lazyRecordsSend.topicPartition(), lazyRecordsSend.recordConversionStats()); } } + + @Override + public void release() { + sendList.forEach(Send::release); + } } diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java index 32d9051df2..11ddee0659 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java +++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsSend.java @@ -39,11 +39,7 @@ protected RecordsSend(T records, int maxBytesToWrite) { @Override public boolean completed() { - boolean completed = remaining <= 0 && !pending; - if (completed) { - release(); - } - return completed; + return remaining <= 0 && !pending; } @Override @@ -86,7 +82,7 @@ protected T records() { */ protected abstract long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException; - private void release() { + public void release() { if (records instanceof PooledResource) { ((PooledResource) records).release(); } diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java index 1b1f377949..09545d1fa3 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogFileRecords.java @@ -101,20 +101,20 @@ public long appendedOffset() { public CompletableFuture read(long startOffset, long maxOffset, int maxSize) { if (ReadHint.isReadAll()) { - return readAll0(startOffset, maxOffset, maxSize); + ReadOptions readOptions = ReadOptions.builder().fastRead(ReadHint.isFastRead()).pooledBuf(true).build(); + return readAll0(startOffset, maxOffset, maxSize, readOptions); } else { return CompletableFuture.completedFuture(new BatchIteratorRecordsAdaptor(this, startOffset, maxOffset, maxSize)); } } - private CompletableFuture readAll0(long startOffset, long maxOffset, int maxSize) { + private CompletableFuture readAll0(long startOffset, long maxOffset, int maxSize, ReadOptions readOptions) { // calculate the relative offset in the segment, which may start from 0. long nextFetchOffset = startOffset - baseOffset; long endOffset = Utils.min(this.committedOffset.get(), maxOffset) - baseOffset; if (nextFetchOffset >= endOffset) { return CompletableFuture.completedFuture(null); } - ReadOptions readOptions = ReadOptions.builder().fastRead(ReadHint.isFastRead()).build(); return fetch0(nextFetchOffset, endOffset, maxSize, readOptions) .thenApply(PooledMemoryRecords::of); } @@ -463,7 +463,7 @@ private void ensureAllLoaded() throws IOException { } Records records = null; try { - records = elasticLogFileRecords.readAll0(startOffset, maxOffset, fetchSize).get(); + records = elasticLogFileRecords.readAll0(startOffset, maxOffset, fetchSize, ReadOptions.DEFAULT).get(); } catch (Throwable t) { throw new IOException(FutureUtil.cause(t)); } diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index bb4395a31c..28bff1d5dd 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -1052,6 +1052,8 @@ private[kafka] class Processor( val send = new NetworkSend(connectionId, responseSend) selector.send(send) inflightResponses.put(send, response) + } else { + responseSend.release() } } @@ -1144,6 +1146,7 @@ private[kafka] class Processor( if (response == null) { throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`") } + send.release() updateRequestMetrics(response) // Invoke send completion callback @@ -1179,7 +1182,11 @@ private[kafka] class Processor( val remoteHost = ConnectionId.fromString(connectionId).getOrElse { throw new IllegalStateException(s"connectionId has unexpected format: $connectionId") }.remoteHost - inflightResponses.entrySet().removeIf(e => connectionId.equals(e.getValue.request.context.connectionId)) + inflightResponses.entrySet().removeIf(e => { + val remove = connectionId.equals(e.getValue.request.context.connectionId) + e.getKey.release() + remove + }) // the channel has been closed by the selector but the quotas still need to be updated connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost)) } catch { diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index 636cafad19..b72eccbe5f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -993,6 +993,14 @@ class KafkaApis(val requestChannel: RequestChannel, } } + def release(): Unit = { + partitions.values().forEach(data => { + if (data.records() != null) { + data.records().asInstanceOf[PooledResource].release() + } + }) + } + if (fetchRequest.isFromFollower) { // We've already evaluated against the quota and are good to go. Just need to record it now. unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) @@ -1028,6 +1036,7 @@ class KafkaApis(val requestChannel: RequestChannel, } // If throttling is required, return an empty response. unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs) + release() } else { // Get the actual response. This will update the fetch context. unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 69e52e29ff..3a66df7ae0 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -1388,6 +1388,15 @@ class ReplicaManager(val config: KafkaConfig, // Note that the use of limitBytes and minOneMessage parameters have been changed here. val limitBytes = params.maxBytes val result = new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)] + + def release(): Unit = { + result.foreach { case (_, logReadResult) => + if (logReadResult.info != null && logReadResult.info.records != null && logReadResult.info.records.isInstanceOf[PooledResource]) { + logReadResult.info.records.asInstanceOf[PooledResource].release() + } + } + } + val minOneMessage = !params.hardMaxBytesLimit val remainingBytes = new AtomicInteger(limitBytes) @@ -1413,6 +1422,7 @@ class ReplicaManager(val config: KafkaConfig, CompletableFuture.allOf(readCfArray.toArray: _*).get() } if (fastReadFastFail.get() != null) { + release() throw fastReadFastFail.get() } @@ -1433,6 +1443,7 @@ class ReplicaManager(val config: KafkaConfig, } CompletableFuture.allOf(remainingCfArray.toArray: _*).get() if (fastReadFastFail.get() != null) { + release() throw fastReadFastFail.get() } result diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index bda20cb329..182e0bbb01 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -128,7 +128,7 @@ versions += [ zookeeper: "3.6.3", zstd: "1.5.2-1", commonLang: "3.12.0", - s3stream: "0.6.5-SNAPSHOT", + s3stream: "0.6.6-SNAPSHOT", ] libs += [ activation: "javax.activation:activation:$versions.activation",