From 95779d7d509c012e9e3d1931fc3b6fd465075513 Mon Sep 17 00:00:00 2001 From: huxi Date: Thu, 20 Oct 2016 17:03:31 +0800 Subject: [PATCH] Fix for kafka-4295: kafka-console-consumer.sh does not delete the temporary group in zookeeper Since consumer stop logic and zk node removal code are in separate threads, so when two threads execute in an interleaving manner, persistent node '/consumers/' might not be removed for those console consumer groups which do not specify "group.id". This will pollute Zookeeper with lots of inactive console consumer offset information. --- .../main/scala/kafka/tools/ConsoleConsumer.scala | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 361bef29585b0..db8c8dc45fdfd 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -74,14 +74,9 @@ object ConsoleConsumer extends Logging { try { process(conf.maxMessages, conf.formatter, consumer, System.out, conf.skipMessageOnError) } finally { - consumer.cleanup() conf.formatter.close() reportRecordCount() - - // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack - if (!conf.groupIdPassed) - ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) - + shutDownConsumerAndRemoveItsConsumersNode(consumer, conf) shutdownLatch.countDown() } } @@ -103,7 +98,7 @@ object ConsoleConsumer extends Logging { def addShutdownHook(consumer: BaseConsumer, conf: ConsumerConfig) { Runtime.getRuntime.addShutdownHook(new Thread() { override def run() { - consumer.stop() + shutDownConsumerAndRemoveItsConsumersNode(consumer, conf) shutdownLatch.await() @@ -114,6 +109,13 @@ object ConsoleConsumer extends Logging { }) } + def shutDownConsumerAndRemoveItsConsumersNode(consumer: BaseConsumer, conf: ConsumerConfig): Unit = { + consumer.stop(); + // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack + if (!conf.groupIdPassed) + ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) + } + def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer, output: PrintStream, skipMessageOnError: Boolean) { while (messageCount < maxMessages || maxMessages == -1) { val msg: BaseConsumerRecord = try {