From 653240b149dae8fb396ba1b45991384e7c93d05f Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Fri, 12 May 2017 08:47:41 +0100 Subject: [PATCH] Improve shutdown sequence --- core/src/main/scala/kafka/server/KafkaServer.scala | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 431d192169f16..fdf837cb65627 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -589,28 +589,35 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP if (shutdownLatch.getCount > 0 && isShuttingDown.compareAndSet(false, true)) { CoreUtils.swallow(controlledShutdown()) brokerState.newState(BrokerShuttingDown) + if (socketServer != null) CoreUtils.swallow(socketServer.shutdown()) if (requestHandlerPool != null) CoreUtils.swallow(requestHandlerPool.shutdown()) + CoreUtils.swallow(kafkaScheduler.shutdown()) + if (apis != null) CoreUtils.swallow(apis.close()) CoreUtils.swallow(authorizer.foreach(_.close())) - if (replicaManager != null) - CoreUtils.swallow(replicaManager.shutdown()) if (adminManager != null) CoreUtils.swallow(adminManager.shutdown()) + if (transactionCoordinator != null) CoreUtils.swallow(transactionCoordinator.shutdown()) if (groupCoordinator != null) CoreUtils.swallow(groupCoordinator.shutdown()) + + if (replicaManager != null) + CoreUtils.swallow(replicaManager.shutdown()) if (logManager != null) CoreUtils.swallow(logManager.shutdown()) + if (kafkaController != null) CoreUtils.swallow(kafkaController.shutdown()) if (zkUtils != null) CoreUtils.swallow(zkUtils.close()) + if (metrics != null) CoreUtils.swallow(metrics.close())