From 40760c05c94d6cdd991c0a261dde55df292c5fed Mon Sep 17 00:00:00 2001 From: Robin Han Date: Sat, 27 Sep 2025 15:49:37 +0800 Subject: [PATCH] fix(issue2902): change Verification.waitingRequests to unbounded queue to fix the deadlock Signed-off-by: Robin Han --- core/src/main/scala/kafka/network/SocketServer.scala | 2 +- core/src/main/scala/kafka/server/ReplicaManager.scala | 5 +++-- .../server/streamaspect/ElasticReplicaManager.scala | 9 ++++++--- .../org/apache/kafka/network/SocketServerConfigs.java | 4 ++++ 4 files changed, 14 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala index b9e2990f8a..cdbacf5d2d 100644 --- a/core/src/main/scala/kafka/network/SocketServer.scala +++ b/core/src/main/scala/kafka/network/SocketServer.scala @@ -1183,7 +1183,7 @@ private[kafka] class Processor( // AutoMQ will pipeline the requests to accelerate the performance and also keep the request order. // Mute the channel if the inflight requests exceed the threshold. - if (channelContext.nextCorrelationId.size() >= 64 && !channel.isMuted) { + if (channelContext.nextCorrelationId.size() >= SocketServerConfigs.MAX_INFLIGHT_REQUESTS_PER_CONNECTION && !channel.isMuted) { if (isTraceEnabled) { trace(s"Mute channel ${channel.id} because the inflight requests exceed the threshold, inflight count is ${channelContext.nextCorrelationId.size()}.") } diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 750a3f280b..9b4d8c6886 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -67,7 +67,7 @@ import java.nio.file.{Files, Paths} import java.util import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong} import java.util.concurrent.locks.Lock -import java.util.concurrent.{ArrayBlockingQueue, CompletableFuture, Future, RejectedExecutionException, TimeUnit} +import java.util.concurrent.{CompletableFuture, Future, LinkedBlockingQueue, RejectedExecutionException, TimeUnit} import java.util.{Collections, Optional, OptionalInt, OptionalLong} import scala.collection.{Map, Seq, Set, mutable} import scala.compat.java8.OptionConverters._ @@ -942,7 +942,8 @@ class ReplicaManager(val config: KafkaConfig, case class Verification( hasInflight: AtomicBoolean, - waitingRequests: ArrayBlockingQueue[TransactionVerificationRequest], + // Use an unbounded queue to prevent deadlock from happening. See https://github.com/AutoMQ/automq/issues/2902 + waitingRequests: LinkedBlockingQueue[TransactionVerificationRequest], timestamp: AtomicLong, ) diff --git a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala index bee6265809..0cd75c3184 100644 --- a/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala +++ b/core/src/main/scala/kafka/server/streamaspect/ElasticReplicaManager.scala @@ -1438,7 +1438,7 @@ class ElasticReplicaManager( transactionWaitingForValidationMap.computeIfAbsent(producerId, _ => { Verification( new AtomicBoolean(false), - new ArrayBlockingQueue[TransactionVerificationRequest](5), + new LinkedBlockingQueue[TransactionVerificationRequest](), new AtomicLong(time.milliseconds())) }) } else { @@ -1474,17 +1474,20 @@ class ElasticReplicaManager( error("Error in transaction verification callback", e) } if (verification != null) { + var request: TransactionVerificationRequest = null verification.synchronized { verification.timestamp.set(time.milliseconds()) if (!verification.waitingRequests.isEmpty) { // Since the callback thread and task thread may be different, we need to ensure that the tasks are executed sequentially. - val request = verification.waitingRequests.poll() - request.task() + request = verification.waitingRequests.poll() } else { // If there are no tasks in the queue, set hasInflight to false verification.hasInflight.set(false) } } + if (request != null) { + request.task() + } val lastCleanTimestamp = lastTransactionCleanTimestamp.get(); val now = time.milliseconds() if (now - lastCleanTimestamp > 60 * 1000 && lastTransactionCleanTimestamp.compareAndSet(lastCleanTimestamp, now)) { diff --git a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java index b67527b28f..7ae04d7541 100644 --- a/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java +++ b/server/src/main/java/org/apache/kafka/network/SocketServerConfigs.java @@ -183,6 +183,10 @@ public class SocketServerConfigs { public static final int NUM_NETWORK_THREADS_DEFAULT = 3; public static final String NUM_NETWORK_THREADS_DOC = "The number of threads that the server uses for receiving requests from the network and sending responses to the network. Noted: each listener (except for controller listener) creates its own thread pool."; + // AutoMQ inject start + public static final int MAX_INFLIGHT_REQUESTS_PER_CONNECTION = 64; + // AutoMQ inject end + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(LISTENERS_CONFIG, STRING, LISTENERS_DEFAULT, HIGH, LISTENERS_DOC) .define(ADVERTISED_LISTENERS_CONFIG, STRING, null, HIGH, ADVERTISED_LISTENERS_DOC)