Skip to content

Commit

Permalink
[FLINK-2837][storm] various improvements for the compatibility layer
Browse files Browse the repository at this point in the history
- refactor to use Storm's topology builder
- remove FlinkTopologyBuilder
- instantiate context-based StreamExecutionEnvironment (local or remote)
- remove some of the Flink and Storm behavior replicating classes
- modify FlinkTopology to parse Storm topology directly
- replace StormTestBase with StreamingTestBase
- add print example
- FlinkTopologyBuilder changes (check if all inputs are available before processing)
- correct package typo
- two input support
- add join example
- update docs

This closes #1398.
  • Loading branch information
mxm committed Dec 2, 2015
1 parent 20fe2af commit 8863679
Show file tree
Hide file tree
Showing 71 changed files with 1,132 additions and 1,009 deletions.
12 changes: 5 additions & 7 deletions docs/apis/storm_compatibility.md
Expand Up @@ -57,20 +57,18 @@ See *WordCount Storm* within `flink-storm-examples/pom.xml` for an example how t

Flink provides a Storm compatible API (`org.apache.flink.storm.api`) that offers replacements for the following classes:

- `TopologyBuilder` replaced by `FlinkTopologyBuilder`
- `StormSubmitter` replaced by `FlinkSubmitter`
- `NimbusClient` and `Client` replaced by `FlinkClient`
- `LocalCluster` replaced by `FlinkLocalCluster`

In order to submit a Storm topology to Flink, it is sufficient to replace the used Storm classes with their Flink replacements in the Storm *client code that assembles* the topology.
The actual runtime code, ie, Spouts and Bolts, can be uses *unmodified*.
If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively.
If a parameter is not specified, the value is taken from `flink-conf.yaml`.
The actual runtime code, ie, Spouts and Bolts, can be used *unmodified*.
If a topology is executed in a remote cluster, parameters `nimbus.host` and `nimbus.thrift.port` are used as `jobmanger.rpc.address` and `jobmanger.rpc.port`, respectively. If a parameter is not specified, the value is taken from `flink-conf.yaml`.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
FlinkTopologyBuilder builder = new FlinkTopologyBuilder(); // replaces: TopologyBuilder builder = new FlinkTopology();
TopologyBuilder builder = new TopologyBuilder(); // the Storm topology builder

// actual topology assembling code and used Spouts/Bolts can be used as-is
builder.setSpout("source", new FileSpout(inputFilePath));
Expand All @@ -81,12 +79,12 @@ builder.setBolt("sink", new BoltFileSink(outputFilePath)).shuffleGrouping("count
Config conf = new Config();
if(runLocal) { // submit to test cluster
FlinkLocalCluster cluster = new FlinkLocalCluster(); // replaces: LocalCluster cluster = new LocalCluster();
cluster.submitTopology("WordCount", conf, builder.createTopology());
cluster.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder));
} else { // submit to remote cluster
// optional
// conf.put(Config.NIMBUS_HOST, "remoteHost");
// conf.put(Config.NIMBUS_THRIFT_PORT, 6123);
FlinkSubmitter.submitTopology("WordCount", conf, builder.createTopology()); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
FlinkSubmitter.submitTopology("WordCount", conf, FlinkTopology.createTopology(builder)); // replaces: StormSubmitter.submitTopology(topologyId, conf, builder.createTopology());
}
~~~
</div>
Expand Down
10 changes: 9 additions & 1 deletion flink-contrib/flink-storm-examples/pom.xml
Expand Up @@ -61,6 +61,14 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.storm</groupId>
<artifactId>storm-starter</artifactId>
<version>0.9.4</version>
</dependency>


</dependencies>

<build>
Expand Down Expand Up @@ -226,7 +234,7 @@ under the License.
</execution>

<!-- WordCount Storm topology-->
<!-- Example for whole topologies (ie, if FlinkTopologyBuilder is used) -->
<!-- Example for whole topologies (ie, if FlinkTopology is used) -->
<!-- We cannot use maven-jar-plugin because 'defaults.yaml' must be included in jar.
However, we excluded 'defaults.yaml' in dependency-plugin to get clean Eclipse environment.
Thus, 'defaults.yaml' is not available for maven-jar-plugin.
Expand Down
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.flink.storm.excamation;
package org.apache.flink.storm.exclamation;

import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;

import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.excamation.operators.ExclamationBolt;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;

/**
* Implements the "Exclamation" program that attaches five exclamation mark to every line of a text files in a streaming
Expand Down Expand Up @@ -58,15 +58,17 @@ public static void main(final String[] args) throws Exception {
}

// build Topology the Storm way
final FlinkTopologyBuilder builder = ExclamationTopology.buildTopology();
final TopologyBuilder builder = ExclamationTopology.buildTopology();

// execute program locally
Config conf = new Config();
conf.put(ExclamationBolt.EXCLAMATION_COUNT, ExclamationTopology.getExclamation());

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, conf, builder.createTopology());
cluster.submitTopology(topologyId, conf, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);
cluster.shutdown();
}

}
Expand Up @@ -15,17 +15,17 @@
* limitations under the License.
*/

package org.apache.flink.storm.excamation;
package org.apache.flink.storm.exclamation;

import backtype.storm.topology.TopologyBuilder;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.excamation.operators.ExclamationBolt;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.BoltPrintSink;
import org.apache.flink.storm.util.FiniteFileSpout;
import org.apache.flink.storm.util.FiniteInMemorySpout;
import org.apache.flink.storm.util.OutputFormatter;
import org.apache.flink.storm.util.SimpleOutputFormatter;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.BoltPrintSink;

/**
* Implements the "Exclamation" program that attaches two exclamation marks to every line of a text files in a streaming
Expand All @@ -51,8 +51,8 @@ public class ExclamationTopology {
public final static String sinkId = "sink";
private final static OutputFormatter formatter = new SimpleOutputFormatter();

public static FlinkTopologyBuilder buildTopology() {
final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();
public static TopologyBuilder buildTopology() {
final TopologyBuilder builder = new TopologyBuilder();

// get input data
if (fileInputOutput) {
Expand Down
Expand Up @@ -16,19 +16,18 @@
* limitations under the License.
*/

package org.apache.flink.storm.excamation;
package org.apache.flink.storm.exclamation;

import backtype.storm.utils.Utils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
import org.apache.flink.storm.excamation.operators.ExclamationBolt;
import org.apache.flink.storm.exclamation.operators.ExclamationBolt;
import org.apache.flink.storm.util.StormConfig;
import org.apache.flink.storm.wrappers.BoltWrapper;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import backtype.storm.utils.Utils;

/**
* Implements the "Exclamation" program that attaches 3+x exclamation marks to every line of a text files in a streaming
* fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}.
Expand Down
Expand Up @@ -16,8 +16,9 @@
* limitations under the License.
*/

package org.apache.flink.storm.excamation;
package org.apache.flink.storm.exclamation;

import backtype.storm.utils.Utils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
Expand All @@ -28,8 +29,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import backtype.storm.utils.Utils;

/**
* Implements the "Exclamation" program that attaches six exclamation marks to every line of a text files in a streaming
* fashion. The program is constructed as a regular {@link backtype.storm.generated.StormTopology}.
Expand Down
Expand Up @@ -16,7 +16,7 @@
* limitations under the License.
*/

package org.apache.flink.storm.excamation.operators;
package org.apache.flink.storm.exclamation.operators;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
Expand Down
@@ -0,0 +1,88 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.storm.join;

import backtype.storm.Config;
import backtype.storm.testing.FeederSpout;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.TupleOutputFormatter;
import storm.starter.bolt.PrinterBolt;
import storm.starter.bolt.SingleJoinBolt;


public class SingleJoinExample {

public static void main(String[] args) throws Exception {
final FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender", "hobbies"));
final FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("gender", genderSpout);

builder.setSpout("age", ageSpout);

builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
.fieldsGrouping("gender", new Fields("id"))
.fieldsGrouping("age", new Fields("id"));

// emit result
if (args.length > 0) {
// read the text file from given input path
builder.setBolt("fileOutput", new BoltFileSink(args[0], new TupleOutputFormatter()))
.shuffleGrouping("join");
} else {
builder.setBolt("print", new PrinterBolt()).shuffleGrouping("join");
}

Config conf = new Config();
conf.setDebug(true);

String[] hobbies = new String[] {"reading", "biking", "travelling", "watching tv"};

for (int i = 0; i < 10; i++) {
String gender;
if (i % 2 == 0) {
gender = "male";
}
else {
gender = "female";
}
genderSpout.feed(new Values(i, gender, hobbies[i % hobbies.length]));
}

for (int i = 9; i >= 0; i--) {
ageSpout.feed(new Values(i, i + 20));
}


final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology("joinTopology", conf, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);

cluster.shutdown();

}
}
@@ -0,0 +1,61 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.storm.print;

import backtype.storm.Config;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.utils.Utils;
import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopology;
import storm.starter.bolt.PrinterBolt;
import storm.starter.spout.TwitterSampleSpout;

import java.util.Arrays;

/**
* Prints incoming tweets. Tweets can be filtered by keywords.
*/
public class PrintSampleStream {
public static void main(String[] args) throws Exception {
String consumerKey = args[0];
String consumerSecret = args[1];
String accessToken = args[2];
String accessTokenSecret = args[3];

// keywords start with the 5th parameter
String[] keyWords = Arrays.copyOfRange(args, 4, args.length);

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("twitter", new TwitterSampleSpout(consumerKey, consumerSecret,
accessToken, accessTokenSecret, keyWords));
builder.setBolt("print", new PrinterBolt())
.shuffleGrouping("twitter");


Config conf = new Config();

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology("Print", conf, FlinkTopology.createTopology(builder));

Utils.sleep(10 * 1000);

cluster.shutdown();
}
}
Expand Up @@ -23,8 +23,6 @@
import java.io.IOException;
import java.util.Map;

import org.apache.flink.storm.util.FiniteSpout;

/**
* Implements a Spout that reads data from a given local file. The spout stops automatically
* when it reached the end of the file.
Expand Down
Expand Up @@ -18,9 +18,6 @@

package org.apache.flink.storm.util;

import org.apache.flink.storm.util.FiniteSpout;


/**
* Implements a Spout that reads String[] data stored in memory. The Spout stops automatically when it emitted all of
* the data.
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.storm.wordcount;

import backtype.storm.topology.IRichBolt;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.examples.java.wordcount.util.WordCountData;
Expand Down
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.storm.wordcount;

import backtype.storm.topology.IRichBolt;

import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.PojoCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
Expand Down
Expand Up @@ -19,7 +19,6 @@

import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;

import org.apache.flink.api.java.io.CsvInputFormat;
import org.apache.flink.api.java.io.TupleCsvInputFormat;
import org.apache.flink.api.java.tuple.Tuple;
Expand Down

0 comments on commit 8863679

Please sign in to comment.