[FLINK-3174] Add merging WindowAssigner #1460

Closed
wants to merge 1 commit into
from

Projects

None yet

2 participants

@aljoscha
Contributor

After triggering and before emitting the window contents the window
assigner is given the change to merge existing windows. The trigger is
then given a chance to react in the new Trigger.onMerge() call.

This adds new method WindowAssigner.isMerging() that allows window
assigners to specify whether they can merge windows. All existing
assigners are now derived from NonMergingWindowAssigner that returns
false for isMerging(). Only of a WindowAssigners announces that it can
merge is the more costly merging logic used in the WindowOperator.

For triggers there is new method Trigger.onMerge() that notifies the
trigger of the new merged window as well as the old windows and old
trigger contexts. This allows the trigger to set a timer for the newly
merged window.

This enables proper support for session windows.

This also adds the SessionWindows window assigner and adapts an existing
session example and adds test cases.

@aljoscha aljoscha [FLINK-3174] Add merging WindowAssigner
After triggering and before emitting the window contents the window
assigner is given the change to merge existing windows. The trigger is
then given a chance to react in the new Trigger.onMerge() call.

This adds new method WindowAssigner.isMerging() that allows window
assigners to specify whether they can merge windows. All existing
assigners are now derived from NonMergingWindowAssigner that returns
false for isMerging(). Only of a WindowAssigners announces that it can
merge is the more costly merging logic used in the WindowOperator.

For triggers there is new method Trigger.onMerge() that notifies the
trigger of the new merged window as well as the old windows and old
trigger contexts. This allows the trigger to set a timer for the newly
merged window.

This enables proper support for session windows.

This also adds the SessionWindows window assigner and adapts an existing
session example and adds test cases.
04f7e95
@rmetzger rmetzger commented on the diff Jan 7, 2016
...g/runtime/operators/windowing/WindowOperatorTest.java
@@ -459,12 +638,29 @@ public int compare(Object o1, Object o2) {
}
}
- private static class TupleKeySelector implements KeySelector<Tuple2<String, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
+ @SuppressWarnings("unchecked")
+ private static class Tuple3ResultSortComparator implements Comparator<Object> {
@rmetzger
rmetzger Jan 7, 2016 Contributor

duplicate code with NonKeyedWindowOperatorTest ?

@rmetzger rmetzger commented on the diff Jan 7, 2016
...g/runtime/operators/windowing/WindowOperatorTest.java
@@ -438,7 +617,7 @@ public void close() throws Exception {
}
@SuppressWarnings("unchecked")
- private static class ResultSortComparator implements Comparator<Object> {
+ private static class Tuple2ResultSortComparator implements Comparator<Object> {
@rmetzger
rmetzger Jan 7, 2016 Contributor

this is also a duplicate with NonKeyedWindowOperatorTest according to IntelliJ

@rmetzger
Contributor
rmetzger commented Jan 7, 2016

Thank you for working on this! Session windows are something many people asked for!

@aljoscha
Contributor

This will have to be reworked now that windows use the keyed state abstraction.

@aljoscha aljoscha closed this Feb 22, 2016
@aljoscha aljoscha deleted the aljoscha:window-sessions branch Mar 16, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment