Skip to content

Commit

Permalink
[storm-compat] adapted layer to new streaming API
Browse files Browse the repository at this point in the history
  - re-introduced possibility to specify output type for addSource()
  - split StormCollector into Abstract-, Bolt-, and Spout-Collector
additional minor changes:
  - removed unnecessary "unused" tags
  - fixed comment typos
  - added missing license header in SingleJoin example files
  • Loading branch information
mjsax authored and mbalassi committed Jun 14, 2015
1 parent 09e5be4 commit e39699f
Show file tree
Hide file tree
Showing 45 changed files with 771 additions and 550 deletions.
Expand Up @@ -122,7 +122,7 @@ public FlinkClient(final String host, final int port, final Integer timeout) {
@SuppressWarnings("rawtypes")
public static FlinkClient getConfiguredClient(final Map conf) {
final String nimbusHost = (String) conf.get(Config.NIMBUS_HOST);
final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT));
final int nimbusPort = Utils.getInt(conf.get(Config.NIMBUS_THRIFT_PORT)).intValue();
return new FlinkClient(nimbusHost, nimbusPort);
}

Expand All @@ -133,7 +133,6 @@ public static FlinkClient getConfiguredClient(final Map conf) {
*
* @return A reference to itself.
*/
@SuppressWarnings("unused")
public FlinkClient getClient() {
return this;
}
Expand Down
Expand Up @@ -65,27 +65,22 @@ public void shutdown() {
ClusterUtil.stopOnMiniCluster();
}

@SuppressWarnings("unused")
public String getTopologyConf(final String id) {
return null;
}

@SuppressWarnings("unused")
public StormTopology getTopology(final String id) {
return null;
}

@SuppressWarnings("unused")
public ClusterSummary getClusterInfo() {
return null;
}

@SuppressWarnings("unused")
public TopologyInfo getTopologyInfo(final String id) {
return null;
}

@SuppressWarnings("unused")
public Map<?, ?> getState() {
return null;
}
Expand Down
Expand Up @@ -17,8 +17,6 @@

package org.apache.flink.stormcompatibility.api;

import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
Expand All @@ -29,11 +27,16 @@
import java.util.List;

/**
* {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a {@link IRichSpout spout} or {@link
* IRichBolt bolt}.<br /> <br /> <strong>CAUTION: Currently, Flink does only support the default output stream.
* Furthermore, direct emit is not supported.</strong>
* {@link FlinkOutputFieldsDeclarer} is used to get the declared output schema of a
* {@link backtype.storm.topology.IRichSpout spout} or {@link backtype.storm.topology.IRichBolt
* bolt}.<br />
* <br />
* <strong>CAUTION: Currently, Flink does only support the default output stream. Furthermore,
* direct emit is not supported.</strong>
*/
final class FlinkOutputFieldsDeclarer implements OutputFieldsDeclarer {

// the declared output schema
private Fields outputSchema;

@Override
Expand All @@ -57,7 +60,7 @@ public void declare(final boolean direct, final Fields fields) {
/**
* {@inheritDoc}
* <p/>
* Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
* Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
* {@link Utils#DEFAULT_STREAM_ID}.
*
* @throws UnsupportedOperationException
Expand All @@ -71,7 +74,7 @@ public void declareStream(final String streamId, final Fields fields) {
/**
* {@inheritDoc}
* <p/>
* Currently, Flink only supports the default output stream. Thus, pareamter {@code streamId} must be equals to
* Currently, Flink only supports the default output stream. Thus, parameter {@code streamId} must be equals to
* {@link Utils#DEFAULT_STREAM_ID}. Furthermore, direct emit is no supported by Flink and parameter {@code direct}
* must be {@code false}.
*
Expand Down Expand Up @@ -132,15 +135,12 @@ public TypeInformation<?> getOutputType() throws IllegalArgumentException {
* {@link DefaultComparable} is a {@link Comparable} helper class that is used to get the correct {@link
* TypeInformation} from {@link TypeExtractor} within {@link #getOutputType()}. If key fields are not comparable,
* Flink cannot use them and will throw an exception.
*
* @author mjsax
*/
private static class DefaultComparable implements Comparable<DefaultComparable> {

public DefaultComparable() {
}

@SuppressWarnings("NullableProblems")
@Override
public int compareTo(final DefaultComparable o) {
return 0;
Expand Down
Expand Up @@ -23,6 +23,7 @@
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.generated.SubmitOptions;
import backtype.storm.utils.Utils;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.configuration.ConfigConstants;
Expand All @@ -39,7 +40,7 @@
* {@link FlinkSubmitter} mimics a {@link StormSubmitter} to submit Storm topologies to a Flink cluster.
*/
public class FlinkSubmitter {
public static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);
public final static Logger logger = LoggerFactory.getLogger(FlinkSubmitter.class);

/**
* Submits a topology to run on the cluster. A topology runs forever or until explicitly killed.
Expand All @@ -57,7 +58,6 @@ public class FlinkSubmitter {
* @throws InvalidTopologyException
* if an invalid topology was submitted
*/
@SuppressWarnings("unused")
public static void submitTopology(final String name, final Map<?, ?> stormConf, final FlinkTopology topology,
final SubmitOptions opts)
throws AlreadyAliveException, InvalidTopologyException {
Expand Down Expand Up @@ -97,7 +97,8 @@ public static void submitTopology(final String name, final Map stormConf, final
}
if (!stormConf.containsKey(Config.NIMBUS_THRIFT_PORT)) {
stormConf.put(Config.NIMBUS_THRIFT_PORT,
flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, 6123));
new Integer(flinkConfig.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
6123)));
}

final String serConf = JSONValue.toJSONString(stormConf);
Expand Down Expand Up @@ -151,7 +152,6 @@ public static void submitTopology(final String name, final Map stormConf, final
* @throws InvalidTopologyException
* if an invalid topology was submitted
*/
@SuppressWarnings("unused")
public static void submitTopologyWithProgressBar(final String name, final Map<?, ?> stormConf,
final FlinkTopology topology)
throws AlreadyAliveException, InvalidTopologyException {
Expand All @@ -169,7 +169,7 @@ public static void submitTopologyWithProgressBar(final String name, final Map<?,
* file path of the jar file to submit
* @return the value of parameter localJar
*/
@SuppressWarnings({"rawtypes", "unused"})
@SuppressWarnings("rawtypes")
public static String submitJar(final Map conf, final String localJar) {
return submitJar(localJar);
}
Expand All @@ -187,7 +187,6 @@ public static String submitJar(final Map conf, final String localJar) {
* progress listener to track the jar file upload
* @return the value of parameter localJar
*/
@SuppressWarnings("rawtypes")
public static String submitJar(final String localJar) {
if (localJar == null) {
throw new RuntimeException(
Expand Down
Expand Up @@ -30,7 +30,7 @@
*/
class FlinkTopology extends StreamExecutionEnvironment {

// The corresponding {@link StormTopology} that is mimiced by this {@link FlinkTopology}
// The corresponding {@link StormTopology} that is mimicked by this {@link FlinkTopology}
private final StormTopology stormTopology;
// The number of declared tasks for the whole program (ie, sum over all dops)
private int numberOfTasks = 0;
Expand Down Expand Up @@ -70,7 +70,6 @@ public JobExecutionResult execute(final String jobName) throws Exception {
}

//TODO
@SuppressWarnings("unused")
public String getStormTopologyAsString() {
return this.stormTopology.toString();
}
Expand Down
Expand Up @@ -142,7 +142,7 @@ public FlinkTopology createTopology() {
final Grouping grouping = inputStream.getValue();
if (grouping.is_set_shuffle()) {
// Storm uses a round-robin shuffle strategy
inputDataStream = inputDataStream.distribute();
inputDataStream = inputDataStream.rebalance();
} else if (grouping.is_set_fields()) {
// global grouping is emulated in Storm via an empty fields grouping list
final List<String> fields = grouping.get_fields();
Expand Down Expand Up @@ -234,7 +234,6 @@ public BoltDeclarer setBolt(final String id, final IRichBolt bolt, final Number
* the basic bolt
* @return use the returned object to declare the inputs to this component
*/
@SuppressWarnings("unused")
public BoltDeclarer setBolt(final String id, final IBasicBolt bolt) {
return this.setBolt(id, bolt, null);
}
Expand Down
@@ -0,0 +1,111 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.stormcompatibility.wrappers;

import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import java.util.List;

/**
* A {@link AbstractStormCollector} transforms Storm tuples to Flink tuples.
*/
abstract class AbstractStormCollector<OUT> {

/**
* Flink output tuple of concrete type {@link Tuple1} to {@link Tuple25}.
*/
protected final Tuple outputTuple;
/**
* The number of attributes of the output tuples. (Determines the concrete type of
* {@link #outputTuple}). If {@link #numberOfAttributes} is zero, {@link #outputTuple} is not
* used and "raw" data type is used.
*/
protected final int numberOfAttributes;
/**
* Is set to {@code true} each time a tuple is emitted.
*/
boolean tupleEmitted = false;

/**
* Instantiates a new {@link AbstractStormCollector} that emits Flink tuples via
* {@link #doEmit(Object)}. If the number of attributes is specified as zero, any output type is
* supported. If the number of attributes is between 1 to 25, the output type is {@link Tuple1}
* to {@link Tuple25}.
*
* @param numberOfAttributes
* The number of attributes of the emitted tuples.
* @throws UnsupportedOperationException
* if the specified number of attributes is not in the valid range of [0,25]
*/
public AbstractStormCollector(final int numberOfAttributes) throws UnsupportedOperationException {
this.numberOfAttributes = numberOfAttributes;

if (this.numberOfAttributes <= 0) {
this.outputTuple = null;
} else if (this.numberOfAttributes <= 25) {
try {
this.outputTuple = org.apache.flink.api.java.tuple.Tuple
.getTupleClass(this.numberOfAttributes).newInstance();
} catch (final InstantiationException e) {
throw new RuntimeException(e);
} catch (final IllegalAccessException e) {
throw new RuntimeException(e);
}
} else {
throw new UnsupportedOperationException(
"SimpleStormBoltWrapper can handle not more then 25 attributes, but "
+ this.numberOfAttributes + " are declared by the given bolt");
}
}

/**
* Transforms a Storm tuple into a Flink tuple of type {@code OUT} and emits this tuple via
* {@link #doEmit(Object)}.
*
* @param tuple
* The Storm tuple to be emitted.
* @return the return value of {@link #doEmit(Object)}
*/
@SuppressWarnings("unchecked")
protected final List<Integer> tansformAndEmit(final List<Object> tuple) {
List<Integer> taskIds;
if (this.numberOfAttributes > 0) {
assert (tuple.size() == this.numberOfAttributes);
for (int i = 0; i < this.numberOfAttributes; ++i) {
this.outputTuple.setField(tuple.get(i), i);
}
taskIds = doEmit((OUT) this.outputTuple);
} else {
assert (tuple.size() == 1);
taskIds = doEmit((OUT) tuple.get(0));
}
this.tupleEmitted = true;

return taskIds;
}

/**
* Emits a Flink tuple.
*
* @param flinkTuple
* The tuple to be emitted.
* @return the IDs of the tasks this tuple was sent to
*/
protected abstract List<Integer> doEmit(OUT flinkTuple);

}
Expand Up @@ -19,16 +19,16 @@

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.topology.IRichSpout;

import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple25;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
import org.apache.flink.util.Collector;

/**
* A {@link AbstractStormSpoutWrapper} wraps an {@link IRichSpout} in order to execute the Storm bolt within a Flink
* Streaming program. It takes the spout's output tuples and transforms them into Flink tuples of type {@code OUT} (see
* {@link StormCollector} for supported types).<br />
* {@link StormSpoutCollector} for supported types).<br />
* <br />
* <strong>CAUTION: currently, only simple spouts are supported! (ie, spouts that do not use the Storm configuration
* <code>Map</code> or <code>TopologyContext</code> that is provided by the spouts's <code>prepare(..)</code> method.
Expand All @@ -37,13 +37,19 @@
public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceFunction<OUT> {
private static final long serialVersionUID = 4993283609095408765L;

// The wrapped Storm {@link IRichSpout spout}
protected final IRichSpout spout;
// Number of attributes of the bolt's output tuples
// Number of attributes of the bolt's output tuples.
private final int numberOfAttributes;
// The wrapper of the given Flink collector
protected StormCollector<OUT> collector;
// Indicates, if the source is still running or was canceled
/**
* The wrapped Storm {@link IRichSpout spout}.
*/
protected final IRichSpout spout;
/**
* The wrapper of the given Flink collector.
*/
protected StormSpoutCollector<OUT> collector;
/**
* Indicates, if the source is still running or was canceled.
*/
protected boolean isRunning = true;

/**
Expand All @@ -56,7 +62,6 @@ public abstract class AbstractStormSpoutWrapper<OUT> extends RichParallelSourceF
* @throws IllegalArgumentException
* If the number of declared output attributes is not with range [1;25].
*/
@SuppressWarnings("unused")
public AbstractStormSpoutWrapper(final IRichSpout spout) throws IllegalArgumentException {
this(spout, false);
}
Expand Down Expand Up @@ -84,8 +89,8 @@ public AbstractStormSpoutWrapper(final IRichSpout spout, final boolean rawOutput
}

@Override
public final void run(final Collector<OUT> collector) throws Exception {
this.collector = new StormCollector<OUT>(this.numberOfAttributes, collector);
public final void run(final SourceContext<OUT> ctx) throws Exception {
this.collector = new StormSpoutCollector<OUT>(this.numberOfAttributes, ctx);
this.spout.open(null,
StormWrapperSetupHelper
.convertToTopologyContext((StreamingRuntimeContext) super.getRuntimeContext(), true),
Expand Down

0 comments on commit e39699f

Please sign in to comment.