Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-6581: Fix the ConsumerGroupCommand indefinite execution if one of the partition is unavailable. #4612

Open
wants to merge 1 commit into
base: 0.10.0
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 54 additions & 11 deletions core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
Expand Up @@ -18,6 +18,7 @@
package kafka.admin

import java.util.Properties
import java.util.concurrent.{Callable, ExecutorService, Executors, FutureTask}

import joptsimple.{OptionParser, OptionSpec}
import kafka.api.{OffsetFetchRequest, OffsetFetchResponse, OffsetRequest, PartitionOffsetRequestInfo}
Expand All @@ -35,10 +36,16 @@ import org.apache.kafka.common.security.JaasUtils
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.utils.Utils

import scala.concurrent._
import ExecutionContext.Implicits.global
import scala.collection.JavaConverters._
import scala.collection.{Set, mutable}
import scala.concurrent.{Await, Future, TimeoutException}
import scala.concurrent.duration.Duration
import scala.util.{Failure, Success, Try}

object ConsumerGroupCommand {
val DEFAULT_CONSUMER_POSITION_TIMEOUT = 500

def main(args: Array[String]) {
val opts = new ConsumerGroupCommandOptions(args)
Expand Down Expand Up @@ -302,6 +309,15 @@ object ConsumerGroupCommand {

class KafkaConsumerGroupService(val opts: ConsumerGroupCommandOptions) extends ConsumerGroupService {

private val zkUtils = {
val zkUrl = opts.options.valueOf(opts.zkConnectOpt)
ZkUtils(zkUrl, 30000, 30000, JaasUtils.isZkSecurityEnabled)
}

private val consumerPositionTimeoutOpt = opts.options.valueOf(opts.consumerPositionTimeoutMsOpt)

private var lastFailed = false;

private val adminClient = createAdminClient()

// `consumer` is only needed for `describe`, so we instantiate it lazily
Expand Down Expand Up @@ -332,12 +348,33 @@ object ConsumerGroupCommand {
}

protected def getLogEndOffset(topic: String, partition: Int): LogEndOffsetResult = {
val consumer = getConsumer()
val topicPartition = new TopicPartition(topic, partition)
consumer.assign(List(topicPartition).asJava)
consumer.seekToEnd(List(topicPartition).asJava)
val logEndOffset = consumer.position(topicPartition)
LogEndOffsetResult.LogEndOffset(logEndOffset)
zkUtils.getLeaderForPartition(topic, partition) match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A better approach would be to use KafkaConsumer.endOffsets. Although its doc claims that it blocks indefinitely, it is in fact bound by the request timeout. I will submit a separate patch to correct this.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

KafkaConsumer in 0.10.0 doesn't have endOffsets. It was introduced from 0.10.1 i guess. 0.10.0 doesn't have request timeout too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, that's true. Is there any particular reason you need this in 0.10.0? We are unlikely to do another bug fix release for that version. Note that it should be possible to use tools from more recent versions on an 0.10.0 broker.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We rely on this for consumer offset metrics and lose the visibility even if single partition becomes unavailable. We could but didn't want to maintain 2 versions just for this.
I see 0.10.0.2 as unreleased, no possibility there?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's very unlikely we'll release 0.10.0.2 at this point. Your next closest version is 0.11.0.2, which already has the fix that I suggested above.

case Some(-1) => LogEndOffsetResult.Unknown
case Some(brokerId) =>
var consumer = getConsumer()
val topicPartition = new TopicPartition(topic, partition)
// it can become unavailable after the check above
val future = Future {
consumer.assign(List(topicPartition).asJava)
consumer.seekToEnd(List(topicPartition).asJava)
val logEndOffset = consumer.position(topicPartition)
LogEndOffsetResult.LogEndOffset(logEndOffset)
}

try {
Await.result(future, Duration.create(consumerPositionTimeoutOpt, "millisecond"))
} catch {
case t : TimeoutException =>
lastFailed = true
LogEndOffsetResult.Unknown
case t : Exception =>
lastFailed = true
LogEndOffsetResult.Ignore
}
case None =>
println(s"No broker for partition ${new TopicPartition(topic, partition)}")
LogEndOffsetResult.Ignore
}
}

def close() {
Expand All @@ -352,7 +389,7 @@ object ConsumerGroupCommand {
}

private def getConsumer() = {
if (consumer == null)
if (consumer == null || lastFailed)
consumer = createNewConsumer()
consumer
}
Expand All @@ -376,6 +413,7 @@ object ConsumerGroupCommand {

sealed trait LogEndOffsetResult


object LogEndOffsetResult {
case class LogEndOffset(value: Long) extends LogEndOffsetResult
case object Unknown extends LogEndOffsetResult
Expand Down Expand Up @@ -425,25 +463,30 @@ object ConsumerGroupCommand {
.withRequiredArg
.describedAs("command config property file")
.ofType(classOf[String])
var consumerPositionTimeoutOptDoc = "Timeout for consumer.position() in ms"
val consumerPositionTimeoutMsOpt = parser.accepts("consumer-position-timeout", consumerPositionTimeoutOptDoc)
.withRequiredArg()
.ofType(classOf[Long])
.defaultsTo(DEFAULT_CONSUMER_POSITION_TIMEOUT)

val options = parser.parse(args : _*)

val allConsumerGroupLevelOpts: Set[OptionSpec[_]] = Set(listOpt, describeOpt, deleteOpt)

def checkArgs() {

CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)

// check required args
if (options.has(newConsumerOpt)) {
CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt)

if (options.has(zkConnectOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $zkConnectOpt is not valid with $newConsumerOpt")

if (options.has(deleteOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $deleteOpt is not valid with $newConsumerOpt. Note that " +
"there's no need to delete group metadata for the new consumer as it is automatically deleted when the last " +
"member leaves")

} else {
CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt)

if (options.has(bootstrapServerOpt))
CommandLineUtils.printUsageAndDie(parser, s"Option $bootstrapServerOpt is only valid with $newConsumerOpt")
Expand Down