From ac5194b4cbf4d1e65d8742da5d5cac083a816a87 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 17 Nov 2015 18:44:26 +0000 Subject: [PATCH 1/2] Introduce `producer.config` property to `ConsoleProducer` This makes it easier to pass security properties in the same way to `ConsoleConsumer` and `ConsoleProducer`. --- .../scala/kafka/tools/ConsoleProducer.scala | 24 ++++++++++++------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 979c1bde9fb07..4a8b3e544caeb 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -28,6 +28,7 @@ import java.io._ import joptsimple._ import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.utils.Utils object ConsoleProducer { @@ -76,10 +77,7 @@ object ConsoleProducer { } def getOldProducerProps(config: ProducerConfig): Properties = { - - val props = new Properties - - props.putAll(config.extraProducerProps) + val props = producerProps(config) props.put("metadata.broker.list", config.brokerList) props.put("compression.codec", config.compressionCodec) @@ -101,11 +99,17 @@ object ConsoleProducer { props } - def getNewProducerProps(config: ProducerConfig): Properties = { - - val props = new Properties - + private def producerProps(config: ProducerConfig): Properties = { + val props = + if (config.options.has(config.producerConfigOpt)) + Utils.loadProps(config.options.valueOf(config.producerConfigOpt)) + else new Properties props.putAll(config.extraProducerProps) + props + } + + def getNewProducerProps(config: ProducerConfig): Properties = { + val props = producerProps(config) props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config.brokerList) props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, config.compressionCodec) @@ -237,6 +241,10 @@ object ConsoleProducer { .withRequiredArg .describedAs("producer_prop") .ofType(classOf[String]) + val producerConfigOpt = parser.accepts("producer.config", "Producer config properties file.") + .withRequiredArg + .describedAs("config file") + .ofType(classOf[String]) val useOldProducerOpt = parser.accepts("old-producer", "Use the old producer implementation.") val options = parser.parse(args : _*) From e743fbaa7c99a0ce74ec647736c0aa3ce77345a7 Mon Sep 17 00:00:00 2001 From: Ismael Juma Date: Tue, 17 Nov 2015 23:24:06 +0000 Subject: [PATCH 2/2] Mention that `producer-property` has precedence over `producer.config` As suggested by Jun. --- core/src/main/scala/kafka/tools/ConsoleProducer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/kafka/tools/ConsoleProducer.scala b/core/src/main/scala/kafka/tools/ConsoleProducer.scala index 4a8b3e544caeb..bce819e254923 100644 --- a/core/src/main/scala/kafka/tools/ConsoleProducer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleProducer.scala @@ -241,7 +241,7 @@ object ConsoleProducer { .withRequiredArg .describedAs("producer_prop") .ofType(classOf[String]) - val producerConfigOpt = parser.accepts("producer.config", "Producer config properties file.") + val producerConfigOpt = parser.accepts("producer.config", s"Producer config properties file. Note that $producerPropertyOpt takes precedence over this config.") .withRequiredArg .describedAs("config file") .ofType(classOf[String])