From 74945cfc3a4d3b0afa9b3d0110ab0e329d099a83 Mon Sep 17 00:00:00 2001
From: Gyula Fora
Date: Tue, 28 Apr 2015 23:39:32 +0200
Subject: [PATCH] [FLINK-1948] [streaming] Manual task slot sharing settings
added for stream operators
---
.../SingleOutputStreamOperator.java | 73 ++++++++++++++++++-
.../temporal/StreamCrossOperator.java | 2 +-
.../StreamExecutionEnvironment.java | 12 +++
.../streaming/api/graph/JSONGenerator.java | 12 +--
.../streaming/api/graph/StreamGraph.java | 73 ++++++++++++-------
.../flink/streaming/api/graph/StreamNode.java | 16 ++++
.../api/graph/StreamingJobGraphGenerator.java | 30 ++++++--
.../api/graph/WindowingOptimizer.java | 10 +--
.../api/operators/StreamOperator.java | 12 ++-
.../flink/streaming/api/CoStreamTest.java | 3 +-
.../api/graph/SlotAllocationTest.java | 62 ++++++++++++++++
.../api/operators/co/SelfConnectionTest.java | 5 +-
.../streaming/api/scala/DataStream.scala | 62 +++++++++++++++-
.../api/scala/StreamCrossOperator.scala | 2 +-
.../scala/StreamExecutionEnvironment.scala | 11 +++
15 files changed, 327 insertions(+), 58 deletions(-)
create mode 100644 flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index aa70e3fcf6641..12a3c7eef3777 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -22,6 +22,7 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;
@@ -57,8 +58,7 @@ protected SingleOutputStreamOperator(DataStream dataStream) {
}
/**
- * Sets the parallelism for this operator. The degree must be 1 or
- * more.
+ * Sets the parallelism for this operator. The degree must be 1 or more.
*
* @param parallelism
* The parallelism for this operator.
@@ -118,11 +118,43 @@ public SingleOutputStreamOperator copy() {
return new SingleOutputStreamOperator(this);
}
- public SingleOutputStreamOperator setChainingStrategy(ChainingStrategy strategy) {
+ /**
+ * Sets the {@link ChainingStrategy} for the given operator affecting the
+ * way operators will possibly be co-located on the same thread for
+ * increased performance.
+ *
+ * @param strategy
+ * The selected {@link ChainingStrategy}
+ * @return The operator with the modified chaining strategy
+ */
+ private SingleOutputStreamOperator setChainingStrategy(ChainingStrategy strategy) {
this.operator.setChainingStrategy(strategy);
return this;
}
-
+
+ /**
+ * Turns of chaining for this operator so thread co-location will not be
+ * used as an optimization.
Chaining can be turned off for the whole
+ * job by {@link StreamExecutionEnvironment#disableOperatorChaning()}
+ * however it is not advised for performance considerations.
+ *
+ * @return The operator with chaining disabled
+ */
+ public SingleOutputStreamOperator disableChaining() {
+ return setChainingStrategy(ChainingStrategy.NEVER);
+ }
+
+ /**
+ * Starts a new task chain beginning at this operator. This operator will
+ * not be chained (thread co-located for increased performance) to any
+ * previous tasks even if possible.
+ *
+ * @return The operator with chaining set.
+ */
+ public SingleOutputStreamOperator startNewChain() {
+ return setChainingStrategy(ChainingStrategy.HEAD);
+ }
+
/**
* Adds a type information hint about the return type of this operator.
*
@@ -237,4 +269,37 @@ public O returns(Class typeClass) {
}
}
+ /**
+ * By default all operators in a streaming job share the same resource
+ * group. Each resource group takes as many task manager slots as the
+ * maximum parallelism operator in that group. Task chaining is only
+ * possible within one resource group. By calling this method, this
+ * operators starts a new resource group and all subsequent operators will
+ * be added to this group unless specified otherwise. Please note that
+ * local executions have by default as many available task slots as the
+ * environment parallelism, so in order to start a new resource group the
+ * degree of parallelism for the operators must be decreased from the
+ * default.
+ *
+ * @return The operator as a part of a new resource group.
+ */
+ public SingleOutputStreamOperator startNewResourceGroup() {
+ streamGraph.setResourceStrategy(getId(), ResourceStrategy.NEWGROUP);
+ return this;
+ }
+
+ /**
+ * Isolates the operator in its own resource group. This will cause the
+ * operator to grab as many task slots as its degree of parallelism. If
+ * there are no free resources available, the job will fail to start. It
+ * also disables chaining for this operator All subsequent operators are
+ * assigned to the default resource group.
+ *
+ * @return The operator with isolated resource group.
+ */
+ public SingleOutputStreamOperator isolateResources() {
+ streamGraph.setResourceStrategy(getId(), ResourceStrategy.ISOLATE);
+ return this;
+ }
+
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
index dbd295f0ed6e6..3dd02a3fa79f6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporal/StreamCrossOperator.java
@@ -76,7 +76,7 @@ public CrossWindow every(long length, TimeUnit timeUnit) {
@SuppressWarnings("unchecked")
public CrossWindow every(long length) {
- ((CoStreamWindow) streamGraph.getVertex(id).getOperator())
+ ((CoStreamWindow) streamGraph.getStreamNode(id).getOperator())
.setSlideSize(length);
return this;
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 3e935f5f388fe..783480d461932 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -189,6 +189,18 @@ public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
return this;
}
+ /**
+ * Disables operator chaining for streaming operators. Operator chaining
+ * allows non-shuffle operations to be co-located in the same thread fully
+ * avoiding serialization/de-serialization.
+ *
+ * @return StreamExecutionEnvironment with chaining disabled.
+ */
+ public StreamExecutionEnvironment disableOperatorChaning() {
+ streamGraph.setChaining(false);
+ return this;
+ }
+
/**
* Method for enabling fault-tolerance. Activates monitoring and backup of
* streaming operator states.
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
index 8d8ded9bfd549..a0e0a3693d01d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/JSONGenerator.java
@@ -60,7 +60,7 @@ private void visit(JSONArray jsonArray, List toVisit,
Map edgeRemapings) throws JSONException {
Integer vertexID = toVisit.get(0);
- StreamNode vertex = streamGraph.getVertex(vertexID);
+ StreamNode vertex = streamGraph.getStreamNode(vertexID);
if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
@@ -97,7 +97,7 @@ private void visit(JSONArray jsonArray, List toVisit,
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
- obj.put(PARALLELISM, streamGraph.getVertex(iterationHead).getParallelism());
+ obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
JSONArray iterationInputs = new JSONArray();
obj.put(PREDECESSORS, iterationInputs);
@@ -115,7 +115,7 @@ private void visitIteration(JSONArray jsonArray, List toVisit, int head
Map edgeRemapings, JSONArray iterationInEdges) throws JSONException {
Integer vertexID = toVisit.get(0);
- StreamNode vertex = streamGraph.getVertex(vertexID);
+ StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);
// Ignoring head and tail to avoid redundancy
@@ -154,7 +154,7 @@ private void decorateEdge(JSONArray inputArray, int vertexID, int mappedInputID,
private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {
- StreamNode vertex = streamGraph.getVertex(vertexID);
+ StreamNode vertex = streamGraph.getStreamNode(vertexID);
node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
@@ -165,7 +165,7 @@ private void decorateNode(Integer vertexID, JSONObject node) throws JSONExceptio
node.put(PACT, "Data Stream");
}
- StreamOperator, ?> operator = streamGraph.getVertex(vertexID).getOperator();
+ StreamOperator, ?> operator = streamGraph.getStreamNode(vertexID).getOperator();
if (operator != null && operator.getUserFunction() != null) {
node.put(CONTENTS, vertex.getOperatorName() + " at "
@@ -174,7 +174,7 @@ private void decorateNode(Integer vertexID, JSONObject node) throws JSONExceptio
node.put(CONTENTS, vertex.getOperatorName());
}
- node.put(PARALLELISM, streamGraph.getVertex(vertexID).getParallelism());
+ node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}
}
\ No newline at end of file
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index bfeed28147cb8..93bf8ebe8121f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -173,15 +173,15 @@ public void addIterationHead(Integer vertexID, Integer iterationHead, Integer it
chaining = false;
- StreamLoop iteration = new StreamLoop(iterationID, getVertex(iterationHead), timeOut);
+ StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(iterationHead), timeOut);
streamLoops.put(iterationID, iteration);
vertexIDtoLoop.put(vertexID, iteration);
setSerializersFrom(iterationHead, vertexID);
- getVertex(vertexID).setOperatorName("IterationHead-" + iterationHead);
+ getStreamNode(vertexID).setOperatorName("IterationHead-" + iterationHead);
- int outpartitionerIndex = getVertex(iterationHead).getInEdgeIndices().get(0);
- StreamPartitioner> outputPartitioner = getVertex(outpartitionerIndex).getOutEdges()
+ int outpartitionerIndex = getStreamNode(iterationHead).getInEdgeIndices().get(0);
+ StreamPartitioner> outputPartitioner = getStreamNode(outpartitionerIndex).getOutEdges()
.get(0).getPartitioner();
addEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList());
@@ -196,22 +196,23 @@ public void addIterationHead(Integer vertexID, Integer iterationHead, Integer it
public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID,
long waitTime) {
- if (getVertex(iterationTail).getBufferTimeout() == 0) {
+ if (getStreamNode(iterationTail).getBufferTimeout() == 0) {
throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
}
addNode(vertexID, StreamIterationTail.class, null, null).setParallelism(
- getVertex(iterationTail).getParallelism());
+ getStreamNode(iterationTail).getParallelism());
StreamLoop iteration = streamLoops.get(iterationID);
- iteration.setTail(getVertex(iterationTail));
+ iteration.setTail(getStreamNode(iterationTail));
vertexIDtoLoop.put(vertexID, iteration);
setSerializersFrom(iterationTail, vertexID);
- getVertex(vertexID).setOperatorName("IterationTail-" + iterationTail);
+ getStreamNode(vertexID).setOperatorName("IterationTail-" + iterationTail);
- setParallelism(iteration.getHead().getID(), getVertex(iterationTail).getParallelism());
- setBufferTimeout(iteration.getHead().getID(), getVertex(iterationTail).getBufferTimeout());
+ setParallelism(iteration.getHead().getID(), getStreamNode(iterationTail).getParallelism());
+ setBufferTimeout(iteration.getHead().getID(), getStreamNode(iterationTail)
+ .getBufferTimeout());
if (LOG.isDebugEnabled()) {
LOG.debug("ITERATION SINK: {}", vertexID);
@@ -233,14 +234,14 @@ protected StreamNode addNode(Integer vertexID, Class extends AbstractInvokable
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID,
StreamPartitioner> partitionerObject, int typeNumber, List outputNames) {
- StreamEdge edge = new StreamEdge(getVertex(upStreamVertexID),
- getVertex(downStreamVertexID), typeNumber, outputNames, partitionerObject);
- getVertex(edge.getSourceID()).addOutEdge(edge);
- getVertex(edge.getTargetID()).addInEdge(edge);
+ StreamEdge edge = new StreamEdge(getStreamNode(upStreamVertexID),
+ getStreamNode(downStreamVertexID), typeNumber, outputNames, partitionerObject);
+ getStreamNode(edge.getSourceID()).addOutEdge(edge);
+ getStreamNode(edge.getTargetID()).addInEdge(edge);
}
public void addOutputSelector(Integer vertexID, OutputSelector outputSelector) {
- getVertex(vertexID).addOutputSelector(outputSelector);
+ getStreamNode(vertexID).addOutputSelector(outputSelector);
if (LOG.isDebugEnabled()) {
LOG.debug("Outputselector set for {}", vertexID);
@@ -249,24 +250,24 @@ public void addOutputSelector(Integer vertexID, OutputSelector outputSele
}
public void setParallelism(Integer vertexID, int parallelism) {
- getVertex(vertexID).setParallelism(parallelism);
+ getStreamNode(vertexID).setParallelism(parallelism);
}
public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
- getVertex(vertexID).setBufferTimeout(bufferTimeout);
+ getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
}
private void setSerializers(Integer vertexID, StreamRecordSerializer> in1,
StreamRecordSerializer> in2, StreamRecordSerializer> out) {
- StreamNode vertex = getVertex(vertexID);
+ StreamNode vertex = getStreamNode(vertexID);
vertex.setSerializerIn1(in1);
vertex.setSerializerIn2(in2);
vertex.setSerializerOut(out);
}
private void setSerializersFrom(Integer from, Integer to) {
- StreamNode fromVertex = getVertex(from);
- StreamNode toVertex = getVertex(to);
+ StreamNode fromVertex = getStreamNode(from);
+ StreamNode toVertex = getStreamNode(to);
toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
@@ -275,18 +276,32 @@ private void setSerializersFrom(Integer from, Integer to) {
public void setOutType(Integer vertexID, TypeInformation outType) {
StreamRecordSerializer serializer = new StreamRecordSerializer(outType,
executionConfig);
- getVertex(vertexID).setSerializerOut(serializer);
+ getStreamNode(vertexID).setSerializerOut(serializer);
}
public void setOperator(Integer vertexID, StreamOperator operatorObject) {
- getVertex(vertexID).setOperator(operatorObject);
+ getStreamNode(vertexID).setOperator(operatorObject);
}
public void setInputFormat(Integer vertexID, InputFormat inputFormat) {
- getVertex(vertexID).setInputFormat(inputFormat);
+ getStreamNode(vertexID).setInputFormat(inputFormat);
+ }
+
+ public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
+ StreamNode node = getStreamNode(vertexID);
+ switch (strategy) {
+ case ISOLATE:
+ node.isolateSlot();
+ break;
+ case NEWGROUP:
+ node.startNewSlotSharingGroup();
+ break;
+ default:
+ throw new IllegalArgumentException("Unknown resource strategy");
+ }
}
- public StreamNode getVertex(Integer vertexID) {
+ public StreamNode getStreamNode(Integer vertexID) {
return streamNodes.get(vertexID);
}
@@ -295,7 +310,7 @@ protected Collection extends Integer> getVertexIDs() {
}
protected StreamEdge getEdge(int sourceId, int targetId) {
- Iterator outIterator = getVertex(sourceId).getOutEdges().iterator();
+ Iterator outIterator = getStreamNode(sourceId).getOutEdges().iterator();
while (outIterator.hasNext()) {
StreamEdge edge = outIterator.next();
@@ -311,6 +326,10 @@ public Collection getSourceIDs() {
return sources;
}
+ public Collection getStreamNodes() {
+ return streamNodes.values();
+ }
+
public Set>> getOperators() {
Set>> operatorSet = new HashSet>>();
for (StreamNode vertex : streamNodes.values()) {
@@ -414,6 +433,10 @@ public void dumpStreamingPlanAsJSON(File file) throws IOException {
}
}
+ public static enum ResourceStrategy {
+ DEFAULT, ISOLATE, NEWGROUP
+ }
+
/**
* Object for representing loops in streaming programs.
*
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
index 576150e20c99c..e44b99941899e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
@@ -38,6 +38,7 @@
public class StreamNode implements Serializable {
private static final long serialVersionUID = 1L;
+ private static int currentSlotSharingIndex = 1;
transient private StreamExecutionEnvironment env;
@@ -45,6 +46,8 @@ public class StreamNode implements Serializable {
private Integer parallelism = null;
private Long bufferTimeout = null;
private String operatorName;
+ private Integer slotSharingID;
+ private boolean isolatedSlot = false;
private transient StreamOperator, ?> operator;
private List> outputSelectors;
@@ -68,6 +71,7 @@ public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamOperator,
this.operator = operator;
this.outputSelectors = outputSelector;
this.jobVertexClass = jobVertexClass;
+ this.slotSharingID = currentSlotSharingIndex;
}
public void addInEdge(StreamEdge inEdge) {
@@ -198,4 +202,16 @@ public void setInputFormat(InputFormat inputFormat) {
this.inputFormat = inputFormat;
}
+ public int getSlotSharingID() {
+ return isolatedSlot ? -1 : slotSharingID;
+ }
+
+ public void startNewSlotSharingGroup() {
+ this.slotSharingID = ++currentSlotSharingIndex;
+ }
+
+ public void isolateSlot() {
+ isolatedSlot = true;
+ }
+
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 7b856a1d6cf33..29d3f288a7eae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -23,6 +23,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.configuration.Configuration;
@@ -141,7 +142,7 @@ private List createChain(Integer startNode, Integer current) {
List chainableOutputs = new ArrayList();
List nonChainableOutputs = new ArrayList();
- for (StreamEdge outEdge : streamGraph.getVertex(current).getOutEdges()) {
+ for (StreamEdge outEdge : streamGraph.getStreamNode(current).getOutEdges()) {
if (isChainable(outEdge)) {
chainableOutputs.add(outEdge);
} else {
@@ -169,7 +170,7 @@ private List createChain(Integer startNode, Integer current) {
config.setChainStart();
config.setOutEdgesInOrder(transitiveOutEdges);
- config.setOutEdges(streamGraph.getVertex(current).getOutEdges());
+ config.setOutEdges(streamGraph.getStreamNode(current).getOutEdges());
for (StreamEdge edge : transitiveOutEdges) {
connect(startNode, edge);
@@ -195,7 +196,7 @@ private List createChain(Integer startNode, Integer current) {
}
private String createChainedName(Integer vertexID, List chainedOutputs) {
- String operatorName = streamGraph.getVertex(vertexID).getOperatorName();
+ String operatorName = streamGraph.getStreamNode(vertexID).getOperatorName();
if (chainedOutputs.size() > 1) {
List outputChainedNames = new ArrayList();
for (StreamEdge chainable : chainedOutputs) {
@@ -217,7 +218,7 @@ private String createChainedName(Integer vertexID, List chainedOutpu
private StreamConfig createProcessingVertex(Integer vertexID) {
AbstractJobVertex jobVertex = new AbstractJobVertex(chainedNames.get(vertexID));
- StreamNode vertex = streamGraph.getVertex(vertexID);
+ StreamNode vertex = streamGraph.getStreamNode(vertexID);
jobVertex.setInvokableClass(vertex.getJobVertexClass());
@@ -247,7 +248,7 @@ private StreamConfig createProcessingVertex(Integer vertexID) {
private void setVertexConfig(Integer vertexID, StreamConfig config,
List chainableOutputs, List nonChainableOutputs) {
- StreamNode vertex = streamGraph.getVertex(vertexID);
+ StreamNode vertex = streamGraph.getStreamNode(vertexID);
config.setVertexID(vertexID);
config.setBufferTimeout(vertex.getBufferTimeout());
@@ -318,6 +319,8 @@ private boolean isChainable(StreamEdge edge) {
return downStreamVertex.getInEdges().size() == 1
&& outOperator != null
+ && upStreamVertex.getSlotSharingID() == downStreamVertex.getSlotSharingID()
+ && upStreamVertex.getSlotSharingID() != -1
&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || headOperator
.getChainingStrategy() == ChainingStrategy.ALWAYS)
@@ -328,10 +331,21 @@ private boolean isChainable(StreamEdge edge) {
}
private void setSlotSharing() {
- SlotSharingGroup shareGroup = new SlotSharingGroup();
- for (AbstractJobVertex vertex : jobVertices.values()) {
- vertex.setSlotSharingGroup(shareGroup);
+ Map slotSharingGroups = new HashMap();
+
+ for (Entry entry : jobVertices.entrySet()) {
+
+ int slotSharingID = streamGraph.getStreamNode(entry.getKey()).getSlotSharingID();
+
+ if (slotSharingID != -1) {
+ SlotSharingGroup group = slotSharingGroups.get(slotSharingID);
+ if (group == null) {
+ group = new SlotSharingGroup();
+ slotSharingGroups.put(slotSharingID, group);
+ }
+ entry.getValue().setSlotSharingGroup(group);
+ }
}
for (StreamLoop loop : streamGraph.getStreamLoops()) {
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
index b688ea43a2ffd..dfcdc8d871356 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/WindowingOptimizer.java
@@ -53,7 +53,7 @@ private static void removeMergeBeforeFlatten(StreamGraph streamGraph) {
for (Integer flattenerID : flatteners) {
// Flatteners should have exactly one input
- StreamNode input = streamGraph.getVertex(flattenerID).getInEdges().get(0)
+ StreamNode input = streamGraph.getStreamNode(flattenerID).getInEdges().get(0)
.getSourceVertex();
// Check whether the flatten is applied after a merge
@@ -98,9 +98,9 @@ private static void setDiscretizerReuse(StreamGraph streamGraph) {
for (Tuple2> discretizer : discretizers) {
boolean inMatching = false;
for (Tuple2, List> matching : matchingDiscretizers) {
- Set discretizerInEdges = new HashSet(streamGraph.getVertex(
+ Set discretizerInEdges = new HashSet(streamGraph.getStreamNode(
discretizer.f0).getInEdgeIndices());
- Set matchingInEdges = new HashSet(streamGraph.getVertex(
+ Set matchingInEdges = new HashSet(streamGraph.getStreamNode(
matching.f1.get(0)).getInEdgeIndices());
if (discretizer.f1.equals(matching.f0)
@@ -132,7 +132,7 @@ private static void setDiscretizerReuse(StreamGraph streamGraph) {
private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplaceID,
Integer replaceWithID) {
// Convert to array to create a copy
- List outEdges = new ArrayList(streamGraph.getVertex(toReplaceID)
+ List outEdges = new ArrayList(streamGraph.getStreamNode(toReplaceID)
.getOutEdges());
int numOutputs = outEdges.size();
@@ -146,6 +146,6 @@ private static void replaceDiscretizer(StreamGraph streamGraph, Integer toReplac
}
// Remove the other discretizer
- streamGraph.removeVertex(streamGraph.getVertex(toReplaceID));
+ streamGraph.removeVertex(streamGraph.getStreamNode(toReplaceID));
}
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 5cb3ec9d5cba6..a361b7a2b3ef5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -113,7 +113,7 @@ protected StreamRecord readNext() throws IOException {
// Task already cancelled do nothing
return null;
}
- } catch (IllegalStateException e) {
+ } catch (IllegalStateException e) {
if (isRunning) {
throw new RuntimeException("Could not read next record due to: "
+ StringUtils.stringifyException(e));
@@ -198,6 +198,16 @@ public ChainingStrategy getChainingStrategy() {
return chainingStrategy;
}
+ /**
+ * Defines the chaining scheme for the operator. By default ALWAYS is used,
+ * which means operators will be eagerly chained whenever possible, for
+ * maximal performance. It is generally a good practice to allow maximal
+ * chaining and increase operator parallelism. When the strategy is set
+ * to NEVER, the operator will not be chained to the preceding or succeeding
+ * operators. HEAD strategy marks a start of a new chain, so that the
+ * operator will not be chained to preceding operators, only succeding ones.
+ *
+ */
public static enum ChainingStrategy {
ALWAYS, NEVER, HEAD
}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index 4228314e04ac1..4b3543b75bad3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -31,7 +31,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
@@ -89,7 +88,7 @@ public Tuple2 map(Integer value) throws Exception {
public boolean filter(Tuple2 value) throws Exception {
return true;
}
- }).setChainingStrategy(StreamOperator.ChainingStrategy.NEVER).groupBy(new KeySelector, Integer>() {
+ }).disableChaining().groupBy(new KeySelector, Integer>() {
private static final long serialVersionUID = 1L;
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
new file mode 100644
index 0000000000000..94636f767b80c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/graph/SlotAllocationTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import static org.junit.Assert.*;
+
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.junit.Test;
+
+public class SlotAllocationTest {
+
+ @SuppressWarnings("serial")
+ @Test
+ public void test() {
+ StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(8);
+
+ FilterFunction dummyFilter = new FilterFunction() {
+
+ @Override
+ public boolean filter(Long value) throws Exception {
+
+ return false;
+ }
+ };
+
+ env.generateSequence(1, 10).filter(dummyFilter).isolateResources().filter(dummyFilter)
+ .disableChaining().filter(dummyFilter).startNewResourceGroup().filter(dummyFilter)
+ .startNewChain().print();
+
+ JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+
+ List vertices = jobGraph.getVerticesSortedTopologicallyFromSources();
+
+ assertEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(2).getSlotSharingGroup());
+ assertNotEquals(vertices.get(0).getSlotSharingGroup(), vertices.get(1)
+ .getSlotSharingGroup());
+ assertNotEquals(vertices.get(2).getSlotSharingGroup(), vertices.get(3)
+ .getSlotSharingGroup());
+ assertEquals(vertices.get(3).getSlotSharingGroup(), vertices.get(4).getSlotSharingGroup());
+
+ }
+}
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
index ca0c1dc7bb0bf..5986a30929c79 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/SelfConnectionTest.java
@@ -33,7 +33,6 @@
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.windowing.helper.Timestamp;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
@@ -132,7 +131,7 @@ public void differentDataStreamSameChain() {
public String map(Integer value) throws Exception {
return "x " + value;
}
- }).setChainingStrategy(StreamOperator.ChainingStrategy.ALWAYS);
+ });
stringMap.connect(src).map(new CoMapFunction() {
@@ -178,7 +177,7 @@ public void differentDataStreamDifferentChain() {
StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
- DataStream src = env.fromElements(1, 3, 5).setChainingStrategy(StreamOperator.ChainingStrategy.NEVER);
+ DataStream src = env.fromElements(1, 3, 5).disableChaining();
DataStream stringMap = src.flatMap(new FlatMapFunction() {
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index 41be9d4ffc625..9994479ec1e79 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -95,9 +95,67 @@ class DataStream[T](javaStream: JavaStream[T]) {
"parallelism.")
}
- def setChainingStrategy(strategy: ChainingStrategy): DataStream[T] = {
+ /**
+ * Turns of chaining for this operator so thread co-location will not be
+ * used as an optimization. Chaining can be turned off for the whole
+ * job by {@link StreamExecutionEnvironment#disableOperatorChaning()}
+ * however it is not advised for performance considerations.
+ *
+ */
+ def disableChaining(): DataStream[T] = {
+ javaStream match {
+ case ds: SingleOutputStreamOperator[_, _] => ds.disableChaining();
+ case _ =>
+ throw new UnsupportedOperationException("Only supported for operators.")
+ }
+ this
+ }
+
+ /**
+ * Starts a new task chain beginning at this operator. This operator will
+ * not be chained (thread co-located for increased performance) to any
+ * previous tasks even if possible.
+ *
+ */
+ def startNewChain(): DataStream[T] = {
+ javaStream match {
+ case ds: SingleOutputStreamOperator[_, _] => ds.startNewChain();
+ case _ =>
+ throw new UnsupportedOperationException("Only supported for operators.")
+ }
+ this
+ }
+
+ /**
+ * Isolates the operator in its own resource group. This will cause the
+ * operator to grab as many task slots as its degree of parallelism. If
+ * there are no free resources available, the job will fail to start.
+ * All subsequent operators are assigned to the default resource group.
+ *
+ */
+ def isolateResources(): DataStream[T] = {
+ javaStream match {
+ case ds: SingleOutputStreamOperator[_, _] => ds.isolateResources();
+ case _ =>
+ throw new UnsupportedOperationException("Only supported for operators.")
+ }
+ this
+ }
+
+ /**
+ * By default all operators in a streaming job share the same resource
+ * group. Each resource group takes as many task manager slots as the
+ * maximum parallelism operator in that group. By calling this method, this
+ * operators starts a new resource group and all subsequent operators will
+ * be added to this group unless specified otherwise. Please note that
+ * local executions have by default as many available task slots as the
+ * environment parallelism, so in order to start a new resource group the
+ * degree of parallelism for the operators must be decreased from the
+ * default.
+ */
+ def startNewResourceGroup(): DataStream[T] = {
javaStream match {
- case ds: SingleOutputStreamOperator[_, _] => ds.setChainingStrategy(strategy)
+ case ds: SingleOutputStreamOperator[_, _] => ds.startNewResourceGroup();
case _ =>
throw new UnsupportedOperationException("Only supported for operators.")
}
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index d14787c428ff2..699c38da57aee 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -100,7 +100,7 @@ object StreamCrossOperator {
override def every(length: Long): CrossWindow[I1, I2] = {
val graph = javaStream.getExecutionEnvironment().getStreamGraph()
- val operator = graph.getVertex(javaStream.getId()).getOperator()
+ val operator = graph.getStreamNode(javaStream.getId()).getOperator()
operator.asInstanceOf[CoStreamWindow[_,_,_]].setSlideSize(length)
this
}
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index c7716ed0c86c9..ce5f91b846c9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -122,6 +122,17 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.enableCheckpointing()
this
}
+
+ /**
+ * Disables operator chaining for streaming operators. Operator chaining
+ * allows non-shuffle operations to be co-located in the same thread fully
+ * avoiding serialization de-serialization.
+ *
+ */
+ def disableOperatorChaning(): StreamExecutionEnvironment = {
+ javaEnv.disableOperatorChaning()
+ this
+ }
/**
* Sets the number of times that failed tasks are re-executed. A value of zero