diff --git a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala index c41f37d17e..79f18064af 100644 --- a/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala +++ b/core/src/main/scala/kafka/log/streamaspect/ElasticLogManager.scala @@ -191,7 +191,7 @@ object ElasticLogManager { INSTANCE.get.newSegment(topicPartition, baseOffset, time, fileSuffix) } - def shutdownNow(): Unit = { + def shutdown(): Unit = { INSTANCE.foreach(_.shutdownNow()) } } diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala index 75663b1200..f7116dd5ff 100644 --- a/core/src/main/scala/kafka/server/BrokerServer.scala +++ b/core/src/main/scala/kafka/server/BrokerServer.scala @@ -602,8 +602,6 @@ class BrokerServer( if (logManager != null) CoreUtils.swallow(logManager.shutdown(), this) - CoreUtils.swallow(ElasticLogManager.shutdownNow(), this) - // log manager need clientToControllerChannelManager to send request to controller. if (clientToControllerChannelManager != null) CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this) diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala index 8752f9ccde..e0f337dafa 100644 --- a/core/src/main/scala/kafka/server/ReplicaManager.scala +++ b/core/src/main/scala/kafka/server/ReplicaManager.scala @@ -2769,8 +2769,12 @@ class ReplicaManager(val config: KafkaConfig, info(s"try force stop partitions ${partitionsToClose.keys}") stopPartitions(partitionsToClose) } - checkAllPartitionClosed() + while (!checkAllPartitionClosed() && (System.currentTimeMillis() - start) < 30000) { + info("still has opening partition, retry check later") + Thread.sleep(1000) + } info("await all partitions closed") + CoreUtils.swallow(ElasticLogManager.shutdown(), this) } def checkAllPartitionClosed(): Boolean = {