From 7b8e1ec087fbf7aa7796df7606480ff78f5a94e3 Mon Sep 17 00:00:00 2001 From: Sahil Aggarwal Date: Thu, 22 Feb 2018 16:29:16 +0530 Subject: [PATCH] KAFKA-6581: Fix the ConsumerGroupCommand indefinite execution if one of the partition is unavailable. * Checks if partition available before calling consumer.position * Adds timeout on consumer.position() call. --- .../kafka/admin/ConsumerGroupCommand.scala | 65 +++++++++++++++---- 1 file changed, 54 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala index 03864048c1f8..707d70d486d5 100755 --- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala +++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala @@ -18,6 +18,7 @@ package kafka.admin import java.util.Properties +import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask} import joptsimple.{OptionParser, OptionSpec} import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo} @@ -35,10 +36,16 @@ import org.apache.kafka.common.security.JaasUtils import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.utils.Utils +import scala.concurrent._ +import ExecutionContext.Implicits.global import scala.collection.JavaConverters._ import scala.collection.{Set, mutable} +import scala.concurrent.{Await, Future, TimeoutException} +import scala.concurrent.duration.Duration +import scala.util.{Failure, Success, Try} object ConsumerGroupCommand { + val DEFAULT_CONSUMER_POSITION_TIMEOUT = 500 def main(args: Array[String]) { val opts = new ConsumerGroupCommandOptions(args) @@ -302,6 +309,15 @@ object ConsumerGroupCommand { class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService { + private val zkUtils = { + val zkUrl = opts.options.valueOf(opts.zkConnectOpt) + ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled) + } + + private val consumerPositionTimeoutOpt = opts.options.valueOf(opts.consumerPositionTimeoutMsOpt) + + private var lastFailed = false; + private val adminClient = createAdminClient() // `consumer` is only needed for `describe`, so we instantiate it lazily @@ -332,12 +348,33 @@ object ConsumerGroupCommand { } protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = { - val consumer = getConsumer() - val topicPartition = new TopicPartition(topic, partition) - consumer.assign(List(topicPartition).asJava) - consumer.seekToEnd(List(topicPartition).asJava) - val logEndOffset = consumer.position(topicPartition) - LogEndOffsetResult.LogEndOffset(logEndOffset) + zkUtils.getLeaderForPartition(topic, partition) match { + case Some(-1) => LogEndOffsetResult.Unknown + case Some(brokerId) => + var consumer = getConsumer() + val topicPartition = new TopicPartition(topic, partition) + // it can become unavailable after the check above + val future = Future { + consumer.assign(List(topicPartition).asJava) + consumer.seekToEnd(List(topicPartition).asJava) + val logEndOffset = consumer.position(topicPartition) + LogEndOffsetResult.LogEndOffset(logEndOffset) + } + + try { + Await.result(future, Duration.create(consumerPositionTimeoutOpt, "millisecond")) + } catch { + case t : TimeoutException => + lastFailed = true + LogEndOffsetResult.Unknown + case t : Exception => + lastFailed = true + LogEndOffsetResult.Ignore + } + case None => + println(s"No broker for partition ${new TopicPartition(topic, partition)}") + LogEndOffsetResult.Ignore + } } def close() { @@ -352,7 +389,7 @@ object ConsumerGroupCommand { } private def getConsumer() = { - if (consumer == null) + if (consumer == null || lastFailed) consumer = createNewConsumer() consumer } @@ -376,6 +413,7 @@ object ConsumerGroupCommand { sealed trait LogEndOffsetResult + object LogEndOffsetResult { case class LogEndOffset(value: Long) extends LogEndOffsetResult case object Unknown extends LogEndOffsetResult @@ -425,25 +463,30 @@ object ConsumerGroupCommand { .withRequiredArg .describedAs("command config property file") .ofType(classOf[String]) + var consumerPositionTimeoutOptDoc = "Timeout for consumer.position() in ms" + val consumerPositionTimeoutMsOpt = parser.accepts("consumer-position-timeout", consumerPositionTimeoutOptDoc) + .withRequiredArg() + .ofType(classOf[Long]) + .defaultsTo(DEFAULT_CONSUMER_POSITION_TIMEOUT) + val options = parser.parse(args : _*) val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt) def checkArgs() { + + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + // check required args if (options.has(newConsumerOpt)) { CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) - if (options.has(zkConnectOpt)) - CommandLineUtils.printUsageAndDie(parser, s"Option $zkConnectOpt is not valid with $newConsumerOpt") - if (options.has(deleteOpt)) CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is not valid with $newConsumerOpt. Note that " + "there's no need to delete group metadata for the new consumer as it is automatically deleted when the last " + "member leaves") } else { - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) if (options.has(bootstrapServerOpt)) CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is only valid with $newConsumerOpt")