Skip to content

Commit

Permalink
[FLINK-1594] [streaming] Fixed co-tasks input handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Gábor Hermann authored and gyfora committed Mar 20, 2015
1 parent a8ba72b commit 3158d1d
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 45 deletions.
Expand Up @@ -60,7 +60,11 @@ public class StreamConfig implements Serializable {
private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
private static final String ITERATON_WAIT = "iterationWait";
private static final String OUTPUTS = "outvertexIDs";
private static final String NONCHAINED_OUTPUTS = "NONCHAINED_OUTPUTS";
private static final String CHAINED_OUTPUT_EDGES = "CHAINED_OUTPUTS";
private static final String EDGES_IN_ORDER = "rwOrder";
private static final String OUT_STREAM_EDGES = "out stream edges";
private static final String IN_STREAM_EDGES = "out stream edges";

// DEFAULT VALUES

Expand Down Expand Up @@ -281,19 +285,60 @@ public int getNumberOfOutputs() {
return config.getInteger(NUMBER_OF_OUTPUTS, 0);
}

public void setOutputs(List<Integer> outputvertexIDs) {
config.setBytes(OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
public void setNonChainedOutputs(List<StreamEdge> outputvertexIDs) {
config.setBytes(NONCHAINED_OUTPUTS, SerializationUtils.serialize((Serializable) outputvertexIDs));
}

@SuppressWarnings("unchecked")
public List<Integer> getOutputs(ClassLoader cl) {
public List<StreamEdge> getNonChainedOutputs(ClassLoader cl) {
try {
return (List<Integer>) InstantiationUtil.readObjectFromConfig(this.config, OUTPUTS, cl);
return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config, NONCHAINED_OUTPUTS, cl);
} catch (Exception e) {
throw new RuntimeException("Could not instantiate outputs.");
}
}

public void setChainedOutputs(List<StreamEdge> chainedOutputs) {
config.setBytes(CHAINED_OUTPUTS,
SerializationUtils.serialize((Serializable) chainedOutputs));
}

@SuppressWarnings("unchecked")
public List<StreamEdge> getChainedOutputs(ClassLoader cl) {
try {
return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(this.config,
CHAINED_OUTPUTS, cl);
} catch (Exception e) {
throw new RuntimeException("Could not instantiate chained outputs.");
}
}

public void setOutEdges(List<StreamEdge> outEdges) {
config.setBytes(OUT_STREAM_EDGES, SerializationUtils.serialize((Serializable) outEdges));
}

public List<StreamEdge> getOutEdges(ClassLoader cl) {
try {
return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
this.config, OUT_STREAM_EDGES, cl);
} catch (Exception e) {
throw new RuntimeException("Could not instantiate outputs.");
}
}

public void setInEdges(List<StreamEdge> inEdges) {
config.setBytes(IN_STREAM_EDGES, SerializationUtils.serialize((Serializable) inEdges));
}

public List<StreamEdge> getInEdges(ClassLoader cl) {
try {
return (List<StreamEdge>) InstantiationUtil.readObjectFromConfig(
this.config, IN_STREAM_EDGES, cl);
} catch (Exception e) {
throw new RuntimeException("Could not instantiate inputs.");
}
}

public void setOutEdgesInOrder(List<Tuple2<Integer, Integer>> outEdgeList) {

config.setBytes(EDGES_IN_ORDER, SerializationUtils.serialize((Serializable) outEdgeList));
Expand Down Expand Up @@ -329,21 +374,6 @@ public int getInputIndex(int inputNumber) {
return config.getInteger(INPUT_TYPE + inputNumber, 0);
}

public void setChainedOutputs(List<Integer> chainedOutputs) {
config.setBytes(CHAINED_OUTPUTS,
SerializationUtils.serialize((Serializable) chainedOutputs));
}

@SuppressWarnings("unchecked")
public List<Integer> getChainedOutputs(ClassLoader cl) {
try {
return (List<Integer>) InstantiationUtil.readObjectFromConfig(this.config,
CHAINED_OUTPUTS, cl);
} catch (Exception e) {
throw new RuntimeException("Could not instantiate chained outputs.");
}
}

public void setTransitiveChainedTaskConfigs(Map<Integer, StreamConfig> chainedTaskConfigs) {
config.setBytes(CHAINED_TASK_CONFIG,
SerializationUtils.serialize((Serializable) chainedTaskConfigs));
Expand Down Expand Up @@ -382,9 +412,10 @@ public String toString() {
builder.append("\nTask name: " + getVertexID());
builder.append("\nNumber of non-chained inputs: " + getNumberOfInputs());
builder.append("\nNumber of non-chained outputs: " + getNumberOfOutputs());
builder.append("\nOutput names: " + getOutputs(cl));
builder.append("\nOutput names: " + getNonChainedOutputs(cl));
builder.append("\nPartitioning:");
for (Integer outputname : getOutputs(cl)) {
for (StreamEdge output : getNonChainedOutputs(cl)) {
int outputname = output.getTargetVertex();
builder.append("\n\t" + outputname + ": " + getPartitioner(cl, outputname));
}

Expand Down
Expand Up @@ -17,11 +17,12 @@

package org.apache.flink.streaming.api;

import java.io.Serializable;
import java.util.List;

import org.apache.flink.streaming.partitioner.StreamPartitioner;

public class StreamEdge {
public class StreamEdge implements Serializable {

final private int sourceVertex;
final private int targetVertex;
Expand Down
Expand Up @@ -101,25 +101,24 @@ private List<Tuple2<Integer, Integer>> createChain(Integer startNode, Integer cu
if (!builtVertices.contains(startNode)) {

List<Tuple2<Integer, Integer>> transitiveOutEdges = new ArrayList<Tuple2<Integer, Integer>>();
List<Integer> chainableOutputs = new ArrayList<Integer>();
List<Integer> nonChainableOutputs = new ArrayList<Integer>();
List<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

for (StreamEdge outEdge : streamGraph.getOutEdges(current)) {
Integer outID = outEdge.getTargetVertex();
if (isChainable(current, outID)) {
chainableOutputs.add(outID);
if (isChainable(outEdge)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outID);
nonChainableOutputs.add(outEdge);
}
}

for (Integer chainable : chainableOutputs) {
transitiveOutEdges.addAll(createChain(startNode, chainable));
for (StreamEdge chainable : chainableOutputs) {
transitiveOutEdges.addAll(createChain(startNode, chainable.getTargetVertex()));
}

for (Integer nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(new Tuple2<Integer, Integer>(current, nonChainable));
createChain(nonChainable, nonChainable);
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(new Tuple2<Integer, Integer>(current, nonChainable.getTargetVertex()));
createChain(nonChainable.getTargetVertex(), nonChainable.getTargetVertex());
}

chainedNames.put(current, createChainedName(current, chainableOutputs));
Expand All @@ -133,6 +132,8 @@ private List<Tuple2<Integer, Integer>> createChain(Integer startNode, Integer cu

config.setChainStart();
config.setOutEdgesInOrder(transitiveOutEdges);
config.setOutEdges(streamGraph.getOutEdges(current));
config.setInEdges(streamGraph.getInEdges(current));

for (Tuple2<Integer, Integer> edge : transitiveOutEdges) {
connect(startNode, edge);
Expand All @@ -157,12 +158,12 @@ private List<Tuple2<Integer, Integer>> createChain(Integer startNode, Integer cu
}
}

private String createChainedName(Integer vertexID, List<Integer> chainedOutputs) {
private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
String operatorName = streamGraph.getOperatorName(vertexID);
if (chainedOutputs.size() > 1) {
List<String> outputChainedNames = new ArrayList<String>();
for (Integer chainable : chainedOutputs) {
outputChainedNames.add(chainedNames.get(chainable));
for (StreamEdge chainable : chainedOutputs) {
outputChainedNames.add(chainedNames.get(chainable.getTargetVertex()));
}
return operatorName + " -> (" + StringUtils.join(outputChainedNames, ", ") + ")";
} else if (chainedOutputs.size() == 1) {
Expand Down Expand Up @@ -201,7 +202,7 @@ private StreamConfig createProcessingVertex(Integer vertexID) {
}

private void setVertexConfig(Integer vertexID, StreamConfig config,
List<Integer> chainableOutputs, List<Integer> nonChainableOutputs) {
List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {

config.setVertexID(vertexID);
config.setBufferTimeout(streamGraph.getBufferTimeout(vertexID));
Expand All @@ -215,7 +216,7 @@ private void setVertexConfig(Integer vertexID, StreamConfig config,
config.setOutputSelectors(streamGraph.getOutputSelector(vertexID));

config.setNumberOfOutputs(nonChainableOutputs.size());
config.setOutputs(nonChainableOutputs);
config.setNonChainedOutputs(nonChainableOutputs);
config.setChainedOutputs(chainableOutputs);
config.setStateMonitoring(streamGraph.isMonitoringEnabled());

Expand All @@ -227,11 +228,11 @@ private void setVertexConfig(Integer vertexID, StreamConfig config,
config.setIterationWaitTime(streamGraph.getIterationTimeout(vertexID));
}

List<Integer> allOutputs = new ArrayList<Integer>(chainableOutputs);
List<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
allOutputs.addAll(nonChainableOutputs);

for (Integer output : allOutputs) {
config.setSelectedNames(output, streamGraph.getEdge(vertexID, output).getSelectedNames());
for (StreamEdge output : allOutputs) {
config.setSelectedNames(output.getTargetVertex(), streamGraph.getEdge(vertexID, output.getTargetVertex()).getSelectedNames());
}

vertexConfigs.put(vertexID, config);
Expand Down Expand Up @@ -274,7 +275,9 @@ private <T> void connect(Integer headOfChain, Tuple2<Integer, Integer> edge) {
}
}

private boolean isChainable(Integer vertexID, Integer outName) {
private boolean isChainable(StreamEdge edge) {
int vertexID = edge.getSourceVertex();
int outName = edge.getTargetVertex();

StreamInvokable<?, ?> headInvokable = streamGraph.getInvokable(vertexID);
StreamInvokable<?, ?> outInvokable = streamGraph.getInvokable(outName);
Expand Down
Expand Up @@ -20,6 +20,8 @@
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.streaming.api.StreamEdge;
import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
import org.apache.flink.streaming.api.streamrecord.StreamRecord;
import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.io.CoReaderIterator;
Expand All @@ -29,6 +31,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {

Expand Down Expand Up @@ -78,8 +81,10 @@ protected void setConfigInputs() throws StreamVertexException {
ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();

List<StreamEdge> inEdges = configuration.getInEdges(userClassLoader);

for (int i = 0; i < numberOfInputs; i++) {
int inputType = configuration.getInputIndex(i);
int inputType = inEdges.get(i).getTypeNumber();
InputGate reader = getEnvironment().getInputGate(i);
switch (inputType) {
case 1:
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.StreamConfig;
import org.apache.flink.streaming.api.StreamEdge;
import org.apache.flink.streaming.api.collector.CollectorWrapper;
import org.apache.flink.streaming.api.collector.DirectedCollectorWrapper;
import org.apache.flink.streaming.api.collector.StreamOutput;
Expand Down Expand Up @@ -117,7 +118,8 @@ private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
chainedTaskConfig.getOutputSelectors(cl)) : new CollectorWrapper<OUT>();

// Create collectors for the network outputs
for (Integer output : chainedTaskConfig.getOutputs(cl)) {
for (StreamEdge outputEdge : chainedTaskConfig.getNonChainedOutputs(cl)) {
Integer output = outputEdge.getTargetVertex();

Collector<?> outCollector = outputMap.get(output);

Expand All @@ -130,7 +132,9 @@ private Collector<OUT> createChainedCollector(StreamConfig chainedTaskConfig) {
}

// Create collectors for the chained outputs
for (Integer output : chainedTaskConfig.getChainedOutputs(cl)) {
for (StreamEdge outputEdge : chainedTaskConfig.getChainedOutputs(cl)) {
Integer output = outputEdge.getTargetVertex();

Collector<?> outCollector = createChainedCollector(chainedConfigs.get(output));
if (isDirectEmit) {
((DirectedCollectorWrapper<OUT>) wrapper).addCollector(outCollector,
Expand Down

0 comments on commit 3158d1d

Please sign in to comment.