From 9b537f8936cbc24a43d0322c7def4304515d95e1 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 1 Sep 2017 12:20:18 +0200 Subject: [PATCH 1/5] [FLINK-7568] Change role of ProcessWindowFunction and WindowFunction in doc --- docs/dev/stream/operators/windows.md | 252 +++++++++++++-------------- 1 file changed, 126 insertions(+), 126 deletions(-) diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index c2d557f444cfa..a2669ea2405ee 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -460,118 +460,14 @@ The above example appends all input `Long` values to an initially empty `String` Attention `fold()` cannot be used with session windows or other mergeable windows. -### WindowFunction - The Generic Case +### ProcessWindowFunction -A `WindowFunction` gets an `Iterable` containing all the elements of the window and provides +A `ProcessWindowFunction` gets an `Iterable` containing all the elements of the window and provides the most flexibility of all window functions. This comes at the cost of performance and resource consumption, because elements cannot be incrementally aggregated but instead need to be buffered internally until the window is considered ready for processing. -The signature of a `WindowFunction` looks as follows: - -
-
-{% highlight java %} -public interface WindowFunction extends Function, Serializable { - - /** - * Evaluates the window and outputs none or several elements. - * - * @param key The key for which this window is evaluated. - * @param window The window that is being evaluated. - * @param input The elements in the window being evaluated. - * @param out A collector for emitting elements. - * - * @throws Exception The function may throw exceptions to fail the program and trigger recovery. - */ - void apply(KEY key, W window, Iterable input, Collector out) throws Exception; -} -{% endhighlight %} -
- -
-{% highlight scala %} -trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable { - - /** - * Evaluates the window and outputs none or several elements. - * - * @param key The key for which this window is evaluated. - * @param window The window that is being evaluated. - * @param input The elements in the window being evaluated. - * @param out A collector for emitting elements. - * @throws Exception The function may throw exceptions to fail the program and trigger recovery. - */ - def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]) -} -{% endhighlight %} -
-
- -A `WindowFunction` can be defined and used like this: - -
-
-{% highlight java %} -DataStream> input = ...; - -input - .keyBy() - .window() - .apply(new MyWindowFunction()); - -/* ... */ - -public class MyWindowFunction implements WindowFunction, String, String, TimeWindow> { - - void apply(String key, TimeWindow window, Iterable> input, Collector out) { - long count = 0; - for (Tuple in: input) { - count++; - } - out.collect("Window: " + window + "count: " + count); - } -} - -{% endhighlight %} -
- -
-{% highlight scala %} -val input: DataStream[(String, Long)] = ... - -input - .keyBy() - .window() - .apply(new MyWindowFunction()) - -/* ... */ - -class MyWindowFunction extends WindowFunction[(String, Long), String, String, TimeWindow] { - - def apply(key: String, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): () = { - var count = 0L - for (in <- input) { - count = count + 1 - } - out.collect(s"Window $window count: $count") - } -} -{% endhighlight %} -
-
- -The example shows a `WindowFunction` to count the elements in a window. In addition, the window function adds information about the window to the output. - -Attention Note that using `WindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `WindowFunction` to get both incremental aggregation and the added information of a `WindowFunction`. - -### ProcessWindowFunction - -In places where a `WindowFunction` can be used you can also use a `ProcessWindowFunction`. This -is very similar to `WindowFunction`, except that the interface allows to query more information -about the context in which the window evaluation happens. - -This is the `ProcessWindowFunction` interface: +The signature of `ProcessWindowFunction` looks as follows:
@@ -620,7 +516,6 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function * @param out A collector for emitting elements. * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ - @throws[Exception] def process( key: KEY, context: Context, @@ -641,7 +536,7 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
-It can be used like this: +A `ProcessWindowFunction` can be defined and used like this:
@@ -652,6 +547,20 @@ input .keyBy() .window() .process(new MyProcessWindowFunction()); + +/* ... */ + +public class MyProcessWindowFunction implements ProcessWindowFunction, String, String, TimeWindow> { + + void process(String key, Context context, Iterable> input, Collector out) { + long count = 0; + for (Tuple in: input) { + count++; + } + out.collect("Window: " + context.window() + "count: " + count); + } +} + {% endhighlight %}
@@ -663,25 +572,42 @@ input .keyBy() .window() .process(new MyProcessWindowFunction()) + +/* ... */ + +class MyWindowFunction extends ProcessWindowFunction[(String, Long), String, String, TimeWindow] { + + def apply(key: String, context: Context, input: Iterable[(String, Long)], out: Collector[String]): () = { + var count = 0L + for (in <- input) { + count = count + 1 + } + out.collect(s"Window ${context.window} count: $count") + } +} {% endhighlight %}
-### WindowFunction with Incremental Aggregation +The example shows a `ProcessWindowFunction` that counts the elements in a window. In addition, the window function adds information about the window to the output. -A `WindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to +Attention Note that using `ProcessWindowFunction` for simple aggregates such as count is quite inefficient. The next section shows how a `ReduceFunction` can be combined with a `ProcessWindowFunction` to get both incremental aggregation and the added information of a `ProcessWindowFunction`. + +### ProcessWindowFunction with Incremental Aggregation + +A `ProcessWindowFunction` can be combined with either a `ReduceFunction` or a `FoldFunction` to incrementally aggregate elements as they arrive in the window. -When the window is closed, the `WindowFunction` will be provided with the aggregated result. +When the window is closed, the `ProcessWindowFunction` will be provided with the aggregated result. This allows to incrementally compute windows while having access to the -additional window meta information of the `WindowFunction`. +additional window meta information of the `ProcessWindowFunction`. -Note You can also `ProcessWindowFunction` instead of -`WindowFunction` for incremental window aggregation. +Note You can also the legacy `WindowFunction` instead of +`ProcessWindowFunction` for incremental window aggregation. #### Incremental Window Aggregation with FoldFunction The following example shows how an incremental `FoldFunction` can be combined with -a `WindowFunction` to extract the number of events in the window and return also +a `ProcessWindowFunction` to extract the number of events in the window and return also the key and end time of the window.
@@ -692,7 +618,7 @@ DataStream input = ...; input .keyBy() .timeWindow() - .fold(new Tuple3("",0L, 0), new MyFoldFunction(), new MyWindowFunction()) + .fold(new Tuple3("",0L, 0), new MyFoldFunction(), new MyProcessWindowFunction()) // Function definitions @@ -706,15 +632,15 @@ private static class MyFoldFunction } } -private static class MyWindowFunction - implements WindowFunction, Tuple3, String, TimeWindow> { +private static class MyProcessWindowFunction + implements ProcessWindowFunction, Tuple3, String, TimeWindow> { - public void apply(String key, - TimeWindow window, + public void process(String key, + Context context, Iterable> counts, Collector> out) { Integer count = counts.iterator().next().getField(2); - out.collect(new Tuple3(key, window.getEnd(),count)); + out.collect(new Tuple3(key, context.window().getEnd(),count)); } } @@ -759,7 +685,7 @@ DataStream input = ...; input .keyBy() .timeWindow() - .reduce(new MyReduceFunction(), new MyWindowFunction()); + .reduce(new MyReduceFunction(), new MyProcessWindowFunction()); // Function definitions @@ -770,11 +696,11 @@ private static class MyReduceFunction implements ReduceFunction { } } -private static class MyWindowFunction - implements WindowFunction, String, TimeWindow> { +private static class MyProcessWindowFunction + implements ProcessWindowFunction, String, TimeWindow> { public void apply(String key, - TimeWindow window, + Context context, Iterable minReadings, Collector> out) { SensorReading min = minReadings.iterator().next(); @@ -808,6 +734,80 @@ input
+### WindowFunction (Legacy) + +In some places where a `ProcessWindowFunction` can be used you can also use a `WindowFunction`. This +is an older version of `ProcessWindowFunction` that provides less contextual information and does +not have some advances features, such as per-window keyed state. This interface will be deprecated +at some point. + +The signature of a `WindowFunction` looks as follows: + +
+
+{% highlight java %} +public interface WindowFunction extends Function, Serializable { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + void apply(KEY key, W window, Iterable input, Collector out) throws Exception; +} +{% endhighlight %} +
+ +
+{% highlight scala %} +trait WindowFunction[IN, OUT, KEY, W <: Window] extends Function with Serializable { + + /** + * Evaluates the window and outputs none or several elements. + * + * @param key The key for which this window is evaluated. + * @param window The window that is being evaluated. + * @param input The elements in the window being evaluated. + * @param out A collector for emitting elements. + * @throws Exception The function may throw exceptions to fail the program and trigger recovery. + */ + def apply(key: KEY, window: W, input: Iterable[IN], out: Collector[OUT]) +} +{% endhighlight %} +
+
+ +It can be used like this: + +
+
+{% highlight java %} +DataStream> input = ...; + +input + .keyBy() + .window() + .apply(new MyWindowFunction()); +{% endhighlight %} +
+ +
+{% highlight scala %} +val input: DataStream[(String, Long)] = ... + +input + .keyBy() + .window() + .apply(new MyWindowFunction()) +{% endhighlight %} +
+
+ ## Triggers A `Trigger` determines when a window (as formed by the *window assigner*) is ready to be From 2ef1f31e06e57831ffaa99004bdb92edf347f589 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 1 Sep 2017 12:28:13 +0200 Subject: [PATCH 2/5] [FLINK-7568] Update ProcessFunction.Context in window documentation --- docs/dev/stream/operators/windows.md | 63 +++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 10 deletions(-) diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index a2669ea2405ee..bd216c26111fa 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -490,15 +490,35 @@ public abstract class ProcessWindowFunction impl Iterable elements, Collector out) throws Exception; - /** - * The context holding window metadata - */ - public abstract class Context { - /** - * @return The window that is being evaluated. - */ - public abstract W window(); - } + /** + * The context holding window metadata. + */ + public abstract class Context implements java.io.Serializable { + /** + * Returns the window that is being evaluated. + */ + public abstract W window(); + + /** Returns the current processing time. */ + public abstract long currentProcessingTime(); + + /** Returns the current event-time watermark. */ + public abstract long currentWatermark(); + + /** + * State accessor for per-key and per-window state. + * + *

NOTE:If you use per-window state you have to ensure that you clean it up + * by implementing {@link ProcessWindowFunction#clear(Context)}. + */ + public abstract KeyedStateStore windowState(); + + /** + * State accessor for per-key global state. + */ + public abstract KeyedStateStore globalState(); + } + } {% endhighlight %} @@ -527,15 +547,38 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function */ abstract class Context { /** - * @return The window that is being evaluated. + * Returns the window that is being evaluated. */ def window: W + + /** + * Returns the current processing time. + */ + def currentProcessingTime: Long + + /** + * Returns the current event-time watermark. + */ + def currentWatermark: Long + + /** + * State accessor for per-key and per-window state. + */ + def windowState: KeyedStateStore + + /** + * State accessor for per-key global state. + */ + def globalState: KeyedStateStore } + } {% endhighlight %} + + A `ProcessWindowFunction` can be defined and used like this:

From 6b521e16f58fbab36b2de27f9739d8beec69823b Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 1 Sep 2017 12:33:40 +0200 Subject: [PATCH 3/5] [FLINK-7568] Add note about 'key' parameter to window doc --- docs/dev/stream/operators/windows.md | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index bd216c26111fa..d1de150b75d60 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -577,7 +577,10 @@ abstract class ProcessWindowFunction[IN, OUT, KEY, W <: Window] extends Function
- +Note The `key` parameter is the key that is extracted +via the `KeySelector` that was specified for the `keyBy()` invocation. In case of tuple-index +keys or string-field references this key type is always `Tuple` and you have to manually cast +it to a tuple of the correct size to extract the key fields. A `ProcessWindowFunction` can be defined and used like this: From f1ea8c52c3f13aaf8e05b9e08111b538d1af609c Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 1 Sep 2017 13:45:01 +0200 Subject: [PATCH 4/5] [FLINK-7568] Add note about start/end timestamps in window doc --- docs/dev/stream/operators/windows.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index d1de150b75d60..a0243783be2c2 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -111,6 +111,11 @@ windows) assign elements to windows based on time, which can either be processin time. Please take a look at our section on [event time]({{ site.baseurl }}/dev/event_time.html) to learn about the difference between processing time and event time and how timestamps and watermarks are generated. +Time-based windows have a *start timestamp* (inclusive) and an *end timestamp* (exclusive) +that together describe the size of the window. In code, Flink uses `TimeWindow` when working with +time-based windows, this has methods for querying the start- and end-timestamp and also an +additional method `maxTimestamp()` that returns the largest allowed timestamp for a given windows. + In the following, we show how Flink's pre-defined window assigners work and how they are used in a DataStream program. The following figures visualize the workings of each assigner. The purple circles represent elements of the stream, which are partitioned by some key (in this case *user 1*, *user 2* and *user 3*). From 99742001abb4fe73d14330edc918a47aa08090cb Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 1 Sep 2017 15:29:21 +0200 Subject: [PATCH 5/5] [FLINK-7568] Add section about consecutive windows to window doc --- docs/dev/stream/operators/windows.md | 74 ++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md index a0243783be2c2..c689259696d0b 100644 --- a/docs/dev/stream/operators/windows.md +++ b/docs/dev/stream/operators/windows.md @@ -1079,6 +1079,80 @@ as they may "bridge" the gap between two pre-existing, unmerged windows. Attention You should be aware that the elements emitted by a late firing should be treated as updated results of a previous computation, i.e., your data stream will contain multiple results for the same computation. Depending on your application, you need to take these duplicated results into account or deduplicate them. +## Working with window results + +The result of a windowed operation is again a `DataStream`, no information about the windowed +operations is retained in the result elements so if you want to keep meta-information about the +window you have to manually encode that information in the result elements in your +`ProcessWindowFunction`. The only relevant information that is set on the result elements is the +element *timestamp*. This is set to the maximum allowed timestamp of the processed window, which +is *end timestamp - 1*, since the window-end timestamp is exclusive. Note that this is true for both +event-time windows and processing-time windows. i.e. after a windowed operations elements always +have a timestamp, but this can be an event-time timestamp or a processing-time timestamp. For +processing-time windows this has no special implications but for event-time windows this together +with how watermarks interact with windows enables +[consecutive windowed operations](#consecutive-windowed-operations) with the same window sizes. We +will cover this after taking a look how watermarks interact with windows. + +### Interaction of watermarks and windows + +Before continuing in this section you might want to take a look at our section about +[event time and watermarks]({{ site.baseurl }}/dev/event_time.html). + +When watermarks arrive at the window operator this triggers two things: + - the watermark triggers computation of all windows where the maximum timestamp (which is + *end-timestamp - 1*) is smaller than the new watermark + - the watermark is forwarded (as is) to downstream operations + +Intuitively, a watermark "flushes" out any windows that would be considered late in downstream +operations once they receive that watermark. + +### Consecutive windowed operations + +As mentioned before, the way the timestamp of windowed results is computed and how watermarks +interact with windows allows stringing together consecutive windowed operations. This can be useful +when you want to do two consecutive windowed operations where you want to use different keys but +still want elements from the same upstream window to end up in the same downstream window. Consider +this example: + +
+
+{% highlight java %} +DataStream input = ...; + +DataStream resultsPerKey = input + .keyBy() + .window(TumblingEventTimeWindows.of(Time.seconds(5))) + .reduce(new Summer()); + +DataStream globalResults = resultsPerKey + .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) + .process(new TopKWindowFunction()); + +{% endhighlight %} +
+ +
+{% highlight scala %} +val input: DataStream[Int] = ... + +val resultsPerKey = input + .keyBy() + .window(TumblingEventTimeWindows.of(Time.seconds(5))) + .reduce(new Summer()) + +val globalResults = resultsPerKey + .windowAll(TumblingEventTimeWindows.of(Time.seconds(5))) + .process(new TopKWindowFunction()) +{% endhighlight %} +
+
+ +In this example, the results for time window `[0, 5)` from the first operation will also end up in +time window `[0, 5)` in the subsequent windowed operation. This allows calculating a sum per key +and then calculating the top-k elements within the same window in the second operation. +and then calculating the top-k elements within the same window in the second operation. + ## Useful state size considerations Windows can be defined over long periods of time (such as days, weeks, or months) and therefore accumulate very large state. There are a couple of rules to keep in mind when estimating the storage requirements of your windowing computation: