Skip to content

Commit

Permalink
[FLINK-1381] Allow multiple output splitters for single stream operator
Browse files Browse the repository at this point in the history
Closes #332

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
	flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
  • Loading branch information
qmlmoon authored and gyfora committed Jan 25, 2015
1 parent 095dc4a commit 7ce9a8f
Show file tree
Hide file tree
Showing 10 changed files with 257 additions and 70 deletions.
Expand Up @@ -186,17 +186,22 @@ public boolean isDirectedEmit() {
return config.getBoolean(DIRECTED_EMIT, false); return config.getBoolean(DIRECTED_EMIT, false);
} }


public void setOutputSelector(OutputSelector<?> outputSelector) {
if (outputSelector != null) { public void setOutputSelectors(List<OutputSelector<?>> outputSelector) {
setDirectedEmit(true); try {
config.setBytes(OUTPUT_SELECTOR, SerializationUtils.serialize(outputSelector)); if (outputSelector != null) {
setDirectedEmit(true);
config.setBytes(OUTPUT_SELECTOR, SerializationUtils.serialize((Serializable) outputSelector));
}
} catch (SerializationException e) {
throw new RuntimeException("Cannot serialize OutputSelector");
} }
} }


@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public <T> OutputSelector<T> getOutputSelector(ClassLoader cl) { public <T> List<OutputSelector<T>> getOutputSelectors(ClassLoader cl) {
try { try {
return (OutputSelector<T>) InstantiationUtil.readObjectFromConfig(this.config, return (List<OutputSelector<T>>) InstantiationUtil.readObjectFromConfig(this.config,
OUTPUT_SELECTOR, cl); OUTPUT_SELECTOR, cl);
} catch (Exception e) { } catch (Exception e) {
throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e); throw new StreamVertexException("Cannot deserialize and instantiate OutputSelector", e);
Expand Down
Expand Up @@ -66,8 +66,8 @@ public class StreamGraph {
private Map<String, StreamRecordSerializer<?>> typeSerializersIn2; private Map<String, StreamRecordSerializer<?>> typeSerializersIn2;
private Map<String, StreamRecordSerializer<?>> typeSerializersOut1; private Map<String, StreamRecordSerializer<?>> typeSerializersOut1;
private Map<String, StreamRecordSerializer<?>> typeSerializersOut2; private Map<String, StreamRecordSerializer<?>> typeSerializersOut2;
private Map<String, OutputSelector<?>> outputSelectors;
private Map<String, Class<? extends AbstractInvokable>> jobVertexClasses; private Map<String, Class<? extends AbstractInvokable>> jobVertexClasses;
private Map<String, List<OutputSelector<?>>> outputSelectors;
private Map<String, Integer> iterationIds; private Map<String, Integer> iterationIds;
private Map<Integer, String> iterationIDtoHeadName; private Map<Integer, String> iterationIDtoHeadName;
private Map<Integer, String> iterationIDtoTailName; private Map<Integer, String> iterationIDtoTailName;
Expand Down Expand Up @@ -101,7 +101,7 @@ public void initGraph() {
typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>(); typeSerializersIn2 = new HashMap<String, StreamRecordSerializer<?>>();
typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>(); typeSerializersOut1 = new HashMap<String, StreamRecordSerializer<?>>();
typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>(); typeSerializersOut2 = new HashMap<String, StreamRecordSerializer<?>>();
outputSelectors = new HashMap<String, OutputSelector<?>>(); outputSelectors = new HashMap<String, List<OutputSelector<?>>>();
jobVertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>(); jobVertexClasses = new HashMap<String, Class<? extends AbstractInvokable>>();
iterationIds = new HashMap<String, Integer>(); iterationIds = new HashMap<String, Integer>();
iterationIDtoHeadName = new HashMap<Integer, String>(); iterationIDtoHeadName = new HashMap<Integer, String>();
Expand Down Expand Up @@ -272,6 +272,7 @@ private void addVertex(String vertexName, Class<? extends AbstractInvokable> ver
outEdgeLists.put(vertexName, new ArrayList<String>()); outEdgeLists.put(vertexName, new ArrayList<String>());
outEdgeTypes.put(vertexName, new ArrayList<Integer>()); outEdgeTypes.put(vertexName, new ArrayList<Integer>());
selectedNames.put(vertexName, new ArrayList<List<String>>()); selectedNames.put(vertexName, new ArrayList<List<String>>());
outputSelectors.put(vertexName, new ArrayList<OutputSelector<?>>());
inEdgeLists.put(vertexName, new ArrayList<String>()); inEdgeLists.put(vertexName, new ArrayList<String>());
outputPartitioners.put(vertexName, new ArrayList<StreamPartitioner<?>>()); outputPartitioners.put(vertexName, new ArrayList<StreamPartitioner<?>>());
iterationTailCount.put(vertexName, 0); iterationTailCount.put(vertexName, 0);
Expand Down Expand Up @@ -385,10 +386,10 @@ public void setIterationSourceSettings(String iterationID, String iterationTail)
* @param vertexName * @param vertexName
* Name of the vertex for which the output selector will be set * Name of the vertex for which the output selector will be set
* @param outputSelector * @param outputSelector
* The outputselector object * The user defined output selector.
*/ */
public void setOutputSelector(String vertexName, OutputSelector<?> outputSelector) { public <T> void setOutputSelector(String vertexName, OutputSelector<T> outputSelector) {
outputSelectors.put(vertexName, outputSelector); outputSelectors.get(vertexName).add(outputSelector);


if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Outputselector set for {}", vertexName); LOG.debug("Outputselector set for {}", vertexName);
Expand Down Expand Up @@ -520,7 +521,7 @@ public Class<? extends AbstractInvokable> getJobVertexClass(String vertexName) {
return inputFormatLists.get(vertexName); return inputFormatLists.get(vertexName);
} }


public OutputSelector<?> getOutputSelector(String vertexName) { public List<OutputSelector<?>> getOutputSelector(String vertexName) {
return outputSelectors.get(vertexName); return outputSelectors.get(vertexName);
} }


Expand Down
Expand Up @@ -191,7 +191,7 @@ private void setVertexConfig(String vertexName, StreamConfig config,
config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName)); config.setTypeSerializerOut2(streamGraph.getOutSerializer2(vertexName));


config.setUserInvokable(streamGraph.getInvokable(vertexName)); config.setUserInvokable(streamGraph.getInvokable(vertexName));
config.setOutputSelector(streamGraph.getOutputSelector(vertexName)); config.setOutputSelectors(streamGraph.getOutputSelector(vertexName));
config.setOperatorStates(streamGraph.getState(vertexName)); config.setOperatorStates(streamGraph.getState(vertexName));


config.setNumberOfOutputs(nonChainableOutputs.size()); config.setNumberOfOutputs(nonChainableOutputs.size());
Expand Down
Expand Up @@ -40,7 +40,7 @@ public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT> {


private static final Logger LOG = LoggerFactory.getLogger(DirectedCollectorWrapper.class); private static final Logger LOG = LoggerFactory.getLogger(DirectedCollectorWrapper.class);


OutputSelector<OUT> outputSelector; List<OutputSelector<OUT>> outputSelectors;


protected Map<String, List<Collector<OUT>>> outputMap; protected Map<String, List<Collector<OUT>>> outputMap;


Expand All @@ -53,8 +53,8 @@ public class DirectedCollectorWrapper<OUT> extends CollectorWrapper<OUT> {
* @param outputSelector * @param outputSelector
* User defined {@link OutputSelector} * User defined {@link OutputSelector}
*/ */
public DirectedCollectorWrapper(OutputSelector<OUT> outputSelector) { public DirectedCollectorWrapper(List<OutputSelector<OUT>> outputSelectors) {
this.outputSelector = outputSelector; this.outputSelectors = outputSelectors;
this.emitted = new HashSet<Collector<OUT>>(); this.emitted = new HashSet<Collector<OUT>>();
this.selectAllOutputs = new LinkedList<Collector<OUT>>(); this.selectAllOutputs = new LinkedList<Collector<OUT>>();
this.outputMap = new HashMap<String, List<Collector<OUT>>>(); this.outputMap = new HashMap<String, List<Collector<OUT>>>();
Expand Down Expand Up @@ -91,34 +91,37 @@ public void addCollector(Collector<?> output, List<String> selectedNames) {
public void collect(OUT record) { public void collect(OUT record) {
emitted.clear(); emitted.clear();


Iterable<String> outputNames = outputSelector.select(record);

for (Collector<OUT> output : selectAllOutputs) { for (Collector<OUT> output : selectAllOutputs) {
output.collect(record); output.collect(record);
emitted.add(output); emitted.add(output);
} }


for (String outputName : outputNames) { for (OutputSelector<OUT> outputSelector : outputSelectors) {
List<Collector<OUT>> outputList = outputMap.get(outputName); Iterable<String> outputNames = outputSelector.select(record);
if (outputList == null) {
if (LOG.isErrorEnabled()) {
String format = String.format(
"Cannot emit because no output is selected with the name: %s",
outputName);
LOG.error(format);


} for (String outputName : outputNames) {
} else { List<Collector<OUT>> outputList = outputMap.get(outputName);
for (Collector<OUT> output : outputList) { if (outputList == null) {
if (!emitted.contains(output)) { if (LOG.isErrorEnabled()) {
output.collect(record); String format = String.format(
emitted.add(output); "Cannot emit because no output is selected with the name: %s",
outputName);
LOG.error(format);

}
} else {
for (Collector<OUT> output : outputList) {
if (!emitted.contains(output)) {
output.collect(record);
emitted.add(output);
}
} }

} }


} }

} }

} }


@Override @Override
Expand Down
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.core.fs.FileSystem.WriteMode; import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.StreamGraph; import org.apache.flink.streaming.api.StreamGraph;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator; import org.apache.flink.streaming.api.datastream.temporaloperator.StreamCrossOperator;
import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator; import org.apache.flink.streaming.api.datastream.temporaloperator.StreamJoinOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -224,6 +225,25 @@ public DataStream<OUT> merge(DataStream<OUT>... streams) {
return returnStream; return returnStream;
} }


/**
* Operator used for directing tuples to specific named outputs using an
* {@link org.apache.flink.streaming.api.collector.OutputSelector}. Calling
* this method on an operator creates a new {@link SplitDataStream}.
*
* @param outputSelector
* The user defined
* {@link org.apache.flink.streaming.api.collector.OutputSelector}
* for directing the tuples.
* @return The {@link SplitDataStream}
*/
public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
for (DataStream<OUT> ds : this.mergedStreams) {
streamGraph.setOutputSelector(ds.getId(), clean(outputSelector));
}

return new SplitDataStream<OUT>(this);
}

/** /**
* Creates a new {@link ConnectedDataStream} by connecting * Creates a new {@link ConnectedDataStream} by connecting
* {@link DataStream} outputs of different type with each other. The * {@link DataStream} outputs of different type with each other. The
Expand Down Expand Up @@ -382,8 +402,7 @@ public DataStream<OUT> global() {
* the data stream that will be fed back and used as the input for the * the data stream that will be fed back and used as the input for the
* iteration head. A common usage pattern for streaming iterations is to use * iteration head. A common usage pattern for streaming iterations is to use
* output splitting to send a part of the closing data stream to the head. * output splitting to send a part of the closing data stream to the head.
* Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for * Refer to {@link #split(OutputSelector)} for more information.
* more information.
* <p> * <p>
* The iteration edge will be partitioned the same way as the first input of * The iteration edge will be partitioned the same way as the first input of
* the iteration head. * the iteration head.
Expand All @@ -408,8 +427,7 @@ public IterativeDataStream<OUT> iterate() {
* the data stream that will be fed back and used as the input for the * the data stream that will be fed back and used as the input for the
* iteration head. A common usage pattern for streaming iterations is to use * iteration head. A common usage pattern for streaming iterations is to use
* output splitting to send a part of the closing data stream to the head. * output splitting to send a part of the closing data stream to the head.
* Refer to {@link SingleOutputStreamOperator#split(outputSelector)} for * Refer to {@link #split(OutputSelector)} for more information.
* more information.
* <p> * <p>
* The iteration edge will be partitioned the same way as the first input of * The iteration edge will be partitioned the same way as the first input of
* the iteration head. * the iteration head.
Expand Down Expand Up @@ -1176,8 +1194,8 @@ public DataStreamSink<OUT> addSink(SinkFunction<OUT> sinkFunction) {
DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(), DataStreamSink<OUT> returnStream = new DataStreamSink<OUT>(environment, "sink", getType(),
sinkInvokable); sinkInvokable);


streamGraph.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null, streamGraph.addStreamVertex(returnStream.getId(), sinkInvokable, getType(), null, "sink",
"sink", degreeOfParallelism); degreeOfParallelism);


this.connectGraph(this.copy(), returnStream.getId(), 0); this.connectGraph(this.copy(), returnStream.getId(), 0);


Expand Down
Expand Up @@ -22,7 +22,6 @@


import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.collector.OutputSelector;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.invokable.StreamInvokable; import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy; import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
Expand Down Expand Up @@ -100,28 +99,6 @@ public SingleOutputStreamOperator<OUT, O> setBufferTimeout(long timeoutMillis) {
return this; return this;
} }


/**
* Operator used for directing tuples to specific named outputs using an
* {@link OutputSelector}. Calling this method on an operator creates a new
* {@link SplitDataStream}.
*
* @param outputSelector
* The user defined {@link OutputSelector} for directing the
* tuples.
* @return The {@link SplitDataStream}
*/
public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
if (!isSplit) {
this.isSplit = true;
streamGraph.setOutputSelector(id, clean(outputSelector));

return new SplitDataStream<OUT>(this);
} else {
throw new RuntimeException("Currently operators can only be split once");
}

}

/** /**
* This is a beta feature </br></br> Register an operator state for this * This is a beta feature </br></br> Register an operator state for this
* operator by the given name. This name can be used to retrieve the state * operator by the given name. This name can be used to retrieve the state
Expand Down
Expand Up @@ -56,7 +56,10 @@ private DataStream<OUT> selectOutput(String[] outputNames) {
} }


DataStream<OUT> returnStream = copy(); DataStream<OUT> returnStream = copy();
returnStream.userDefinedNames = Arrays.asList(outputNames);
for (DataStream<OUT> ds : returnStream.mergedStreams) {
ds.userDefinedNames = Arrays.asList(outputNames);
}
return returnStream; return returnStream;
} }


Expand Down
Expand Up @@ -108,7 +108,7 @@ private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
// We create a wrapper that will encapsulate the chained operators and // We create a wrapper that will encapsulate the chained operators and
// network outputs // network outputs
CollectorWrapper<OUT> wrapper = isDirectEmit ? new DirectedCollectorWrapper( CollectorWrapper<OUT> wrapper = isDirectEmit ? new DirectedCollectorWrapper(
chainedTaskConfig.getOutputSelector(cl)) : new CollectorWrapper<OUT>(); chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper<OUT>();


// Create collectors for the network outputs // Create collectors for the network outputs
for (String output : chainedTaskConfig.getOutputs(cl)) { for (String output : chainedTaskConfig.getOutputs(cl)) {
Expand Down

0 comments on commit 7ce9a8f

Please sign in to comment.