Skip to content

Commit

Permalink
[streaming] Documentations updated to match the reworked windowing se…
Browse files Browse the repository at this point in the history
…mantics
  • Loading branch information
gyfora authored and mbalassi committed Feb 16, 2015
1 parent 4470207 commit f571ece
Show file tree
Hide file tree
Showing 22 changed files with 322 additions and 133 deletions.
100 changes: 69 additions & 31 deletions docs/streaming_guide.md

Large diffs are not rendered by default.

Expand Up @@ -850,14 +850,15 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)

/**
* Create a {@link WindowedDataStream} that can be used to apply
* transformation like {@link WindowedDataStream#reduce} or aggregations on
* preset chunks(windows) of the data stream. To define the windows one or
* more {@link WindowingHelper} such as {@link Time}, {@link Count} and
* transformation like {@link WindowedDataStream#reduceWindow},
* {@link WindowedDataStream#mapWindow} or aggregations on preset
* chunks(windows) of the data stream. To define windows a
* {@link WindowingHelper} such as {@link Time}, {@link Count} and
* {@link Delta} can be used.</br></br> When applied to a grouped data
* stream, the windows (evictions) and slide sizes (triggers) will be
* computed on a per group basis. </br></br> For more advanced control over
* the trigger and eviction policies please refer to
* {@link #window(triggers, evicters)} </br> </br> For example to create a
* {@link #window(trigger, eviction)} </br> </br> For example to create a
* sum every 5 seconds in a tumbling fashion:</br>
* {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)} </br></br> To
* create sliding windows use the
Expand All @@ -867,33 +868,34 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
* {@code ds.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(3,
* TimeUnit.SECONDS)).sum(field)}
*
* @param policyHelpers
* @param policyHelper
* Any {@link WindowingHelper} such as {@link Time},
* {@link Count} and {@link Delta} to define the window.
* {@link Count} and {@link Delta} to define the window size.
* @return A {@link WindowedDataStream} providing further operations.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public WindowedDataStream<OUT> window(WindowingHelper policyHelpers) {
return new WindowedDataStream<OUT>(this, policyHelpers);
public WindowedDataStream<OUT> window(WindowingHelper policyHelper) {
return new WindowedDataStream<OUT>(this, policyHelper);
}

/**
* Create a {@link WindowedDataStream} using the given {@link TriggerPolicy}
* s and {@link EvictionPolicy}s. Windowing can be used to apply
* transformation like {@link WindowedDataStream#reduce} or aggregations on
* preset chunks(windows) of the data stream.</br></br>For most common
* use-cases please refer to {@link #window(WindowingHelper...)}
*
* @param triggers
* The list of {@link TriggerPolicy}s that will determine how
* often the user function is called on the window.
* @param evicters
* The list of {@link EvictionPolicy}s that will determine the
* number of elements in each time window.
* and {@link EvictionPolicy}. Windowing can be used to apply transformation
* like {@link WindowedDataStream#reduceWindow},
* {@link WindowedDataStream#mapWindow} or aggregations on preset
* chunks(windows) of the data stream.</br></br>For most common use-cases
* please refer to {@link #window(WindowingHelper)}
*
* @param trigger
* The {@link TriggerPolicy} that will determine how often the
* user function is called on the window.
* @param eviction
* The {@link EvictionPolicy} that will determine the number of
* elements in each time window.
* @return A {@link WindowedDataStream} providing further operations.
*/
public WindowedDataStream<OUT> window(TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> evicter) {
return new WindowedDataStream<OUT>(this, trigger, evicter);
public WindowedDataStream<OUT> window(TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction) {
return new WindowedDataStream<OUT>(this, trigger, eviction);
}

/**
Expand Down
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
Expand Down Expand Up @@ -67,16 +66,6 @@ public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
return discretizedStream;
}

/**
* Applies a reduce transformation on the windowed data stream by reducing
* the current window at every trigger.The user can also extend the
* {@link RichReduceFunction} to gain access to other features provided by
* the {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param reduceFunction
* The reduce function that will be applied to the windows.
* @return The transformed DataStream
*/
@Override
public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {

Expand All @@ -92,18 +81,6 @@ public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
}
}

/**
* Applies a reduceGroup transformation on the windowed data stream by
* reducing the current window at every trigger. In contrast with the
* standard binary reducer, with reduceGroup the user can access all
* elements of the window at the same time through the iterable interface.
* The user can also extend the to gain access to other features provided by
* the {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param windowMapFunction
* The reduce function that will be applied to the windows.
* @return The transformed DataStream
*/
@Override
public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {

Expand Down Expand Up @@ -238,4 +215,10 @@ protected DiscretizedStream<OUT> copy() {
return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey, transformation);
}

@Override
public WindowedDataStream<OUT> local() {
throw new UnsupportedOperationException(
"Local discretisation can only be applied after defining the discretisation logic");
}

}
Expand Up @@ -26,6 +26,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.streaming.api.function.RichWindowMapFunction;
import org.apache.flink.streaming.api.function.WindowMapFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
Expand Down Expand Up @@ -53,20 +54,19 @@
import org.apache.flink.streaming.util.keys.KeySelectorUtil;

/**
* A {@link WindowedDataStream} represents a data stream that has been divided
* into windows (predefined chunks). User defined function such as
* A {@link WindowedDataStream} represents a data stream that has been
* discretised into windows. User defined function such as
* {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations
* can be applied to the windows.
* can be applied to the windows. The results of these transformations are also
* WindowedDataStreams of the same discretisation unit.
*
* @param <OUT>
* The output type of the {@link WindowedDataStream}
*/
public class WindowedDataStream<OUT> {

protected enum WindowTransformation {

REDUCEWINDOW, MAPWINDOW, NONE;

private Function UDF;

public WindowTransformation with(Function UDF) {
Expand Down Expand Up @@ -132,8 +132,8 @@ public WindowedDataStream() {
* </br></br> The user function in this case will be called on the 5 most
* recent elements every 2 seconds
*
* @param policyHelpers
* The policies that define the triggering frequency
* @param policyHelper
* The policy that define the triggering frequency
*
* @return The windowed data stream with triggering set
*/
Expand All @@ -151,7 +151,7 @@ public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
/**
* Groups the elements of the {@link WindowedDataStream} by the given key
* positions. The window sizes (evictions) and slide sizes (triggers) will
* be calculated on the whole stream (in a central fashion), but the user
* be calculated on the whole stream (in a global fashion), but the user
* defined functions will be applied on a per group basis. </br></br> To get
* windows and triggers on a per group basis apply the
* {@link DataStream#window} operator on an already grouped data stream.
Expand All @@ -171,7 +171,7 @@ public WindowedDataStream<OUT> groupBy(int... fields) {
/**
* Groups the elements of the {@link WindowedDataStream} by the given field
* expressions. The window sizes (evictions) and slide sizes (triggers) will
* be calculated on the whole stream (in a central fashion), but the user
* be calculated on the whole stream (in a global fashion), but the user
* defined functions will be applied on a per group basis. </br></br> To get
* windows and triggers on a per group basis apply the
* {@link DataStream#window} operator on an already grouped data stream.
Expand All @@ -191,7 +191,7 @@ public WindowedDataStream<OUT> groupBy(String... fields) {
/**
* Groups the elements of the {@link WindowedDataStream} using the given
* {@link KeySelector}. The window sizes (evictions) and slide sizes
* (triggers) will be calculated on the whole stream (in a central fashion),
* (triggers) will be calculated on the whole stream (in a global fashion),
* but the user defined functions will be applied on a per group basis.
* </br></br> To get windows and triggers on a per group basis apply the
* {@link DataStream#window} operator on an already grouped data stream.
Expand All @@ -212,23 +212,35 @@ private WindowedDataStream<OUT> groupBy(Keys<OUT> keys) {
}

/**
* Sets the windowed computations local, so that the windowing and reduce or
* aggregation logic will be computed for each parallel instance of this
* operator
* Sets the window discretisation local, meaning that windows will be
* created in parallel at environment parallelism.
*
* @return The local windowed data stream
* @return The WindowedDataStream with local discretisation
*/
public WindowedDataStream<OUT> local() {
WindowedDataStream<OUT> out = copy();
out.isLocal = true;
return out;
}

/**
* Returns the {@link DataStream} of {@link StreamWindow}s which represent
* the discretised stream. There is no ordering guarantee for the received
* windows.
*
* @return The discretised stream
*/
public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>())
.getDiscretizedStream();
}

/**
* Flattens the results of the window computations and streams out the
* window elements.
*
* @return The data stream consisting of the individual records.
*/
public DataStream<OUT> flatten() {
return dataStream;
}
Expand Down Expand Up @@ -259,15 +271,16 @@ public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
}

/**
* Applies a reduceGroup transformation on the windowed data stream by
* reducing the current window at every trigger. In contrast with the
* standard binary reducer, with reduceGroup the user can access all
* Applies a mapWindow transformation on the windowed data stream by calling
* the mapWindow function on the window at every trigger. In contrast with
* the standard binary reducer, with mapWindow allows the user to access all
* elements of the window at the same time through the iterable interface.
* The user can also extend the to gain access to other features provided by
* the {@link org.apache.flink.api.common.functions.RichFunction} interface.
* The user can also extend the {@link RichWindowMapFunction} to gain access
* to other features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param windowMapFunction
* The reduce function that will be applied to the windows.
* The function that will be applied to the windows.
* @return The transformed DataStream
*/
public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
Expand All @@ -276,18 +289,21 @@ public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFu
}

/**
* Applies a reduceGroup transformation on the windowed data stream by
* reducing the current window at every trigger. In contrast with the
* standard binary reducer, with reduceGroup the user can access all
* Applies a mapWindow transformation on the windowed data stream by calling
* the mapWindow function on the window at every trigger. In contrast with
* the standard binary reducer, with mapWindow allows the user to access all
* elements of the window at the same time through the iterable interface.
* The user can also extend the to gain access to other features provided by
* the {@link org.apache.flink.api.common.functions.RichFunction} interface.
* </br> </br> This version of reduceGroup uses user supplied
* typeinformation for serializaton. Use this only when the system is unable
* to detect type information using: {@link #mapWindow()}
* The user can also extend the {@link RichWindowMapFunction} to gain access
* to other features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
* </br> </br> This version of mapWindow uses user supplied typeinformation
* for serializaton. Use this only when the system is unable to detect type
* information.
*
* @param windowMapFunction
* The reduce function that will be applied to the windows.
* The function that will be applied to the windows.
* @param outType
* The output type of the operator.
* @return The transformed DataStream
*/
public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction,
Expand Down
Expand Up @@ -18,8 +18,17 @@
package org.apache.flink.streaming.api.function;

import org.apache.flink.api.common.functions.AbstractRichFunction;
import org.apache.flink.api.common.functions.RichFunction;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;

/**
* Abstract class for defining rich mapWindow transformation to be applied on
* {@link WindowedDataStream}s. The mapWindow function will be called on each
* {@link StreamWindow}.</p> In addition the user can access the functionality
* provided by the {@link RichFunction} interface.
*/
public abstract class RichWindowMapFunction<IN, OUT> extends AbstractRichFunction implements
WindowMapFunction<IN, OUT> {

Expand Down
Expand Up @@ -20,8 +20,15 @@
import java.io.Serializable;

import org.apache.flink.api.common.functions.Function;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.util.Collector;

/**
* Interface for defining mapWindow transformation to be applied on
* {@link WindowedDataStream}s. The mapWindow function will be called on each
* {@link StreamWindow}.
*/
public interface WindowMapFunction<T, O> extends Function, Serializable {

void mapWindow(Iterable<T> values, Collector<O> out) throws Exception;
Expand Down
Expand Up @@ -28,11 +28,15 @@
import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;

/**
* This invokable represents the grouped discretization step of a window
* transformation. The user supplied eviction and trigger policies are applied
* on a per group basis to create the {@link StreamWindow} that will be further
* transformed in the next stages. </p> To allow pre-aggregations supply an
* appropriate {@link WindowBuffer}
*/
public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> {

/**
* Auto-generated serial version UID
*/
private static final long serialVersionUID = -3469545957144404137L;

protected KeySelector<IN, ?> keySelector;
Expand Down
Expand Up @@ -22,6 +22,10 @@
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;

/**
* A specialized {@link GroupedStreamDiscretizer} to be used with time only
* policies
*/
public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {

private static final long serialVersionUID = -3469545957144404137L;
Expand Down
Expand Up @@ -26,6 +26,12 @@
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;

/**
* This invokable represents the discretization step of a window transformation.
* The user supplied eviction and trigger policies are applied to create the
* {@link StreamWindow} that will be further transformed in the next stages.
* </p> To allow pre-aggregations supply an appropriate {@link WindowBuffer}
*/
public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> {

/**
Expand Down
Expand Up @@ -20,6 +20,10 @@
import org.apache.flink.streaming.api.invokable.ChainableInvokable;
import org.apache.flink.streaming.api.windowing.StreamWindow;

/**
* This invokable flattens the results of the window transformations by
* outputing the elements of the {@link StreamWindow} one-by-one
*/
public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {

public WindowFlattener() {
Expand Down
Expand Up @@ -18,10 +18,15 @@
package org.apache.flink.streaming.api.invokable.operator.windowing;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.function.WindowMapFunction;
import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
import org.apache.flink.streaming.api.windowing.StreamWindow;

/**
* This invokable is used to apply mapWindow transformations on
* {@link WindowedDataStream}s.
*/
public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {

private static final long serialVersionUID = 1L;
Expand Down

0 comments on commit f571ece

Please sign in to comment.