Skip to content

Commit

Permalink
[hotfix] Simplify new windowing API
Browse files Browse the repository at this point in the history
Before, there would be three different window() methods on
KeyedDataStream: one that takes two policies, one that takes one policy
and one that takes a window assigner.

Now, there is only one window() method that takes a window assigner and
creates a KeyedWindowDataStream.

For conveniece, there are two methods timeWindows() that take either one
argument (tumbling windows) or two arguments (sliding windows). These
create a KeyedWindowDataStream with either a SlidingWindows or
TumblingWindows assigner.

When the window operator is created we pick the optimized aligned time
windows operator if the combination of window assigner/trigger/evictor
allows it.

All of this behaviour is verified in tests.

This closes #1195
  • Loading branch information
aljoscha authored and StephanEwen committed Oct 1, 2015
1 parent 937793e commit 5623c15
Show file tree
Hide file tree
Showing 21 changed files with 446 additions and 1,005 deletions.
Expand Up @@ -23,8 +23,14 @@
import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.transformations.OneInputTransformation; import org.apache.flink.streaming.api.transformations.OneInputTransformation;
import org.apache.flink.streaming.api.transformations.PartitionTransformation; import org.apache.flink.streaming.api.transformations.PartitionTransformation;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.SlidingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.windowpolicy.WindowPolicy; import org.apache.flink.streaming.api.windowing.time.AbstractTime;
import org.apache.flink.streaming.api.windowing.time.EventTime;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.partitioner.HashPartitioner; import org.apache.flink.streaming.runtime.partitioner.HashPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
Expand Down Expand Up @@ -92,34 +98,50 @@ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// ------------------------------------------------------------------------ // ------------------------------------------------------------------------


/** /**
* Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key * Windows this {@code KeyedDataStream} into tumbling time windows.
* grouped stream. The window is defined by a single policy. *
* <p> * <p>
* For time windows, these single-policy windows result in tumbling time windows. * This is a shortcut for either {@code .window(TumblingTimeWindows.of(size))} or
* * {@code .window(TumblingProcessingTimeWindows.of(size))} depending on the time characteristic
* @param policy The policy that defines the window. * set using
* @return The windows data stream. * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
*
* @param size The size of the window.
*/ */
public KeyedWindowDataStream<T, KEY> window(WindowPolicy policy) { public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size) {
return new KeyedWindowDataStream<T, KEY>(this, policy); AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());

if (actualSize instanceof EventTime) {
return window(TumblingTimeWindows.of(actualSize.toMilliseconds()));
} else {
return window(TumblingProcessingTimeWindows.of(actualSize.toMilliseconds()));
}
} }


/** /**
* Windows this data stream to a KeyedWindowDataStream, which evaluates windows over a key * Windows this {@code KeyedDataStream} into sliding time windows.
* grouped stream. The window is defined by a window policy, plus a slide policy. *
* <p> * <p>
* For time windows, these slide policy windows result in sliding time windows. * This is a shortcut for either {@code .window(SlidingTimeWindows.of(size, slide))} or
* * {@code .window(SlidingProcessingTimeWindows.of(size, slide))} depending on the time characteristic
* @param window The policy that defines the window. * set using
* @param slide The additional policy defining the slide of the window. * {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#setStreamTimeCharacteristic(org.apache.flink.streaming.api.TimeCharacteristic)}
* @return The windows data stream. *
* @param size The size of the window.
*/ */
public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) { public KeyedWindowDataStream<T, KEY, TimeWindow> timeWindow(AbstractTime size, AbstractTime slide) {
return new KeyedWindowDataStream<T, KEY>(this, window, slide); AbstractTime actualSize = size.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());
AbstractTime actualSlide = slide.makeSpecificBasedOnTimeCharacteristic(environment.getStreamTimeCharacteristic());

if (actualSize instanceof EventTime) {
return window(SlidingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
} else {
return window(SlidingProcessingTimeWindows.of(actualSize.toMilliseconds(), actualSlide.toMilliseconds()));
}
} }


/** /**
* Windows this data stream to a {@code KeyedTriggerWindowDataStream}, which evaluates windows * Windows this data stream to a {@code KeyedWindowDataStream}, which evaluates windows
* over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The * over a key grouped stream. Elements are put into windows by a {@link WindowAssigner}. The
* grouping of elements is done both by key and by window. * grouping of elements is done both by key and by window.
* *
Expand All @@ -131,7 +153,7 @@ public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy sl
* @param assigner The {@code WindowAssigner} that assigns elements to windows. * @param assigner The {@code WindowAssigner} that assigns elements to windows.
* @return The trigger windows data stream. * @return The trigger windows data stream.
*/ */
public <W extends Window> KeyedTriggerWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) { public <W extends Window> KeyedWindowDataStream<T, KEY, W> window(WindowAssigner<? super T, W> assigner) {
return new KeyedTriggerWindowDataStream<T, KEY, W>(this, assigner); return new KeyedWindowDataStream<>(this, assigner);
} }
} }

This file was deleted.

0 comments on commit 5623c15

Please sign in to comment.