Skip to content

Commit

Permalink
[FLINK-2525] Add configuration support in Storm-compatibility
Browse files Browse the repository at this point in the history
This closes #1046
  • Loading branch information
ffbin authored and mjsax committed Oct 2, 2015
1 parent 9f71107 commit 9fe285a
Show file tree
Hide file tree
Showing 22 changed files with 435 additions and 84 deletions.
31 changes: 31 additions & 0 deletions docs/apis/storm_compatibility.md
Expand Up @@ -169,10 +169,41 @@ The input type is `Tuple1<String>` and `Fields("sentence")` specify that `input.

See [BoltTokenizerWordCountPojo](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountPojo.java) and [BoltTokenizerWordCountWithNames](https://github.com/apache/flink/tree/master/flink-contrib/flink-storm-compatibility/flink-storm-compatibility-examples/src/main/java/org/apache/flink/stormcompatibility/wordcount/BoltTokenizerWordCountWithNames.java) for examples.

## Configuring Spouts and Bolts

In Storm, Spouts and Bolts can be configured with a globally distributed `Map` object that is given to `submitTopology(...)` method of `LocalCluster` or `StormSubmitter`.
This `Map` is provided by the user next to the topology and gets forwarded as a parameter to the calls `Spout.open(...)` and `Bolt.prepare(...)`.
If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.

For embedded usage, Flink's configuration mechanism must be used.
A global configuration can be set in a `StreamExecutionEnvironment` via `.getConfig().setGlobalJobParameters(...)`.
Flink's regular `Configuration` class can be used to configure Spouts and Bolts.
However, `Configuration` does not support arbitrary key data types as Storm does (only `String` keys are allowed).
Thus, Flink additionally provides `StormConfig` class that can be used like a raw `Map` to provide full compatibility to Storm.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
~~~java
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

StormConfig config = new StormConfig();
// set config values
[...]

// set global Storm configuration
env.getConfig().setGlobalJobParameters(config);

// assemble program with embedded Spouts and/or Bolts
[...]
~~~
</div>
</div>

## Multiple Output Streams

Flink can also handle the declaration of multiple output streams for Spouts and Bolts.
If a whole topology is executed using `FlinkTopologyBuilder` etc., there is no special attention required &ndash; it works as in regular Storm.

For embedded usage, the output stream will be of data type `SplitStreamType<T>` and must be split by using `DataStream.split(...)` and `SplitDataStream.select(...)`.
Flink provides the predefined output selector `FlinkStormStreamSelector<T>` for `.split(...)` already.
Furthermore, the wrapper type `SplitStreamTuple<T>` can be removed using `SplitStreamMapper<T>`.
Expand Down
Expand Up @@ -3,7 +3,6 @@
The Storm compatibility layer allows to embed spouts or bolt unmodified within a regular Flink streaming program (`StormSpoutWrapper` and `StormBoltWrapper`). Additionally, a whole Storm topology can be submitted to Flink (see `FlinkTopologyBuilder`, `FlinkLocalCluster`, and `FlinkSubmitter`). Only a few minor changes to the original submitting code are required. The code that builds the topology itself, can be reused unmodified. See `flink-storm-examples` for a simple word-count example.

The following Strom features are not (yet/fully) supported by the compatibility layer right now:
* the spout/bolt configuration within `open()`/`prepare()` is not yet supported (ie, `Map conf` parameter)
* topology and tuple meta information (ie, `TopologyContext` not fully supported)
* no fault-tolerance guarantees (ie, calls to `ack()`/`fail()` and anchoring is ignored)
* for whole Storm topologies the following is not supported by Flink:
Expand Down
Expand Up @@ -32,6 +32,7 @@
import backtype.storm.utils.Utils;

import com.google.common.collect.Lists;

import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.Client;
import org.apache.flink.client.program.JobWithJars;
Expand All @@ -47,6 +48,7 @@
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.stormcompatibility.util.StormConfig;

import scala.Some;
import scala.concurrent.Await;
Expand All @@ -66,7 +68,6 @@
public class FlinkClient {

/** The client's configuration */
@SuppressWarnings("unused")
private final Map<?,?> conf;
/** The jobmanager's host name */
private final String jobManagerHost;
Expand Down Expand Up @@ -161,7 +162,7 @@ public void submitTopology(final String name, final String uploadedJarLocation,
*/
public void submitTopologyWithOpts(final String name, final String uploadedJarLocation, final FlinkTopology
topology)
throws AlreadyAliveException, InvalidTopologyException {
throws AlreadyAliveException, InvalidTopologyException {

if (this.getTopologyJobId(name) != null) {
throw new AlreadyAliveException();
Expand All @@ -174,11 +175,15 @@ public void submitTopologyWithOpts(final String name, final String uploadedJarLo
throw new RuntimeException("Problem with jar file " + uploadedJarFile.getAbsolutePath(), e);
}

/* set storm configuration */
if (this.conf != null) {
topology.getConfig().setGlobalJobParameters(new StormConfig(this.conf));
}

final JobGraph jobGraph = topology.getStreamGraph().getJobGraph(name);
jobGraph.addJar(new Path(uploadedJarFile.getAbsolutePath()));

final Configuration configuration = jobGraph.getJobConfiguration();

configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, jobManagerHost);
configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, jobManagerPort);

Expand Down
Expand Up @@ -31,6 +31,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.stormcompatibility.util.StormConfig;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -45,11 +46,11 @@ public class FlinkLocalCluster {

/** The log used by this mini cluster */
private static final Logger LOG = LoggerFactory.getLogger(FlinkLocalCluster.class);

/** The flink mini cluster on which to execute the programs */
private final FlinkMiniCluster flink;


public FlinkLocalCluster() {
this.flink = new LocalFlinkMiniCluster(new Configuration(), true, StreamingMode.STREAMING);
this.flink.start();
Expand All @@ -59,17 +60,22 @@ public FlinkLocalCluster(FlinkMiniCluster flink) {
this.flink = Objects.requireNonNull(flink);
}

public void submitTopology(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology)
@SuppressWarnings("rawtypes")
public void submitTopology(final String topologyName, final Map conf, final FlinkTopology topology)
throws Exception {
this.submitTopologyWithOpts(topologyName, conf, topology, null);
}

public void submitTopologyWithOpts(final String topologyName, final Map<?, ?> conf, final FlinkTopology topology,
final SubmitOptions submitOpts) throws Exception {

@SuppressWarnings("rawtypes")
public void submitTopologyWithOpts(final String topologyName, final Map conf, final FlinkTopology topology, final SubmitOptions submitOpts) throws Exception {
LOG.info("Running Storm topology on FlinkLocalCluster");

if(conf != null) {
topology.getConfig().setGlobalJobParameters(new StormConfig(conf));
}

JobGraph jobGraph = topology.getStreamGraph().getJobGraph(topologyName);
flink.submitJobDetached(jobGraph);
this.flink.submitJobDetached(jobGraph);
}

public void killTopology(final String topologyName) {
Expand Down Expand Up @@ -115,7 +121,7 @@ public TopologyInfo getTopologyInfo(final String id) {
// ------------------------------------------------------------------------
// Access to default local cluster
// ------------------------------------------------------------------------

// A different {@link FlinkLocalCluster} to be used for execution of ITCases
private static LocalClusterFactory currentFactory = new DefaultLocalClusterFactory();

Expand All @@ -138,7 +144,7 @@ public static FlinkLocalCluster getLocalCluster() {
public static void initialize(LocalClusterFactory clusterFactory) {
currentFactory = Objects.requireNonNull(clusterFactory);
}

// ------------------------------------------------------------------------
// Cluster factory
// ------------------------------------------------------------------------
Expand All @@ -159,7 +165,7 @@ public static interface LocalClusterFactory {
* A factory that instantiates a FlinkLocalCluster.
*/
public static class DefaultLocalClusterFactory implements LocalClusterFactory {

@Override
public FlinkLocalCluster createLocalCluster() {
return new FlinkLocalCluster();
Expand Down
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.util;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;

import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;

import backtype.storm.Config;

/**
* {@link StormConfig} is used to provide a user-defined Storm configuration (ie, a raw {@link Map} or {@link Config}
* object) for embedded Spouts and Bolts.
*/
@SuppressWarnings("rawtypes")
public final class StormConfig extends GlobalJobParameters implements Map {
private static final long serialVersionUID = 8019519109673698490L;

/** Contains the actual configuration that is provided to Spouts and Bolts. */
private final Map config = new HashMap();

/**
* Creates an empty configuration.
*/
public StormConfig() {
}

/**
* Creates an configuration with initial values provided by the given {@code Map}.
*
* @param config
* Initial values for this configuration.
*/
@SuppressWarnings("unchecked")
public StormConfig(Map config) {
this.config.putAll(config);
}


@Override
public int size() {
return this.config.size();
}

@Override
public boolean isEmpty() {
return this.config.isEmpty();
}

@Override
public boolean containsKey(Object key) {
return this.config.containsKey(key);
}

@Override
public boolean containsValue(Object value) {
return this.config.containsValue(value);
}

@Override
public Object get(Object key) {
return this.config.get(key);
}

@SuppressWarnings("unchecked")
@Override
public Object put(Object key, Object value) {
return this.config.put(key, value);
}

@Override
public Object remove(Object key) {
return this.config.remove(key);
}

@SuppressWarnings("unchecked")
@Override
public void putAll(Map m) {
this.config.putAll(m);
}

@Override
public void clear() {
this.config.clear();
}

@SuppressWarnings("unchecked")
@Override
public Set<Object> keySet() {
return this.config.keySet();
}

@SuppressWarnings("unchecked")
@Override
public Collection<Object> values() {
return this.config.values();
}

@SuppressWarnings("unchecked")
@Override
public Set<java.util.Map.Entry<Object, Object>> entrySet() {
return this.config.entrySet();
}

}
Expand Up @@ -23,9 +23,11 @@
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;

import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;

Expand Down Expand Up @@ -99,7 +101,19 @@ public AbstractStormSpoutWrapper(final IRichSpout spout,
@Override
public final void run(final SourceContext<OUT> ctx) throws Exception {
this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
this.spout.open(null,

GlobalJobParameters config = super.getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
StormConfig stormConfig = new StormConfig();

if (config != null) {
if (config instanceof StormConfig) {
stormConfig = (StormConfig) config;
} else {
stormConfig.putAll(config.toMap());
}
}

this.spout.open(stormConfig,
StormWrapperSetupHelper
.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
new SpoutOutputCollector(this.collector));
Expand Down
Expand Up @@ -24,11 +24,13 @@
import backtype.storm.topology.IRichBolt;
import backtype.storm.tuple.Fields;

import org.apache.flink.api.common.ExecutionConfig.GlobalJobParameters;
import org.apache.flink.api.java.tuple.Tuple0;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.stormcompatibility.util.SplitStreamType;
import org.apache.flink.stormcompatibility.util.StormConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
Expand Down Expand Up @@ -205,7 +207,18 @@ public void open(final Configuration parameters) throws Exception {
this.numberOfAttributes, flinkCollector));
}

this.bolt.prepare(null, topologyContext, stormCollector);
GlobalJobParameters config = super.executionConfig.getGlobalJobParameters();
StormConfig stormConfig = new StormConfig();

if (config != null) {
if (config instanceof StormConfig) {
stormConfig = (StormConfig) config;
} else {
stormConfig.putAll(config.toMap());
}
}

this.bolt.prepare(stormConfig, topologyContext, stormCollector);
}

@Override
Expand Down

0 comments on commit 9fe285a

Please sign in to comment.