From 6e216e026f45661b32fd9b58fb7b56e2f984d8d2 Mon Sep 17 00:00:00 2001 From: Vahid Hashemian Date: Tue, 21 Jun 2016 10:36:53 -0700 Subject: [PATCH] MINOR: KAFKA-3176 Follow-up to fix minor issues Co-authored with @ijuma. --- .../scala/kafka/consumer/BaseConsumer.scala | 42 +++++++++----- .../scala/kafka/tools/ConsoleConsumer.scala | 58 ++++++++++--------- 2 files changed, 56 insertions(+), 44 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index b39da1906b8c..6e232a865f0d 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -67,22 +67,32 @@ class NewShinyConsumer(topic: Option[String], partitionId: Option[Int], offset: var recordIter = consumer.poll(0).iterator def consumerInit() { - if (topic.isDefined) - if (partitionId.isDefined) { - val topicPartition = new TopicPartition(topic.get, partitionId.get) - consumer.assign(List(topicPartition)) - offset.get match { - case OffsetRequest.EarliestTime => consumer.seekToBeginning(List(topicPartition)) - case OffsetRequest.LatestTime => consumer.seekToEnd(List(topicPartition)) - case _ => consumer.seek(topicPartition, offset.get) - } - } - else - consumer.subscribe(List(topic.get)) - else if (whitelist.isDefined) - consumer.subscribe(Pattern.compile(whitelist.get), new NoOpConsumerRebalanceListener()) - else - throw new IllegalArgumentException("Exactly one of topic or whitelist has to be provided.") + (topic, partitionId, offset, whitelist) match { + case (Some(topic), Some(partitionId), Some(offset), None) => + seek(topic, partitionId, offset) + case (Some(topic), Some(partitionId), None, None) => + // default to latest if no offset is provided + seek(topic, partitionId, OffsetRequest.LatestTime) + case (Some(topic), None, None, None) => + consumer.subscribe(List(topic)) + case (None, None, None, Some(whitelist)) => + consumer.subscribe(Pattern.compile(whitelist), new NoOpConsumerRebalanceListener()) + case _ => + throw new IllegalArgumentException("An invalid combination of arguments is provided. " + + "Exactly one of 'topic' or 'whitelist' must be provided. " + + "If 'topic' is provided, an optional 'partition' may also be provided. " + + "If 'partition' is provided, an optional 'offset' may also be provided, otherwise, consumption starts from the end of the partition.") + } + } + + def seek(topic: String, partitionId: Int, offset: Long) { + val topicPartition = new TopicPartition(topic, partitionId) + consumer.assign(List(topicPartition)) + offset match { + case OffsetRequest.EarliestTime => consumer.seekToBeginning(List(topicPartition)) + case OffsetRequest.LatestTime => consumer.seekToEnd(List(topicPartition)) + case _ => consumer.seek(topicPartition, offset) + } } override def receive(): BaseConsumerRecord = { diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 0b6502a63e8d..17cf5bd1d367 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -19,8 +19,7 @@ package kafka.tools import java.io.PrintStream import java.util.concurrent.CountDownLatch -import java.util.{Properties, Random} - +import java.util.{Locale, Properties, Random} import joptsimple._ import kafka.api.OffsetRequest import kafka.common.{MessageFormatter, StreamEndException} @@ -34,7 +33,6 @@ import org.apache.kafka.common.record.TimestampType import org.apache.kafka.common.serialization.Deserializer import org.apache.kafka.common.utils.Utils import org.apache.log4j.Logger - import scala.collection.JavaConverters._ /** @@ -225,7 +223,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("consume offset") .ofType(classOf[String]) - .defaultsTo("earliest") + .defaultsTo("latest") val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + "Multiple URLS can be given to allow fail-over.") .withRequiredArg @@ -296,8 +294,6 @@ object ConsoleConsumer extends Logging { // If using new consumer, topic must be specified. var topicArg: String = null var whitelistArg: String = null - var partitionArg: Option[Int] = None - var offsetArg: Long = OffsetRequest.LatestTime var filterSpec: TopicFilter = null val extraConsumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerPropertyOpt).asScala) val consumerProps = if (options.has(consumerConfigOpt)) @@ -306,7 +302,7 @@ object ConsoleConsumer extends Logging { new Properties() val zkConnectionStr = options.valueOf(zkConnectOpt) val fromBeginning = options.has(resetBeginningOpt) - partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None + val partitionArg = if (options.has(partitionIdOpt)) Some(options.valueOf(partitionIdOpt).intValue) else None val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt).asScala) @@ -331,33 +327,39 @@ object ConsoleConsumer extends Logging { topicArg = options.valueOf(topicOrFilterOpt.head) filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) } - + + if (!useNewConsumer && (partitionArg.isDefined || options.has(offsetOpt))) + CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.") + if (partitionArg.isDefined) { - if (!useNewConsumer) - CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.") if (!options.has(topicIdOpt)) CommandLineUtils.printUsageAndDie(parser, "The topic is required when partition is specified.") if (fromBeginning && options.has(offsetOpt)) CommandLineUtils.printUsageAndDie(parser, "Options from-beginning and offset cannot be specified together.") - if (options.has(offsetOpt) && - !(options.valueOf(offsetOpt).toLowerCase().equals("earliest") || - options.valueOf(offsetOpt).toLowerCase().equals("latest") || - (options.valueOf(offsetOpt) forall Character.isDigit))) - CommandLineUtils.printUsageAndDie(parser, "The provided offset value is incorrect. Valid values are 'earliest', 'latest', or non-negative numbers.") - } else if (options.has(offsetOpt)) { - if (!useNewConsumer) - CommandLineUtils.printUsageAndDie(parser, "Partition-offset based consumption is supported in the new consumer only.") - else - CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.") - } - - offsetArg = if (options.has(offsetOpt)) { - options.valueOf(offsetOpt).toLowerCase() match { - case "earliest" => OffsetRequest.EarliestTime - case "latest" => OffsetRequest.LatestTime - case _ => options.valueOf(offsetOpt).toLong + } else if (options.has(offsetOpt)) + CommandLineUtils.printUsageAndDie(parser, "The partition is required when offset is specified.") + + def invalidOffset(offset: String): Nothing = + CommandLineUtils.printUsageAndDie(parser, s"The provided offset value '$offset' is incorrect. Valid values are " + + "'earliest', 'latest', or a non-negative long.") + + val offsetArg = + if (options.has(offsetOpt)) { + options.valueOf(offsetOpt).toLowerCase(Locale.ROOT) match { + case "earliest" => OffsetRequest.EarliestTime + case "latest" => OffsetRequest.LatestTime + case offsetString => + val offset = + try offsetString.toLong + catch { + case e: NumberFormatException => invalidOffset(offsetString) + } + if (offset < 0) invalidOffset(offsetString) + offset + } } - } else if (fromBeginning) OffsetRequest.EarliestTime else OffsetRequest.LatestTime + else if (fromBeginning) OffsetRequest.EarliestTime + else OffsetRequest.LatestTime CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt else zkConnectOpt)