From e102ff6b54dd539bbf45deb90cc8731842ec1bf6 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Thu, 13 Aug 2015 13:36:13 +0100 Subject: [PATCH 1/5] KAFKA-2015 - ported patch from jira in and altered to match exising ConsoleProducer template --- .../scala/kafka/consumer/BaseConsumer.scala | 72 ++++ .../scala/kafka/tools/ConsoleConsumer.scala | 340 ++++++++++-------- .../kafka/tools/ConsoleConsumerTest.scala | 95 +++++ 3 files changed, 364 insertions(+), 143 deletions(-) create mode 100644 core/src/main/scala/kafka/consumer/BaseConsumer.scala create mode 100644 core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala new file mode 100644 index 0000000000000..b8774afd7047b --- /dev/null +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.consumer + +import java.util.Properties + +/** + * A base consumer used to abstract both old and new consumer + * this class should be removed (along with BaseProducer) be removed + * once we deprecate old consumer + */ +trait BaseConsumer { + def receive(): BaseConsumerRecord + def close() +} + +case class BaseConsumerRecord(topic: String, partition: Int, offset: Long, key: Array[Byte], value: Array[Byte]) + +class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseConsumer { + import org.apache.kafka.clients.consumer.KafkaConsumer + + val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) + consumer.subscribe(topic) + var recordIter = consumer.poll(0).iterator() + + override def receive(): BaseConsumerRecord = { + while (!recordIter.hasNext) + recordIter = consumer.poll(0).iterator() + + val record = recordIter.next() + BaseConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), record.value()) + } + + override def close() { + this.consumer.close() + } +} + +class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends BaseConsumer { + import kafka.serializer.DefaultDecoder + + val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) + val stream: KafkaStream[Array[Byte], Array[Byte]] = + consumerConnector.createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder()).head + val iter = stream.iterator() + + override def receive(): BaseConsumerRecord = { + // we do not need to check hasNext for KafkaStream iterator + val messageAndMetadata = iter.next() + BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key(), messageAndMetadata.message()) + } + + override def close() { + this.consumerConnector.shutdown() + } +} + diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index a3bee58a207ec..bbd217fc7b84a 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -17,91 +17,217 @@ package kafka.tools -import scala.collection.JavaConversions._ -import org.I0Itec.zkclient._ -import joptsimple._ -import java.util.Properties -import java.util.Random import java.io.PrintStream +import java.util.{Properties, Random} +import joptsimple._ +import kafka.consumer._ import kafka.message._ -import kafka.serializer._ -import kafka.utils._ import kafka.metrics.KafkaMetricsReporter -import kafka.consumer.{Blacklist,Whitelist,ConsumerConfig,Consumer} +import kafka.utils._ +import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.common.utils.Utils +import scala.collection.JavaConversions._ + /** - * Consumer that dumps messages out to standard out. - * + * Consumer that dumps messages to standard out. */ object ConsoleConsumer extends Logging { def main(args: Array[String]) { + val conf = new ConsumerConfig(args) + run(conf) + System.exit(0) + } + + def run(conf: ConsumerConfig) { + + val consumer = + if (conf.useNewConsumer) { + new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf)) + } else { + checkZk(conf); + new OldConsumer(conf.filterSpec, getOldConsumerProps(conf)) + } + + addShutdownHook(consumer, conf) + + process(conf.maxMessages, conf.formatter, consumer) + } + + def checkZk(config: ConsumerConfig) { + if (!checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/brokers/ids")) { + System.err.println("No brokers found in ZK.") + System.exit(1) + } + + if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) && + checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id") + "/offsets")) { + System.err.println("Found previous offset information for this group " + config.consumerProps.getProperty("group.id") + + ". Please use --delete-consumer-offsets to delete previous offsets metadata") + System.exit(1) + } + } + + def addShutdownHook(consumer: BaseConsumer, conf: ConsumerConfig) { + Runtime.getRuntime.addShutdownHook(new Thread() { + override def run() { + consumer.close() + + // if awe generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack + if(!conf.groupIdPassed) + ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) + } + }) + } + + def process(maxMessages: Integer, formatter: MessageFormatter, consumer: BaseConsumer) { + var messageCount = 0 + while (messageCount < maxMessages || maxMessages == -1) { + messageCount += 1 + val msg: BaseConsumerRecord = consumer.receive() + formatter.writeTo(msg.key, msg.value, System.out) + checkErr(formatter) + } + println("Processed a total of %d messages".format(messageCount)) + } + + def checkErr(formatter: MessageFormatter) { + if (System.out.checkError()) { + // This means no one is listening to our output stream any more, time to shutdown + System.err.println("Unable to write to standard out, closing consumer.") + formatter.close() + System.exit(1) + } + } + + def getOldConsumerProps(config: ConsumerConfig): Properties = { + val props = new Properties + + props.putAll(config.consumerProps) + props.put("auto.offset.reset", if (config.fromBeginning) "smallest" else "largest") + props.put("zookeeper.connect", config.zkConnectionStr) + + if (!config.options.has(config.deleteConsumerOffsetsOpt) && config.options.has(config.resetBeginningOpt) && + checkZkPathExists(config.options.valueOf(config.zkConnectOpt), "/consumers/" + props.getProperty("group.id") + "/offsets")) { + System.err.println("Found previous offset information for this group " + props.getProperty("group.id") + + ". Please use --delete-consumer-offsets to delete previous offsets metadata") + System.exit(1) + } + + if (config.options.has(config.deleteConsumerOffsetsOpt)) + ZkUtils.maybeDeletePath(config.options.valueOf(config.zkConnectOpt), "/consumers/" + config.consumerProps.getProperty("group.id")) + + props + } + + def getNewConsumerProps(config: ConsumerConfig): Properties = { + val props = new Properties + + props.putAll(config.consumerProps) + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer != null) config.keyDeserializer else "org.apache.kafka.common.serialization.StringDeserializer") + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, if(config.valueDeserializer != null) config.valueDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer") + + props + } + + class ConsumerConfig(args: Array[String]) { val parser = new OptionParser val topicIdOpt = parser.accepts("topic", "The topic id to consume on.") - .withRequiredArg - .describedAs("topic") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("topic") + .ofType(classOf[String]) val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") - .withRequiredArg - .describedAs("whitelist") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("whitelist") + .ofType(classOf[String]) val blacklistOpt = parser.accepts("blacklist", "Blacklist of topics to exclude from consumption.") - .withRequiredArg - .describedAs("blacklist") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("blacklist") + .ofType(classOf[String]) val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the form host:port. " + - "Multiple URLS can be given to allow fail-over.") - .withRequiredArg - .describedAs("urls") - .ofType(classOf[String]) - + "Multiple URLS can be given to allow fail-over.") + .withRequiredArg + .describedAs("urls") + .ofType(classOf[String]) val consumerConfigOpt = parser.accepts("consumer.config", "Consumer config properties file.") - .withRequiredArg - .describedAs("config file") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) val messageFormatterOpt = parser.accepts("formatter", "The name of a class to use for formatting kafka messages for display.") - .withRequiredArg - .describedAs("class") - .ofType(classOf[String]) - .defaultsTo(classOf[DefaultMessageFormatter].getName) + .withRequiredArg + .describedAs("class") + .ofType(classOf[String]) + .defaultsTo(classOf[DefaultMessageFormatter].getName) val messageFormatterArgOpt = parser.accepts("property") - .withRequiredArg - .describedAs("prop") - .ofType(classOf[String]) + .withRequiredArg + .describedAs("prop") + .ofType(classOf[String]) val deleteConsumerOffsetsOpt = parser.accepts("delete-consumer-offsets", "If specified, the consumer path in zookeeper is deleted when starting up") val resetBeginningOpt = parser.accepts("from-beginning", "If the consumer does not already have an established offset to consume from, " + - "start with the earliest message present in the log rather than the latest message.") + "start with the earliest message present in the log rather than the latest message.") val maxMessagesOpt = parser.accepts("max-messages", "The maximum number of messages to consume before exiting. If not set, consumption is continual.") - .withRequiredArg - .describedAs("num_messages") - .ofType(classOf[java.lang.Integer]) + .withRequiredArg + .describedAs("num_messages") + .ofType(classOf[java.lang.Integer]) val skipMessageOnErrorOpt = parser.accepts("skip-message-on-error", "If there is an error when processing a message, " + - "skip it instead of halt.") + "skip it instead of halt.") val csvMetricsReporterEnabledOpt = parser.accepts("csv-reporter-enabled", "If set, the CSV metrics reporter will be enabled") val metricsDirectoryOpt = parser.accepts("metrics-dir", "If csv-reporter-enable is set, and this parameter is" + - "set, the csv metrics will be outputed here") + "set, the csv metrics will be outputed here") .withRequiredArg - .describedAs("metrics dictory") + .describedAs("metrics directory") .ofType(classOf[java.lang.String]) + val useNewConsumerOpt = parser.accepts("new-consumer", "Use the new consumer implementation.") + val bootstrapServerOpt = parser.accepts("bootstrap-server") + .withRequiredArg + .describedAs("server to connect to") + .ofType(classOf[String]) + val keyDeserializerOpt = parser.accepts("key-deserializer") + .withRequiredArg + .describedAs("deserializer for key") + .ofType(classOf[String]) + val valueDeserializerOpt = parser.accepts("value-deserializer") + .withRequiredArg + .describedAs("deserializer for values") + .ofType(classOf[String]) - if(args.length == 0) + if (args.length == 0) CommandLineUtils.printUsageAndDie(parser, "The console consumer is a tool that reads data from Kafka and outputs it to standard output.") - + var groupIdPassed = true val options: OptionSet = tryParse(parser, args) - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + val useNewConsumer = options.has(useNewConsumerOpt) + val filterOpt = List(whitelistOpt, blacklistOpt).filter(options.has) val topicOrFilterOpt = List(topicIdOpt, whitelistOpt, blacklistOpt).filter(options.has) - if (topicOrFilterOpt.size != 1) - CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") val topicArg = options.valueOf(topicOrFilterOpt.head) - val filterSpec = if (options.has(blacklistOpt)) - new Blacklist(topicArg) - else - new Whitelist(topicArg) + val filterSpec = if (options.has(blacklistOpt)) new Blacklist(topicArg) else new Whitelist(topicArg) + val consumerProps = CommandLineUtils.parseKeyValueArgs(options.valuesOf(consumerConfigOpt)) + val zkConnectionStr = options.valueOf(zkConnectOpt) + val fromBeginning = options.has(resetBeginningOpt) + val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false + val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) + val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) + val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 + val bootstrapServer = options.valueOf(bootstrapServerOpt); + val keyDeserializer = options.valueOf(keyDeserializerOpt); + val valueDeserializer = options.valueOf(valueDeserializerOpt); + val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] + formatter.init(formatterArgs) + + if (!useNewConsumer) { + CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) + } + else { + CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) + } - val csvMetricsReporterEnabled = options.has(csvMetricsReporterEnabledOpt) - if (csvMetricsReporterEnabled) { + if (!useNewConsumer && topicOrFilterOpt.size != 1) + CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") + + if (options.has(csvMetricsReporterEnabledOpt)) { val csvReporterProps = new Properties() csvReporterProps.put("kafka.metrics.polling.interval.secs", "5") csvReporterProps.put("kafka.metrics.reporters", "kafka.metrics.KafkaCSVMetricsReporter") @@ -114,102 +240,27 @@ object ConsoleConsumer extends Logging { KafkaMetricsReporter.startReporters(verifiableProps) } - val consumerProps = if (options.has(consumerConfigOpt)) - Utils.loadProps(options.valueOf(consumerConfigOpt)) - else - new Properties() - - if(!consumerProps.containsKey("group.id")) { - consumerProps.put("group.id","console-consumer-" + new Random().nextInt(100000)) + //Provide the consumer with a randomly assigned group id + if(!consumerProps.containsKey(ConsumerConfig.GROUP_ID_CONFIG)) { + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG,"console-consumer-" + new Random().nextInt(100000)) groupIdPassed=false } - consumerProps.put("auto.offset.reset", if(options.has(resetBeginningOpt)) "smallest" else "largest") - consumerProps.put("zookeeper.connect", options.valueOf(zkConnectOpt)) - - if (!checkZkPathExists(options.valueOf(zkConnectOpt),"/brokers/ids")) { - System.err.println("No brokers found.") - System.exit(1) - } - if (!options.has(deleteConsumerOffsetsOpt) && options.has(resetBeginningOpt) && - checkZkPathExists(options.valueOf(zkConnectOpt),"/consumers/" + consumerProps.getProperty("group.id")+ "/offsets")) { - System.err.println("Found previous offset information for this group "+consumerProps.getProperty("group.id") - +". Please use --delete-consumer-offsets to delete previous offsets metadata") - System.exit(1) - } - - if(options.has(deleteConsumerOffsetsOpt)) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.getProperty("group.id")) - - val config = new ConsumerConfig(consumerProps) - val skipMessageOnError = if (options.has(skipMessageOnErrorOpt)) true else false - val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) - val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) - val maxMessages = if(options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 - val connector = Consumer.create(config) - - Runtime.getRuntime.addShutdownHook(new Thread() { - override def run() { - connector.shutdown() - // if there is no group specified then avoid polluting zookeeper with persistent group data, this is a hack - if(!groupIdPassed) - ZkUtils.maybeDeletePath(options.valueOf(zkConnectOpt), "/consumers/" + consumerProps.get("group.id")) - } - }) - - var numMessages = 0L - val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] - formatter.init(formatterArgs) - try { - val stream = connector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder()).get(0) - val iter = if(maxMessages >= 0) - stream.slice(0, maxMessages) - else - stream - - for(messageAndTopic <- iter) { - try { - formatter.writeTo(messageAndTopic.key, messageAndTopic.message, System.out) - numMessages += 1 - } catch { - case e: Throwable => - if (skipMessageOnError) - error("Error processing message, skipping this message: ", e) - else - throw e - } - if(System.out.checkError()) { - // This means no one is listening to our output stream any more, time to shutdown - System.err.println("Unable to write to standard out, closing consumer.") - System.err.println("Consumed %d messages".format(numMessages)) - formatter.close() - connector.shutdown() - System.exit(1) + def tryParse(parser: OptionParser, args: Array[String]) = { + try { + parser.parse(args: _*) + } catch { + case e: OptionException => { + Utils.croak(e.getMessage) + null } } - } catch { - case e: Throwable => error("Error processing message, stopping consumer: ", e) - } - System.err.println("Consumed %d messages".format(numMessages)) - System.out.flush() - formatter.close() - connector.shutdown() - } - - def tryParse(parser: OptionParser, args: Array[String]) = { - try { - parser.parse(args : _*) - } catch { - case e: OptionException => { - Utils.croak(e.getMessage) - null - } } } def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { - val zk = ZkUtils.createZkClient(zkUrl, 30*1000,30*1000); + val zk = ZkUtils.createZkClient(zkUrl, 30 * 1000, 30 * 1000); zk.exists(path) } catch { case _: Throwable => false @@ -219,7 +270,9 @@ object ConsoleConsumer extends Logging { trait MessageFormatter { def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) + def init(props: Properties) {} + def close() {} } @@ -229,16 +282,16 @@ class DefaultMessageFormatter extends MessageFormatter { var lineSeparator = "\n".getBytes override def init(props: Properties) { - if(props.containsKey("print.key")) + if (props.containsKey("print.key")) printKey = props.getProperty("print.key").trim.toLowerCase.equals("true") - if(props.containsKey("key.separator")) + if (props.containsKey("key.separator")) keySeparator = props.getProperty("key.separator").getBytes - if(props.containsKey("line.separator")) + if (props.containsKey("line.separator")) lineSeparator = props.getProperty("line.separator").getBytes } def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) { - if(printKey) { + if (printKey) { output.write(if (key == null) "null".getBytes() else key) output.write(keySeparator) } @@ -249,6 +302,7 @@ class DefaultMessageFormatter extends MessageFormatter { class NoOpMessageFormatter extends MessageFormatter { override def init(props: Properties) {} + def writeTo(key: Array[Byte], value: Array[Byte], output: PrintStream) {} } diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala new file mode 100644 index 0000000000000..08b0ad5cb58f2 --- /dev/null +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.tools + +import kafka.consumer.{BaseConsumer, BaseConsumerRecord} +import kafka.tools.{ConsoleConsumer, MessageFormatter} +import org.easymock.EasyMock +import org.junit.Assert._ +import org.junit.Test +import org.scalatest.junit.JUnit3Suite + +class ConsoleConsumerTest extends JUnit3Suite { + + override def setUp() { + super.setUp() + } + + override def tearDown() { + super.tearDown() + } + + @Test + def testShouldLimitReadsToMaxMessageLimit() { + //Mocks + val consumer = EasyMock.createNiceMock(classOf[BaseConsumer]) + val formatter = EasyMock.createNiceMock(classOf[MessageFormatter]) + + //Stubs + val record = new BaseConsumerRecord("foo", 1, 1, Array[Byte](), Array[Byte]()) + + //Expectations + val messageLimit: Int = 10 + EasyMock.expect(formatter.writeTo(EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject())).times(messageLimit) + EasyMock.expect(consumer.receive()).andReturn(record).times(messageLimit) + + EasyMock.replay(consumer) + EasyMock.replay(formatter) + + //Test + ConsoleConsumer.process(messageLimit, formatter, consumer) + } + + @Test + def testShouldParseValidOldConsumerValidConfig() { + //Given + val args: Array[String] = Array( + "--zookeeper", "localhost:2181", + "--topic", "test", + "--from-beginning") + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + + //Then + assertFalse(config.useNewConsumer) + assertEquals("localhost:2181", config.zkConnectionStr) + assertEquals("test", config.topicArg) + assertEquals(true, config.fromBeginning) + } + + @Test + def testShouldParseValidNewConsumerValidConfig() { + //Given + val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--from-beginning", + "--new-consumer") //new + + //When + val config = new ConsoleConsumer.ConsumerConfig(args) + + //Then + assertTrue(config.useNewConsumer) + assertEquals("localhost:9092", config.bootstrapServer) + assertEquals("test", config.topicArg) + assertEquals(true, config.fromBeginning) + } + +} From 883a626201efb5f0b52fb7db9280711e1fa16798 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 18 Aug 2015 15:12:40 +0100 Subject: [PATCH 2/5] Patch for KAFKA-2015: incorporating comments to date. --- .../scala/kafka/consumer/BaseConsumer.scala | 18 +++++----- .../scala/kafka/tools/ConsoleConsumer.scala | 33 ++++++++----------- .../kafka/tools/ConsoleConsumerTest.scala | 20 ++++------- 3 files changed, 28 insertions(+), 43 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index b8774afd7047b..dd808d077de38 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -36,18 +36,18 @@ class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseCon val consumer = new KafkaConsumer[Array[Byte], Array[Byte]](consumerProps) consumer.subscribe(topic) - var recordIter = consumer.poll(0).iterator() + var recordIter = consumer.poll(0).iterator override def receive(): BaseConsumerRecord = { while (!recordIter.hasNext) - recordIter = consumer.poll(0).iterator() + recordIter = consumer.poll(0).iterator - val record = recordIter.next() - BaseConsumerRecord(record.topic(), record.partition(), record.offset(), record.key(), record.value()) + val record = recordIter.next + BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value) } override def close() { - this.consumer.close() + this.consumer.close } } @@ -57,16 +57,16 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B val consumerConnector = Consumer.create(new ConsumerConfig(consumerProps)) val stream: KafkaStream[Array[Byte], Array[Byte]] = consumerConnector.createMessageStreamsByFilter(topicFilter, 1, new DefaultDecoder(), new DefaultDecoder()).head - val iter = stream.iterator() + val iter = stream.iterator override def receive(): BaseConsumerRecord = { // we do not need to check hasNext for KafkaStream iterator - val messageAndMetadata = iter.next() - BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key(), messageAndMetadata.message()) + val messageAndMetadata = iter.next + BaseConsumerRecord(messageAndMetadata.topic, messageAndMetadata.partition, messageAndMetadata.offset, messageAndMetadata.key, messageAndMetadata.message) } override def close() { - this.consumerConnector.shutdown() + this.consumerConnector.shutdown } } diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index bbd217fc7b84a..8478b8d1c428a 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -37,7 +37,6 @@ object ConsoleConsumer extends Logging { def main(args: Array[String]) { val conf = new ConsumerConfig(args) run(conf) - System.exit(0) } def run(conf: ConsumerConfig) { @@ -46,7 +45,7 @@ object ConsoleConsumer extends Logging { if (conf.useNewConsumer) { new NewShinyConsumer(conf.topicArg, getNewConsumerProps(conf)) } else { - checkZk(conf); + checkZk(conf) new OldConsumer(conf.filterSpec, getOldConsumerProps(conf)) } @@ -74,8 +73,8 @@ object ConsoleConsumer extends Logging { override def run() { consumer.close() - // if awe generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack - if(!conf.groupIdPassed) + // if we generated a random group id (as none specified explicitly) then avoid polluting zookeeper with persistent group data, this is a hack + if (!conf.groupIdPassed) ZkUtils.maybeDeletePath(conf.options.valueOf(conf.zkConnectOpt), "/consumers/" + conf.consumerProps.get("group.id")) } }) @@ -89,7 +88,7 @@ object ConsoleConsumer extends Logging { formatter.writeTo(msg.key, msg.value, System.out) checkErr(formatter) } - println("Processed a total of %d messages".format(messageCount)) + println(s"Processed a total of %messageCount messages") } def checkErr(formatter: MessageFormatter) { @@ -128,7 +127,7 @@ object ConsoleConsumer extends Logging { props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, if (config.options.has(config.resetBeginningOpt)) "earliest" else "latest") props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config.bootstrapServer) props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, if (config.keyDeserializer != null) config.keyDeserializer else "org.apache.kafka.common.serialization.StringDeserializer") - props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, if(config.valueDeserializer != null) config.valueDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer") + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, if (config.valueDeserializer != null) config.valueDeserializer else "org.apache.kafka.common.serialization.ByteArrayDeserializer") props } @@ -211,18 +210,13 @@ object ConsoleConsumer extends Logging { val messageFormatterClass = Class.forName(options.valueOf(messageFormatterOpt)) val formatterArgs = CommandLineUtils.parseKeyValueArgs(options.valuesOf(messageFormatterArgOpt)) val maxMessages = if (options.has(maxMessagesOpt)) options.valueOf(maxMessagesOpt).intValue else -1 - val bootstrapServer = options.valueOf(bootstrapServerOpt); - val keyDeserializer = options.valueOf(keyDeserializerOpt); - val valueDeserializer = options.valueOf(valueDeserializerOpt); + val bootstrapServer = options.valueOf(bootstrapServerOpt) + val keyDeserializer = options.valueOf(keyDeserializerOpt) + val valueDeserializer = options.valueOf(valueDeserializerOpt) val formatter: MessageFormatter = messageFormatterClass.newInstance().asInstanceOf[MessageFormatter] formatter.init(formatterArgs) - if (!useNewConsumer) { - CommandLineUtils.checkRequiredArgs(parser, options, zkConnectOpt) - } - else { - CommandLineUtils.checkRequiredArgs(parser, options, bootstrapServerOpt) - } + CommandLineUtils.checkRequiredArgs(parser, options, if (useNewConsumer) bootstrapServerOpt else zkConnectOpt) if (!useNewConsumer && topicOrFilterOpt.size != 1) CommandLineUtils.printUsageAndDie(parser, "Exactly one of whitelist/blacklist/topic is required.") @@ -247,20 +241,19 @@ object ConsoleConsumer extends Logging { } def tryParse(parser: OptionParser, args: Array[String]) = { - try { + try parser.parse(args: _*) - } catch { - case e: OptionException => { + catch { + case e: OptionException => Utils.croak(e.getMessage) null - } } } } def checkZkPathExists(zkUrl: String, path: String): Boolean = { try { - val zk = ZkUtils.createZkClient(zkUrl, 30 * 1000, 30 * 1000); + val zk = ZkUtils.createZkClient(zkUrl, 30 * 1000, 30 * 1000) zk.exists(path) } catch { case _: Throwable => false diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 08b0ad5cb58f2..426df8635202a 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -21,21 +21,13 @@ import kafka.consumer.{BaseConsumer, BaseConsumerRecord} import kafka.tools.{ConsoleConsumer, MessageFormatter} import org.easymock.EasyMock import org.junit.Assert._ -import org.junit.Test -import org.scalatest.junit.JUnit3Suite +import org.junit.{After, Before, Test} +import org.scalatest.junit.{JUnitSuite, JUnit3Suite} -class ConsoleConsumerTest extends JUnit3Suite { - - override def setUp() { - super.setUp() - } - - override def tearDown() { - super.tearDown() - } +class ConsoleConsumerTest extends JUnitSuite { @Test - def testShouldLimitReadsToMaxMessageLimit() { + def shouldLimitReadsToMaxMessageLimit() { //Mocks val consumer = EasyMock.createNiceMock(classOf[BaseConsumer]) val formatter = EasyMock.createNiceMock(classOf[MessageFormatter]) @@ -56,7 +48,7 @@ class ConsoleConsumerTest extends JUnit3Suite { } @Test - def testShouldParseValidOldConsumerValidConfig() { + def shouldParseValidOldConsumerValidConfig() { //Given val args: Array[String] = Array( "--zookeeper", "localhost:2181", @@ -74,7 +66,7 @@ class ConsoleConsumerTest extends JUnit3Suite { } @Test - def testShouldParseValidNewConsumerValidConfig() { + def sShouldParseValidNewConsumerValidConfig() { //Given val args: Array[String] = Array( "--bootstrap-server", "localhost:9092", From 739457c480bf4135ab5ea3fc01475347cedf7357 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 18 Aug 2015 16:33:18 +0100 Subject: [PATCH 3/5] Patch for KAFKA-2015: switched to blocking poll + typo + fixed to match style guide --- core/src/main/scala/kafka/consumer/BaseConsumer.scala | 6 +++--- .../test/scala/unit/kafka/tools/ConsoleConsumerTest.scala | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/kafka/consumer/BaseConsumer.scala b/core/src/main/scala/kafka/consumer/BaseConsumer.scala index dd808d077de38..5017c95600c91 100644 --- a/core/src/main/scala/kafka/consumer/BaseConsumer.scala +++ b/core/src/main/scala/kafka/consumer/BaseConsumer.scala @@ -40,14 +40,14 @@ class NewShinyConsumer(topic: String, consumerProps: Properties) extends BaseCon override def receive(): BaseConsumerRecord = { while (!recordIter.hasNext) - recordIter = consumer.poll(0).iterator + recordIter = consumer.poll(Long.MaxValue).iterator val record = recordIter.next BaseConsumerRecord(record.topic, record.partition, record.offset, record.key, record.value) } override def close() { - this.consumer.close + this.consumer.close() } } @@ -66,7 +66,7 @@ class OldConsumer(topicFilter: TopicFilter, consumerProps: Properties) extends B } override def close() { - this.consumerConnector.shutdown + this.consumerConnector.shutdown() } } diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 426df8635202a..0d2de299fc2a2 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -66,7 +66,7 @@ class ConsoleConsumerTest extends JUnitSuite { } @Test - def sShouldParseValidNewConsumerValidConfig() { + def shouldParseValidNewConsumerValidConfig() { //Given val args: Array[String] = Array( "--bootstrap-server", "localhost:9092", From 6e08bf436fecf51da08ee7ef6a49e853959144b2 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Tue, 18 Aug 2015 16:42:43 +0100 Subject: [PATCH 4/5] Patch for KAFKA-2015: fixed formatting error --- core/src/main/scala/kafka/tools/ConsoleConsumer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 8478b8d1c428a..22c63b3c09ce1 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -88,7 +88,7 @@ object ConsoleConsumer extends Logging { formatter.writeTo(msg.key, msg.value, System.out) checkErr(formatter) } - println(s"Processed a total of %messageCount messages") + println(s"Processed a total of $messageCount messages") } def checkErr(formatter: MessageFormatter) { From 5058a7b6db91e4c90fc198f0b8e4957c38abb563 Mon Sep 17 00:00:00 2001 From: Ben Stopford Date: Thu, 20 Aug 2015 10:18:39 +0100 Subject: [PATCH 5/5] Patch for KAFKA-2015: removed unused imports --- .../src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 0d2de299fc2a2..254ba7b7cf774 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -21,8 +21,8 @@ import kafka.consumer.{BaseConsumer, BaseConsumerRecord} import kafka.tools.{ConsoleConsumer, MessageFormatter} import org.easymock.EasyMock import org.junit.Assert._ -import org.junit.{After, Before, Test} -import org.scalatest.junit.{JUnitSuite, JUnit3Suite} +import org.junit.Test +import org.scalatest.junit.JUnitSuite class ConsoleConsumerTest extends JUnitSuite {