Skip to content

Commit

Permalink
[FLINK-19434][DataStream API] Add source input chaining to StreamingJ…
Browse files Browse the repository at this point in the history
…obGraphGenerator (part 2)

This fixes issues with InputGate numbering and with Operators that have a mix of chained sources
and network inputs.
  • Loading branch information
tsreaper authored and StephanEwen committed Oct 2, 2020
1 parent 52a83cc commit ce4f67b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;

import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -138,7 +139,7 @@ public static JobGraph createJobGraph(StreamGraph streamGraph, @Nullable JobID j
private final StreamGraphHasher defaultStreamGraphHasher;
private final List<StreamGraphHasher> legacyStreamGraphHashers;

private final boolean chainSourceInputs = true;
private final Map<Integer, OperatorChainInfo> chainInfos;

private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobID) {
this.streamGraph = streamGraph;
Expand All @@ -154,6 +155,7 @@ private StreamingJobGraphGenerator(StreamGraph streamGraph, @Nullable JobID jobI
this.chainedPreferredResources = new HashMap<>();
this.chainedInputOutputFormats = new HashMap<>();
this.physicalEdgesInOrder = new ArrayList<>();
this.chainInfos = new HashMap<>();

jobGraph = new JobGraph(jobID, streamGraph.getJobName());
}
Expand Down Expand Up @@ -268,38 +270,39 @@ private Collection<OperatorChainInfo> buildChainedInputsAndGetHeadInputs(
for (Integer sourceNodeId : streamGraph.getSourceIDs()) {
final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId);

if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory) {
checkState(sourceNode.getOutEdges().size() == 1);

if (sourceNode.getOperatorFactory() instanceof SourceOperatorFactory && sourceNode.getOutEdges().size() == 1) {
// as long as only NAry ops support this chaining, we need to skip the other parts
final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0);
final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId());
final StreamOperatorFactory<?> targetFactory = target.getOperatorFactory();

// CHECK WHETHER TARGET IS NARY INPUT

final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
final StreamConfig.SourceInputConfig inputConfig = new StreamConfig.SourceInputConfig(sourceOutEdge);
final StreamConfig operatorConfig = new StreamConfig(new Configuration());
setVertexConfig(sourceNodeId, operatorConfig, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
operatorConfig.setChainIndex(0); // sources are always first
operatorConfig.setOperatorID(opId);
operatorConfig.setOperatorName(sourceNode.getOperatorName());
chainedSources.put(sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));
if (isMultipleInput(target) && isChainableIgnoringInEdgesSize(sourceOutEdge, streamGraph)) {
final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
final StreamConfig.SourceInputConfig inputConfig = new StreamConfig.SourceInputConfig(sourceOutEdge);
final StreamConfig operatorConfig = new StreamConfig(new Configuration());
setVertexConfig(sourceNodeId, operatorConfig, Collections.emptyList(), Collections.emptyList(), Collections.emptyMap());
operatorConfig.setChainIndex(0); // sources are always first
operatorConfig.setOperatorID(opId);
operatorConfig.setOperatorName(sourceNode.getOperatorName());
chainedSources.put(sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig));

final SourceOperatorFactory<?> sourceOpFact = (SourceOperatorFactory<?>) sourceNode.getOperatorFactory();
final OperatorCoordinator.Provider coord = sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);
final SourceOperatorFactory<?> sourceOpFact = (SourceOperatorFactory<?>) sourceNode.getOperatorFactory();
final OperatorCoordinator.Provider coord = sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId);

final OperatorChainInfo chainInfo = chainEntryPoints.computeIfAbsent(
final OperatorChainInfo chainInfo = chainEntryPoints.computeIfAbsent(
sourceOutEdge.getTargetId(),
(k) -> new OperatorChainInfo(sourceOutEdge.getTargetId(), hashes, legacyHashes, chainedSources, streamGraph));
chainInfo.addCoordinatorProvider(coord);
}
else {
chainEntryPoints.computeIfAbsent(
sourceNodeId,
(k) -> new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));
chainInfo.addCoordinatorProvider(coord);
continue;
}
}

chainEntryPoints.computeIfAbsent(
sourceNodeId,
(k) -> new OperatorChainInfo(sourceNodeId, hashes, legacyHashes, chainedSources, streamGraph));
}

for (Map.Entry<Integer, OperatorChainInfo> entry : chainEntryPoints.entrySet()) {
chainInfos.put(entry.getKey(), entry.getValue());
}
return chainEntryPoints.values();
}
Expand Down Expand Up @@ -351,7 +354,9 @@ private List<StreamEdge> createChain(Integer currentNodeId, int chainIndex, Oper
createChain(
nonChainable.getTargetId(),
1, // operators start at position 1 because 0 is for chained source inputs
chainInfo.newChain(nonChainable.getTargetId()));
chainInfos.computeIfAbsent(
nonChainable.getTargetId(),
(k) -> chainInfo.newChain(nonChainable.getTargetId())));
}

chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs));
Expand Down Expand Up @@ -537,6 +542,7 @@ private void setVertexConfig(
final List<StreamEdge> inEdges = vertex.getInEdges();
final StreamConfig.InputConfig[] inputConfigs = new StreamConfig.InputConfig[inEdges.size()];

int inputGateCount = 0;
for (int i = 0; i < inEdges.size(); i++) {
final StreamEdge inEdge = inEdges.get(i);
final ChainedSourceInfo chainedSource = chainedSources.get(inEdge.getSourceId());
Expand All @@ -546,7 +552,8 @@ private void setVertexConfig(
.computeIfAbsent(vertexID, (key) -> new HashMap<>())
.put(inEdge.getSourceId(), chainedSource.getOperatorConfig());
} else {
inputConfigs[i] = new StreamConfig.NetworkInputConfig(vertex.getTypeSerializerIn(i), i);
inputConfigs[i] = new StreamConfig.NetworkInputConfig(
vertex.getTypeSerializerIn(Math.max(0, inEdge.getTypeNumber() - 1)), inputGateCount++);
}
}
config.setInputs(inputConfigs);
Expand Down Expand Up @@ -712,16 +719,22 @@ private ResultPartitionType determineResultPartitionType(StreamPartitioner<?> pa
}

public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

return downStreamVertex.getInEdges().size() == 1
&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
&& isChainableIgnoringInEdgesSize(edge, streamGraph);
}

private static boolean isChainableIgnoringInEdgesSize(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);

return upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
&& areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph)
&& (edge.getPartitioner() instanceof ForwardPartitioner)
&& edge.getShuffleMode() != ShuffleMode.BATCH
&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
&& streamGraph.isChainingEnabled();
}

@VisibleForTesting
Expand Down Expand Up @@ -1080,6 +1093,15 @@ private void configureCheckpointing() {
jobGraph.setSnapshotSettings(settings);
}

private static boolean isMultipleInput(StreamNode vertex) {
StreamOperatorFactory<?> factory = vertex.getOperatorFactory();
// FIXME super hack!
return factory.getClass().getName().equals(
"org.apache.flink.table.runtime.operators.multipleinput.BatchMultipleInputStreamOperatorFactory") ||
factory.getClass().getName().equals(
"org.apache.flink.table.runtime.operators.multipleinput.StreamMultipleInputStreamOperatorFactory");
}

/**
* A private class to help maintain the information of an operator chain during the recursive call in
* {@link #createChain(Integer, int, OperatorChainInfo)}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public void init() throws Exception {
.filter(input -> (input instanceof StreamConfig.NetworkInputConfig))
.count();

ArrayList[] inputLists = new ArrayList[numberOfLogicalNetworkInputs];
ArrayList[] inputLists = new ArrayList[inputs.length];
for (int i = 0; i < inputLists.length; i++) {
inputLists[i] = new ArrayList<>();
}
Expand All @@ -105,7 +105,13 @@ public void init() throws Exception {
inputLists[inputType - 1].add(reader);
}

createInputProcessor(inputLists, inputs, watermarkGauges);
ArrayList<ArrayList<?>> networkInputLists = new ArrayList<>();
for (ArrayList<?> inputList : inputLists) {
if (!inputList.isEmpty()) {
networkInputLists.add(inputList);
}
}
createInputProcessor(networkInputLists.toArray(new ArrayList[0]), inputs, watermarkGauges);

// wrap watermark gauge since registered metrics must be unique
getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue);
Expand Down

0 comments on commit ce4f67b

Please sign in to comment.