From 26bcfdbef95c4aecbff8aee0a587281f59b55bcf Mon Sep 17 00:00:00 2001 From: Carl Haferd Date: Mon, 22 May 2017 16:09:19 -0700 Subject: [PATCH] Add the option to set client.id to storm-kafka, STORM-2524 --- docs/storm-kafka.md | 9 ++++++--- .../src/jvm/org/apache/storm/kafka/SpoutConfig.java | 12 ++++++++++++ 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/docs/storm-kafka.md b/docs/storm-kafka.md index c1162ccd58d..20244f29669 100644 --- a/docs/storm-kafka.md +++ b/docs/storm-kafka.md @@ -57,11 +57,14 @@ The optional ClientId is used as a part of the ZooKeeper path where the spout's There are 2 extensions of KafkaConfig currently in use. -Spoutconfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling -behavior specific to KafkaSpout. The Zkroot will be used as root to store your consumer's offset. The id should uniquely -identify your spout. +SpoutConfig is an extension of KafkaConfig that supports additional fields with ZooKeeper connection info and for controlling +behavior specific to KafkaSpout. +The clientId will be used to identify requests which are made using the Kafka Protocol. +The zkRoot will be used as root to store your consumer's offset. +The id should uniquely identify your spout. ```java +public SpoutConfig(BrokerHosts hosts, String topic, String clientId, String zkRoot, String id); public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); ``` diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java index aa93c24b013..2a684c2e910 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java @@ -42,9 +42,21 @@ public class SpoutConfig extends KafkaConfig implements Serializable { public long retryDelayMaxMs = 60 * 1000; public int retryLimit = -1; + /** + * Create a SpoutConfig without setting client.id, which can make the source application ambiguous when tracing Kafka calls. + */ public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id; } + + /** + * Create a SpoutConfig with a client.id value. + */ + public SpoutConfig(BrokerHosts hosts, String topic, String clientId, String zkRoot, String id) { + super(hosts, topic, clientId); + this.zkRoot = zkRoot; + this.id = id; + } }