From 7b19d6b8a32726fb45c6095111a27e13411efd66 Mon Sep 17 00:00:00 2001 From: vesense Date: Tue, 25 Apr 2017 12:55:24 +0800 Subject: [PATCH 1/3] STORM-2490: Lambda support --- examples/storm-starter/README.markdown | 1 + .../apache/storm/starter/LambdaTopology.java | 52 +++++++++++ external/storm-rocketmq/README.md | 2 +- .../storm/lambda/AbstractLambdaBolt.java | 30 ++++++ .../storm/lambda/LambdaBiConsumerBolt.java | 36 +++++++ .../storm/lambda/LambdaConsumerBolt.java | 36 +++++++ .../org/apache/storm/lambda/LambdaSpout.java | 54 +++++++++++ .../storm/lambda/SerializableBiConsumer.java | 23 +++++ .../storm/lambda/SerializableConsumer.java | 23 +++++ .../storm/lambda/SerializableSupplier.java | 23 +++++ .../storm/topology/TopologyBuilder.java | 93 +++++++++++++++++++ 11 files changed, 372 insertions(+), 1 deletion(-) create mode 100644 examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java create mode 100644 storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java create mode 100644 storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java create mode 100644 storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java create mode 100644 storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java create mode 100644 storm-client/src/jvm/org/apache/storm/lambda/SerializableBiConsumer.java create mode 100644 storm-client/src/jvm/org/apache/storm/lambda/SerializableConsumer.java create mode 100644 storm-client/src/jvm/org/apache/storm/lambda/SerializableSupplier.java diff --git a/examples/storm-starter/README.markdown b/examples/storm-starter/README.markdown index 6a884657766..a57d4a2a1cf 100644 --- a/examples/storm-starter/README.markdown +++ b/examples/storm-starter/README.markdown @@ -39,6 +39,7 @@ these topologies first: 2. [WordCountTopology](src/jvm/org/apache/storm/starter/WordCountTopology.java): Basic topology that makes use of multilang by implementing one bolt in Python 3. [ReachTopology](src/jvm/org/apache/storm/starter/ReachTopology.java): Example of complex DRPC on top of Storm +4. [LambdaTopology](src/jvm/org/apache/storm/starter/LambdaTopology.java): Example of writing spout/bolt using Java8 lambda expression After you have familiarized yourself with these topologies, take a look at the other topopologies in [src/jvm/org/apache/storm/starter/](src/jvm/org/apache/storm/starter/) such as [RollingTopWords](src/jvm/org/apache/storm/starter/RollingTopWords.java) diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java new file mode 100644 index 00000000000..d2be6be412c --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.topology.ConfigurableTopology; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.tuple.Values; + +import java.util.UUID; + +public class LambdaTopology extends ConfigurableTopology { + public static void main(String[] args) { + ConfigurableTopology.start(new LambdaTopology(), args); + } + + @Override + protected int run(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + + // example. spout1: generate random strings + // bolt1: get the first part of a string + // bolt2: output the tuple + builder.setSpout("spout1", () -> UUID.randomUUID().toString()); + builder.setBolt("bolt1", (tuple, collector) -> { + String[] parts = tuple.getStringByField("lambda").split("\\-"); + collector.emit(new Values(parts[0])); + }).shuffleGrouping("spout1"); + builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1"); + + Config conf = new Config(); + conf.setDebug(true); + conf.setNumWorkers(2); + + return submit("lambda-demo", conf, builder); + } +} diff --git a/external/storm-rocketmq/README.md b/external/storm-rocketmq/README.md index 160118a1bcc..341bb0712d4 100644 --- a/external/storm-rocketmq/README.md +++ b/external/storm-rocketmq/README.md @@ -1,4 +1,4 @@ -#Storm RocketMQ +# Storm RocketMQ Storm/Trident integration for [RocketMQ](https://rocketmq.incubator.apache.org/). This package includes the core spout, bolt and trident states that allows a storm topology to either write storm tuples into a topic or read from topics in a storm topology. diff --git a/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java new file mode 100644 index 00000000000..3ecc4572461 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.lambda; + +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; + +public abstract class AbstractLambdaBolt extends BaseBasicBolt { + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("lambda")); + } +} diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java new file mode 100644 index 00000000000..96d48a251a4 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.lambda; + +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.tuple.Tuple; + +public class LambdaBiConsumerBolt extends AbstractLambdaBolt { + + private SerializableBiConsumer biConsumer; + + public LambdaBiConsumerBolt(SerializableBiConsumer biConsumer) { + this.biConsumer = biConsumer; + } + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + biConsumer.accept(input, collector); + } + +} diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java new file mode 100644 index 00000000000..29bb32e0d48 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.lambda; + +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.tuple.Tuple; + +public class LambdaConsumerBolt extends AbstractLambdaBolt { + + private SerializableConsumer consumer; + + public LambdaConsumerBolt(SerializableConsumer consumer) { + this.consumer = consumer; + } + + @Override + public void execute(Tuple input, BasicOutputCollector collector) { + consumer.accept(input); + } + +} diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java new file mode 100644 index 00000000000..51593b50720 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.lambda; + +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Values; + +import java.util.Map; + +public class LambdaSpout extends BaseRichSpout { + private SerializableSupplier supplier; + private SpoutOutputCollector collector; + + public LambdaSpout(SerializableSupplier supplier) { + this.supplier = supplier; + } + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + this.collector = collector; + } + + @Override + public void nextTuple() { + Object obj = supplier.get(); + if (obj != null) { + collector.emit(new Values(obj)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("lambda")); + } +} diff --git a/storm-client/src/jvm/org/apache/storm/lambda/SerializableBiConsumer.java b/storm-client/src/jvm/org/apache/storm/lambda/SerializableBiConsumer.java new file mode 100644 index 00000000000..e7a6865ff53 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/lambda/SerializableBiConsumer.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.lambda; + +import java.io.Serializable; +import java.util.function.BiConsumer; + +public interface SerializableBiConsumer extends BiConsumer, Serializable {} diff --git a/storm-client/src/jvm/org/apache/storm/lambda/SerializableConsumer.java b/storm-client/src/jvm/org/apache/storm/lambda/SerializableConsumer.java new file mode 100644 index 00000000000..0f781c8ece3 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/lambda/SerializableConsumer.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.lambda; + +import java.io.Serializable; +import java.util.function.Consumer; + +public interface SerializableConsumer extends Consumer, Serializable {} diff --git a/storm-client/src/jvm/org/apache/storm/lambda/SerializableSupplier.java b/storm-client/src/jvm/org/apache/storm/lambda/SerializableSupplier.java new file mode 100644 index 00000000000..238f9be3512 --- /dev/null +++ b/storm-client/src/jvm/org/apache/storm/lambda/SerializableSupplier.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.lambda; + +import java.io.Serializable; +import java.util.function.Supplier; + +public interface SerializableSupplier extends Supplier, Serializable {} diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java index 15def4e3382..15e19457efb 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java +++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java @@ -22,6 +22,12 @@ import org.apache.storm.grouping.CustomStreamGrouping; import org.apache.storm.grouping.PartialKeyGrouping; import org.apache.storm.hooks.IWorkerHook; +import org.apache.storm.lambda.LambdaBiConsumerBolt; +import org.apache.storm.lambda.LambdaConsumerBolt; +import org.apache.storm.lambda.LambdaSpout; +import org.apache.storm.lambda.SerializableBiConsumer; +import org.apache.storm.lambda.SerializableConsumer; +import org.apache.storm.lambda.SerializableSupplier; import org.apache.storm.spout.CheckpointSpout; import org.apache.storm.state.State; import org.apache.storm.task.OutputCollector; @@ -315,6 +321,68 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt(new StatefulWindowedBoltExecutor(bolt)), parallelism_hint); } + /** + * Define a new bolt in this topology. This defines a lambda basic bolt, which is a + * simpler to use but more restricted kind of bolt. Basic bolts are intended + * for non-aggregation processing and automate the anchoring/acking process to + * achieve proper reliability in the topology. + * + * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. + * @param biConsumer lambda expression which is the instance of functional interface BiConsumer + * @return use the returned object to declare the inputs to this component + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer) throws IllegalArgumentException { + return setBolt(id, biConsumer, null); + } + + /** + * Define a new bolt in this topology. This defines a lambda basic bolt, which is a + * simpler to use but more restricted kind of bolt. Basic bolts are intended + * for non-aggregation processing and automate the anchoring/acking process to + * achieve proper reliability in the topology. + * + * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. + * @param biConsumer lambda expression which is the instance of functional interface BiConsumer + * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster. + * @return use the returned object to declare the inputs to this component + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, Number parallelism_hint) throws IllegalArgumentException { + return setBolt(id, new LambdaBiConsumerBolt(biConsumer), parallelism_hint); + } + + /** + * Define a new bolt in this topology. This defines a lambda basic bolt, which is a + * simpler to use but more restricted kind of bolt. Basic bolts are intended + * for non-aggregation processing and automate the anchoring/acking process to + * achieve proper reliability in the topology. + * + * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. + * @param consumer lambda expression which is the instance of functional interface Consumer + * @return use the returned object to declare the inputs to this component + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public BoltDeclarer setBolt(String id, SerializableConsumer consumer) throws IllegalArgumentException { + return setBolt(id, consumer, null); + } + + /** + * Define a new bolt in this topology. This defines a lambda basic bolt, which is a + * simpler to use but more restricted kind of bolt. Basic bolts are intended + * for non-aggregation processing and automate the anchoring/acking process to + * achieve proper reliability in the topology. + * + * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. + * @param consumer lambda expression which is the instance of functional interface Consumer + * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster. + * @return use the returned object to declare the inputs to this component + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public BoltDeclarer setBolt(String id, SerializableConsumer consumer, Number parallelism_hint) throws IllegalArgumentException { + return setBolt(id, new LambdaConsumerBolt(consumer), parallelism_hint); + } + /** * Define a new spout in this topology. * @@ -352,6 +420,31 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel // TODO: finish } + /** + * Define a new spout in this topology. + * + * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs. + * @param supplier lambda expression which is the instance of functional interface Supplier + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) throws IllegalArgumentException { + return setSpout(id, supplier, null); + } + + /** + * Define a new spout in this topology with the specified parallelism. If the spout declares + * itself as non-distributed, the parallelism_hint will be ignored and only one task + * will be allocated to this component. + * + * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs. + * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster. + * @param supplier lambda expression which is the instance of functional interface Supplier + * @throws IllegalArgumentException if {@code parallelism_hint} is not positive + */ + public SpoutDeclarer setSpout(String id, SerializableSupplier supplier, Number parallelism_hint) throws IllegalArgumentException { + return setSpout(id, new LambdaSpout(supplier), parallelism_hint); + } + /** * Add a new worker lifecycle hook * From 4d8efaee4577ebc369ccd2153c4e2bafe2de3c6b Mon Sep 17 00:00:00 2001 From: vesense Date: Wed, 26 Apr 2017 13:16:50 +0800 Subject: [PATCH 2/3] STORM-2490: add NOTE for lambda variable usage --- .../apache/storm/starter/LambdaTopology.java | 24 ++++++++++++++++++- .../storm/topology/TopologyBuilder.java | 12 +++++----- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java index d2be6be412c..66307ef6429 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java @@ -22,6 +22,7 @@ import org.apache.storm.topology.TopologyBuilder; import org.apache.storm.tuple.Values; +import java.io.Serializable; import java.util.UUID; public class LambdaTopology extends ConfigurableTopology { @@ -36,10 +37,18 @@ protected int run(String[] args) throws Exception { // example. spout1: generate random strings // bolt1: get the first part of a string // bolt2: output the tuple + + // NOTE: Variable used in lambda expression should be final or effectively final + // (or it will cause compilation error), + // and variable type should implement the Serializable interface if it isn't primitive type + // (or it will cause not serializable exception). + Prefix prefix = new Prefix("Hello lambda:"); + String suffix = ":so cool!"; + builder.setSpout("spout1", () -> UUID.randomUUID().toString()); builder.setBolt("bolt1", (tuple, collector) -> { String[] parts = tuple.getStringByField("lambda").split("\\-"); - collector.emit(new Values(parts[0])); + collector.emit(new Values(prefix + parts[0] + suffix)); }).shuffleGrouping("spout1"); builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1"); @@ -50,3 +59,16 @@ protected int run(String[] args) throws Exception { return submit("lambda-demo", conf, builder); } } + +class Prefix implements Serializable { + private String str; + + public Prefix(String str) { + this.str = str; + } + + @Override + public String toString() { + return this.str; + } +} diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java index 15e19457efb..23c55387265 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java +++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java @@ -328,7 +328,7 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt consumer) thr * achieve proper reliability in the topology. * * @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs. - * @param consumer lambda expression which is the instance of functional interface Consumer + * @param consumer lambda expression that implements tuple processing for this bolt * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster. * @return use the returned object to declare the inputs to this component * @throws IllegalArgumentException if {@code parallelism_hint} is not positive @@ -424,7 +424,7 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel * Define a new spout in this topology. * * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs. - * @param supplier lambda expression which is the instance of functional interface Supplier + * @param supplier lambda expression that implements tuple generating for this spout * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) throws IllegalArgumentException { @@ -438,7 +438,7 @@ public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) * * @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs. * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somewhere around the cluster. - * @param supplier lambda expression which is the instance of functional interface Supplier + * @param supplier lambda expression that implements tuple generating for this spout * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ public SpoutDeclarer setSpout(String id, SerializableSupplier supplier, Number parallelism_hint) throws IllegalArgumentException { From ce58ae5388e5d8c60a511e239ecb57bdbfc17437 Mon Sep 17 00:00:00 2001 From: vesense Date: Wed, 26 Apr 2017 17:20:52 +0800 Subject: [PATCH 3/3] STORM-2490: support user defined output fields --- .../apache/storm/starter/LambdaTopology.java | 5 ++-- .../storm/lambda/AbstractLambdaBolt.java | 30 ------------------- .../storm/lambda/LambdaBiConsumerBolt.java | 14 +++++++-- .../storm/lambda/LambdaConsumerBolt.java | 8 ++++- .../org/apache/storm/lambda/LambdaSpout.java | 4 +-- .../storm/topology/TopologyBuilder.java | 14 +++++---- 6 files changed, 32 insertions(+), 43 deletions(-) delete mode 100644 storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java index 66307ef6429..61b02dbbd8c 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/LambdaTopology.java @@ -44,12 +44,13 @@ protected int run(String[] args) throws Exception { // (or it will cause not serializable exception). Prefix prefix = new Prefix("Hello lambda:"); String suffix = ":so cool!"; + int tag = 999; builder.setSpout("spout1", () -> UUID.randomUUID().toString()); builder.setBolt("bolt1", (tuple, collector) -> { String[] parts = tuple.getStringByField("lambda").split("\\-"); - collector.emit(new Values(prefix + parts[0] + suffix)); - }).shuffleGrouping("spout1"); + collector.emit(new Values(prefix + parts[0] + suffix, tag)); + }, "strValue", "intValue").shuffleGrouping("spout1"); builder.setBolt("bolt2", tuple -> System.out.println(tuple)).shuffleGrouping("bolt1"); Config conf = new Config(); diff --git a/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java deleted file mode 100644 index 3ecc4572461..00000000000 --- a/storm-client/src/jvm/org/apache/storm/lambda/AbstractLambdaBolt.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.storm.lambda; - -import org.apache.storm.topology.OutputFieldsDeclarer; -import org.apache.storm.topology.base.BaseBasicBolt; -import org.apache.storm.tuple.Fields; - -public abstract class AbstractLambdaBolt extends BaseBasicBolt { - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("lambda")); - } -} diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java index 96d48a251a4..7e7de9cf1bf 100644 --- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java +++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaBiConsumerBolt.java @@ -18,14 +18,20 @@ package org.apache.storm.lambda; import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.tuple.Fields; import org.apache.storm.tuple.Tuple; -public class LambdaBiConsumerBolt extends AbstractLambdaBolt { +public class LambdaBiConsumerBolt extends BaseBasicBolt { private SerializableBiConsumer biConsumer; - public LambdaBiConsumerBolt(SerializableBiConsumer biConsumer) { + private String[] fields; + + public LambdaBiConsumerBolt(SerializableBiConsumer biConsumer, String[] fields) { this.biConsumer = biConsumer; + this.fields = fields; } @Override @@ -33,4 +39,8 @@ public void execute(Tuple input, BasicOutputCollector collector) { biConsumer.accept(input, collector); } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields(fields)); + } } diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java index 29bb32e0d48..d9114ed8ebc 100644 --- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java +++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaConsumerBolt.java @@ -18,9 +18,11 @@ package org.apache.storm.lambda; import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.base.BaseBasicBolt; import org.apache.storm.tuple.Tuple; -public class LambdaConsumerBolt extends AbstractLambdaBolt { +public class LambdaConsumerBolt extends BaseBasicBolt { private SerializableConsumer consumer; @@ -33,4 +35,8 @@ public void execute(Tuple input, BasicOutputCollector collector) { consumer.accept(input); } + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + // this bolt dosen't emit to downstream bolts + } } diff --git a/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java index 51593b50720..6d0ba3a10c6 100644 --- a/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java +++ b/storm-client/src/jvm/org/apache/storm/lambda/LambdaSpout.java @@ -27,10 +27,10 @@ import java.util.Map; public class LambdaSpout extends BaseRichSpout { - private SerializableSupplier supplier; + private SerializableSupplier supplier; private SpoutOutputCollector collector; - public LambdaSpout(SerializableSupplier supplier) { + public LambdaSpout(SerializableSupplier supplier) { this.supplier = supplier; } diff --git a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java index 23c55387265..d8d871145bc 100644 --- a/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java +++ b/storm-client/src/jvm/org/apache/storm/topology/TopologyBuilder.java @@ -329,11 +329,12 @@ public BoltDeclarer setBolt(String id, IStatefulWindowedBolt biConsumer) throws IllegalArgumentException { - return setBolt(id, biConsumer, null); + public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, String... fields) throws IllegalArgumentException { + return setBolt(id, biConsumer, null, fields); } /** @@ -344,12 +345,13 @@ public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, Number parallelism_hint) throws IllegalArgumentException { - return setBolt(id, new LambdaBiConsumerBolt(biConsumer), parallelism_hint); + public BoltDeclarer setBolt(String id, SerializableBiConsumer biConsumer, Number parallelism_hint, String... fields) throws IllegalArgumentException { + return setBolt(id, new LambdaBiConsumerBolt(biConsumer, fields), parallelism_hint); } /** @@ -427,7 +429,7 @@ public void setStateSpout(String id, IRichStateSpout stateSpout, Number parallel * @param supplier lambda expression that implements tuple generating for this spout * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ - public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) throws IllegalArgumentException { + public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) throws IllegalArgumentException { return setSpout(id, supplier, null); } @@ -441,7 +443,7 @@ public SpoutDeclarer setSpout(String id, SerializableSupplier supplier) * @param supplier lambda expression that implements tuple generating for this spout * @throws IllegalArgumentException if {@code parallelism_hint} is not positive */ - public SpoutDeclarer setSpout(String id, SerializableSupplier supplier, Number parallelism_hint) throws IllegalArgumentException { + public SpoutDeclarer setSpout(String id, SerializableSupplier supplier, Number parallelism_hint) throws IllegalArgumentException { return setSpout(id, new LambdaSpout(supplier), parallelism_hint); }