From b2b2781fd04a14807849fdc41e4b5ecb8ab75f13 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 8 Oct 2015 11:32:18 +0200 Subject: [PATCH] [hotfix] Change result of WindowedStream ops to SingleOutputStreamOperator --- .../api/datastream/AllWindowedStream.java | 46 +++++++++--------- .../api/datastream/WindowedStream.java | 48 +++++++++---------- 2 files changed, 47 insertions(+), 47 deletions(-) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index c7a70d78393c6..83e7adc6b4357 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -126,14 +126,14 @@ public AllWindowedStream evictor(Evictor evictor) { * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ - public DataStream reduce(ReduceFunction function) { + public SingleOutputStreamOperator reduce(ReduceFunction function) { //clean the closure function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); String udfName = "Reduce at " + callLocation; - DataStream result = createFastTimeOperatorIfValid(function, input.getType(), udfName); + SingleOutputStreamOperator result = createFastTimeOperatorIfValid(function, input.getType(), udfName); if (result != null) { return result; } @@ -173,7 +173,7 @@ public DataStream reduce(ReduceFunction function) { * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. */ - public DataStream fold(R initialValue, FoldFunction function) { + public SingleOutputStreamOperator fold(R initialValue, FoldFunction function) { //clean the closure function = input.getExecutionEnvironment().clean(function); @@ -191,7 +191,7 @@ public DataStream fold(R initialValue, FoldFunction function) { * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. */ - public DataStream fold(R initialValue, FoldFunction function, TypeInformation resultType) { + public SingleOutputStreamOperator fold(R initialValue, FoldFunction function, TypeInformation resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); return apply(new FoldAllWindowFunction(initialValue, function), resultType); @@ -208,7 +208,7 @@ public DataStream fold(R initialValue, FoldFunction function, TypeI * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public DataStream apply(AllWindowFunction function) { + public SingleOutputStreamOperator apply(AllWindowFunction function) { TypeInformation inType = input.getType(); TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( function, AllWindowFunction.class, true, true, inType, null, false); @@ -227,14 +227,14 @@ public DataStream apply(AllWindowFunction function) { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public DataStream apply(AllWindowFunction function, TypeInformation resultType) { + public SingleOutputStreamOperator apply(AllWindowFunction function, TypeInformation resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); String udfName = "MapWindow at " + callLocation; - DataStream result = createFastTimeOperatorIfValid(function, resultType, udfName); + SingleOutputStreamOperator result = createFastTimeOperatorIfValid(function, resultType, udfName); if (result != null) { return result; } @@ -274,7 +274,7 @@ public DataStream apply(AllWindowFunction function, TypeInformat * @param positionToSum The position in the tuple/array to sum * @return The transformed DataStream. */ - public DataStream sum(int positionToSum) { + public SingleOutputStreamOperator sum(int positionToSum) { return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig())); } @@ -291,7 +291,7 @@ public DataStream sum(int positionToSum) { * @param field The field to sum * @return The transformed DataStream. */ - public DataStream sum(String field) { + public SingleOutputStreamOperator sum(String field) { return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig())); } @@ -302,7 +302,7 @@ public DataStream sum(String field) { * @param positionToMin The position to minimize * @return The transformed DataStream. */ - public DataStream min(int positionToMin) { + public SingleOutputStreamOperator min(int positionToMin) { return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig())); } @@ -319,7 +319,7 @@ public DataStream min(int positionToMin) { * @param field The field expression based on which the aggregation will be applied. * @return The transformed DataStream. */ - public DataStream min(String field) { + public SingleOutputStreamOperator min(String field) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig())); } @@ -332,7 +332,7 @@ public DataStream min(String field) { * The position to minimize by * @return The transformed DataStream. */ - public DataStream minBy(int positionToMinBy) { + public SingleOutputStreamOperator minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -344,7 +344,7 @@ public DataStream minBy(int positionToMinBy) { * @param positionToMinBy The position to minimize by * @return The transformed DataStream. */ - public DataStream minBy(String positionToMinBy) { + public SingleOutputStreamOperator minBy(String positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -358,7 +358,7 @@ public DataStream minBy(String positionToMinBy) { * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last * @return The transformed DataStream. */ - public DataStream minBy(int positionToMinBy, boolean first) { + public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); } @@ -373,7 +373,7 @@ public DataStream minBy(int positionToMinBy, boolean first) { * @param first If True then in case of field equality the first object will be returned * @return The transformed DataStream. */ - public DataStream minBy(String field, boolean first) { + public SingleOutputStreamOperator minBy(String field, boolean first) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); } @@ -384,7 +384,7 @@ public DataStream minBy(String field, boolean first) { * @param positionToMax The position to maximize * @return The transformed DataStream. */ - public DataStream max(int positionToMax) { + public SingleOutputStreamOperator max(int positionToMax) { return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig())); } @@ -398,7 +398,7 @@ public DataStream max(int positionToMax) { * @param field The field expression based on which the aggregation will be applied. * @return The transformed DataStream. */ - public DataStream max(String field) { + public SingleOutputStreamOperator max(String field) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig())); } @@ -411,7 +411,7 @@ public DataStream max(String field) { * The position to maximize by * @return The transformed DataStream. */ - public DataStream maxBy(int positionToMaxBy) { + public SingleOutputStreamOperator maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -424,7 +424,7 @@ public DataStream maxBy(int positionToMaxBy) { * The position to maximize by * @return The transformed DataStream. */ - public DataStream maxBy(String positionToMaxBy) { + public SingleOutputStreamOperator maxBy(String positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -438,7 +438,7 @@ public DataStream maxBy(String positionToMaxBy) { * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last * @return The transformed DataStream. */ - public DataStream maxBy(int positionToMaxBy, boolean first) { + public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); } @@ -453,11 +453,11 @@ public DataStream maxBy(int positionToMaxBy, boolean first) { * @param first If True then in case of field equality the first object will be returned * @return The transformed DataStream. */ - public DataStream maxBy(String field, boolean first) { + public SingleOutputStreamOperator maxBy(String field, boolean first) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); } - private DataStream aggregate(AggregationFunction aggregator) { + private SingleOutputStreamOperator aggregate(AggregationFunction aggregator) { return reduce(aggregator); } @@ -466,7 +466,7 @@ private DataStream aggregate(AggregationFunction aggregator) { // ------------------------------------------------------------------------ - private DataStream createFastTimeOperatorIfValid( + private SingleOutputStreamOperator createFastTimeOperatorIfValid( Function function, TypeInformation resultType, String functionName) { diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 42e0bd74fce23..1b511d8cfee13 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -137,14 +137,14 @@ public WindowedStream evictor(Evictor evictor) { * @param function The reduce function. * @return The data stream that is the result of applying the reduce function to the window. */ - public DataStream reduce(ReduceFunction function) { + public SingleOutputStreamOperator reduce(ReduceFunction function) { //clean the closure function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); String udfName = "Reduce at " + callLocation; - DataStream result = createFastTimeOperatorIfValid(function, input.getType(), udfName); + SingleOutputStreamOperator result = createFastTimeOperatorIfValid(function, input.getType(), udfName); if (result != null) { return result; } @@ -187,7 +187,7 @@ public DataStream reduce(ReduceFunction function) { * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. */ - public DataStream fold(R initialValue, FoldFunction function) { + public SingleOutputStreamOperator fold(R initialValue, FoldFunction function) { //clean the closure function = input.getExecutionEnvironment().clean(function); @@ -205,7 +205,7 @@ public DataStream fold(R initialValue, FoldFunction function) { * @param function The fold function. * @return The data stream that is the result of applying the fold function to the window. */ - public DataStream fold(R initialValue, FoldFunction function, TypeInformation resultType) { + public SingleOutputStreamOperator fold(R initialValue, FoldFunction function, TypeInformation resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); return apply(new FoldWindowFunction(initialValue, function), resultType); @@ -223,7 +223,7 @@ public DataStream fold(R initialValue, FoldFunction function, TypeI * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public DataStream apply(WindowFunction function) { + public SingleOutputStreamOperator apply(WindowFunction function) { TypeInformation inType = input.getType(); TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( function, WindowFunction.class, true, true, inType, null, false); @@ -243,14 +243,14 @@ public DataStream apply(WindowFunction function) { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public DataStream apply(WindowFunction function, TypeInformation resultType) { + public SingleOutputStreamOperator apply(WindowFunction function, TypeInformation resultType) { //clean the closure function = input.getExecutionEnvironment().clean(function); String callLocation = Utils.getCallLocationName(); String udfName = "MapWindow at " + callLocation; - DataStream result = createFastTimeOperatorIfValid(function, resultType, udfName); + SingleOutputStreamOperator result = createFastTimeOperatorIfValid(function, resultType, udfName); if (result != null) { return result; } @@ -276,7 +276,7 @@ public DataStream apply(WindowFunction function, TypeInformat keySel, new HeapWindowBuffer.Factory(), function, - trigger).enableSetProcessingTime(setProcessingTime);; + trigger).enableSetProcessingTime(setProcessingTime); } return input.transform(opName, resultType, operator); @@ -293,7 +293,7 @@ public DataStream apply(WindowFunction function, TypeInformat * @param positionToSum The position in the tuple/array to sum * @return The transformed DataStream. */ - public DataStream sum(int positionToSum) { + public SingleOutputStreamOperator sum(int positionToSum) { return aggregate(new SumAggregator<>(positionToSum, input.getType(), input.getExecutionConfig())); } @@ -310,7 +310,7 @@ public DataStream sum(int positionToSum) { * @param field The field to sum * @return The transformed DataStream. */ - public DataStream sum(String field) { + public SingleOutputStreamOperator sum(String field) { return aggregate(new SumAggregator<>(field, input.getType(), input.getExecutionConfig())); } @@ -321,7 +321,7 @@ public DataStream sum(String field) { * @param positionToMin The position to minimize * @return The transformed DataStream. */ - public DataStream min(int positionToMin) { + public SingleOutputStreamOperator min(int positionToMin) { return aggregate(new ComparableAggregator<>(positionToMin, input.getType(), AggregationFunction.AggregationType.MIN, input.getExecutionConfig())); } @@ -338,7 +338,7 @@ public DataStream min(int positionToMin) { * @param field The field expression based on which the aggregation will be applied. * @return The transformed DataStream. */ - public DataStream min(String field) { + public SingleOutputStreamOperator min(String field) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MIN, false, input.getExecutionConfig())); } @@ -351,7 +351,7 @@ public DataStream min(String field) { * The position to minimize by * @return The transformed DataStream. */ - public DataStream minBy(int positionToMinBy) { + public SingleOutputStreamOperator minBy(int positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -363,7 +363,7 @@ public DataStream minBy(int positionToMinBy) { * @param positionToMinBy The position to minimize by * @return The transformed DataStream. */ - public DataStream minBy(String positionToMinBy) { + public SingleOutputStreamOperator minBy(String positionToMinBy) { return this.minBy(positionToMinBy, true); } @@ -377,7 +377,7 @@ public DataStream minBy(String positionToMinBy) { * @param first If true, then the operator return the first element with the minimum value, otherwise returns the last * @return The transformed DataStream. */ - public DataStream minBy(int positionToMinBy, boolean first) { + public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { return aggregate(new ComparableAggregator<>(positionToMinBy, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); } @@ -392,7 +392,7 @@ public DataStream minBy(int positionToMinBy, boolean first) { * @param first If True then in case of field equality the first object will be returned * @return The transformed DataStream. */ - public DataStream minBy(String field, boolean first) { + public SingleOutputStreamOperator minBy(String field, boolean first) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MINBY, first, input.getExecutionConfig())); } @@ -403,7 +403,7 @@ public DataStream minBy(String field, boolean first) { * @param positionToMax The position to maximize * @return The transformed DataStream. */ - public DataStream max(int positionToMax) { + public SingleOutputStreamOperator max(int positionToMax) { return aggregate(new ComparableAggregator<>(positionToMax, input.getType(), AggregationFunction.AggregationType.MAX, input.getExecutionConfig())); } @@ -417,7 +417,7 @@ public DataStream max(int positionToMax) { * @param field The field expression based on which the aggregation will be applied. * @return The transformed DataStream. */ - public DataStream max(String field) { + public SingleOutputStreamOperator max(String field) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAX, false, input.getExecutionConfig())); } @@ -430,7 +430,7 @@ public DataStream max(String field) { * The position to maximize by * @return The transformed DataStream. */ - public DataStream maxBy(int positionToMaxBy) { + public SingleOutputStreamOperator maxBy(int positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -443,7 +443,7 @@ public DataStream maxBy(int positionToMaxBy) { * The position to maximize by * @return The transformed DataStream. */ - public DataStream maxBy(String positionToMaxBy) { + public SingleOutputStreamOperator maxBy(String positionToMaxBy) { return this.maxBy(positionToMaxBy, true); } @@ -457,7 +457,7 @@ public DataStream maxBy(String positionToMaxBy) { * @param first If true, then the operator return the first element with the maximum value, otherwise returns the last * @return The transformed DataStream. */ - public DataStream maxBy(int positionToMaxBy, boolean first) { + public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { return aggregate(new ComparableAggregator<>(positionToMaxBy, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); } @@ -472,11 +472,11 @@ public DataStream maxBy(int positionToMaxBy, boolean first) { * @param first If True then in case of field equality the first object will be returned * @return The transformed DataStream. */ - public DataStream maxBy(String field, boolean first) { + public SingleOutputStreamOperator maxBy(String field, boolean first) { return aggregate(new ComparableAggregator<>(field, input.getType(), AggregationFunction.AggregationType.MAXBY, first, input.getExecutionConfig())); } - private DataStream aggregate(AggregationFunction aggregator) { + private SingleOutputStreamOperator aggregate(AggregationFunction aggregator) { return reduce(aggregator); } @@ -484,7 +484,7 @@ private DataStream aggregate(AggregationFunction aggregator) { // Utilities // ------------------------------------------------------------------------ - private DataStream createFastTimeOperatorIfValid( + private SingleOutputStreamOperator createFastTimeOperatorIfValid( Function function, TypeInformation resultType, String functionName) {