From cd0e5e2e4039b9cd84ab569d03d4ae61c326561b Mon Sep 17 00:00:00 2001 From: Rick Kellogg Date: Thu, 26 Feb 2015 11:51:42 -0500 Subject: [PATCH 1/5] Removed legacy constructor for SpoutConfig with only three parameters. No longer exists. --- external/storm-kafka/README.md | 1 - 1 file changed, 1 deletion(-) diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md index c5ed4a5311e..91d34f107f7 100644 --- a/external/storm-kafka/README.md +++ b/external/storm-kafka/README.md @@ -56,7 +56,6 @@ behavior specific to KafkaSpout. The Zkroot will be used as root to store your c identify your spout. ```java public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id); -public SpoutConfig(BrokerHosts hosts, String topic, String id); ``` In addition to these parameters, SpoutConfig contains the following fields that control how KafkaSpout behaves: ```java From 328a29cd1fc708e34d3397b5d2e49cc9de9185ee Mon Sep 17 00:00:00 2001 From: Rick Kellogg Date: Thu, 26 Feb 2015 11:55:11 -0500 Subject: [PATCH 2/5] Removed extraneous implements Serializable clause. Already present in base class (KafkaConfig). --- external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java index 61d0b355667..65f49c7aa37 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java @@ -21,7 +21,7 @@ import java.util.List; -public class SpoutConfig extends KafkaConfig implements Serializable { +public class SpoutConfig extends KafkaConfig { public List zkServers = null; public Integer zkPort = null; public String zkRoot = null; From 0e8be57613d034022d94dcd2fd4c7bb88c745af2 Mon Sep 17 00:00:00 2001 From: Rick Kellogg Date: Thu, 26 Feb 2015 11:59:51 -0500 Subject: [PATCH 3/5] Removed unnecessary initialization to null on instance variables. --- external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java index 65f49c7aa37..f6ce01952a6 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java @@ -22,10 +22,10 @@ public class SpoutConfig extends KafkaConfig { - public List zkServers = null; - public Integer zkPort = null; - public String zkRoot = null; - public String id = null; + public List zkServers; + public Integer zkPort; + public String zkRoot; + public String id; // setting for how often to save the current kafka offset to ZooKeeper public long stateUpdateIntervalMs = 2000; From 324ee017f1f2b85dace6495641678fa24d985310 Mon Sep 17 00:00:00 2001 From: Rick Kellogg Date: Thu, 26 Feb 2015 12:20:09 -0500 Subject: [PATCH 4/5] Import cleanup. --- external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java index f6ce01952a6..a49f6fd7685 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java @@ -17,10 +17,8 @@ */ package storm.kafka; -import java.io.Serializable; import java.util.List; - public class SpoutConfig extends KafkaConfig { public List zkServers; public Integer zkPort; From f2d25d3b31a106ccb6f4fb2e6e3ed62dd91d3299 Mon Sep 17 00:00:00 2001 From: Rick Kellogg Date: Thu, 26 Feb 2015 14:37:44 -0500 Subject: [PATCH 5/5] Added additional constructor with all required properties. Marked incomplete constructor as deprecated. Extensive Javadocs added. --- .../src/jvm/storm/kafka/SpoutConfig.java | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java index a49f6fd7685..a2f8f609ccb 100644 --- a/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java +++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java @@ -19,10 +19,31 @@ import java.util.List; +/** + * Configuration passed to constructor of KafkaSpout. + * + * As the Spout reads messages from Kafka, it maintains the KafkaSpout offsets within Zookeeper. + * + * These offset values are stored under {zkRoot}/{id}. + * + * @see KafkaSpout + */ public class SpoutConfig extends KafkaConfig { + /** + * List of Zookeeper host names used for Spout offset storage (without port number or other path). + */ public List zkServers; + /** + * Zookeeper port used for Spout offset storage + */ public Integer zkPort; + /** + * Path within Zookeeper used for Spout offset data (requires leading slash) + */ public String zkRoot; + /** + * Identifier used within Zookeeper to identity unique Spouts. + */ public String id; // setting for how often to save the current kafka offset to ZooKeeper @@ -34,9 +55,37 @@ public class SpoutConfig extends KafkaConfig { public double retryDelayMultiplier = 1.0; public long retryDelayMaxMs = 60 * 1000; + /** + * Construct configuration for KafkaSpout + * + * @param hosts Hosts with Kafka topic (required) + * @param topic Topic name (required) + * @param zkRoot Zookeeper path for storage of Spout offsets (required) + * @param id Identifier used to identify Spout (required) + * + * @deprecated + */ public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) { super(hosts, topic); this.zkRoot = zkRoot; this.id = id; } + + /** + * Construct configuration for KafkaSpout + * + * @param hosts Hosts with Kafka topic (required) + * @param topic Topic name (required) + * @param zkServers Zookeeper host names for Spout offset storage (required) + * @param zkPort Zookeeper port number for Spout offset storage (required) + * @param zkRoot Zookeeper path for storage of Spout offsets (required) + * @param id Identifier used to identify Spout (required) + */ + public SpoutConfig(BrokerHosts hosts, String topic, List zkServers, int zkPort, String zkRoot, String id) { + super(hosts, topic); + this.zkServers = zkServers; + this.zkPort = zkPort; + this.zkRoot = zkRoot; + this.id = id; + } }