Skip to content

Commit

Permalink
[FLINK-2550] Rename IterativeDataStream to IterativeStream
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Oct 5, 2015
1 parent 0de9d2e commit 7b6e762
Show file tree
Hide file tree
Showing 5 changed files with 49 additions and 49 deletions.
Expand Up @@ -467,22 +467,22 @@ public DataStream<T> global() {
/**
* Initiates an iterative part of the program that feeds back data streams.
* The iterative part needs to be closed by calling
* {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
* this IterativeDataStream will be the iteration head. The data stream
* given to the {@link IterativeDataStream#closeWith(DataStream)} method is
* {@link IterativeStream#closeWith(DataStream)}. The transformation of
* this IterativeStream will be the iteration head. The data stream
* given to the {@link IterativeStream#closeWith(DataStream)} method is
* the data stream that will be fed back and used as the input for the
* iteration head. The user can also use different feedback type than the
* input of the iteration and treat the input and feedback streams as a
* {@link ConnectedStreams} be calling
* {@link IterativeDataStream#withFeedbackType(TypeInformation)}
* {@link IterativeStream#withFeedbackType(TypeInformation)}
* <p>
* A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
* {@link #split(OutputSelector)} for more information.
* <p>
* The iteration edge will be partitioned the same way as the first input of
* the iteration head unless it is changed in the
* {@link IterativeDataStream#closeWith(DataStream)} call.
* {@link IterativeStream#closeWith(DataStream)} call.
* <p>
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the
Expand All @@ -491,29 +491,29 @@ public DataStream<T> global() {
*
* @return The iterative data stream created.
*/
public IterativeDataStream<T> iterate() {
return new IterativeDataStream<T>(this, 0);
public IterativeStream<T> iterate() {
return new IterativeStream<T>(this, 0);
}

/**
* Initiates an iterative part of the program that feeds back data streams.
* The iterative part needs to be closed by calling
* {@link IterativeDataStream#closeWith(DataStream)}. The transformation of
* this IterativeDataStream will be the iteration head. The data stream
* given to the {@link IterativeDataStream#closeWith(DataStream)} method is
* {@link IterativeStream#closeWith(DataStream)}. The transformation of
* this IterativeStream will be the iteration head. The data stream
* given to the {@link IterativeStream#closeWith(DataStream)} method is
* the data stream that will be fed back and used as the input for the
* iteration head. The user can also use different feedback type than the
* input of the iteration and treat the input and feedback streams as a
* {@link ConnectedStreams} be calling
* {@link IterativeDataStream#withFeedbackType(TypeInformation)}
* {@link IterativeStream#withFeedbackType(TypeInformation)}
* <p>
* A common usage pattern for streaming iterations is to use output
* splitting to send a part of the closing data stream to the head. Refer to
* {@link #split(OutputSelector)} for more information.
* <p>
* The iteration edge will be partitioned the same way as the first input of
* the iteration head unless it is changed in the
* {@link IterativeDataStream#closeWith(DataStream)} call.
* {@link IterativeStream#closeWith(DataStream)} call.
* <p>
* By default a DataStream with iteration will never terminate, but the user
* can use the maxWaitTime parameter to set a max waiting time for the
Expand All @@ -526,8 +526,8 @@ public IterativeDataStream<T> iterate() {
*
* @return The iterative data stream created.
*/
public IterativeDataStream<T> iterate(long maxWaitTimeMillis) {
return new IterativeDataStream<T>(this, maxWaitTimeMillis);
public IterativeStream<T> iterate(long maxWaitTimeMillis) {
return new IterativeStream<T>(this, maxWaitTimeMillis);
}

/**
Expand Down
Expand Up @@ -32,13 +32,13 @@
*
* @param <T> Type of the elements in this Stream
*/
public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, IterativeDataStream<T>> {
public class IterativeStream<T> extends SingleOutputStreamOperator<T, IterativeStream<T>> {

// We store these so that we can create a co-iteration if we need to
private DataStream<T> originalInput;
private long maxWaitTime;

protected IterativeDataStream(DataStream<T> dataStream, long maxWaitTime) {
protected IterativeStream(DataStream<T> dataStream, long maxWaitTime) {
super(dataStream.getExecutionEnvironment(),
new FeedbackTransformation<T>(dataStream.getTransformation(), maxWaitTime));
this.originalInput = dataStream;
Expand Down Expand Up @@ -88,9 +88,9 @@ public DataStream<T> closeWith(DataStream<T> feedbackStream) {
*
* @param feedbackTypeString
* String describing the type information of the feedback stream.
* @return A {@link ConnectedIterativeDataStreams}.
* @return A {@link ConnectedIterativeStreams}.
*/
public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(String feedbackTypeString) {
public <F> ConnectedIterativeStreams<T, F> withFeedbackType(String feedbackTypeString) {
return withFeedbackType(TypeInfoParser.<F> parse(feedbackTypeString));
}

Expand All @@ -104,9 +104,9 @@ public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(String feedbackT
*
* @param feedbackTypeClass
* Class of the elements in the feedback stream.
* @return A {@link ConnectedIterativeDataStreams}.
* @return A {@link ConnectedIterativeStreams}.
*/
public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
public <F> ConnectedIterativeStreams<T, F> withFeedbackType(Class<F> feedbackTypeClass) {
return withFeedbackType(TypeExtractor.getForClass(feedbackTypeClass));
}

Expand All @@ -120,14 +120,14 @@ public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(Class<F> feedbac
*
* @param feedbackType
* The type information of the feedback stream.
* @return A {@link ConnectedIterativeDataStreams}.
* @return A {@link ConnectedIterativeStreams}.
*/
public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
return new ConnectedIterativeDataStreams<T, F>(originalInput, feedbackType, maxWaitTime);
public <F> ConnectedIterativeStreams<T, F> withFeedbackType(TypeInformation<F> feedbackType) {
return new ConnectedIterativeStreams<T, F>(originalInput, feedbackType, maxWaitTime);
}

/**
* The {@link ConnectedIterativeDataStreams} represent a start of an
* The {@link ConnectedIterativeStreams} represent a start of an
* iterative part of a streaming program, where the original input of the
* iteration and the feedback of the iteration are connected as in a
* {@link ConnectedStreams}.
Expand All @@ -142,11 +142,11 @@ public <F> ConnectedIterativeDataStreams<T, F> withFeedbackType(TypeInformation<
* @param <F>
* Type of the feedback of the iteration
*/
public static class ConnectedIterativeDataStreams<I, F> extends ConnectedStreams<I, F> {
public static class ConnectedIterativeStreams<I, F> extends ConnectedStreams<I, F> {

private CoFeedbackTransformation<F> coFeedbackTransformation;

public ConnectedIterativeDataStreams(DataStream<I> input,
public ConnectedIterativeStreams(DataStream<I> input,
TypeInformation<F> feedbackType,
long waitTime) {
super(input.getExecutionEnvironment(),
Expand Down
Expand Up @@ -31,8 +31,8 @@
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream.ConnectedIterativeDataStreams;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.IterativeStream.ConnectedIterativeStreams;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -67,7 +67,7 @@ public void testIncorrectParallelism() throws Exception {

DataStream<Integer> source = env.fromElements(1, 10);

IterativeDataStream<Integer> iter1 = source.iterate();
IterativeStream<Integer> iter1 = source.iterate();
SingleOutputStreamOperator<Integer, ?> map1 = iter1.map(NoOpIntMap);
iter1.closeWith(map1).print();
}
Expand All @@ -80,7 +80,7 @@ public void testDoubleClosing() throws Exception {
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);

IterativeDataStream<Integer> iter1 = source.iterate();
IterativeStream<Integer> iter1 = source.iterate();

iter1.closeWith(iter1.map(NoOpIntMap));
iter1.closeWith(iter1.map(NoOpIntMap));
Expand All @@ -96,7 +96,7 @@ public void testDifferingParallelism() throws Exception {
DataStream<Integer> source = env.fromElements(1, 10)
.map(NoOpIntMap);

IterativeDataStream<Integer> iter1 = source.iterate();
IterativeStream<Integer> iter1 = source.iterate();


iter1.closeWith(iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2));
Expand All @@ -112,7 +112,7 @@ public void testCoDifferingParallelism() throws Exception {
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);

ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
Integer.class);


Expand All @@ -131,8 +131,8 @@ public void testClosingFromOutOfLoop() throws Exception {
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);

IterativeDataStream<Integer> iter1 = source.iterate();
IterativeDataStream<Integer> iter2 = source.iterate();
IterativeStream<Integer> iter1 = source.iterate();
IterativeStream<Integer> iter2 = source.iterate();


iter2.closeWith(iter1.map(NoOpIntMap));
Expand All @@ -150,8 +150,8 @@ public void testCoIterClosingFromOutOfLoop() throws Exception {
// introduce dummy mapper to get to correct parallelism
DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);

IterativeDataStream<Integer> iter1 = source.iterate();
ConnectedIterativeDataStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
IterativeStream<Integer> iter1 = source.iterate();
ConnectedIterativeStreams<Integer, Integer> coIter = source.iterate().withFeedbackType(
Integer.class);


Expand All @@ -166,7 +166,7 @@ public void testExecutionWithEmptyIteration() throws Exception {

DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap);

IterativeDataStream<Integer> iter1 = source.iterate();
IterativeStream<Integer> iter1 = source.iterate();

iter1.map(NoOpIntMap).print();

Expand All @@ -179,9 +179,9 @@ public void testImmutabilityWithCoiteration() {

DataStream<Integer> source = env.fromElements(1, 10).map(NoOpIntMap); // for rebalance

IterativeDataStream<Integer> iter1 = source.iterate();
IterativeStream<Integer> iter1 = source.iterate();
// Calling withFeedbackType should create a new iteration
ConnectedIterativeDataStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);
ConnectedIterativeStreams<Integer, String> iter2 = iter1.withFeedbackType(String.class);

iter1.closeWith(iter1.map(NoOpIntMap)).print();
iter2.closeWith(iter2.map(NoOpCoMap)).print();
Expand All @@ -205,7 +205,7 @@ public void testmultipleHeadsTailsSimple() {
DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
.map(NoOpIntMap).name("ParallelizeMapRebalance");

IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
IterativeStream<Integer> iter1 = source1.union(source2).iterate();

DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("IterRebalanceMap").setParallelism(DEFAULT_PARALLELISM / 2);
DataStream<Integer> head2 = iter1.map(NoOpIntMap).name("IterForwardMap");
Expand Down Expand Up @@ -286,7 +286,7 @@ public void testmultipleHeadsTailsWithTailPartitioning() {
DataStream<Integer> source2 = env.fromElements(1, 2, 3, 4, 5)
.map(NoOpIntMap);

IterativeDataStream<Integer> iter1 = source1.union(source2).iterate();
IterativeStream<Integer> iter1 = source1.union(source2).iterate();

DataStream<Integer> head1 = iter1.map(NoOpIntMap).name("map1");
DataStream<Integer> head2 = iter1.map(NoOpIntMap).setParallelism(DEFAULT_PARALLELISM / 2).rebalance().name(
Expand Down Expand Up @@ -370,7 +370,7 @@ public void testSimpleIteration() throws Exception {
DataStream<Boolean> source = env.fromCollection(Collections.nCopies(DEFAULT_PARALLELISM * 2, false))
.map(NoOpBoolMap).name("ParallelizeMap");

IterativeDataStream<Boolean> iteration = source.iterate(3000);
IterativeStream<Boolean> iteration = source.iterate(3000);

DataStream<Boolean> increment = iteration.flatMap(new IterationHead()).map(NoOpBoolMap);

Expand All @@ -395,7 +395,7 @@ public void testCoIteration() throws Exception {
.map(NoOpStrMap).name("ParallelizeMap");


ConnectedIterativeDataStreams<Integer, String> coIt = env.fromElements(0, 0)
ConnectedIterativeStreams<Integer, String> coIt = env.fromElements(0, 0)
.map(NoOpIntMap).name("ParallelizeMap")
.iterate(2000)
.withFeedbackType("String");
Expand Down Expand Up @@ -476,7 +476,7 @@ public Integer getKey(Integer value) throws Exception {
DataStream<Integer> source = env.fromElements(1, 2, 3)
.map(NoOpIntMap).name("ParallelizeMap");

IterativeDataStream<Integer> it = source.keyBy(key).iterate(3000);
IterativeStream<Integer> it = source.keyBy(key).iterate(3000);

DataStream<Integer> head = it.flatMap(new RichFlatMapFunction<Integer, Integer>() {

Expand Down Expand Up @@ -518,7 +518,7 @@ public void testWithCheckPointing() throws Exception {
.map(NoOpBoolMap).name("ParallelizeMap");


IterativeDataStream<Boolean> iteration = source.iterate(3000);
IterativeStream<Boolean> iteration = source.iterate(3000);

iteration.closeWith(iteration.flatMap(new IterationHead())).addSink(new ReceiveCheckNoOpSink<Boolean>());

Expand Down
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -115,7 +115,7 @@ public void complexIntegrationTest1() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Long, Tuple2<String, Long>>> sourceStream1 = env.addSource(new TupleSource()).setParallelism(1);

IterativeDataStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){
IterativeStream<Tuple2<Long, Tuple2<String, Long>>> it = sourceStream1.map(new MapFunction<Tuple2<Long, Tuple2<String, Long>>,Tuple2<Long, Tuple2<String, Long>>>(){

Tuple2<Long, Tuple2<String, Long>> result = new Tuple2<Long, Tuple2<String, Long>>(
0L, new Tuple2<String, Long>("", 0L));
Expand Down
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.datastream.IterativeStream;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand Down Expand Up @@ -69,7 +69,7 @@ public static void main(String[] args) throws Exception {
}

// create an iterative data stream from the input with 5 second timeout
IterativeDataStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
IterativeStream<Tuple5<Integer, Integer, Integer, Integer, Integer>> it = inputStream.map(new InputMap())
.iterate(5000);

// apply the step function to get the next Fibonacci number
Expand Down

0 comments on commit 7b6e762

Please sign in to comment.