From 36b9324bd9940a43e28f98d31e27db282836fe2a Mon Sep 17 00:00:00 2001 From: Robin Han Date: Tue, 2 Apr 2024 20:36:17 +0800 Subject: [PATCH] fix(issues1078): fix RequestChannel shutdown blocking Signed-off-by: Robin Han --- .../src/main/scala/kafka/network/RequestChannel.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala index 5fdc0ed427..8d64b66012 100644 --- a/core/src/main/scala/kafka/network/RequestChannel.scala +++ b/core/src/main/scala/kafka/network/RequestChannel.scala @@ -363,6 +363,7 @@ class RequestChannel(val queueSize: Int, private val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric) private val callbackQueue = new ArrayBlockingQueue[BaseRequest](queueSize) private val multiCallbackQueue = new java.util.ArrayList[ArrayBlockingQueue[BaseRequest]]() + private var notifiedShutdown = false metricsGroup.newGauge(requestQueueSizeMetricName, () => { if (multiRequestQueue.size() != 0) { @@ -544,8 +545,14 @@ class RequestChannel(val queueSize: Int, } def sendShutdownRequest(): Unit = { - requestQueue.put(ShutdownRequest) - multiRequestQueue.forEach(q => q.put(ShutdownRequest)) + if (multiRequestQueue.size() != 0) { + if (!notifiedShutdown) { + notifiedShutdown = true + multiRequestQueue.forEach(q => q.put(ShutdownRequest)) + } + } else { + requestQueue.put(ShutdownRequest) + } } def sendCallbackRequest(request: CallbackRequest): Unit = {