Skip to content

Commit

Permalink
[FLINK-3315] Fix Slot Sharing in Streaming API
Browse files Browse the repository at this point in the history
This changes slot sharing settings to single method
slotSharingGroup(String) on DataStream.

Operations inherit the slot sharing group of the input if all input
operations are in the same slot sharing group.

The default slot sharing group is "default" this can also be explicitly
set using slotSharingGroup("default"). This overrides the inheriting
behaviour.

This closes #1641.
  • Loading branch information
aljoscha authored and tillrohrmann committed Feb 23, 2016
1 parent dc4d147 commit c40cfd3
Show file tree
Hide file tree
Showing 13 changed files with 454 additions and 187 deletions.
Expand Up @@ -103,4 +103,23 @@ public DataStreamSink<T> disableChaining() {
this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
return this;
}

/**
* Sets the slot sharing group of this operation. Parallel instances of
* operations that are in the same slot sharing group will be co-located in the same
* TaskManager slot, if possible.
*
* <p>Operations inherit the slot sharing group of input operations if all input operations
* are in the same slot sharing group and no slot sharing group was explicitly specified.
*
* <p>Initially an operation is in the default slot sharing group. An operation can be put into
* the default group explicitly by setting the slot sharing group to {@code "default"}.
*
* @param slotSharingGroup The slot sharing group name.
*/
@PublicEvolving
public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}
}
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.api.java.typeutils.TypeInfoParser;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph.ResourceStrategy;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.transformations.StreamTransformation;
Expand Down Expand Up @@ -313,40 +312,21 @@ protected DataStream<T> setConnectionType(StreamPartitioner<T> partitioner) {
}

/**
* By default all operators in a streaming job share the same resource
* group. Each resource group takes as many task manager slots as the
* maximum parallelism operator in that group. Task chaining is only
* possible within one resource group. By calling this method, this
* operators starts a new resource group and all subsequent operators will
* be added to this group unless specified otherwise.
* <p> Please note that
* local executions have by default as many available task slots as the
* environment parallelism, so in order to start a new resource group the
* degree of parallelism for the operators must be decreased from the
* default.
*
* @return The operator as a part of a new resource group.
*/
@PublicEvolving
public SingleOutputStreamOperator<T, O> startNewResourceGroup() {
transformation.setResourceStrategy(ResourceStrategy.NEWGROUP);
return this;
}

/**
* Isolates the operator in its own resource group. This will cause the
* operator to grab as many task slots as its degree of parallelism. If
* there are no free resources available, the job will fail to start. It
* also disables chaining for this operator.
* <p>All subsequent operators are
* assigned to the default resource group.
*
* @return The operator with isolated resource group.
* Sets the slot sharing group of this operation. Parallel instances of
* operations that are in the same slot sharing group will be co-located in the same
* TaskManager slot, if possible.
*
* <p>Operations inherit the slot sharing group of input operations if all input operations
* are in the same slot sharing group and no slot sharing group was explicitly specified.
*
* <p>Initially an operation is in the default slot sharing group. An operation can be put into
* the default group explicitly by setting the slot sharing group to {@code "default"}.
*
* @param slotSharingGroup The slot sharing group name.
*/
@PublicEvolving
public SingleOutputStreamOperator<T, O> isolateResources() {
transformation.setResourceStrategy(ResourceStrategy.ISOLATE);
public SingleOutputStreamOperator<T, O> slotSharingGroup(String slotSharingGroup) {
transformation.setSlotSharingGroup(slotSharingGroup);
return this;
}

}
Expand Up @@ -154,31 +154,40 @@ public boolean isIterative() {
return!vertexIDtoLoopTimeout.isEmpty();
}

public <IN, OUT> void addSource(Integer vertexID, StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
public <IN, OUT> void addSource(Integer vertexID,
String slotSharingGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
sources.add(vertexID);
}

public <IN, OUT> void addSink(Integer vertexID, StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo, TypeInformation<OUT> outTypeInfo, String operatorName) {
addOperator(vertexID, operatorObject, inTypeInfo, outTypeInfo, operatorName);
public <IN, OUT> void addSink(Integer vertexID,
String slotSharingGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {
addOperator(vertexID, slotSharingGroup, operatorObject, inTypeInfo, outTypeInfo, operatorName);
sinks.add(vertexID);
}

public <IN, OUT> void addOperator(
Integer vertexID,
String slotSharingGroup,
StreamOperator<OUT> operatorObject,
TypeInformation<IN> inTypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {

if (operatorObject instanceof StoppableStreamSource) {
addNode(vertexID, StoppableSourceStreamTask.class, operatorObject, operatorName);
addNode(vertexID, slotSharingGroup, StoppableSourceStreamTask.class, operatorObject, operatorName);
} else if (operatorObject instanceof StreamSource) {
addNode(vertexID, SourceStreamTask.class, operatorObject, operatorName);
addNode(vertexID, slotSharingGroup, SourceStreamTask.class, operatorObject, operatorName);
} else {
addNode(vertexID, OneInputStreamTask.class, operatorObject, operatorName);
addNode(vertexID, slotSharingGroup, OneInputStreamTask.class, operatorObject, operatorName);
}

TypeSerializer<IN> inSerializer = inTypeInfo != null && !(inTypeInfo instanceof MissingTypeInfo) ? inTypeInfo.createSerializer(executionConfig) : null;
Expand Down Expand Up @@ -206,13 +215,14 @@ public <IN, OUT> void addOperator(

public <IN1, IN2, OUT> void addCoOperator(
Integer vertexID,
String slotSharingGroup,
TwoInputStreamOperator<IN1, IN2, OUT> taskOperatorObject,
TypeInformation<IN1> in1TypeInfo,
TypeInformation<IN2> in2TypeInfo,
TypeInformation<OUT> outTypeInfo,
String operatorName) {

addNode(vertexID, TwoInputStreamTask.class, taskOperatorObject, operatorName);
addNode(vertexID, slotSharingGroup, TwoInputStreamTask.class, taskOperatorObject, operatorName);

TypeSerializer<OUT> outSerializer = (outTypeInfo != null) && !(outTypeInfo instanceof MissingTypeInfo) ?
outTypeInfo.createSerializer(executionConfig) : null;
Expand All @@ -231,15 +241,23 @@ public <IN1, IN2, OUT> void addCoOperator(
}
}

protected StreamNode addNode(Integer vertexID, Class<? extends AbstractInvokable> vertexClass,
StreamOperator<?> operatorObject, String operatorName) {
protected StreamNode addNode(Integer vertexID,
String slotSharingGroup,
Class<? extends AbstractInvokable> vertexClass,
StreamOperator<?> operatorObject,
String operatorName) {

if (streamNodes.containsKey(vertexID)) {
throw new RuntimeException("Duplicate vertexID " + vertexID);
}

StreamNode vertex = new StreamNode(environemnt, vertexID, operatorObject, operatorName,
new ArrayList<OutputSelector<?>>(), vertexClass);
StreamNode vertex = new StreamNode(environemnt,
vertexID,
slotSharingGroup,
operatorObject,
operatorName,
new ArrayList<OutputSelector<?>>(),
vertexClass);

streamNodes.put(vertexID, vertex);

Expand Down Expand Up @@ -288,6 +306,22 @@ public void addVirtualPartitionNode(Integer originalId, Integer virtualId, Strea
new Tuple2<Integer, StreamPartitioner<?>>(originalId, partitioner));
}

/**
* Determines the slot sharing group of an operation across virtual nodes.
*/
public String getSlotSharingGroup(Integer id) {
if (virtualSelectNodes.containsKey(id)) {
Integer mappedId = virtualSelectNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
} else if (virtuaPartitionNodes.containsKey(id)) {
Integer mappedId = virtuaPartitionNodes.get(id).f0;
return getSlotSharingGroup(mappedId);
} else {
StreamNode node = getStreamNode(id);
return node.getSlotSharingGroup();
}
}

public void addEdge(Integer upStreamVertexID, Integer downStreamVertexID, int typeNumber) {
addEdgeInternal(upStreamVertexID,
downStreamVertexID,
Expand Down Expand Up @@ -414,24 +448,6 @@ public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
getStreamNode(vertexID).setInputFormat(inputFormat);
}

public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
StreamNode node = getStreamNode(vertexID);
if (node == null) {
return;
}

switch (strategy) {
case ISOLATE:
node.isolateSlot();
break;
case NEWGROUP:
node.startNewSlotSharingGroup();
break;
default:
throw new IllegalArgumentException("Unknown resource strategy");
}
}

void setTransformationId(Integer nodeId, String transformationId) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
Expand Down Expand Up @@ -495,23 +511,23 @@ public long getLoopTimeout(Integer vertexID) {

public Tuple2<StreamNode, StreamNode> createIterationSourceAndSink(int loopId, int sourceId, int sinkId, long timeout, int parallelism) {
StreamNode source = this.addNode(sourceId,
StreamIterationHead.class,
null,
null);
null,
StreamIterationHead.class,
null,
"IterationSource-" + loopId);
sources.add(source.getId());
setParallelism(source.getId(), parallelism);

StreamNode sink = this.addNode(sinkId,
StreamIterationTail.class,
null,
null);
null,
StreamIterationTail.class,
null,
"IterationSink-" + loopId);
sinks.add(sink.getId());
setParallelism(sink.getId(), parallelism);

iterationSourceSinkPairs.add(new Tuple2<>(source, sink));

source.setOperatorName("IterationSource-" + loopId);
sink.setOperatorName("IterationSink-" + loopId);
this.vertexIDtoBrokerID.put(source.getId(), "broker-" + loopId);
this.vertexIDtoBrokerID.put(sink.getId(), "broker-" + loopId);
this.vertexIDtoLoopTimeout.put(source.getId(), timeout);
Expand Down

0 comments on commit c40cfd3

Please sign in to comment.