From da1c92c1f1a3c659532ce03d11a3ea376aa16e6c Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Tue, 6 Dec 2016 00:22:33 +0530 Subject: [PATCH 01/11] STORM-1308: tick-tuple-test conversion to Java --- .../jvm/org/apache/storm/TickTupleTest.java | 96 +++++++++++++++++++ 1 file changed, 96 insertions(+) create mode 100644 storm-core/test/jvm/org/apache/storm/TickTupleTest.java diff --git a/storm-core/test/jvm/org/apache/storm/TickTupleTest.java b/storm-core/test/jvm/org/apache/storm/TickTupleTest.java new file mode 100644 index 00000000000..a9f6c466887 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/TickTupleTest.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.StormTopology; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.OutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.IRichSpout; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseRichBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Map; + +public class TickTupleTest { + + @Test + public void testTickTupleWorksWithSystemBolt() throws Exception { + ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build(); + StormTopology topology = createNoOpTopology(); + Config stormConf = new Config(); + stormConf.putAll(Utils.readDefaultConfig()); + stormConf.put("storm.cluster.mode", "local"); + stormConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1); + cluster.submitTopology("test", stormConf, topology); + cluster.advanceClusterTime(2); + Assert.assertTrue("Test is passed", true); + } + + private IRichSpout makeNoOpSpout() { + return new BaseRichSpout() { + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("tuple")); + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + } + + @Override + public void nextTuple() { + } + + private void writeObject(java.io.ObjectOutputStream stream) { + } + }; + } + + private BaseRichBolt makeNoOpBolt() { + return new BaseRichBolt() { + @Override + public void prepare(Map conf, TopologyContext topologyContext, OutputCollector outputCollector) {} + @Override + public void execute(Tuple tuple) {} + + @Override + public void cleanup() { } + + @Override + public void declareOutputFields(OutputFieldsDeclarer ofd) {} + + private void writeObject(java.io.ObjectOutputStream stream) {} + }; + } + + private StormTopology createNoOpTopology() { + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("1", makeNoOpSpout()); + builder.setBolt("2", makeNoOpBolt()).shuffleGrouping("1"); + return builder.createTopology(); + } +} From 50bebd90eabcce0bdaf4d5cb7d65e92bbfbe4985 Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Sun, 11 Dec 2016 12:21:07 +0530 Subject: [PATCH 02/11] fix --- .../jvm/org/apache/storm/MessagingTest.java | 87 +++++++++++++++++++ .../jvm/org/apache/storm/TickTupleTest.java | 26 +++--- 2 files changed, 103 insertions(+), 10 deletions(-) create mode 100644 storm-core/test/jvm/org/apache/storm/MessagingTest.java diff --git a/storm-core/test/jvm/org/apache/storm/MessagingTest.java b/storm-core/test/jvm/org/apache/storm/MessagingTest.java new file mode 100644 index 00000000000..8fb7d22a391 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/MessagingTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.generated.Grouping; +import org.apache.storm.generated.StormTopology; +import org.apache.storm.testing.*; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.utils.Utils; +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; + +/** + * Created by ruchi on 29-11-2016. + */ +public class MessagingTest { + + @Test + public void testLocalTransport() throws Exception { + Config stormConf = new Config(); + stormConf.putAll(Utils.readDefaultConfig()); + stormConf.put(Config.TOPOLOGY_WORKERS, 2); + stormConf.put(Config.STORM_LOCAL_MODE_ZMQ, true); + List seeds = new ArrayList<>(); + seeds.add("localhost"); + stormConf.put(Config.NIMBUS_HOST, "localhost"); + stormConf.put(Config.NIMBUS_SEEDS, seeds); + stormConf.put("storm.cluster.mode", "local"); + stormConf.put(Config.STORM_LOCAL_HOSTNAME, "localhost"); + stormConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); + ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withSupervisors(1).withPortsPerSupervisor(2) + .withDaemonConf(stormConf).withNimbusDaemon(true).build(); + Thrift.SpoutDetails spoutDetails = Thrift.prepareSpoutDetails(new TestWordSpout(false), 2); + Map inputs = new HashMap<>(); + inputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()); + Thrift.BoltDetails boltDetails = Thrift.prepareBoltDetails(inputs, new TestGlobalCount(), 6); + Map spoutMap = new HashMap<>(); + spoutMap.put("1", spoutDetails); + Map boltMap = new HashMap<>(); + boltMap.put("2", boltDetails); + //StormTopology stormTopology = Thrift.buildTopology(spoutMap, boltMap); + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("1", new TestWordSpout(false), 2); + builder.setBolt("2", new TestGlobalCount(), 6).shuffleGrouping("1"); + StormTopology stormTopology = builder.createTopology(); + FixedTuple[] fixedTuple = {new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), + new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b"))}; + Map> data = new HashMap<>(); + data.put("1", (List)Arrays.asList(fixedTuple)); + MockedSources mockedSources = new MockedSources(data); + CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam(); + completeTopologyParam.setMockedSources(mockedSources); + Map> results = Testing.completeTopology(cluster, stormTopology, completeTopologyParam); + Assert.assertEquals(6*4, Testing.readTuples(results,"2").size()); + + } + +} diff --git a/storm-core/test/jvm/org/apache/storm/TickTupleTest.java b/storm-core/test/jvm/org/apache/storm/TickTupleTest.java index a9f6c466887..503bd2aeb8b 100644 --- a/storm-core/test/jvm/org/apache/storm/TickTupleTest.java +++ b/storm-core/test/jvm/org/apache/storm/TickTupleTest.java @@ -39,15 +39,21 @@ public class TickTupleTest { @Test public void testTickTupleWorksWithSystemBolt() throws Exception { - ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build(); - StormTopology topology = createNoOpTopology(); - Config stormConf = new Config(); - stormConf.putAll(Utils.readDefaultConfig()); - stormConf.put("storm.cluster.mode", "local"); - stormConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1); - cluster.submitTopology("test", stormConf, topology); - cluster.advanceClusterTime(2); - Assert.assertTrue("Test is passed", true); + ILocalCluster cluster = null; + try { + cluster = new LocalCluster.Builder().withSimulatedTime().withNimbusDaemon(true).build(); + StormTopology topology = createNoOpTopology(); + Config stormConf = new Config(); + stormConf.putAll(Utils.readDefaultConfig()); + stormConf.put("storm.cluster.mode", "local"); + stormConf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 1); + cluster.submitTopology("test", stormConf, topology); + cluster.advanceClusterTime(2); + Assert.assertTrue("Test is passed", true); + } finally { + cluster.close(); + } + } private IRichSpout makeNoOpSpout() { @@ -90,7 +96,7 @@ private void writeObject(java.io.ObjectOutputStream stream) {} private StormTopology createNoOpTopology() { TopologyBuilder builder = new TopologyBuilder(); builder.setSpout("1", makeNoOpSpout()); - builder.setBolt("2", makeNoOpBolt()).shuffleGrouping("1"); + builder.setBolt("2", makeNoOpBolt()).fieldsGrouping("1", new Fields("tuple")); return builder.createTopology(); } } From ffcefd5c5e020a7d149b3fe5bc3d3b1173d35552 Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Sun, 11 Dec 2016 12:26:35 +0530 Subject: [PATCH 03/11] remove unwanted file --- .../jvm/org/apache/storm/MessagingTest.java | 87 ------------------- 1 file changed, 87 deletions(-) delete mode 100644 storm-core/test/jvm/org/apache/storm/MessagingTest.java diff --git a/storm-core/test/jvm/org/apache/storm/MessagingTest.java b/storm-core/test/jvm/org/apache/storm/MessagingTest.java deleted file mode 100644 index 8fb7d22a391..00000000000 --- a/storm-core/test/jvm/org/apache/storm/MessagingTest.java +++ /dev/null @@ -1,87 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.storm; - -import org.apache.storm.generated.GlobalStreamId; -import org.apache.storm.generated.Grouping; -import org.apache.storm.generated.StormTopology; -import org.apache.storm.testing.*; -import org.apache.storm.topology.TopologyBuilder; -import org.apache.storm.utils.Utils; -import org.junit.Assert; -import org.junit.Test; - -import java.util.*; - -/** - * Created by ruchi on 29-11-2016. - */ -public class MessagingTest { - - @Test - public void testLocalTransport() throws Exception { - Config stormConf = new Config(); - stormConf.putAll(Utils.readDefaultConfig()); - stormConf.put(Config.TOPOLOGY_WORKERS, 2); - stormConf.put(Config.STORM_LOCAL_MODE_ZMQ, true); - List seeds = new ArrayList<>(); - seeds.add("localhost"); - stormConf.put(Config.NIMBUS_HOST, "localhost"); - stormConf.put(Config.NIMBUS_SEEDS, seeds); - stormConf.put("storm.cluster.mode", "local"); - stormConf.put(Config.STORM_LOCAL_HOSTNAME, "localhost"); - stormConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); - ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime().withSupervisors(1).withPortsPerSupervisor(2) - .withDaemonConf(stormConf).withNimbusDaemon(true).build(); - Thrift.SpoutDetails spoutDetails = Thrift.prepareSpoutDetails(new TestWordSpout(false), 2); - Map inputs = new HashMap<>(); - inputs.put(Utils.getGlobalStreamId("1", null), Thrift.prepareShuffleGrouping()); - Thrift.BoltDetails boltDetails = Thrift.prepareBoltDetails(inputs, new TestGlobalCount(), 6); - Map spoutMap = new HashMap<>(); - spoutMap.put("1", spoutDetails); - Map boltMap = new HashMap<>(); - boltMap.put("2", boltDetails); - //StormTopology stormTopology = Thrift.buildTopology(spoutMap, boltMap); - TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("1", new TestWordSpout(false), 2); - builder.setBolt("2", new TestGlobalCount(), 6).shuffleGrouping("1"); - StormTopology stormTopology = builder.createTopology(); - FixedTuple[] fixedTuple = {new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b")), - new FixedTuple(( List)Arrays.asList((Object)"a")),new FixedTuple(( List)Arrays.asList((Object)"b"))}; - Map> data = new HashMap<>(); - data.put("1", (List)Arrays.asList(fixedTuple)); - MockedSources mockedSources = new MockedSources(data); - CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam(); - completeTopologyParam.setMockedSources(mockedSources); - Map> results = Testing.completeTopology(cluster, stormTopology, completeTopologyParam); - Assert.assertEquals(6*4, Testing.readTuples(results,"2").size()); - - } - -} From ffc00dc44b1180312df750351778385e236cdffc Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Sun, 11 Dec 2016 12:28:58 +0530 Subject: [PATCH 04/11] removed old clj test --- .../clj/org/apache/storm/tick_tuple_test.clj | 54 ------------------- 1 file changed, 54 deletions(-) delete mode 100644 storm-core/test/clj/org/apache/storm/tick_tuple_test.clj diff --git a/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj b/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj deleted file mode 100644 index f478dd404d7..00000000000 --- a/storm-core/test/clj/org/apache/storm/tick_tuple_test.clj +++ /dev/null @@ -1,54 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.tick-tuple-test - (:use [clojure test]) - (:use [org.apache.storm config]) - (:use [org.apache.storm.internal clojure]) - (:import [org.apache.storm Thrift LocalCluster$Builder]) - (:import [org.apache.storm.utils Utils])) - -(defbolt noop-bolt ["tuple"] {:prepare true} - [conf context collector] - (bolt - (execute [tuple]))) - -(defspout noop-spout ["tuple"] - [conf context collector] - (spout - (nextTuple []))) - -(deftest test-tick-tuple-works-with-system-bolt - (with-open [cluster (.build (doto (LocalCluster$Builder.) - (.withSimulatedTime)))] - (let [topology (Thrift/buildTopology - {"1" (Thrift/prepareSpoutDetails noop-spout)} - {"2" (Thrift/prepareBoltDetails - {(Utils/getGlobalStreamId "1" nil) - (Thrift/prepareFieldsGrouping ["tuple"])} - noop-bolt)})] - (try - (.submitTopology cluster - "test" - {TOPOLOGY-TICK-TUPLE-FREQ-SECS 1} - topology) - (.advanceClusterTime cluster 2) - ;; if reaches here, it means everything works ok. - (is true) - (catch Exception e - (is false)))))) - - - From 4a0ccf7f8bf12acf685b095d101b550d7901efcb Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Tue, 17 Jan 2017 23:35:26 +0530 Subject: [PATCH 05/11] STORM-1292: port backtype.storm.messaging-test to java --- .../jvm/org/apache/storm/MessagingTest.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 storm-core/test/jvm/org/apache/storm/MessagingTest.java diff --git a/storm-core/test/jvm/org/apache/storm/MessagingTest.java b/storm-core/test/jvm/org/apache/storm/MessagingTest.java new file mode 100644 index 00000000000..52ed1d9cfbf --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/MessagingTest.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.StormTopology; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.topology.TopologyBuilder; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.*; + +public class MessagingTest { + + @Test + public void testLocalTransport() throws Exception { + Config stormConf = new Config(); + stormConf.put(Config.TOPOLOGY_WORKERS, 2); + stormConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); + + try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime() + .withSupervisors(1).withPortsPerSupervisor(2) + .withDaemonConf(stormConf).build()) { + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("1", new TestWordSpout(false), 2); + builder.setBolt("2", new TestGlobalCount(), 6).shuffleGrouping("1"); + StormTopology stormTopology = builder.createTopology(); + + List fixedTuples = new ArrayList<>(); + for (int i = 0; i < 12; i++) { + fixedTuples.add(new FixedTuple(Collections.singletonList("a"))); + fixedTuples.add(new FixedTuple(Collections.singletonList("b"))); + } + Map> data = new HashMap<>(); + data.put("1", fixedTuples); + MockedSources mockedSources = new MockedSources(data); + CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam(); + completeTopologyParam.setMockedSources(mockedSources); + Map> results = Testing.completeTopology(cluster, stormTopology, completeTopologyParam); + Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size()); + } + } + + + +} From db06090ad22b2573a633faf235b4447fc9e4204d Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Tue, 17 Jan 2017 23:38:08 +0530 Subject: [PATCH 06/11] changes done --- .../clj/org/apache/storm/messaging_test.clj | 61 ------------------- 1 file changed, 61 deletions(-) delete mode 100644 storm-core/test/clj/org/apache/storm/messaging_test.clj diff --git a/storm-core/test/clj/org/apache/storm/messaging_test.clj b/storm-core/test/clj/org/apache/storm/messaging_test.clj deleted file mode 100644 index c9843363e51..00000000000 --- a/storm-core/test/clj/org/apache/storm/messaging_test.clj +++ /dev/null @@ -1,61 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.messaging-test - (:use [clojure test]) - (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt]) - (:use [org.apache.storm config]) - (:import [org.apache.storm Testing Thrift LocalCluster$Builder]) - (:import [org.apache.storm.utils Utils])) - -(deftest test-local-transport - (doseq [transport-on? [false true]] - (with-open [cluster (.build (doto (LocalCluster$Builder.) - (.withSimulatedTime) - (.withSupervisors 1) - (.withPortsPerSupervisor 2) - (.withDaemonConf {TOPOLOGY-WORKERS 2 - STORM-LOCAL-MODE-ZMQ - (if transport-on? true false) - STORM-MESSAGING-TRANSPORT - "org.apache.storm.messaging.netty.Context"})))] - (let [topology (Thrift/buildTopology - {"1" (Thrift/prepareSpoutDetails - (TestWordSpout. true) (Integer. 2))} - {"2" (Thrift/prepareBoltDetails - {(Utils/getGlobalStreamId "1" nil) - (Thrift/prepareShuffleGrouping)} - (TestGlobalCount.) (Integer. 6)) - }) - results (Testing/completeTopology cluster - topology - (doto (CompleteTopologyParam.) - ;; important for test that - ;; #tuples = multiple of 4 and 6 - (.setMockedSources (MockedSources. {"1" [["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ]} - ))))] - (is (= (* 6 4) (.size (Testing/readTuples results "2")))))))) From a8e07e7c0e0c0e33b1f4c8303b245b5af6bc81f2 Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Tue, 17 Jan 2017 23:43:00 +0530 Subject: [PATCH 07/11] STORM-1292: port backtype.storm.messaging-test to java --- storm-core/test/jvm/org/apache/storm/MessagingTest.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/storm-core/test/jvm/org/apache/storm/MessagingTest.java b/storm-core/test/jvm/org/apache/storm/MessagingTest.java index 52ed1d9cfbf..6320e49eca2 100644 --- a/storm-core/test/jvm/org/apache/storm/MessagingTest.java +++ b/storm-core/test/jvm/org/apache/storm/MessagingTest.java @@ -29,7 +29,11 @@ import org.junit.Assert; import org.junit.Test; -import java.util.*; +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; public class MessagingTest { @@ -62,7 +66,4 @@ public void testLocalTransport() throws Exception { Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size()); } } - - - } From 2dc87c477b36937d4e921631961aadbb2b0b031d Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Tue, 17 Jan 2017 23:35:26 +0530 Subject: [PATCH 08/11] STORM-1292: port backtype.storm.messaging-test to java changes done STORM-1292: port backtype.storm.messaging-test to java --- .../clj/org/apache/storm/messaging_test.clj | 61 ---------------- .../jvm/org/apache/storm/MessagingTest.java | 69 +++++++++++++++++++ 2 files changed, 69 insertions(+), 61 deletions(-) delete mode 100644 storm-core/test/clj/org/apache/storm/messaging_test.clj create mode 100644 storm-core/test/jvm/org/apache/storm/MessagingTest.java diff --git a/storm-core/test/clj/org/apache/storm/messaging_test.clj b/storm-core/test/clj/org/apache/storm/messaging_test.clj deleted file mode 100644 index c9843363e51..00000000000 --- a/storm-core/test/clj/org/apache/storm/messaging_test.clj +++ /dev/null @@ -1,61 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.messaging-test - (:use [clojure test]) - (:import [org.apache.storm.testing CompleteTopologyParam MockedSources TestWordCounter TestWordSpout TestGlobalCount TestEventLogSpout TestEventOrderCheckBolt]) - (:use [org.apache.storm config]) - (:import [org.apache.storm Testing Thrift LocalCluster$Builder]) - (:import [org.apache.storm.utils Utils])) - -(deftest test-local-transport - (doseq [transport-on? [false true]] - (with-open [cluster (.build (doto (LocalCluster$Builder.) - (.withSimulatedTime) - (.withSupervisors 1) - (.withPortsPerSupervisor 2) - (.withDaemonConf {TOPOLOGY-WORKERS 2 - STORM-LOCAL-MODE-ZMQ - (if transport-on? true false) - STORM-MESSAGING-TRANSPORT - "org.apache.storm.messaging.netty.Context"})))] - (let [topology (Thrift/buildTopology - {"1" (Thrift/prepareSpoutDetails - (TestWordSpout. true) (Integer. 2))} - {"2" (Thrift/prepareBoltDetails - {(Utils/getGlobalStreamId "1" nil) - (Thrift/prepareShuffleGrouping)} - (TestGlobalCount.) (Integer. 6)) - }) - results (Testing/completeTopology cluster - topology - (doto (CompleteTopologyParam.) - ;; important for test that - ;; #tuples = multiple of 4 and 6 - (.setMockedSources (MockedSources. {"1" [["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ["a"] ["b"] - ]} - ))))] - (is (= (* 6 4) (.size (Testing/readTuples results "2")))))))) diff --git a/storm-core/test/jvm/org/apache/storm/MessagingTest.java b/storm-core/test/jvm/org/apache/storm/MessagingTest.java new file mode 100644 index 00000000000..6320e49eca2 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/MessagingTest.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.storm; + +import org.apache.storm.generated.StormTopology; +import org.apache.storm.testing.TestWordSpout; +import org.apache.storm.testing.TestGlobalCount; +import org.apache.storm.testing.FixedTuple; +import org.apache.storm.testing.MockedSources; +import org.apache.storm.testing.CompleteTopologyParam; +import org.apache.storm.topology.TopologyBuilder; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.List; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Map; +import java.util.HashMap; + +public class MessagingTest { + + @Test + public void testLocalTransport() throws Exception { + Config stormConf = new Config(); + stormConf.put(Config.TOPOLOGY_WORKERS, 2); + stormConf.put(Config.STORM_MESSAGING_TRANSPORT , "org.apache.storm.messaging.netty.Context"); + + try (ILocalCluster cluster = new LocalCluster.Builder().withSimulatedTime() + .withSupervisors(1).withPortsPerSupervisor(2) + .withDaemonConf(stormConf).build()) { + + TopologyBuilder builder = new TopologyBuilder(); + builder.setSpout("1", new TestWordSpout(false), 2); + builder.setBolt("2", new TestGlobalCount(), 6).shuffleGrouping("1"); + StormTopology stormTopology = builder.createTopology(); + + List fixedTuples = new ArrayList<>(); + for (int i = 0; i < 12; i++) { + fixedTuples.add(new FixedTuple(Collections.singletonList("a"))); + fixedTuples.add(new FixedTuple(Collections.singletonList("b"))); + } + Map> data = new HashMap<>(); + data.put("1", fixedTuples); + MockedSources mockedSources = new MockedSources(data); + CompleteTopologyParam completeTopologyParam = new CompleteTopologyParam(); + completeTopologyParam.setMockedSources(mockedSources); + Map> results = Testing.completeTopology(cluster, stormTopology, completeTopologyParam); + Assert.assertEquals(6 * 4, Testing.readTuples(results, "2").size()); + } + } +} From 78403790e5f6b8c3e5ddc7e9f393e36947604273 Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Thu, 19 Jan 2017 23:07:30 +0530 Subject: [PATCH 09/11] STORM-1292: port backtype.storm.messaging-test to java --- storm-core/test/jvm/org/apache/storm/MessagingTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/storm-core/test/jvm/org/apache/storm/MessagingTest.java b/storm-core/test/jvm/org/apache/storm/MessagingTest.java index 6320e49eca2..26ca6cf24eb 100644 --- a/storm-core/test/jvm/org/apache/storm/MessagingTest.java +++ b/storm-core/test/jvm/org/apache/storm/MessagingTest.java @@ -48,7 +48,7 @@ public void testLocalTransport() throws Exception { .withDaemonConf(stormConf).build()) { TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("1", new TestWordSpout(false), 2); + builder.setSpout("1", new TestWordSpout(true), 2); builder.setBolt("2", new TestGlobalCount(), 6).shuffleGrouping("1"); StormTopology stormTopology = builder.createTopology(); From 86859df609ef628b53bdab4b18e9643e2e9e7951 Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Tue, 21 Feb 2017 12:47:47 +0530 Subject: [PATCH 10/11] update --- storm-core/test/jvm/org/apache/storm/MessagingTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/storm-core/test/jvm/org/apache/storm/MessagingTest.java b/storm-core/test/jvm/org/apache/storm/MessagingTest.java index 3d8f39ef49f..2f348d855eb 100644 --- a/storm-core/test/jvm/org/apache/storm/MessagingTest.java +++ b/storm-core/test/jvm/org/apache/storm/MessagingTest.java @@ -29,11 +29,7 @@ import org.junit.Assert; import org.junit.Test; -import java.util.List; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Map; -import java.util.HashMap; +import java.util.*; public class MessagingTest { @@ -67,4 +63,6 @@ public void testLocalTransport() throws Exception { } } + + } From c6c3fa0b5e8967be8555a4f70a345b7a364c8c5b Mon Sep 17 00:00:00 2001 From: kamleshbhatt Date: Tue, 21 Feb 2017 13:20:09 +0530 Subject: [PATCH 11/11] local state test port to java --- .../clj/org/apache/storm/local_state_test.clj | 59 ------------------- .../jvm/org/apache/storm/LocalStateTest.java | 54 +++++++++++++++++ 2 files changed, 54 insertions(+), 59 deletions(-) delete mode 100644 storm-core/test/clj/org/apache/storm/local_state_test.clj create mode 100644 storm-core/test/jvm/org/apache/storm/LocalStateTest.java diff --git a/storm-core/test/clj/org/apache/storm/local_state_test.clj b/storm-core/test/clj/org/apache/storm/local_state_test.clj deleted file mode 100644 index e5baa67d6ef..00000000000 --- a/storm-core/test/clj/org/apache/storm/local_state_test.clj +++ /dev/null @@ -1,59 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. -(ns org.apache.storm.local-state-test - (:use [clojure test]) - (:import [org.apache.storm.testing TmpPath]) - (:import [org.apache.storm.utils LocalState] - [org.apache.storm.generated GlobalStreamId] - [org.apache.commons.io FileUtils] - [java.io File])) - -(deftest test-local-state - (with-open [dir1-tmp (TmpPath.) - dir2-tmp (TmpPath.)] - (let [dir1 (.getPath dir1-tmp) - dir2 (.getPath dir2-tmp) - gs-a (GlobalStreamId. "a" "a") - gs-b (GlobalStreamId. "b" "b") - gs-c (GlobalStreamId. "c" "c") - gs-d (GlobalStreamId. "d" "d") - ls1 (LocalState. dir1) - ls2 (LocalState. dir2)] - (is (= {} (.snapshot ls1))) - (.put ls1 "a" gs-a) - (.put ls1 "b" gs-b) - (is (= {"a" gs-a "b" gs-b} (.snapshot ls1))) - (is (= {} (.snapshot ls2))) - (is (= gs-a (.get ls1 "a"))) - (is (= nil (.get ls1 "c"))) - (is (= gs-b (.get ls1 "b"))) - (is (= {"a" gs-a "b" gs-b} (.snapshot (LocalState. dir1)))) - (.put ls2 "b" gs-a) - (.put ls2 "b" gs-b) - (.put ls2 "b" gs-c) - (.put ls2 "b" gs-d) - (is (= gs-d (.get ls2 "b")))))) - -(deftest empty-state - (with-open [tmp-dir (TmpPath.)] - (let [dir (.getPath tmp-dir) - ls (LocalState. dir) - gs-a (GlobalStreamId. "a" "a") - data (FileUtils/openOutputStream (File. dir "12345")) - version (FileUtils/openOutputStream (File. dir "12345.version"))] - (is (= nil (.get ls "c"))) - (.put ls "a" gs-a) - (is (= gs-a (.get ls "a")))))) diff --git a/storm-core/test/jvm/org/apache/storm/LocalStateTest.java b/storm-core/test/jvm/org/apache/storm/LocalStateTest.java new file mode 100644 index 00000000000..bf5cb63c950 --- /dev/null +++ b/storm-core/test/jvm/org/apache/storm/LocalStateTest.java @@ -0,0 +1,54 @@ +package org.apache.storm; + +import org.apache.storm.generated.GlobalStreamId; +import org.apache.storm.testing.TmpPath; +import org.apache.storm.utils.LocalState; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; + +public class LocalStateTest { + + @Test + public void testLocalState() throws IOException{ + TmpPath dir1_tmp = new TmpPath(); + TmpPath dir2_tmp = new TmpPath(); + GlobalStreamId globalStreamId_a = new GlobalStreamId("a","a"); + GlobalStreamId globalStreamId_b = new GlobalStreamId("b","b"); + GlobalStreamId globalStreamId_c = new GlobalStreamId("c","c"); + GlobalStreamId globalStreamId_d = new GlobalStreamId("d","d"); + + LocalState ls1 = new LocalState(dir1_tmp.getPath()); + LocalState ls2 = new LocalState(dir2_tmp.getPath()); + Assert.assertTrue(ls1.snapshot().isEmpty()); + + ls1.put("a",globalStreamId_a); + ls1.put("b",globalStreamId_b); + Assert.assertTrue(ls1.snapshot().containsKey("a")); + Assert.assertTrue(ls1.snapshot().containsValue(globalStreamId_a)); + Assert.assertTrue(ls1.snapshot().containsKey("b")); + Assert.assertTrue(ls1.snapshot().containsValue(globalStreamId_b)); + + ls2.put("b",globalStreamId_a); + ls2.put("b",globalStreamId_b); + ls2.put("b",globalStreamId_c); + ls2.put("b",globalStreamId_d); + Assert.assertEquals(globalStreamId_d, ls2.get("b")); + } + + @Test + public void testEmptyState() throws IOException { + + TmpPath tmp_dir = new TmpPath(); + String dir = tmp_dir.getPath(); + LocalState ls = new LocalState(dir); + GlobalStreamId gs_a = new GlobalStreamId("a","a"); + //FileOutputStream data =FileUtils.openOutputStream(new File(dir,"12345")); + //FileOutputStream version =FileUtils.openOutputStream(new File(dir,"12345.version")); + Assert.assertNull(ls.get("c")); + ls.put("a",gs_a); + Assert.assertEquals(gs_a, ls.get("a")); + + } +}