From 8ddbafcfe848263d61b49af117fd52acd99d23e9 Mon Sep 17 00:00:00 2001 From: darionyaphet Date: Mon, 27 Jun 2016 22:28:52 +0800 Subject: [PATCH] STORM-1931 : Share mapper and selector in Storm-Kafka --- .../apache/storm/kafka/bolt/KafkaBolt.java | 8 ++-- .../FieldNameBasedTupleToKafkaMapper.java | 8 ++-- .../{bolt => }/mapper/TupleToKafkaMapper.java | 8 ++-- .../selector/DefaultTopicSelector.java | 6 +-- .../selector/FieldIndexTopicSelector.java | 6 +-- .../selector/FieldNameTopicSelector.java | 6 +-- .../selector/KafkaTopicSelector.java | 6 +-- .../kafka/trident/TridentKafkaState.java | 8 ++-- .../trident/TridentKafkaStateFactory.java | 8 ++-- .../FieldNameBasedTupleToKafkaMapper.java | 41 ------------------- .../mapper/TridentTupleToKafkaMapper.java | 28 ------------- .../selector/DefaultTopicSelector.java | 34 --------------- .../trident/selector/KafkaTopicSelector.java | 26 ------------ .../apache/storm/kafka/TridentKafkaTest.java | 10 ++--- .../storm/kafka/TridentKafkaTopology.java | 5 +-- 15 files changed, 39 insertions(+), 169 deletions(-) rename external/storm-kafka/src/jvm/org/apache/storm/kafka/{bolt => }/mapper/FieldNameBasedTupleToKafkaMapper.java (90%) rename external/storm-kafka/src/jvm/org/apache/storm/kafka/{bolt => }/mapper/TupleToKafkaMapper.java (87%) rename external/storm-kafka/src/jvm/org/apache/storm/kafka/{bolt => }/selector/DefaultTopicSelector.java (89%) rename external/storm-kafka/src/jvm/org/apache/storm/kafka/{bolt => }/selector/FieldIndexTopicSelector.java (92%) rename external/storm-kafka/src/jvm/org/apache/storm/kafka/{bolt => }/selector/FieldNameTopicSelector.java (92%) rename external/storm-kafka/src/jvm/org/apache/storm/kafka/{bolt => }/selector/KafkaTopicSelector.java (88%) delete mode 100644 external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java delete mode 100644 external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java delete mode 100644 external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java delete mode 100644 external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java index 0ceac3a8a5e..40ad9515af2 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java @@ -17,6 +17,10 @@ */ package org.apache.storm.kafka.bolt; +import org.apache.storm.kafka.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.selector.DefaultTopicSelector; +import org.apache.storm.kafka.selector.KafkaTopicSelector; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.OutputFieldsDeclarer; @@ -29,10 +33,6 @@ import org.apache.kafka.clients.producer.Callback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper; -import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper; -import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector; -import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector; import java.util.concurrent.Future; import java.util.concurrent.ExecutionException; import java.util.Map; diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/mapper/FieldNameBasedTupleToKafkaMapper.java similarity index 90% rename from external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java rename to external/storm-kafka/src/jvm/org/apache/storm/kafka/mapper/FieldNameBasedTupleToKafkaMapper.java index 672da8eb993..69ee00dae35 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/mapper/FieldNameBasedTupleToKafkaMapper.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.kafka.bolt.mapper; +package org.apache.storm.kafka.mapper; -import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.ITuple; public class FieldNameBasedTupleToKafkaMapper implements TupleToKafkaMapper { @@ -36,13 +36,13 @@ public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String boltMessageF } @Override - public K getKeyFromTuple(Tuple tuple) { + public K getKeyFromTuple(ITuple tuple) { //for backward compatibility, we return null when key is not present. return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null; } @Override - public V getMessageFromTuple(Tuple tuple) { + public V getMessageFromTuple(ITuple tuple) { return (V) tuple.getValueByField(boltMessageField); } } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/mapper/TupleToKafkaMapper.java similarity index 87% rename from external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java rename to external/storm-kafka/src/jvm/org/apache/storm/kafka/mapper/TupleToKafkaMapper.java index 38904134ec6..26b07ed522e 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/mapper/TupleToKafkaMapper.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.kafka.bolt.mapper; +package org.apache.storm.kafka.mapper; -import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.ITuple; import java.io.Serializable; @@ -27,6 +27,6 @@ * @param type of value. */ public interface TupleToKafkaMapper extends Serializable { - K getKeyFromTuple(Tuple tuple); - V getMessageFromTuple(Tuple tuple); + K getKeyFromTuple(ITuple tuple); + V getMessageFromTuple(ITuple tuple); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/DefaultTopicSelector.java similarity index 89% rename from external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java rename to external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/DefaultTopicSelector.java index 2aafc78ea72..daef9cd5e7f 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/DefaultTopicSelector.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.kafka.bolt.selector; +package org.apache.storm.kafka.selector; -import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.ITuple; public class DefaultTopicSelector implements KafkaTopicSelector { @@ -28,7 +28,7 @@ public DefaultTopicSelector(final String topicName) { } @Override - public String getTopic(Tuple tuple) { + public String getTopic(ITuple tuple) { return topicName; } } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/FieldIndexTopicSelector.java similarity index 92% rename from external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java rename to external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/FieldIndexTopicSelector.java index 7b52403b0ca..8675a4302e7 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/FieldIndexTopicSelector.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.kafka.bolt.selector; +package org.apache.storm.kafka.selector; -import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.ITuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,7 +37,7 @@ public FieldIndexTopicSelector(int fieldIndex, String defaultTopicName) { } @Override - public String getTopic(Tuple tuple) { + public String getTopic(ITuple tuple) { if (fieldIndex < tuple.size()) { return tuple.getString(fieldIndex); } else { diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/FieldNameTopicSelector.java similarity index 92% rename from external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java rename to external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/FieldNameTopicSelector.java index a622e8f1340..974d68d9b15 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/FieldNameTopicSelector.java @@ -15,9 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.kafka.bolt.selector; +package org.apache.storm.kafka.selector; -import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.ITuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,7 +38,7 @@ public FieldNameTopicSelector(String fieldName, String defaultTopicName) { } @Override - public String getTopic(Tuple tuple) { + public String getTopic(ITuple tuple) { if (tuple.contains(fieldName)) { return tuple.getStringByField(fieldName); } else { diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/KafkaTopicSelector.java similarity index 88% rename from external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java rename to external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/KafkaTopicSelector.java index cb7fb44a047..f345dbf2e5c 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/selector/KafkaTopicSelector.java @@ -15,12 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.storm.kafka.bolt.selector; +package org.apache.storm.kafka.selector; -import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.ITuple; import java.io.Serializable; public interface KafkaTopicSelector extends Serializable { - String getTopic(Tuple tuple); + String getTopic(ITuple tuple); } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java index 5741dc78fd7..a574e099507 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java @@ -17,6 +17,8 @@ */ package org.apache.storm.kafka.trident; +import org.apache.storm.kafka.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.selector.KafkaTopicSelector; import org.apache.storm.task.OutputCollector; import org.apache.storm.topology.FailedException; import org.apache.commons.lang.Validate; @@ -25,8 +27,6 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; -import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; import org.apache.storm.trident.operation.TridentCollector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.tuple.TridentTuple; @@ -42,10 +42,10 @@ public class TridentKafkaState implements State { private KafkaProducer producer; private OutputCollector collector; - private TridentTupleToKafkaMapper mapper; + private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; - public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { + public TridentKafkaState withTridentTupleToKafkaMapper(TupleToKafkaMapper mapper) { this.mapper = mapper; return this; } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java index f564510775d..c594dd8856a 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java @@ -17,11 +17,11 @@ */ package org.apache.storm.kafka.trident; +import org.apache.storm.kafka.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.selector.KafkaTopicSelector; import org.apache.storm.task.IMetricsContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; -import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; import org.apache.storm.trident.state.State; import org.apache.storm.trident.state.StateFactory; @@ -32,11 +32,11 @@ public class TridentKafkaStateFactory implements StateFactory { private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class); - private TridentTupleToKafkaMapper mapper; + private TupleToKafkaMapper mapper; private KafkaTopicSelector topicSelector; private Properties producerProperties = new Properties(); - public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) { + public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TupleToKafkaMapper mapper) { this.mapper = mapper; return this; } diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java deleted file mode 100644 index 2d049715d71..00000000000 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.trident.mapper; - -import org.apache.storm.trident.tuple.TridentTuple; - -public class FieldNameBasedTupleToKafkaMapper implements TridentTupleToKafkaMapper { - - public final String keyFieldName; - public final String msgFieldName; - - public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) { - this.keyFieldName = keyFieldName; - this.msgFieldName = msgFieldName; - } - - @Override - public K getKeyFromTuple(TridentTuple tuple) { - return (K) tuple.getValueByField(keyFieldName); - } - - @Override - public V getMessageFromTuple(TridentTuple tuple) { - return (V) tuple.getValueByField(msgFieldName); - } -} diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java deleted file mode 100644 index 28c6c89465e..00000000000 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.trident.mapper; - -import org.apache.storm.tuple.Tuple; -import org.apache.storm.trident.tuple.TridentTuple; - -import java.io.Serializable; - -public interface TridentTupleToKafkaMapper extends Serializable { - K getKeyFromTuple(TridentTuple tuple); - V getMessageFromTuple(TridentTuple tuple); -} diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java deleted file mode 100644 index 7ae49a3d104..00000000000 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.trident.selector; - -import org.apache.storm.trident.tuple.TridentTuple; - -public class DefaultTopicSelector implements KafkaTopicSelector { - - private final String topicName; - - public DefaultTopicSelector(final String topicName) { - this.topicName = topicName; - } - - @Override - public String getTopic(TridentTuple tuple) { - return topicName; - } -} diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java deleted file mode 100644 index 012a6c73b5c..00000000000 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.kafka.trident.selector; - -import org.apache.storm.trident.tuple.TridentTuple; - -import java.io.Serializable; - -public interface KafkaTopicSelector extends Serializable { - String getTopic(TridentTuple tuple); -} diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java index 7a6073aa3d5..c3be70d013f 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTest.java @@ -17,16 +17,16 @@ */ package org.apache.storm.kafka; +import org.apache.storm.kafka.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.mapper.TupleToKafkaMapper; +import org.apache.storm.kafka.selector.DefaultTopicSelector; +import org.apache.storm.kafka.selector.KafkaTopicSelector; import org.apache.storm.tuple.Fields; import kafka.javaapi.consumer.SimpleConsumer; import org.junit.After; import org.junit.Before; import org.junit.Test; import org.apache.storm.kafka.trident.TridentKafkaState; -import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper; -import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper; -import org.apache.storm.kafka.trident.selector.DefaultTopicSelector; -import org.apache.storm.kafka.trident.selector.KafkaTopicSelector; import org.apache.storm.trident.tuple.TridentTuple; import org.apache.storm.trident.tuple.TridentTupleView; @@ -42,7 +42,7 @@ public class TridentKafkaTest { public void setup() { broker = new KafkaTestBroker(); simpleConsumer = TestUtils.getKafkaConsumer(broker); - TridentTupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message"); + TupleToKafkaMapper mapper = new FieldNameBasedTupleToKafkaMapper("key", "message"); KafkaTopicSelector topicSelector = new DefaultTopicSelector(TestUtils.TOPIC); state = new TridentKafkaState() .withKafkaTopicSelector(topicSelector) diff --git a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java index fdc6752d3ff..91329759a07 100644 --- a/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java +++ b/external/storm-kafka/src/test/org/apache/storm/kafka/TridentKafkaTopology.java @@ -20,13 +20,12 @@ import org.apache.storm.Config; import org.apache.storm.LocalCluster; import org.apache.storm.generated.StormTopology; +import org.apache.storm.kafka.mapper.FieldNameBasedTupleToKafkaMapper; +import org.apache.storm.kafka.selector.DefaultTopicSelector; import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Values; -import com.google.common.collect.ImmutableMap; import org.apache.storm.kafka.trident.TridentKafkaStateFactory; import org.apache.storm.kafka.trident.TridentKafkaUpdater; -import org.apache.storm.kafka.trident.mapper.FieldNameBasedTupleToKafkaMapper; -import org.apache.storm.kafka.trident.selector.DefaultTopicSelector; import org.apache.storm.trident.Stream; import org.apache.storm.trident.TridentTopology; import org.apache.storm.trident.testing.FixedBatchSpout;