Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,8 @@ public long size() {
return send.size();
}

@Override
public void release() {
send.release();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,6 @@ public interface Send {
*/
long size();

default void release() {}

}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Encapsulation for holding records that require down-conversion in a lazy, chunked manner (KIP-283). See
* {@link LazyDownConversionRecordsSend} for the actual chunked send implementation.
*/
public class LazyDownConversionRecords implements BaseRecords {
public class LazyDownConversionRecords implements BaseRecords, PooledResource {
private final TopicPartition topicPartition;
private final Records records;
private final byte toMagic;
Expand Down Expand Up @@ -124,6 +124,13 @@ public java.util.Iterator<ConvertedRecords<?>> iterator(long maximumReadSize) {
return new Iterator(records, maximumReadSize, firstBatch);
}

@Override
public void release() {
if (records instanceof PooledResource) {
((PooledResource) records).release();
}
}

/**
* Implementation for being able to iterate over down-converted records. Goal of this implementation is to keep
* it as memory-efficient as possible by not having to maintain all down-converted records in-memory. Maintains
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Queue;

Expand All @@ -35,6 +37,7 @@
public class MultiRecordsSend implements Send {
private static final Logger log = LoggerFactory.getLogger(MultiRecordsSend.class);

private final List<Send> sendList;
private final Queue<Send> sendQueue;
private final long size;
private Map<TopicPartition, RecordConversionStats> recordConversionStats;
Expand All @@ -47,6 +50,7 @@ public class MultiRecordsSend implements Send {
* progresses (on completion, it will be empty).
*/
public MultiRecordsSend(Queue<Send> sends) {
this.sendList = new ArrayList<>(sends);
this.sendQueue = sends;

long size = 0;
Expand All @@ -58,6 +62,7 @@ public MultiRecordsSend(Queue<Send> sends) {
}

public MultiRecordsSend(Queue<Send> sends, long size) {
this.sendList = new ArrayList<>(sends);
this.sendQueue = sends;
this.size = size;
this.current = sendQueue.poll();
Expand Down Expand Up @@ -138,4 +143,9 @@ private void updateRecordConversionStats(Send completedSend) {
recordConversionStats.put(lazyRecordsSend.topicPartition(), lazyRecordsSend.recordConversionStats());
}
}

@Override
public void release() {
sendList.forEach(Send::release);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,7 @@ protected RecordsSend(T records, int maxBytesToWrite) {

@Override
public boolean completed() {
boolean completed = remaining <= 0 && !pending;
if (completed) {
release();
}
return completed;
return remaining <= 0 && !pending;
}

@Override
Expand Down Expand Up @@ -86,7 +82,7 @@ protected T records() {
*/
protected abstract long writeTo(TransferableChannel channel, long previouslyWritten, int remaining) throws IOException;

private void release() {
public void release() {
if (records instanceof PooledResource) {
((PooledResource) records).release();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,20 +101,20 @@ public long appendedOffset() {

public CompletableFuture<Records> read(long startOffset, long maxOffset, int maxSize) {
if (ReadHint.isReadAll()) {
return readAll0(startOffset, maxOffset, maxSize);
ReadOptions readOptions = ReadOptions.builder().fastRead(ReadHint.isFastRead()).pooledBuf(true).build();
return readAll0(startOffset, maxOffset, maxSize, readOptions);
} else {
return CompletableFuture.completedFuture(new BatchIteratorRecordsAdaptor(this, startOffset, maxOffset, maxSize));
}
}

private CompletableFuture<Records> readAll0(long startOffset, long maxOffset, int maxSize) {
private CompletableFuture<Records> readAll0(long startOffset, long maxOffset, int maxSize, ReadOptions readOptions) {
// calculate the relative offset in the segment, which may start from 0.
long nextFetchOffset = startOffset - baseOffset;
long endOffset = Utils.min(this.committedOffset.get(), maxOffset) - baseOffset;
if (nextFetchOffset >= endOffset) {
return CompletableFuture.completedFuture(null);
}
ReadOptions readOptions = ReadOptions.builder().fastRead(ReadHint.isFastRead()).build();
return fetch0(nextFetchOffset, endOffset, maxSize, readOptions)
.thenApply(PooledMemoryRecords::of);
}
Expand Down Expand Up @@ -463,7 +463,7 @@ private void ensureAllLoaded() throws IOException {
}
Records records = null;
try {
records = elasticLogFileRecords.readAll0(startOffset, maxOffset, fetchSize).get();
records = elasticLogFileRecords.readAll0(startOffset, maxOffset, fetchSize, ReadOptions.DEFAULT).get();
} catch (Throwable t) {
throw new IOException(FutureUtil.cause(t));
}
Expand Down
9 changes: 8 additions & 1 deletion core/src/main/scala/kafka/network/SocketServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1052,6 +1052,8 @@ private[kafka] class Processor(
val send = new NetworkSend(connectionId, responseSend)
selector.send(send)
inflightResponses.put(send, response)
} else {
responseSend.release()
}
}

Expand Down Expand Up @@ -1144,6 +1146,7 @@ private[kafka] class Processor(
if (response == null) {
throw new IllegalStateException(s"Send for ${send.destinationId} completed, but not in `inflightResponses`")
}
send.release()
updateRequestMetrics(response)

// Invoke send completion callback
Expand Down Expand Up @@ -1179,7 +1182,11 @@ private[kafka] class Processor(
val remoteHost = ConnectionId.fromString(connectionId).getOrElse {
throw new IllegalStateException(s"connectionId has unexpected format: $connectionId")
}.remoteHost
inflightResponses.entrySet().removeIf(e => connectionId.equals(e.getValue.request.context.connectionId))
inflightResponses.entrySet().removeIf(e => {
val remove = connectionId.equals(e.getValue.request.context.connectionId)
e.getKey.release()
remove
})
// the channel has been closed by the selector but the quotas still need to be updated
connectionQuotas.dec(listenerName, InetAddress.getByName(remoteHost))
} catch {
Expand Down
9 changes: 9 additions & 0 deletions core/src/main/scala/kafka/server/KafkaApis.scala
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,14 @@ class KafkaApis(val requestChannel: RequestChannel,
}
}

def release(): Unit = {
partitions.values().forEach(data => {
if (data.records() != null) {
data.records().asInstanceOf[PooledResource].release()
}
})
}

if (fetchRequest.isFromFollower) {
// We've already evaluated against the quota and are good to go. Just need to record it now.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
Expand Down Expand Up @@ -1028,6 +1036,7 @@ class KafkaApis(val requestChannel: RequestChannel,
}
// If throttling is required, return an empty response.
unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
release()
} else {
// Get the actual response. This will update the fetch context.
unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
Expand Down
11 changes: 11 additions & 0 deletions core/src/main/scala/kafka/server/ReplicaManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1388,6 +1388,15 @@ class ReplicaManager(val config: KafkaConfig,
// Note that the use of limitBytes and minOneMessage parameters have been changed here.
val limitBytes = params.maxBytes
val result = new mutable.ArrayBuffer[(TopicIdPartition, LogReadResult)]

def release(): Unit = {
result.foreach { case (_, logReadResult) =>
if (logReadResult.info != null && logReadResult.info.records != null && logReadResult.info.records.isInstanceOf[PooledResource]) {
logReadResult.info.records.asInstanceOf[PooledResource].release()
}
}
}

val minOneMessage = !params.hardMaxBytesLimit

val remainingBytes = new AtomicInteger(limitBytes)
Expand All @@ -1413,6 +1422,7 @@ class ReplicaManager(val config: KafkaConfig,
CompletableFuture.allOf(readCfArray.toArray: _*).get()
}
if (fastReadFastFail.get() != null) {
release()
throw fastReadFastFail.get()
}

Expand All @@ -1433,6 +1443,7 @@ class ReplicaManager(val config: KafkaConfig,
}
CompletableFuture.allOf(remainingCfArray.toArray: _*).get()
if (fastReadFastFail.get() != null) {
release()
throw fastReadFastFail.get()
}
result
Expand Down
2 changes: 1 addition & 1 deletion gradle/dependencies.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ versions += [
zookeeper: "3.6.3",
zstd: "1.5.2-1",
commonLang: "3.12.0",
s3stream: "0.6.5-SNAPSHOT",
s3stream: "0.6.6-SNAPSHOT",
]
libs += [
activation: "javax.activation:activation:$versions.activation",
Expand Down