From 3f30b81a6e123879f33f95bc025c35808860fedc Mon Sep 17 00:00:00 2001 From: Siyuan Hua Date: Fri, 26 Aug 2016 16:58:12 -0700 Subject: [PATCH] APEXMALHAR-2199 #closes #380 #resolve #comment Simplify the zookeeper url parser to use whatever user specified and support chroot path --- .../contrib/kafka/HighlevelKafkaConsumer.java | 2 +- .../contrib/kafka/KafkaConsumer.java | 27 ++++++++----------- .../contrib/kafka/KafkaMetadataUtil.java | 8 ++++-- .../contrib/kafka/KafkaInputOperatorTest.java | 10 +++---- 4 files changed, 22 insertions(+), 25 deletions(-) diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java index 2f7cece07c..5b9c5ed56b 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/HighlevelKafkaConsumer.java @@ -122,7 +122,7 @@ public void start() // create high level consumer for every cluster Properties config = new Properties(); config.putAll(consumerConfig); - config.setProperty("zookeeper.connect", Joiner.on(',').join(zookeeperMap.get(cluster))); + config.setProperty("zookeeper.connect", zookeeperMap.get(cluster).iterator().next()); // create consumer connector will start a daemon thread to monitor the metadata change // we want to start this thread until the operator is activated standardConsumer.put(cluster, kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(config))); diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java index 805fdc4fb8..a67ff4812d 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaConsumer.java @@ -86,8 +86,16 @@ public KafkaConsumer(String zks, String topic) protected String topic = "default_topic"; /** - * A zookeeper map keyed by cluster id - * It's mandatory field + * A zookeeper map keyed by cluster id. + * It's mandatory field
+ * Each cluster should have only one connection string contain all nodes in the cluster
+ * zookeeper chroot path is also supported
+ * + * Single cluster zookeeper example:
+ *    node1:2181,node2:2181,node3:2181/your/kafka/data
+ * Multi-cluster zookeeper example:
+ *    cluster1::node1:2181,node2:2181,node3:2181/cluster1;cluster2::node1:2181/cluster2 + * */ @NotNull @Bind(JavaSerializer.class) @@ -535,20 +543,7 @@ private SetMultimap parseZookeeperStr(String zookeeper) for (String zk : zookeeper.split(";")) { String[] parts = zk.split("::"); String clusterId = parts.length == 1 ? KafkaPartition.DEFAULT_CLUSTERID : parts[0]; - String[] hostNames = parts.length == 1 ? parts[0].split(",") : parts[1].split(","); - String portId = ""; - for (int idx = hostNames.length - 1; idx >= 0; idx--) { - String[] zkParts = hostNames[idx].split(":"); - if (zkParts.length == 2) { - portId = zkParts[1]; - } - if (!portId.isEmpty() && portId != "") { - theClusters.put(clusterId, zkParts[0] + ":" + portId); - } else { - throw new IllegalArgumentException("Wrong zookeeper string: " + zookeeper + "\n" - + " Expected format should be cluster1::zookeeper1,zookeeper2:port1;cluster2::zookeeper3:port2 or zookeeper1:port1,zookeeper:port2"); - } - } + theClusters.put(clusterId, parts.length == 1 ? parts[0] : parts[1]); } return theClusters; } diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java index af5045abff..b9d4b1b55f 100644 --- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java +++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java @@ -105,10 +105,14 @@ public List transformEntry(String key, Collection bs) }}); } - + /** + * There is always only one string in zkHost + * @param zkHost + * @return + */ public static Set getBrokers(Set zkHost){ - ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$); + ZkClient zkclient = new ZkClient(zkHost.iterator().next(), 30000, 30000, ZKStringSerializer$.MODULE$); Set brokerHosts = new HashSet(); for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) { brokerHosts.add(b.connectionString()); diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java index e4a4dec89e..f3af37fb96 100644 --- a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java +++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaInputOperatorTest.java @@ -488,14 +488,12 @@ public void testZookeeper() throws Exception consumer.setTopic(TEST_TOPIC); testMeta.operator.setConsumer(consumer); - testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182;cluster2::node4:2181"); + testMeta.operator.setZookeeper("cluster1::node0,node1,node2:2181,node3:2182/chroot/dir;cluster2::node4:2181"); latch.await(500, TimeUnit.MILLISECONDS); - Assert.assertEquals("Total size of clusters ", 5, testMeta.operator.getConsumer().zookeeperMap.size()); - Assert.assertEquals("Number of nodes in cluster1 ", 4, testMeta.operator.getConsumer().zookeeperMap.get("cluster1").size()); - Assert.assertEquals("Nodes in cluster1 ", "[node0:2181, node2:2181, node3:2182, node1:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster1").toString()); - Assert.assertEquals("Number of nodes in cluster2 ", 1, testMeta.operator.getConsumer().zookeeperMap.get("cluster2").size()); - Assert.assertEquals("Nodes in cluster2 ", "[node4:2181]", testMeta.operator.getConsumer().zookeeperMap.get("cluster2").toString()); + Assert.assertEquals("Total size of clusters ", 2, testMeta.operator.getConsumer().zookeeperMap.size()); + Assert.assertEquals("Connection url for cluster1 ", "node0,node1,node2:2181,node3:2182/chroot/dir", testMeta.operator.getConsumer().zookeeperMap.get("cluster1").iterator().next()); + Assert.assertEquals("Connection url for cluster 2 ", "node4:2181", testMeta.operator.getConsumer().zookeeperMap.get("cluster2").iterator().next()); } }