Skip to content

Commit

Permalink
[hotfix] Change result of WindowedStream ops to SingleOutputStreamOpe…
Browse files Browse the repository at this point in the history
…rator
  • Loading branch information
aljoscha committed Oct 9, 2015
1 parent 0ee0c1f commit b2b2781
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 47 deletions.
Expand Up @@ -126,14 +126,14 @@ public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
* @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window.
*/
public DataStream<T> reduce(ReduceFunction<T> function) {
public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> function) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);

String callLocation = Utils.getCallLocationName();
String udfName = "Reduce at " + callLocation;

DataStream<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
SingleOutputStreamOperator<T, ?> result = createFastTimeOperatorIfValid(function, input.getType(), udfName);
if (result != null) {
return result;
}
Expand Down Expand Up @@ -173,7 +173,7 @@ public DataStream<T> reduce(ReduceFunction<T> function) {
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function) {
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);

Expand All @@ -191,7 +191,7 @@ public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function) {
* @param function The fold function.
* @return The data stream that is the result of applying the fold function to the window.
*/
public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> function, TypeInformation<R> resultType) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);
return apply(new FoldAllWindowFunction<W, T, R>(initialValue, function), resultType);
Expand All @@ -208,7 +208,7 @@ public <R> DataStream<R> fold(R initialValue, FoldFunction<T, R> function, TypeI
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function) {
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function) {
TypeInformation<T> inType = input.getType();
TypeInformation<R> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, AllWindowFunction.class, true, true, inType, null, false);
Expand All @@ -227,14 +227,14 @@ public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function) {
* @param function The window function.
* @return The data stream that is the result of applying the window function to the window.
*/
public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
public <R> SingleOutputStreamOperator<R, ?> apply(AllWindowFunction<T, R, W> function, TypeInformation<R> resultType) {
//clean the closure
function = input.getExecutionEnvironment().clean(function);

String callLocation = Utils.getCallLocationName();
String udfName = "MapWindow at " + callLocation;

DataStream<R> result = createFastTimeOperatorIfValid(function, resultType, udfName);
SingleOutputStreamOperator<R, ?> result = createFastTimeOperatorIfValid(function, resultType, udfName);
if (result != null) {
return result;
}
Expand Down Expand Up @@ -274,7 +274,7 @@ public <R> DataStream<R> apply(AllWindowFunction<T, R, W> function, TypeInformat
* @param positionToSum The position in the tuple/array to sum
* @return The transformed DataStream.
*/
public DataStream<T> sum(int positionToSum) {
public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig()));
}

Expand All @@ -291,7 +291,7 @@ public DataStream<T> sum(int positionToSum) {
* @param field The field to sum
* @return The transformed DataStream.
*/
public DataStream<T> sum(String field) {
public SingleOutputStreamOperator<T, ?> sum(String field) {
return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig()));
}

Expand All @@ -302,7 +302,7 @@ public DataStream<T> sum(String field) {
* @param positionToMin The position to minimize
* @return The transformed DataStream.
*/
public DataStream<T> min(int positionToMin) {
public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig()));
}

Expand All @@ -319,7 +319,7 @@ public DataStream<T> min(int positionToMin) {
* @param field The field expression based on which the aggregation will be applied.
* @return The transformed DataStream.
*/
public DataStream<T> min(String field) {
public SingleOutputStreamOperator<T, ?> min(String field) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig()));
}

Expand All @@ -332,7 +332,7 @@ public DataStream<T> min(String field) {
* The position to minimize by
* @return The transformed DataStream.
*/
public DataStream<T> minBy(int positionToMinBy) {
public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true);
}

Expand All @@ -344,7 +344,7 @@ public DataStream<T> minBy(int positionToMinBy) {
* @param positionToMinBy The position to minimize by
* @return The transformed DataStream.
*/
public DataStream<T> minBy(String positionToMinBy) {
public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
return this.minBy(positionToMinBy, true);
}

Expand All @@ -358,7 +358,7 @@ public DataStream<T> minBy(String positionToMinBy) {
* @param first If true, then the operator return the first element with the minimum value, otherwise returns the last
* @return The transformed DataStream.
*/
public DataStream<T> minBy(int positionToMinBy, boolean first) {
public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
}

Expand All @@ -373,7 +373,7 @@ public DataStream<T> minBy(int positionToMinBy, boolean first) {
* @param first If True then in case of field equality the first object will be returned
* @return The transformed DataStream.
*/
public DataStream<T> minBy(String field, boolean first) {
public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig()));
}

Expand All @@ -384,7 +384,7 @@ public DataStream<T> minBy(String field, boolean first) {
* @param positionToMax The position to maximize
* @return The transformed DataStream.
*/
public DataStream<T> max(int positionToMax) {
public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig()));
}

Expand All @@ -398,7 +398,7 @@ public DataStream<T> max(int positionToMax) {
* @param field The field expression based on which the aggregation will be applied.
* @return The transformed DataStream.
*/
public DataStream<T> max(String field) {
public SingleOutputStreamOperator<T, ?> max(String field) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig()));
}

Expand All @@ -411,7 +411,7 @@ public DataStream<T> max(String field) {
* The position to maximize by
* @return The transformed DataStream.
*/
public DataStream<T> maxBy(int positionToMaxBy) {
public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}

Expand All @@ -424,7 +424,7 @@ public DataStream<T> maxBy(int positionToMaxBy) {
* The position to maximize by
* @return The transformed DataStream.
*/
public DataStream<T> maxBy(String positionToMaxBy) {
public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true);
}

Expand All @@ -438,7 +438,7 @@ public DataStream<T> maxBy(String positionToMaxBy) {
* @param first If true, then the operator return the first element with the maximum value, otherwise returns the last
* @return The transformed DataStream.
*/
public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
}

Expand All @@ -453,11 +453,11 @@ public DataStream<T> maxBy(int positionToMaxBy, boolean first) {
* @param first If True then in case of field equality the first object will be returned
* @return The transformed DataStream.
*/
public DataStream<T> maxBy(String field, boolean first) {
public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig()));
}

private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
private SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregator) {
return reduce(aggregator);
}

Expand All @@ -466,7 +466,7 @@ private DataStream<T> aggregate(AggregationFunction<T> aggregator) {
// ------------------------------------------------------------------------


private <R> DataStream<R> createFastTimeOperatorIfValid(
private <R> SingleOutputStreamOperator<R, ?> createFastTimeOperatorIfValid(
Function function,
TypeInformation<R> resultType,
String functionName) {
Expand Down

0 comments on commit b2b2781

Please sign in to comment.