From 0e0d9b79b770d9708935920014314e4021e9751e Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Thu, 5 May 2016 08:34:44 +0200 Subject: [PATCH 1/2] [FLINK-3869] Relax window fold generic parameters Before, the WindowFunction in a fold/apply could not emit a type that was different from the accumulator type of the fold. This restriction is unnecessary. --- .../api/datastream/AllWindowedStream.java | 24 +++++--- .../api/datastream/WindowedStream.java | 56 +++++++++++-------- .../windowing/FoldApplyAllWindowFunction.java | 27 ++++++--- .../windowing/FoldApplyWindowFunction.java | 31 +++++++--- .../FoldApplyWindowFunctionTest.java | 41 +++++++------- .../api/scala/AllWindowedStream.scala | 26 +++++---- .../streaming/api/scala/WindowedStream.scala | 26 +++++---- 7 files changed, 140 insertions(+), 91 deletions(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java index 5a1b56df79f27..2d852f54031b9 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java @@ -38,6 +38,7 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction; import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction; +import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; @@ -197,7 +198,7 @@ public SingleOutputStreamOperator fold(R initialValue, FoldFunction "Please use apply(FoldFunction, WindowFunction) instead."); } - return apply(initialValue, function, new PassThroughAllWindowFunction(), resultType); + return apply(initialValue, function, new PassThroughAllWindowFunction(), resultType, resultType); } /** @@ -376,12 +377,15 @@ public SingleOutputStreamOperator apply(ReduceFunction reduceFunction, * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public SingleOutputStreamOperator apply(R initialValue, FoldFunction foldFunction, AllWindowFunction function) { + public SingleOutputStreamOperator apply(ACC initialValue, FoldFunction foldFunction, AllWindowFunction function) { - TypeInformation resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), + TypeInformation foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), Utils.getCallLocationName(), true); - return apply(initialValue, foldFunction, function, resultType); + TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( + function, WindowFunction.class, true, true, getInputType(), null, false); + + return apply(initialValue, foldFunction, function, foldAccumulatorType, resultType); } /** @@ -398,7 +402,11 @@ public SingleOutputStreamOperator apply(R initialValue, FoldFunction SingleOutputStreamOperator apply(R initialValue, FoldFunction foldFunction, AllWindowFunction function, TypeInformation resultType) { + public SingleOutputStreamOperator apply(ACC initialValue, + FoldFunction foldFunction, + AllWindowFunction function, + TypeInformation foldAccumulatorType, + TypeInformation resultType) { if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction."); } @@ -430,15 +438,15 @@ public SingleOutputStreamOperator apply(R initialValue, FoldFunction(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function)), + new InternalIterableAllWindowFunction<>(new FoldApplyAllWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)), trigger, evictor); } else { - FoldingStateDescriptor stateDesc = new FoldingStateDescriptor<>("window-contents", + FoldingStateDescriptor stateDesc = new FoldingStateDescriptor<>("window-contents", initialValue, foldFunction, - resultType); + foldAccumulatorType); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 84290b2414ac4..4fcc7ce59609c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -212,7 +212,7 @@ public SingleOutputStreamOperator fold(R initialValue, FoldFunction "Please use apply(FoldFunction, WindowFunction) instead."); } - return apply(initialValue, function, new PassThroughWindowFunction(), resultType); + return apply(initialValue, function, new PassThroughWindowFunction(), resultType, resultType); } /** @@ -397,12 +397,16 @@ public SingleOutputStreamOperator apply(ReduceFunction reduceFunction, * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - public SingleOutputStreamOperator apply(R initialValue, FoldFunction foldFunction, WindowFunction function) { + public SingleOutputStreamOperator apply(ACC initialValue, FoldFunction foldFunction, WindowFunction function) { + + TypeInformation foldAccumulatorType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), + Utils.getCallLocationName(), true); + + TypeInformation resultType = TypeExtractor.getUnaryOperatorReturnType( + function, WindowFunction.class, true, true, getInputType(), null, false); - TypeInformation resultType = TypeExtractor.getFoldReturnTypes(foldFunction, input.getType(), - Utils.getCallLocationName(), true); - return apply(initialValue, foldFunction, function, resultType); + return apply(initialValue, foldFunction, function, foldAccumulatorType, resultType); } /** @@ -419,7 +423,11 @@ public SingleOutputStreamOperator apply(R initialValue, FoldFunction SingleOutputStreamOperator apply(R initialValue, FoldFunction foldFunction, WindowFunction function, TypeInformation resultType) { + public SingleOutputStreamOperator apply(ACC initialValue, + FoldFunction foldFunction, + WindowFunction function, + TypeInformation foldAccumulatorType, + TypeInformation resultType) { if (foldFunction instanceof RichFunction) { throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction."); } @@ -442,34 +450,34 @@ public SingleOutputStreamOperator apply(R initialValue, FoldFunction> stateDesc = new ListStateDescriptor<>("window-contents", - new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); + new StreamRecordSerializer<>(input.getType().createSerializer(getExecutionEnvironment().getConfig()))); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + evictor + ", " + udfName + ")"; operator = new EvictingWindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - stateDesc, - new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function)), - trigger, - evictor); + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalIterableWindowFunction<>(new FoldApplyWindowFunction<>(initialValue, foldFunction, function, foldAccumulatorType)), + trigger, + evictor); } else { - FoldingStateDescriptor stateDesc = new FoldingStateDescriptor<>("window-contents", - initialValue, - foldFunction, - resultType); + FoldingStateDescriptor stateDesc = new FoldingStateDescriptor<>("window-contents", + initialValue, + foldFunction, + foldAccumulatorType); opName = "TriggerWindow(" + windowAssigner + ", " + stateDesc + ", " + trigger + ", " + udfName + ")"; operator = new WindowOperator<>(windowAssigner, - windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), - keySel, - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - stateDesc, - new InternalSingleValueWindowFunction<>(function), - trigger); + windowAssigner.getWindowSerializer(getExecutionEnvironment().getConfig()), + keySel, + input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), + stateDesc, + new InternalSingleValueWindowFunction<>(function), + trigger); } return input.transform(opName, resultType, operator); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java index a5bc0a1b2126f..8910392448818 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyAllWindowFunction.java @@ -36,20 +36,26 @@ import java.util.Collections; @Internal -public class FoldApplyAllWindowFunction - extends WrappingFunction> - implements AllWindowFunction, OutputTypeConfigurable { +public class FoldApplyAllWindowFunction + extends WrappingFunction> + implements AllWindowFunction, OutputTypeConfigurable { private static final long serialVersionUID = 1L; private final FoldFunction foldFunction; private byte[] serializedInitialValue; + + private transient TypeInformation accTypeInformation; private TypeSerializer accSerializer; private transient ACC initialValue; - public FoldApplyAllWindowFunction(ACC initialValue, FoldFunction foldFunction, AllWindowFunction windowFunction) { + public FoldApplyAllWindowFunction(ACC initialValue, + FoldFunction foldFunction, + AllWindowFunction windowFunction, + TypeInformation accTypeInformation) { super(windowFunction); + this.accTypeInformation = accTypeInformation; this.foldFunction = foldFunction; this.initialValue = initialValue; } @@ -58,6 +64,11 @@ public FoldApplyAllWindowFunction(ACC initialValue, FoldFunction foldFun public void open(Configuration configuration) throws Exception { super.open(configuration); + if (accSerializer == null) { + throw new RuntimeException("No serializer set for the fold accumulator type. " + + "Probably the setOutputType method was not called."); + } + if (serializedInitialValue == null) { throw new RuntimeException("No initial value was serialized for the fold " + "window function. Probably the setOutputType method was not called."); @@ -69,7 +80,7 @@ public void open(Configuration configuration) throws Exception { } @Override - public void apply(W window, Iterable values, Collector out) throws Exception { + public void apply(W window, Iterable values, Collector out) throws Exception { ACC result = accSerializer.copy(initialValue); for (T val: values) { @@ -80,8 +91,10 @@ public void apply(W window, Iterable values, Collector out) throws Excep } @Override - public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { - accSerializer = outTypeInfo.createSerializer(executionConfig); + public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + // out type is not used, just use this for the execution config + + accSerializer = accTypeInformation.createSerializer(executionConfig); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java index 756a6833566cd..fae1e99d2f952 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/windowing/FoldApplyWindowFunction.java @@ -36,20 +36,28 @@ import java.util.Collections; @Internal -public class FoldApplyWindowFunction - extends WrappingFunction> - implements WindowFunction, OutputTypeConfigurable { +public class FoldApplyWindowFunction + extends WrappingFunction> + implements WindowFunction, OutputTypeConfigurable { private static final long serialVersionUID = 1L; private final FoldFunction foldFunction; private byte[] serializedInitialValue; + + private transient TypeInformation accTypeInformation; private TypeSerializer accSerializer; private transient ACC initialValue; - public FoldApplyWindowFunction(ACC initialValue, FoldFunction foldFunction, WindowFunction windowFunction) { + + + public FoldApplyWindowFunction(ACC initialValue, + FoldFunction foldFunction, + WindowFunction windowFunction, + TypeInformation accTypeInformation) { super(windowFunction); + this.accTypeInformation = accTypeInformation; this.foldFunction = foldFunction; this.initialValue = initialValue; } @@ -58,9 +66,14 @@ public FoldApplyWindowFunction(ACC initialValue, FoldFunction foldFuncti public void open(Configuration configuration) throws Exception { super.open(configuration); + if (accSerializer == null) { + throw new RuntimeException("No serializer set for the fold accumulator type. " + + "Probably the setOutputType method was not called."); + } + if (serializedInitialValue == null) { throw new RuntimeException("No initial value was serialized for the fold " + - "window function. Probably the setOutputType method was not called."); + "window function. Probably the setOutputType method was not called."); } ByteArrayInputStream bais = new ByteArrayInputStream(serializedInitialValue); @@ -69,7 +82,7 @@ public void open(Configuration configuration) throws Exception { } @Override - public void apply(K key, W window, Iterable values, Collector out) throws Exception { + public void apply(K key, W window, Iterable values, Collector out) throws Exception { ACC result = accSerializer.copy(initialValue); for (T val: values) { @@ -80,8 +93,10 @@ public void apply(K key, W window, Iterable values, Collector out) throw } @Override - public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { - accSerializer = outTypeInfo.createSerializer(executionConfig); + public void setOutputType(TypeInformation outTypeInfo, ExecutionConfig executionConfig) { + // out type is not used, just use this for the execution config + + accSerializer = accTypeInformation.createSerializer(executionConfig); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java index 0b0ab9ee3cb63..6e45f991acda3 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/FoldApplyWindowFunctionTest.java @@ -56,27 +56,28 @@ public void testFoldWindowFunctionOutputTypeConfigurable() throws Exception{ int initValue = 1; - FoldApplyWindowFunction foldWindowFunction = new FoldApplyWindowFunction<>( - initValue, - new FoldFunction() { - private static final long serialVersionUID = -4849549768529720587L; - - @Override - public Integer fold(Integer accumulator, Integer value) throws Exception { - return accumulator + value; - } - }, - new WindowFunction() { - @Override - public void apply(Integer integer, - TimeWindow window, - Iterable input, - Collector out) throws Exception { - for (Integer in: input) { - out.collect(in); + FoldApplyWindowFunction foldWindowFunction = new FoldApplyWindowFunction<>( + initValue, + new FoldFunction() { + private static final long serialVersionUID = -4849549768529720587L; + + @Override + public Integer fold(Integer accumulator, Integer value) throws Exception { + return accumulator + value; } - } - } + }, + new WindowFunction() { + @Override + public void apply(Integer integer, + TimeWindow window, + Iterable input, + Collector out) throws Exception { + for (Integer in: input) { + out.collect(in); + } + } + }, + BasicTypeInfo.INT_TYPE_INFO ); AccumulatingProcessingTimeWindowOperator windowOperator = new AccumulatingProcessingTimeWindowOperator<>( diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala index 020c619a0e222..74d9b787e65cd 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/AllWindowedStream.scala @@ -270,20 +270,21 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ - def apply[R: TypeInformation]( - initialValue: R, - preAggregator: FoldFunction[T, R], - windowFunction: AllWindowFunction[R, R, W]): DataStream[R] = { + def apply[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + preAggregator: FoldFunction[T, ACC], + windowFunction: AllWindowFunction[ACC, R, W]): DataStream[R] = { val cleanFolder = clean(preAggregator) val cleanWindowFunction = clean(windowFunction) - val applyFunction = new ScalaAllWindowFunctionWrapper[R, R, W](cleanWindowFunction) + val applyFunction = new ScalaAllWindowFunctionWrapper[ACC, R, W](cleanWindowFunction) asScalaStream(javaStream.apply( initialValue, cleanFolder, applyFunction, + implicitly[TypeInformation[ACC]], implicitly[TypeInformation[R]])) } @@ -299,10 +300,10 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ - def apply[R: TypeInformation]( - initialValue: R, - preAggregator: (R, T) => R, - windowFunction: (W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { + def apply[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + preAggregator: (ACC, T) => ACC, + windowFunction: (W, Iterable[ACC], Collector[R]) => Unit): DataStream[R] = { if (preAggregator == null) { throw new NullPointerException("Reduce function must not be null.") @@ -314,11 +315,12 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) { val cleanFolder = clean(preAggregator) val cleanWindowFunction = clean(windowFunction) - val folder = new ScalaFoldFunction[T, R](cleanFolder) - val applyFunction = new ScalaAllWindowFunction[R, R, W](cleanWindowFunction) + val folder = new ScalaFoldFunction[T, ACC](cleanFolder) + val applyFunction = new ScalaAllWindowFunction[ACC, R, W](cleanWindowFunction) + val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]] val returnType: TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.apply(initialValue, folder, applyFunction, returnType)) + asScalaStream(javaStream.apply(initialValue, folder, applyFunction, accType, returnType)) } // ------------------------------------------------------------------------ diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala index 773829e1f3436..8f94fa5db89a2 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedStream.scala @@ -272,20 +272,21 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param function The window function. * @return The data stream that is the result of applying the window function to the window. */ - def apply[R: TypeInformation]( - initialValue: R, - foldFunction: FoldFunction[T, R], - function: WindowFunction[R, R, K, W]): DataStream[R] = { + def apply[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + foldFunction: FoldFunction[T, ACC], + function: WindowFunction[ACC, R, K, W]): DataStream[R] = { val cleanedFunction = clean(function) val cleanedFoldFunction = clean(foldFunction) - val applyFunction = new ScalaWindowFunctionWrapper[R, R, K, W](cleanedFunction) + val applyFunction = new ScalaWindowFunctionWrapper[ACC, R, K, W](cleanedFunction) asScalaStream(javaStream.apply( initialValue, cleanedFoldFunction, applyFunction, + implicitly[TypeInformation[ACC]], implicitly[TypeInformation[R]])) } @@ -300,10 +301,10 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { * @param windowFunction The window function. * @return The data stream that is the result of applying the window function to the window. */ - def apply[R: TypeInformation]( - initialValue: R, - foldFunction: (R, T) => R, - windowFunction: (K, W, Iterable[R], Collector[R]) => Unit): DataStream[R] = { + def apply[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC, + foldFunction: (ACC, T) => ACC, + windowFunction: (K, W, Iterable[ACC], Collector[R]) => Unit): DataStream[R] = { if (foldFunction == null) { throw new NullPointerException("Fold function must not be null.") @@ -315,11 +316,12 @@ class WindowedStream[T, K, W <: Window](javaStream: JavaWStream[T, K, W]) { val cleanFolder = clean(foldFunction) val cleanWindowFunction = clean(windowFunction) - val folder = new ScalaFoldFunction[T, R](cleanFolder) - val applyFunction = new ScalaWindowFunction[R, R, K, W](cleanWindowFunction) + val folder = new ScalaFoldFunction[T, ACC](cleanFolder) + val applyFunction = new ScalaWindowFunction[ACC, R, K, W](cleanWindowFunction) val resultType: TypeInformation[R] = implicitly[TypeInformation[R]] - asScalaStream(javaStream.apply(initialValue, folder, applyFunction, resultType)) + val accType: TypeInformation[ACC] = implicitly[TypeInformation[ACC]] + asScalaStream(javaStream.apply(initialValue, folder, applyFunction, accType, resultType)) } // ------------------------------------------------------------------------ From 5b214cffff33c578928bd455e97967e957a4b7e4 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Mon, 9 May 2016 12:01:36 +0200 Subject: [PATCH 2/2] fixup: Fix OnWindowedStream.scala --- .../acceptPartialFunctions/OnWindowedStream.scala | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala index f7a5923c2894c..16405bc783c0a 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/extensions/impl/acceptPartialFunctions/OnWindowedStream.scala @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.streaming.api.scala.{DataStream, WindowedStream} import org.apache.flink.streaming.api.windowing.windows.Window +import org.apache.flink.util.Collector /** * Wraps a joined data stream, allowing to use anonymous partial functions to @@ -77,13 +78,14 @@ class OnWindowedStream[T, K, W <: Window](stream: WindowedStream[T, K, W]) { * @return The data stream that is the result of applying the window function to the window. */ @PublicEvolving - def applyWith[R: TypeInformation]( - initialValue: R)( - foldFunction: (R, T) => R, - windowFunction: (K, W, Stream[R]) => TraversableOnce[R]) + def applyWith[ACC: TypeInformation, R: TypeInformation]( + initialValue: ACC)( + foldFunction: (ACC, T) => ACC, + windowFunction: (K, W, Stream[ACC]) => TraversableOnce[R]) : DataStream[R] = + stream.apply(initialValue, foldFunction, { - (key, window, items, out) => + (key: K, window: W, items: Iterable[ACC], out: Collector[R]) => windowFunction(key, window, items.toStream).foreach(out.collect) })