From b0ed19863ff316f2f761a043166b49542cfa8398 Mon Sep 17 00:00:00 2001 From: Robin Han Date: Thu, 22 Feb 2024 15:34:58 +0800 Subject: [PATCH] fix(issues826): fix consume records leak in closing channel Signed-off-by: Robin Han --- .../org/apache/kafka/common/network/KafkaChannel.java | 8 ++++++++ .../java/org/apache/kafka/common/network/Selector.java | 1 + 2 files changed, 9 insertions(+) diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java index ea2751b38a..3970f3828b 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java +++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java @@ -202,6 +202,7 @@ public void prepare() throws AuthenticationException, IOException { } public void disconnect() { + release(); disconnected = true; if (state == ChannelState.NOT_CONNECTED && remoteAddress != null) { //if we captured the remote address we can provide more information @@ -210,6 +211,13 @@ public void disconnect() { transportLayer.disconnect(); } + private void release() { + if (send != null) { + send.release(); + } + waitingSend.forEach(NetworkSend::release); + } + public void state(ChannelState state) { this.state = state; } diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java index 67e27e9a29..57eb99c00d 100644 --- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java +++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java @@ -389,6 +389,7 @@ public void send(NetworkSend send) { if (closingChannels.containsKey(connectionId)) { // ensure notification via `disconnected`, leave channel in the state in which closing was triggered this.failedSends.add(connectionId); + send.release(); } else { try { channel.setSend(send);