From dd615e1ed069efb041d9f0637b6a2a6558104ff4 Mon Sep 17 00:00:00 2001 From: bhupesh Date: Fri, 13 May 2016 16:48:53 -0700 Subject: [PATCH] SPOI-8049-Fixed getters and setters for topics and clusters --- .../kafka/AbstractKafkaInputOperator.java | 36 +++++++++++-------- .../malhar/kafka/KafkaInputOperatorTest.java | 13 +++---- 2 files changed, 28 insertions(+), 21 deletions(-) diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java index 3e709ebc27..650ea61745 100644 --- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java +++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java @@ -19,6 +19,8 @@ package org.apache.apex.malhar.kafka; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.HashMap; import java.util.Iterator; @@ -44,8 +46,6 @@ import org.apache.kafka.common.TopicPartition; import com.google.common.base.Joiner; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; import com.datatorrent.api.AutoMetric; import com.datatorrent.api.Context; @@ -413,33 +413,39 @@ public int getInitialPartitionCount() return initialPartitionCount; } - public void setClusters(String clusters) - { - this.clusters = clusters.split(";"); - } - /** * Same setting as bootstrap.servers property to KafkaConsumer * refer to http://kafka.apache.org/documentation.html#newconsumerconfigs - * To support multi cluster, you can have multiple bootstrap.servers separated by ";" + * To support multi cluster, you can have multiple elements in the array */ - public String getClusters() + public void setClusters(String[] clusters) { - return Joiner.on(';').join(clusters); + this.clusters = clusters; } - public void setTopics(String topics) + public String[] getClusters() { - this.topics = Iterables.toArray(Splitter.on(',').trimResults().omitEmptyStrings().split(topics), String.class); + return clusters; } /** - * The topics the operator consumes, separate by',' + * The topics the operator consumes * Topic name can only contain ASCII alphanumerics, '.', '_' and '-' */ - public String getTopics() + public void setTopics(String[] topics) + { + List temp = new ArrayList<>(); + for (String topic: topics) { + if (topic != null && topic.length() != 0) { + temp.add(topic); + } + } + this.topics = temp.toArray(new String[temp.size()]); + } + + public String[] getTopics() { - return Joiner.on(", ").join(topics); + return topics; } public void setStrategy(String policy) diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java index ede7f38bb6..0641cabe7c 100644 --- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java +++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java @@ -250,7 +250,7 @@ public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exc KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class); node.setInitialPartitionCount(1); // set topic - node.setTopics(testName); + node.setTopics(new String[]{testName}); node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); node.setClusters(getClusterConfig()); node.setStrategy(partition); @@ -306,12 +306,13 @@ private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag) operator.setMaxTuplesPerWindow(500); } - private String getClusterConfig() { + private String[] getClusterConfig() { String l = "localhost:"; - return l + TEST_KAFKA_BROKER_PORT[0][0] + - (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + - (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + - (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); + String returnVal = l + TEST_KAFKA_BROKER_PORT[0][0] + + (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") + + (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") + + (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : ""); + return returnVal.split(";"); }