Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 69 additions & 31 deletions docs/streaming_guide.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public static <IN> TypeInformation<IN> getInputFormatTypes(InputFormat<IN, ?> in
// --------------------------------------------------------------------------------------------

@SuppressWarnings("unchecked")
private static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass,
public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass,
boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.List;

import org.apache.commons.lang3.Validate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
Expand Down Expand Up @@ -207,6 +208,10 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
return environment;
}

protected ExecutionConfig getExecutionConfig() {
return environment.getConfig();
}

/**
* Creates a new {@link DataStream} by merging {@link DataStream} outputs of
* the same type with each other. The DataStreams merged using this operator
Expand Down Expand Up @@ -249,11 +254,12 @@ public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {

/**
* Creates a new {@link ConnectedDataStream} by connecting
* {@link DataStream} outputs of different type with each other. The
* DataStreams connected using this operators can be used with CoFunctions.
* {@link DataStream} outputs of (possible) different typea with each other.
* The DataStreams connected using this operator can be used with
* CoFunctions to apply joint transformations.
*
* @param dataStream
* The DataStream with which this stream will be joined.
* The DataStream with which this stream will be connected.
* @return The {@link ConnectedDataStream}.
*/
public <R> ConnectedDataStream<OUT, R> connect(DataStream<R> dataStream) {
Expand Down Expand Up @@ -319,7 +325,7 @@ public GroupedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {

private GroupedDataStream<OUT> groupBy(Keys<OUT> keys) {
return new GroupedDataStream<OUT>(this, clean(KeySelectorUtil.getSelectorForKeys(keys,
getType(), environment.getConfig())));
getType(), getExecutionConfig())));
}

/**
Expand Down Expand Up @@ -497,9 +503,10 @@ public IterativeDataStream<OUT> iterate(long maxWaitTimeMillis) {
}

/**
* Applies a reduce transformation on the data stream. The user can also
* extend the {@link RichReduceFunction} to gain access to other features
* provided by the
* Applies a reduce transformation on the data stream. The returned stream
* contains all the intermediate values of the reduce transformation. The
* user can also extend the {@link RichReduceFunction} to gain access to
* other features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param reducer
Expand Down Expand Up @@ -629,7 +636,8 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
* @return The transformed DataStream.
*/
public SingleOutputStreamOperator<OUT, ?> sum(String field) {
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType(),
getExecutionConfig()));
}

/**
Expand Down Expand Up @@ -660,7 +668,7 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
*/
public SingleOutputStreamOperator<OUT, ?> min(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
false));
false, getExecutionConfig()));
}

/**
Expand Down Expand Up @@ -691,7 +699,7 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
*/
public SingleOutputStreamOperator<OUT, ?> max(String field) {
return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
false));
false, getExecutionConfig()));
}

/**
Expand All @@ -711,7 +719,7 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
*/
public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MINBY, first));
AggregationType.MINBY, first, getExecutionConfig()));
}

/**
Expand All @@ -731,7 +739,7 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
*/
public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
return aggregate(ComparableAggregator.getAggregator(field, getType(),
AggregationType.MAXBY, first));
AggregationType.MAXBY, first, getExecutionConfig()));
}

/**
Expand Down Expand Up @@ -842,14 +850,15 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)

/**
* Create a {@link WindowedDataStream} that can be used to apply
* transformation like {@link WindowedDataStream#reduce} or aggregations on
* preset chunks(windows) of the data stream. To define the windows one or
* more {@link WindowingHelper} such as {@link Time}, {@link Count} and
* transformation like {@link WindowedDataStream#reduceWindow},
* {@link WindowedDataStream#mapWindow} or aggregations on preset
* chunks(windows) of the data stream. To define windows a
* {@link WindowingHelper} such as {@link Time}, {@link Count} and
* {@link Delta} can be used.</br></br> When applied to a grouped data
* stream, the windows (evictions) and slide sizes (triggers) will be
* computed on a per group basis. </br></br> For more advanced control over
* the trigger and eviction policies please refer to
* {@link #window(triggers, evicters)} </br> </br> For example to create a
* {@link #window(trigger, eviction)} </br> </br> For example to create a
* sum every 5 seconds in a tumbling fashion:</br>
* {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)} </br></br> To
* create sliding windows use the
Expand All @@ -859,34 +868,34 @@ public <IN2> StreamJoinOperator<OUT, IN2> join(DataStream<IN2> dataStreamToJoin)
* {@code ds.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(3,
* TimeUnit.SECONDS)).sum(field)}
*
* @param policyHelpers
* @param policyHelper
* Any {@link WindowingHelper} such as {@link Time},
* {@link Count} and {@link Delta} to define the window.
* {@link Count} and {@link Delta} to define the window size.
* @return A {@link WindowedDataStream} providing further operations.
*/
@SuppressWarnings({ "rawtypes", "unchecked" })
public WindowedDataStream<OUT> window(WindowingHelper... policyHelpers) {
return new WindowedDataStream<OUT>(this, policyHelpers);
public WindowedDataStream<OUT> window(WindowingHelper policyHelper) {
return new WindowedDataStream<OUT>(this, policyHelper);
}

/**
* Create a {@link WindowedDataStream} using the given {@link TriggerPolicy}
* s and {@link EvictionPolicy}s. Windowing can be used to apply
* transformation like {@link WindowedDataStream#reduce} or aggregations on
* preset chunks(windows) of the data stream.</br></br>For most common
* use-cases please refer to {@link #window(WindowingHelper...)}
*
* @param triggers
* The list of {@link TriggerPolicy}s that will determine how
* often the user function is called on the window.
* @param evicters
* The list of {@link EvictionPolicy}s that will determine the
* number of elements in each time window.
* and {@link EvictionPolicy}. Windowing can be used to apply transformation
* like {@link WindowedDataStream#reduceWindow},
* {@link WindowedDataStream#mapWindow} or aggregations on preset
* chunks(windows) of the data stream.</br></br>For most common use-cases
* please refer to {@link #window(WindowingHelper)}
*
* @param trigger
* The {@link TriggerPolicy} that will determine how often the
* user function is called on the window.
* @param eviction
* The {@link EvictionPolicy} that will determine the number of
* elements in each time window.
* @return A {@link WindowedDataStream} providing further operations.
*/
public WindowedDataStream<OUT> window(List<TriggerPolicy<OUT>> triggers,
List<EvictionPolicy<OUT>> evicters) {
return new WindowedDataStream<OUT>(this, triggers, evicters);
public WindowedDataStream<OUT> window(TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction) {
return new WindowedDataStream<OUT>(this, trigger, eviction);
}

/**
Expand Down
Loading