Skip to content

Commit

Permalink
[FLINK-1986] [streaming] Iterative stream creation bugfix
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora committed May 19, 2015
1 parent 6edf31a commit 2d3e69a
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 40 deletions.
Expand Up @@ -113,6 +113,9 @@ public class DataStream<OUT> {
@SuppressWarnings("rawtypes")
protected TypeInformation typeInfo;
protected List<DataStream<OUT>> mergedStreams;

protected Integer iterationID = null;
protected Long iterationWaitTime = null;

protected final StreamGraph streamGraph;
private boolean typeUsed;
Expand Down Expand Up @@ -160,6 +163,8 @@ public DataStream(DataStream<OUT> dataStream) {
this.partitioner = dataStream.partitioner.copy();
this.streamGraph = dataStream.streamGraph;
this.typeInfo = dataStream.typeInfo;
this.iterationID = dataStream.iterationID;
this.iterationWaitTime = dataStream.iterationWaitTime;
this.mergedStreams = new ArrayList<DataStream<OUT>>();
this.mergedStreams.add(this);
if (dataStream.mergedStreams.size() > 1) {
Expand Down Expand Up @@ -1224,9 +1229,20 @@ private DataStreamSink<OUT> writeToFile(OutputFormat<OUT> format, long millis) {
operatorName);

connectGraph(inputStream, returnStream.getId(), 0);

if (iterationID != null) {
//This data stream is an input to some iteration
addIterationSource(returnStream);
}

return returnStream;
}

private <X> void addIterationSource(DataStream<X> dataStream) {
Integer id = ++counter;
streamGraph.addIterationHead(id, dataStream.getId(), iterationID, iterationWaitTime);
streamGraph.setParallelism(id, dataStream.getParallelism());
}

/**
* Internal function for setting the partitioner for the DataStream
Expand Down
Expand Up @@ -17,9 +17,6 @@

package org.apache.flink.streaming.api.datastream;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;

/**
* The iterative data stream represents the start of an iteration in a
* {@link DataStream}.
Expand All @@ -31,21 +28,13 @@ public class IterativeDataStream<IN> extends
SingleOutputStreamOperator<IN, IterativeDataStream<IN>> {

static Integer iterationCount = 0;
protected Integer iterationID;
protected long waitTime;


protected IterativeDataStream(DataStream<IN> dataStream, long maxWaitTime) {
super(dataStream);
setBufferTimeout(dataStream.environment.getBufferTimeout());
iterationID = iterationCount;
iterationCount++;
waitTime = maxWaitTime;
}

protected IterativeDataStream(DataStream<IN> dataStream, Integer iterationID, long waitTime) {
super(dataStream);
this.iterationID = iterationID;
this.waitTime = waitTime;
iterationWaitTime = maxWaitTime;
}

/**
Expand All @@ -70,35 +59,9 @@ public DataStream<IN> closeWith(DataStream<IN> iterationTail) {
// We add an iteration sink to the tail which will send tuples to the
// iteration head
streamGraph.addIterationTail(iterationSink.getId(), iterationTail.getId(), iterationID,
waitTime);
iterationWaitTime);

connectGraph(iterationTail.forward(), iterationSink.getId(), 0);
return iterationTail;
}

@Override
public <R> SingleOutputStreamOperator<R, ?> transform(String operatorName,
TypeInformation<R> outTypeInfo, OneInputStreamOperator<IN, R> operator) {

// We call the superclass tranform method
SingleOutputStreamOperator<R, ?> returnStream = super.transform(operatorName, outTypeInfo,
operator);

// Then we add a source that will take care of receiving feedback tuples
// from the tail
addIterationSource(returnStream);

return returnStream;
}

private <X> void addIterationSource(DataStream<X> dataStream) {
Integer id = ++counter;
streamGraph.addIterationHead(id, dataStream.getId(), iterationID, waitTime);
streamGraph.setParallelism(id, dataStream.getParallelism());
}

@Override
public IterativeDataStream<IN> copy() {
return new IterativeDataStream<IN>(this, iterationID, waitTime);
}
}

0 comments on commit 2d3e69a

Please sign in to comment.