Skip to content

Commit

Permalink
[FLINK-2861] Fields grouping on split streams fails
Browse files Browse the repository at this point in the history
This closes #1387
  • Loading branch information
mjsax authored and mjsax committed Nov 24, 2015
1 parent 85e7b28 commit 4c46b2f
Show file tree
Hide file tree
Showing 9 changed files with 295 additions and 28 deletions.
@@ -0,0 +1,72 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.storm.tests;

import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;

import org.apache.flink.storm.api.FlinkLocalCluster;
import org.apache.flink.storm.api.FlinkTopologyBuilder;
import org.apache.flink.storm.tests.operators.FiniteRandomSpout;
import org.apache.flink.storm.tests.operators.TaskIdBolt;
import org.apache.flink.storm.util.BoltFileSink;
import org.apache.flink.storm.util.StormTestBase;

public class StormFieldsGroupingITCase extends StormTestBase {

private final static String topologyId = "FieldsGrouping Test";
private final static String spoutId = "spout";
private final static String boltId = "bolt";
private final static String sinkId = "sink";
private String resultPath;

@Override
protected void preSubmit() throws Exception {
this.resultPath = this.getTempDirPath("result");
}

@Override
protected void postSubmit() throws Exception {
compareResultsByLinesInMemory("4> -1930858313\n" + "4> 1431162155\n" + "4> 1654374947\n"
+ "4> -65105105\n" + "3> -1155484576\n" + "3> 1033096058\n" + "3> -1557280266\n"
+ "3> -1728529858\n" + "3> -518907128\n" + "3> -252332814", this.resultPath);
}

@Override
protected void testProgram() throws Exception {
final String[] tokens = this.resultPath.split(":");
final String outputFile = tokens[tokens.length - 1];

final FlinkTopologyBuilder builder = new FlinkTopologyBuilder();

builder.setSpout(spoutId, new FiniteRandomSpout(0, 10, 2));
builder.setBolt(boltId, new TaskIdBolt(), 2).fieldsGrouping(
spoutId, FiniteRandomSpout.STREAM_PREFIX + 0, new Fields("number"));
builder.setBolt(sinkId, new BoltFileSink(outputFile)).shuffleGrouping(boltId);

final FlinkLocalCluster cluster = FlinkLocalCluster.getLocalCluster();
cluster.submitTopology(topologyId, null, builder.createTopology());

Utils.sleep(10 * 1000);

// TODO kill does no do anything so far
cluster.killTopology(topologyId);
cluster.shutdown();
}

}
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.storm.tests.operators;

import java.util.Map;
import java.util.Random;

import org.apache.flink.storm.util.FiniteSpout;

import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;

public class FiniteRandomSpout extends BaseRichSpout implements FiniteSpout {
private static final long serialVersionUID = 6592885571932363239L;

public static final String STREAM_PREFIX = "stream_";

private final Random r;
private SpoutOutputCollector collector;
private int counter;
private final String[] outputStreams;

public FiniteRandomSpout(long seed, int counter, int numberOfOutputStreams) {
this.r = new Random(seed);
this.counter = counter;
if (numberOfOutputStreams < 1) {
this.outputStreams = new String[] { Utils.DEFAULT_STREAM_ID };
} else {
this.outputStreams = new String[numberOfOutputStreams];
for (int i = 0; i < this.outputStreams.length; ++i) {
this.outputStreams[i] = STREAM_PREFIX + i;
}
}
}

public FiniteRandomSpout(long seed, int counter) {
this(seed, counter, 1);
}

@SuppressWarnings("rawtypes")
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}

@Override
public void nextTuple() {
for (String s : this.outputStreams) {
this.collector.emit(s, new Values(this.r.nextInt()));
}
--this.counter;
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
for (String s : this.outputStreams) {
declarer.declareStream(s, new Fields("number"));
}
}

@Override
public boolean reachedEnd() {
return this.counter <= 0;
}

}
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.storm.tests.operators;

import java.util.Map;

import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;

public class TaskIdBolt extends BaseRichBolt {
private static final long serialVersionUID = -7966475984592762720L;

private OutputCollector collector;
private int thisTaskId;

@SuppressWarnings("rawtypes")
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
this.thisTaskId = context.getThisTaskId();
}

@Override
public void execute(Tuple input) {
this.collector.emit(new Values(this.thisTaskId + "> " + input.getValue(0)));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("number"));
}

}
Expand Up @@ -101,7 +101,7 @@ public FlinkTopology createTopology() {
final DataStreamSource<?> source;

if (sourceStreams.size() == 1) {
final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout);
final SpoutWrapper<Tuple> spoutWrapperSingleOutput = new SpoutWrapper<Tuple>(userSpout, spoutId, null, null);
spoutWrapperSingleOutput.setStormTopology(stormTopology);

final String outputStreamId = (String) sourceStreams.keySet().toArray()[0];
Expand All @@ -113,7 +113,7 @@ public FlinkTopology createTopology() {
source = src;
} else {
final SpoutWrapper<SplitStreamType<Tuple>> spoutWrapperMultipleOutputs = new SpoutWrapper<SplitStreamType<Tuple>>(
userSpout);
userSpout, spoutId, null, null);
spoutWrapperMultipleOutputs.setStormTopology(stormTopology);

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand All @@ -124,7 +124,10 @@ public FlinkTopology createTopology() {
SplitStream<SplitStreamType<Tuple>> splitSource = multiSource
.split(new StormStreamSelector<Tuple>());
for (String streamId : sourceStreams.keySet()) {
outputStreams.put(streamId, splitSource.select(streamId).map(new SplitStreamMapper<Tuple>()));
SingleOutputStreamOperator<Tuple, ?> outStream = splitSource.select(streamId)
.map(new SplitStreamMapper<Tuple>());
outStream.getTransformation().setOutputType(declarer.getOutputType(streamId));
outputStreams.put(streamId, outStream);
}
source = multiSource;
}
Expand Down Expand Up @@ -230,8 +233,8 @@ public FlinkTopology createTopology() {
.getOutputType(outputStreamId);

final BoltWrapper<Tuple, Tuple> boltWrapperSingleOutput = new BoltWrapper<Tuple, Tuple>(
userBolt, this.outputStreams.get(producerId).get(
inputStreamId));
userBolt, boltId, this.outputStreams.get(producerId).get(
inputStreamId), null);
boltWrapperSingleOutput.setStormTopology(stormTopology);

final SingleOutputStreamOperator<Tuple, ?> outStream = inputStream
Expand All @@ -246,8 +249,8 @@ public FlinkTopology createTopology() {
outputStream = outStream;
} else {
final BoltWrapper<Tuple, SplitStreamType<Tuple>> boltWrapperMultipleOutputs = new BoltWrapper<Tuple, SplitStreamType<Tuple>>(
userBolt, this.outputStreams.get(producerId).get(
inputStreamId));
userBolt, boltId, this.outputStreams.get(producerId).get(
inputStreamId), null);
boltWrapperMultipleOutputs.setStormTopology(stormTopology);

@SuppressWarnings({ "unchecked", "rawtypes" })
Expand All @@ -262,9 +265,12 @@ public FlinkTopology createTopology() {

final HashMap<String, DataStream<Tuple>> op = new HashMap<String, DataStream<Tuple>>();
for (String outputStreamId : boltOutputStreams.keySet()) {
op.put(outputStreamId,
splitStream.select(outputStreamId).map(
new SplitStreamMapper<Tuple>()));
SingleOutputStreamOperator<Tuple, ?> outStream = splitStream
.select(outputStreamId).map(
new SplitStreamMapper<Tuple>());
outStream.getTransformation().setOutputType(
declarer.getOutputType(outputStreamId));
op.put(outputStreamId, outStream);
}
availableInputs.put(boltId, op);
outputStream = multiStream;
Expand Down
Expand Up @@ -54,6 +54,8 @@ public class BoltWrapper<IN, OUT> extends AbstractStreamOperator<OUT> implements

/** The wrapped Storm {@link IRichBolt bolt}. */
private final IRichBolt bolt;
/** The name of the bolt. */
private final String name;
/** Number of attributes of the bolt's output tuples per stream. */
private final HashMap<String, Integer> numberOfAttributes;
/** The schema (ie, ordered field names) of the input stream. */
Expand Down Expand Up @@ -189,7 +191,34 @@ public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
*/
public BoltWrapper(final IRichBolt bolt, final Fields inputSchema,
final Collection<String> rawOutputs) throws IllegalArgumentException {
this(bolt, null, inputSchema, rawOutputs);
}

/**
* Instantiates a new {@link BoltWrapper} that wraps the given Storm {@link IRichBolt bolt} such that it can be used
* within a Flink streaming program. The given input schema enable attribute-by-name access for input types
* {@link Tuple0} to {@link Tuple25}. The output type can be any type if parameter {@code rawOutput} is {@code true}
* and the bolt's number of declared output tuples is 1. If {@code rawOutput} is {@code false} the output type will
* be one of {@link Tuple0} to {@link Tuple25} depending on the bolt's declared number of attributes.
*
* @param bolt
* The Storm {@link IRichBolt bolt} to be used.
* @param name
* The name of the bolt.
* @param inputSchema
* The schema (ie, ordered field names) of the input stream.
* @param rawOutputs
* Contains stream names if a single attribute output stream, should not be of type {@link Tuple1} but be
* of a raw type.
* @throws IllegalArgumentException
* If {@code rawOuput} is {@code true} and the number of declared output attributes is not 1 or if
* {@code rawOuput} is {@code false} and the number of declared output attributes is not with range
* [0;25].
*/
public BoltWrapper(final IRichBolt bolt, final String name, final Fields inputSchema,
final Collection<String> rawOutputs) throws IllegalArgumentException {
this.bolt = bolt;
this.name = name;
this.inputSchema = inputSchema;
this.numberOfAttributes = WrapperSetupHelper.getNumberOfAttributes(bolt, rawOutputs);
}
Expand All @@ -209,12 +238,8 @@ public void open() throws Exception {
super.open();

this.flinkCollector = new TimestampedCollector<OUT>(output);
OutputCollector stormCollector = null;

if (this.numberOfAttributes.size() > 0) {
stormCollector = new OutputCollector(new BoltCollector<OUT>(
this.numberOfAttributes, flinkCollector));
}
final OutputCollector stormCollector = new OutputCollector(new BoltCollector<OUT>(
this.numberOfAttributes, flinkCollector));

GlobalJobParameters config = getExecutionConfig().getGlobalJobParameters();
StormConfig stormConfig = new StormConfig();
Expand All @@ -228,7 +253,7 @@ public void open() throws Exception {
}

final TopologyContext topologyContext = WrapperSetupHelper.createTopologyContext(
getRuntimeContext(), this.bolt, this.stormTopology, stormConfig);
getRuntimeContext(), this.bolt, this.name, this.stormTopology, stormConfig);

this.bolt.prepare(stormConfig, topologyContext, stormCollector);
}
Expand Down

0 comments on commit 4c46b2f

Please sign in to comment.