Skip to content

Commit

Permalink
[FLINK-1948] [streaming] Manual task slot sharing settings added for …
Browse files Browse the repository at this point in the history
…stream operators

Closes #634
  • Loading branch information
gyfora authored and mbalassi committed Apr 29, 2015
1 parent 7f0ce14 commit ca82b0c
Show file tree
Hide file tree
Showing 15 changed files with 327 additions and 58 deletions.
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator.ChainingStrategy;

Expand Down Expand Up @@ -57,8 +58,7 @@ protected SingleOutputStreamOperator(DataStream<OUT> dataStream) {
}

/**
* Sets the parallelism for this operator. The degree must be 1 or
* more.
* Sets the parallelism for this operator. The degree must be 1 or more.
*
* @param parallelism
* The parallelism for this operator.
Expand Down Expand Up @@ -118,11 +118,43 @@ public SingleOutputStreamOperator<OUT, O> copy() {
return new SingleOutputStreamOperator<OUT, O>(this);
}

public SingleOutputStreamOperator<OUT, O> setChainingStrategy(ChainingStrategy strategy) {
/**
* Sets the {@link ChainingStrategy} for the given operator affecting the
* way operators will possibly be co-located on the same thread for
* increased performance.
*
* @param strategy
* The selected {@link ChainingStrategy}
* @return The operator with the modified chaining strategy
*/
private SingleOutputStreamOperator<OUT, O> setChainingStrategy(ChainingStrategy strategy) {
this.operator.setChainingStrategy(strategy);
return this;
}


/**
* Turns off chaining for this operator so thread co-location will not be
* used as an optimization. </p> Chaining can be turned off for the whole
* job by {@link StreamExecutionEnvironment#disableOperatorChaning()}
* however it is not advised for performance considerations.
*
* @return The operator with chaining disabled
*/
public SingleOutputStreamOperator<OUT, O> disableChaining() {
return setChainingStrategy(ChainingStrategy.NEVER);
}

/**
* Starts a new task chain beginning at this operator. This operator will
* not be chained (thread co-located for increased performance) to any
* previous tasks even if possible.
*
* @return The operator with chaining set.
*/
public SingleOutputStreamOperator<OUT, O> startNewChain() {
return setChainingStrategy(ChainingStrategy.HEAD);
}

/**
* Adds a type information hint about the return type of this operator.
*
Expand Down Expand Up @@ -237,4 +269,37 @@ public O returns(Class<OUT> typeClass) {
}
}

/**
* By default all operators in a streaming job share the same resource
* group. Each resource group takes as many task manager slots as the
* maximum parallelism operator in that group. Task chaining is only
* possible within one resource group. By calling this method, this
* operators starts a new resource group and all subsequent operators will
* be added to this group unless specified otherwise. </p> Please note that
* local executions have by default as many available task slots as the
* environment parallelism, so in order to start a new resource group the
* degree of parallelism for the operators must be decreased from the
* default.
*
* @return The operator as a part of a new resource group.
*/
public SingleOutputStreamOperator<OUT, O> startNewResourceGroup() {
streamGraph.setResourceStrategy(getId(), ResourceStrategy.NEWGROUP);
return this;
}

/**
* Isolates the operator in its own resource group. This will cause the
* operator to grab as many task slots as its degree of parallelism. If
* there are no free resources available, the job will fail to start. It
* also disables chaining for this operator </p>All subsequent operators are
* assigned to the default resource group.
*
* @return The operator with isolated resource group.
*/
public SingleOutputStreamOperator<OUT, O> isolateResources() {
streamGraph.setResourceStrategy(getId(), ResourceStrategy.ISOLATE);
return this;
}

}
Expand Up @@ -76,7 +76,7 @@ public CrossWindow<I1, I2> every(long length, TimeUnit timeUnit) {

@SuppressWarnings("unchecked")
public CrossWindow<I1, I2> every(long length) {
((CoStreamWindow<I1, I2, ?>) streamGraph.getVertex(id).getOperator())
((CoStreamWindow<I1, I2, ?>) streamGraph.getStreamNode(id).getOperator())
.setSlideSize(length);
return this;
}
Expand Down
Expand Up @@ -189,6 +189,18 @@ public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
return this;
}

/**
* Disables operator chaining for streaming operators. Operator chaining
* allows non-shuffle operations to be co-located in the same thread fully
* avoiding serialization and de-serialization.
*
* @return StreamExecutionEnvironment with chaining disabled.
*/
public StreamExecutionEnvironment disableOperatorChaning() {
streamGraph.setChaining(false);
return this;
}

/**
* Method for enabling fault-tolerance. Activates monitoring and backup of
* streaming operator states.
Expand Down
Expand Up @@ -60,7 +60,7 @@ private void visit(JSONArray jsonArray, List<Integer> toVisit,
Map<Integer, Integer> edgeRemapings) throws JSONException {

Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getVertex(vertexID);
StreamNode vertex = streamGraph.getStreamNode(vertexID);

if (streamGraph.getSourceIDs().contains(vertexID)
|| Collections.disjoint(vertex.getInEdges(), toVisit)) {
Expand Down Expand Up @@ -97,7 +97,7 @@ private void visit(JSONArray jsonArray, List<Integer> toVisit,
obj.put(STEPS, iterationSteps);
obj.put(ID, iterationHead);
obj.put(PACT, "IterativeDataStream");
obj.put(PARALLELISM, streamGraph.getVertex(iterationHead).getParallelism());
obj.put(PARALLELISM, streamGraph.getStreamNode(iterationHead).getParallelism());
obj.put(CONTENTS, "Stream Iteration");
JSONArray iterationInputs = new JSONArray();
obj.put(PREDECESSORS, iterationInputs);
Expand All @@ -115,7 +115,7 @@ private void visitIteration(JSONArray jsonArray, List<Integer> toVisit, int head
Map<Integer, Integer> edgeRemapings, JSONArray iterationInEdges) throws JSONException {

Integer vertexID = toVisit.get(0);
StreamNode vertex = streamGraph.getVertex(vertexID);
StreamNode vertex = streamGraph.getStreamNode(vertexID);
toVisit.remove(vertexID);

// Ignoring head and tail to avoid redundancy
Expand Down Expand Up @@ -154,7 +154,7 @@ private void decorateEdge(JSONArray inputArray, int vertexID, int mappedInputID,

private void decorateNode(Integer vertexID, JSONObject node) throws JSONException {

StreamNode vertex = streamGraph.getVertex(vertexID);
StreamNode vertex = streamGraph.getStreamNode(vertexID);

node.put(ID, vertexID);
node.put(TYPE, vertex.getOperatorName());
Expand All @@ -165,7 +165,7 @@ private void decorateNode(Integer vertexID, JSONObject node) throws JSONExceptio
node.put(PACT, "Data Stream");
}

StreamOperator<?, ?> operator = streamGraph.getVertex(vertexID).getOperator();
StreamOperator<?, ?> operator = streamGraph.getStreamNode(vertexID).getOperator();

if (operator != null && operator.getUserFunction() != null) {
node.put(CONTENTS, vertex.getOperatorName() + " at "
Expand All @@ -174,7 +174,7 @@ private void decorateNode(Integer vertexID, JSONObject node) throws JSONExceptio
node.put(CONTENTS, vertex.getOperatorName());
}

node.put(PARALLELISM, streamGraph.getVertex(vertexID).getParallelism());
node.put(PARALLELISM, streamGraph.getStreamNode(vertexID).getParallelism());
}

}
Expand Up @@ -173,15 +173,15 @@ public void addIterationHead(Integer vertexID, Integer iterationHead, Integer it

chaining = false;

StreamLoop iteration = new StreamLoop(iterationID, getVertex(iterationHead), timeOut);
StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(iterationHead), timeOut);
streamLoops.put(iterationID, iteration);
vertexIDtoLoop.put(vertexID, iteration);

setSerializersFrom(iterationHead, vertexID);
getVertex(vertexID).setOperatorName("IterationHead-" + iterationHead);
getStreamNode(vertexID).setOperatorName("IterationHead-" + iterationHead);

int outpartitionerIndex = getVertex(iterationHead).getInEdgeIndices().get(0);
StreamPartitioner<?> outputPartitioner = getVertex(outpartitionerIndex).getOutEdges()
int outpartitionerIndex = getStreamNode(iterationHead).getInEdgeIndices().get(0);
StreamPartitioner<?> outputPartitioner = getStreamNode(outpartitionerIndex).getOutEdges()
.get(0).getPartitioner();

addEdge(vertexID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
Expand All @@ -196,22 +196,23 @@ public void addIterationHead(Integer vertexID, Integer iterationHead, Integer it
public void addIterationTail(Integer vertexID, Integer iterationTail, Integer iterationID,
long waitTime) {

if (getVertex(iterationTail).getBufferTimeout() == 0) {
if (getStreamNode(iterationTail).getBufferTimeout() == 0) {
throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
}

addNode(vertexID, StreamIterationTail.class, null, null).setParallelism(
getVertex(iterationTail).getParallelism());
getStreamNode(iterationTail).getParallelism());

StreamLoop iteration = streamLoops.get(iterationID);
iteration.setTail(getVertex(iterationTail));
iteration.setTail(getStreamNode(iterationTail));
vertexIDtoLoop.put(vertexID, iteration);

setSerializersFrom(iterationTail, vertexID);
getVertex(vertexID).setOperatorName("IterationTail-" + iterationTail);
getStreamNode(vertexID).setOperatorName("IterationTail-" + iterationTail);

setParallelism(iteration.getHead().getID(), getVertex(iterationTail).getParallelism());
setBufferTimeout(iteration.getHead().getID(), getVertex(iterationTail).getBufferTimeout());
setParallelism(iteration.getHead().getID(), getStreamNode(iterationTail).getParallelism());
setBufferTimeout(iteration.getHead().getID(), getStreamNode(iterationTail)
.getBufferTimeout());

if (LOG.isDebugEnabled()) {
LOG.debug("ITERATION SINK: {}", vertexID);
Expand All @@ -233,14 +234,14 @@ protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable
public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID,
StreamPartitioner<?> partitionerObject, int typeNumber, List<String> outputNames) {

StreamEdge edge = new StreamEdge(getVertex(upStreamVertexID),
getVertex(downStreamVertexID), typeNumber, outputNames, partitionerObject);
getVertex(edge.getSourceID()).addOutEdge(edge);
getVertex(edge.getTargetID()).addInEdge(edge);
StreamEdge edge = new StreamEdge(getStreamNode(upStreamVertexID),
getStreamNode(downStreamVertexID), typeNumber, outputNames, partitionerObject);
getStreamNode(edge.getSourceID()).addOutEdge(edge);
getStreamNode(edge.getTargetID()).addInEdge(edge);
}

public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSelector) {
getVertex(vertexID).addOutputSelector(outputSelector);
getStreamNode(vertexID).addOutputSelector(outputSelector);

if (LOG.isDebugEnabled()) {
LOG.debug("Outputselector set for {}", vertexID);
Expand All @@ -249,24 +250,24 @@ public <T> void addOutputSelector(Integer vertexID, OutputSelector<T> outputSele
}

public void setParallelism(Integer vertexID, int parallelism) {
getVertex(vertexID).setParallelism(parallelism);
getStreamNode(vertexID).setParallelism(parallelism);
}

public void setBufferTimeout(Integer vertexID, long bufferTimeout) {
getVertex(vertexID).setBufferTimeout(bufferTimeout);
getStreamNode(vertexID).setBufferTimeout(bufferTimeout);
}

private void setSerializers(Integer vertexID, StreamRecordSerializer<?> in1,
StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out) {
StreamNode vertex = getVertex(vertexID);
StreamNode vertex = getStreamNode(vertexID);
vertex.setSerializerIn1(in1);
vertex.setSerializerIn2(in2);
vertex.setSerializerOut(out);
}

private void setSerializersFrom(Integer from, Integer to) {
StreamNode fromVertex = getVertex(from);
StreamNode toVertex = getVertex(to);
StreamNode fromVertex = getStreamNode(from);
StreamNode toVertex = getStreamNode(to);

toVertex.setSerializerIn1(fromVertex.getTypeSerializerOut());
toVertex.setSerializerOut(fromVertex.getTypeSerializerIn1());
Expand All @@ -275,18 +276,32 @@ private void setSerializersFrom(Integer from, Integer to) {
public <OUT> void setOutType(Integer vertexID, TypeInformation<OUT> outType) {
StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType,
executionConfig);
getVertex(vertexID).setSerializerOut(serializer);
getStreamNode(vertexID).setSerializerOut(serializer);
}

public <IN, OUT> void setOperator(Integer vertexID, StreamOperator<IN, OUT> operatorObject) {
getVertex(vertexID).setOperator(operatorObject);
getStreamNode(vertexID).setOperator(operatorObject);
}

public void setInputFormat(Integer vertexID, InputFormat<String, ?> inputFormat) {
getVertex(vertexID).setInputFormat(inputFormat);
getStreamNode(vertexID).setInputFormat(inputFormat);
}

public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
StreamNode node = getStreamNode(vertexID);
switch (strategy) {
case ISOLATE:
node.isolateSlot();
break;
case NEWGROUP:
node.startNewSlotSharingGroup();
break;
default:
throw new IllegalArgumentException("Unknown resource strategy");
}
}

public StreamNode getVertex(Integer vertexID) {
public StreamNode getStreamNode(Integer vertexID) {
return streamNodes.get(vertexID);
}

Expand All @@ -295,7 +310,7 @@ protected Collection<? extends Integer> getVertexIDs() {
}

protected StreamEdge getEdge(int sourceId, int targetId) {
Iterator<StreamEdge> outIterator = getVertex(sourceId).getOutEdges().iterator();
Iterator<StreamEdge> outIterator = getStreamNode(sourceId).getOutEdges().iterator();
while (outIterator.hasNext()) {
StreamEdge edge = outIterator.next();

Expand All @@ -311,6 +326,10 @@ public Collection<Integer> getSourceIDs() {
return sources;
}

public Collection<StreamNode> getStreamNodes() {
return streamNodes.values();
}

public Set<Tuple2<Integer, StreamOperator<?, ?>>> getOperators() {
Set<Tuple2<Integer, StreamOperator<?, ?>>> operatorSet = new HashSet<Tuple2<Integer, StreamOperator<?, ?>>>();
for (StreamNode vertex : streamNodes.values()) {
Expand Down Expand Up @@ -414,6 +433,10 @@ public void dumpStreamingPlanAsJSON(File file) throws IOException {
}
}

public static enum ResourceStrategy {
DEFAULT, ISOLATE, NEWGROUP
}

/**
* Object for representing loops in streaming programs.
*
Expand Down
Expand Up @@ -38,13 +38,16 @@
public class StreamNode implements Serializable {

private static final long serialVersionUID = 1L;
private static int currentSlotSharingIndex = 1;

transient private StreamExecutionEnvironment env;

private Integer ID;
private Integer parallelism = null;
private Long bufferTimeout = null;
private String operatorName;
private Integer slotSharingID;
private boolean isolatedSlot = false;

private transient StreamOperator<?, ?> operator;
private List<OutputSelector<?>> outputSelectors;
Expand All @@ -68,6 +71,7 @@ public StreamNode(StreamExecutionEnvironment env, Integer ID, StreamOperator<?,
this.operator = operator;
this.outputSelectors = outputSelector;
this.jobVertexClass = jobVertexClass;
this.slotSharingID = currentSlotSharingIndex;
}

public void addInEdge(StreamEdge inEdge) {
Expand Down Expand Up @@ -198,4 +202,16 @@ public void setInputFormat(InputFormat<String, ?> inputFormat) {
this.inputFormat = inputFormat;
}

public int getSlotSharingID() {
return isolatedSlot ? -1 : slotSharingID;
}

public void startNewSlotSharingGroup() {
this.slotSharingID = ++currentSlotSharingIndex;
}

public void isolateSlot() {
isolatedSlot = true;
}

}

0 comments on commit ca82b0c

Please sign in to comment.