Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion docs/apis/streaming_guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1123,7 +1123,9 @@ output.map(…).project(…);
In this case, all values passing the `isFeedback` filter will be fed back to the iteration head, and the values passing the `isOutput` filter will produce the output of the iteration that can be transformed further (here with a `map` and a `projection`) outside the iteration.

Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. As a consequence iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances, a method is provided to allow iterative programs to shut down automatically if no input is received by the iteration head for a predefined number of milliseconds.
To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time.
To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time.

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `closeWith` method.
</div>
<div data-lang="scala" markdown="1">
The Flink Streaming API supports implementing iterative stream processing dataflows similarly to the batch Flink API. Iterative streaming programs also implement a step function and embed it into an `IterativeDataStream`.
Expand All @@ -1143,6 +1145,8 @@ val iteratedStream = someDataStream.iterate(maxWaitTime) {

Because iterative streaming programs do not have a set number of iterations for each data element, the streaming program has no information on the end of its input. As a consequence iterative streaming programs run until the user manually stops the program. While this is acceptable under normal circumstances a method is provided to allow iterative programs to shut down automatically if no input received by the iteration head for a predefined number of milliseconds.
To use this functionality the user needs to add the maxWaitTimeMillis parameter to the `dataStream.iterate(…)` call to control the max wait time.

By default the partitioning of the feedback stream will be automatically set to be the same as the input of the iteration head. To override this the user can set an optional boolean flag in the `iterate` method.
</div>

</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,18 @@ protected IterativeDataStream(DataStream<IN> dataStream, long maxWaitTime) {
* for more information.
*
*
*
*
* @param iterationTail
* The data stream that is fed back to the next iteration head.
* @param keepPartitioning
* If true the feedback partitioning will be kept as it is (not
* changed to match the input of the iteration head)
* @return Returns the stream that was fed back to the iteration. In most
* cases no further transformation are applied on this stream.
*
*/
public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
public DataStream<IN> closeWith(DataStream<IN> iterationTail, boolean keepPartitioning) {
DataStream<IN> iterationSink = new DataStreamSink<IN>(environment, "Iteration Sink", null,
null);

Expand All @@ -61,7 +66,30 @@ public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
iterationWaitTime);

connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
if (keepPartitioning) {
connectGraph(iterationTail, iterationSink.getId(), 0);
} else {
connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
}
return iterationTail;
}

/**
* Closes the iteration. This method defines the end of the iterative
* program part that will be fed back to the start of the iteration. </br>
* </br>A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
* {@link DataStream#split(org.apache.flink.streaming.api.collector.selector.OutputSelector)}
* for more information.
*
*
* @param iterationTail
* The data stream that is fed back to the next iteration head.
* @return Returns the stream that was fed back to the iteration. In most
* cases no further transformation are applied on this stream.
*
*/
public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
return closeWith(iterationTail,false);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
Expand Down Expand Up @@ -189,23 +190,22 @@ public <IN1, IN2, OUT> void addCoOperator(Integer vertexID,
}
}

@SuppressWarnings("rawtypes")
public void addIterationHead(Integer sourceID, Integer iterationHead, Integer iterationID,
long timeOut) {

addNode(sourceID, StreamIterationHead.class, null, null);
StreamNode itSource = addNode(sourceID, StreamIterationHead.class, null, null);

StreamLoop iteration = new StreamLoop(iterationID, getStreamNode(sourceID), timeOut);
streamLoops.put(iterationID, iteration);
vertexIDtoLoop.put(sourceID, iteration);

setSerializersFrom(iterationHead, sourceID);
getStreamNode(sourceID).setOperatorName("IterationHead-" + iterationHead);

int outpartitionerIndex = getStreamNode(iterationHead).getInEdgeIndices().get(0);
StreamPartitioner<?> outputPartitioner = getStreamNode(outpartitionerIndex).getOutEdges()
.get(0).getPartitioner();
itSource.setOperatorName("IterationSource-" + sourceID);
itSource.setParallelism(getStreamNode(iterationHead).getParallelism());


addEdge(sourceID, iterationHead, outputPartitioner, 0, new ArrayList<String>());
addEdge(sourceID, iterationHead, new RebalancePartitioner(true), 0, new ArrayList<String>());

if (LOG.isDebugEnabled()) {
LOG.debug("ITERATION SOURCE: {}", sourceID);
Expand All @@ -221,19 +221,18 @@ public void addIterationTail(Integer sinkID, Integer iterationTail, Integer iter
throw new RuntimeException("Buffer timeout 0 at iteration tail is not supported.");
}

addNode(sinkID, StreamIterationTail.class, null, null).setParallelism(
getStreamNode(iterationTail).getParallelism());
StreamNode itSink = addNode(sinkID, StreamIterationTail.class, null, null);

StreamLoop iteration = streamLoops.get(iterationID);
iteration.setSink(getStreamNode(sinkID));
vertexIDtoLoop.put(sinkID, iteration);

itSink.setParallelism(iteration.getSource().getParallelism());

setSerializersFrom(iterationTail, sinkID);
getStreamNode(sinkID).setOperatorName("IterationTail-" + iterationTail);
getStreamNode(sinkID).setOperatorName("IterationSink-" + sinkID);

iteration.getSource().setParallelism(iteration.getSink().getParallelism());
setBufferTimeout(iteration.getSource().getId(), getStreamNode(iterationTail)
.getBufferTimeout());
setBufferTimeout(iteration.getSource().getId(), getStreamNode(iterationTail).getBufferTimeout());

if (LOG.isDebugEnabled()) {
LOG.debug("ITERATION SINK: {}", sinkID);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,12 @@
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraph.StreamLoop;
import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
Expand All @@ -41,7 +47,7 @@ public class IterateTest {
private static boolean iterated[];
private static int PARALLELISM = 2;

public static final class IterationHead extends RichFlatMapFunction<Boolean,Boolean> {
public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {

private static final long serialVersionUID = 1L;

Expand All @@ -58,7 +64,7 @@ public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {

}

public static final class IterationTail extends RichFlatMapFunction<Boolean,Boolean> {
public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> {

private static final long serialVersionUID = 1L;

Expand All @@ -78,19 +84,19 @@ public static final class MySink implements SinkFunction<Boolean> {
public void invoke(Boolean tuple) {
}
}
public static final class NoOpMap implements MapFunction<Boolean, Boolean>{

public static final class NoOpMap implements MapFunction<Boolean, Boolean> {

private static final long serialVersionUID = 1L;

@Override
public Boolean map(Boolean value) throws Exception {
return value;
}

}

public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env){
public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env) {
env.setBufferTimeout(10);

DataStream<Boolean> source = env.fromCollection(Collections.nCopies(PARALLELISM, false));
Expand All @@ -103,7 +109,7 @@ public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironme
iteration.closeWith(increment).addSink(new MySink());
return env;
}

@Test
public void testColocation() throws Exception {
StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
Expand All @@ -123,9 +129,9 @@ public void testColocation() throws Exception {
AbstractJobVertex tailOp = null;

for (AbstractJobVertex vertex : graph.getVertices()) {
if (vertex.getName().contains("IterationHead")) {
if (vertex.getName().contains("IterationSource")) {
itSource = vertex;
} else if (vertex.getName().contains("IterationTail")) {
} else if (vertex.getName().contains("IterationSink")) {
itSink = vertex;
} else if (vertex.getName().contains("HeadOperator")) {
headOp = vertex;
Expand All @@ -141,6 +147,36 @@ public void testColocation() throws Exception {
assertEquals(itSource.getParallelism(), itSink.getParallelism());
}

@SuppressWarnings("unchecked")
@Test
public void testPartitioning() throws Exception {
StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);

IterativeDataStream<Boolean> it = env.fromElements(true).iterate();

IterativeDataStream<Boolean> it2 = env.fromElements(true).iterate();

DataStream<Boolean> head = it.map(new NoOpMap()).name("Head1").broadcast();
DataStream<Boolean> head2 = it2.map(new NoOpMap()).name("Head2").broadcast();

it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), true);
it2.closeWith(head2, false);

System.out.println(env.getExecutionPlan());
StreamGraph graph = env.getStreamGraph();

for (StreamLoop loop : graph.getStreamLoops()) {
StreamEdge tailToSink = loop.getSink().getInEdges().get(0);
if (tailToSink.getSourceVertex().getOperatorName().contains("Head1")) {
assertTrue(tailToSink.getPartitioner() instanceof BroadcastPartitioner);
assertTrue(loop.getSink().getInEdges().get(1).getPartitioner() instanceof ShufflePartitioner);
} else {
assertTrue(tailToSink.getPartitioner() instanceof RebalancePartitioner);
}
}

}

@Test
public void test() throws Exception {
StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the iteration head.
* If no data received in the set time the stream terminates.
*
* <p>
* By default the feedback partitioning is set to match the input, to override this set
* the keepPartitioning flag to true
*
*/
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R] = {
def iterate[R](stepFunction: DataStream[T] => (DataStream[T], DataStream[R])): DataStream[R] =
iterate(0)(stepFunction)
}


/**
* Initiates an iterative part of the program that creates a loop by feeding
Expand All @@ -339,15 +341,18 @@ class DataStream[T](javaStream: JavaStream[T]) {
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the iteration head.
* If no data received in the set time the stream terminates.
*
* <p>
* By default the feedback partitioning is set to match the input, to override this set
* the keepPartitioning flag to true
*
*/
def iterate[R](maxWaitTimeMillis:Long = 0)
(stepFunction: DataStream[T] => (DataStream[T], DataStream[R])) : DataStream[R] = {
(stepFunction: DataStream[T] => (DataStream[T], DataStream[R]),
keepPartitioning: Boolean = false) : DataStream[R] = {
val iterativeStream = javaStream.iterate(maxWaitTimeMillis)

val (feedback, output) = stepFunction(new DataStream[T](iterativeStream))
iterativeStream.closeWith(feedback.getJavaStream)
iterativeStream.closeWith(feedback.getJavaStream, keepPartitioning)
output
}

Expand Down