Skip to content

Commit

Permalink
[streaming] WindowMapFunction added + Streaming package structure cle…
Browse files Browse the repository at this point in the history
…anup
  • Loading branch information
gyfora authored and mbalassi committed Feb 16, 2015
1 parent 3885823 commit bb5dc7e
Show file tree
Hide file tree
Showing 41 changed files with 332 additions and 249 deletions.
Expand Up @@ -212,7 +212,7 @@ public static <IN> TypeInformation<IN> getInputFormatTypes(InputFormat<IN, ?> in
// --------------------------------------------------------------------------------------------

@SuppressWarnings("unchecked")
private static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass,
public static <IN, OUT> TypeInformation<OUT> getUnaryOperatorReturnType(Function function, Class<?> baseClass,
boolean hasIterable, boolean hasCollector, TypeInformation<IN> inType,
String functionName, boolean allowMissing)
{
Expand Down
Expand Up @@ -208,7 +208,7 @@ public StreamExecutionEnvironment getExecutionEnvironment() {
return environment;
}

public ExecutionConfig getExecutionConfig() {
protected ExecutionConfig getExecutionConfig() {
return environment.getConfig();
}

Expand Down Expand Up @@ -254,11 +254,12 @@ public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {

/**
* Creates a new {@link ConnectedDataStream} by connecting
* {@link DataStream} outputs of different type with each other. The
* DataStreams connected using this operators can be used with CoFunctions.
* {@link DataStream} outputs of (possible) different typea with each other.
* The DataStreams connected using this operator can be used with
* CoFunctions to apply joint transformations.
*
* @param dataStream
* The DataStream with which this stream will be joined.
* The DataStream with which this stream will be connected.
* @return The {@link ConnectedDataStream}.
*/
public <R> ConnectedDataStream<OUT, R> connect(DataStream<R> dataStream) {
Expand Down Expand Up @@ -502,9 +503,10 @@ public IterativeDataStream<OUT> iterate(long maxWaitTimeMillis) {
}

/**
* Applies a reduce transformation on the data stream. The user can also
* extend the {@link RichReduceFunction} to gain access to other features
* provided by the
* Applies a reduce transformation on the data stream. The returned stream
* contains all the intermediate values of the reduce transformation. The
* user can also extend the {@link RichReduceFunction} to gain access to
* other features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param reducer
Expand Down
Expand Up @@ -18,31 +18,30 @@
package org.apache.flink.streaming.api.datastream;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.Function;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RichGroupReduceFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.function.WindowMapFunction;
import org.apache.flink.streaming.api.invokable.StreamInvokable;
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow;
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindowTypeInfo;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowFlattener;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMapper;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowMerger;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowPartitioner;
import org.apache.flink.streaming.api.invokable.operator.windowing.WindowReducer;
import org.apache.flink.streaming.api.windowing.StreamWindow;
import org.apache.flink.streaming.api.windowing.StreamWindowTypeInfo;

/**
* A {@link DiscretizedStream} represents a data stream that has been divided
* into windows (predefined chunks). User defined function such as
* {@link #reduceWindow(ReduceFunction)},
* {@link #mapWindow(GroupReduceFunction)} or aggregations can be applied to the
* windows.
* {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations
* can be applied to the windows.
*
* @param <OUT>
* The output type of the {@link DiscretizedStream}
Expand All @@ -60,6 +59,14 @@ protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> dis
this.transformation = tranformation;
}

public DataStream<OUT> flatten() {
return discretizedStream.transform("Window Flatten", getType(), new WindowFlattener<OUT>());
}

public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
return discretizedStream;
}

/**
* Applies a reduce transformation on the windowed data stream by reducing
* the current window at every trigger.The user can also extend the
Expand All @@ -79,7 +86,7 @@ public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {

if (!isGrouped()) {
return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(),
new WindowReducer<OUT>(reduceFunction));
new WindowReducer<OUT>(discretizedStream.clean(reduceFunction)));
} else {
return out;
}
Expand All @@ -90,29 +97,27 @@ public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
* reducing the current window at every trigger. In contrast with the
* standard binary reducer, with reduceGroup the user can access all
* elements of the window at the same time through the iterable interface.
* The user can also extend the {@link RichGroupReduceFunction} to gain
* access to other features provided by the
* {@link org.apache.flink.api.common.functions.RichFunction} interface.
* The user can also extend the to gain access to other features provided by
* the {@link org.apache.flink.api.common.functions.RichFunction} interface.
*
* @param reduceFunction
* @param windowMapFunction
* The reduce function that will be applied to the windows.
* @return The transformed DataStream
*/
@Override
public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction) {
public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {

TypeInformation<R> retType = TypeExtractor.getGroupReduceReturnTypes(reduceFunction,
getType());
TypeInformation<R> retType = getWindowMapReturnTypes(windowMapFunction, getType());

return mapWindow(reduceFunction, retType);
return mapWindow(windowMapFunction, retType);
}

@Override
public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction,
public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction,
TypeInformation<R> returnType) {
DiscretizedStream<R> out = partition(transformation).transform(
WindowTransformation.REDUCEWINDOW, "Window Reduce", returnType,
new WindowMapper<OUT, R>(reduceFunction));
new WindowMapper<OUT, R>(discretizedStream.clean(windowMapFunction)));

if (isGrouped()) {
return out.merge();
Expand Down Expand Up @@ -159,10 +164,6 @@ private DiscretizedStream<OUT> merge() {
type, new WindowMerger<OUT>()));
}

public DataStream<OUT> flatten() {
return discretizedStream.transform("Window Flatten", getType(), new WindowFlattener<OUT>());
}

@SuppressWarnings("rawtypes")
private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream) {
return wrap(stream, transformation);
Expand All @@ -174,10 +175,6 @@ private <R> DiscretizedStream<R> wrap(SingleOutputStreamOperator stream,
return new DiscretizedStream<R>(stream, (KeySelector<R, ?>) this.groupByKey, transformation);
}

public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
return discretizedStream;
}

@SuppressWarnings("rawtypes")
protected Class<?> getClassAtPos(int pos) {
Class<?> type;
Expand Down Expand Up @@ -231,6 +228,12 @@ public TypeInformation<OUT> getType() {
return ((StreamWindowTypeInfo<OUT>) discretizedStream.getType()).getInnerType();
}

private static <IN, OUT> TypeInformation<OUT> getWindowMapReturnTypes(
WindowMapFunction<IN, OUT> windowMapInterface, TypeInformation<IN> inType) {
return TypeExtractor.getUnaryOperatorReturnType((Function) windowMapInterface,
WindowMapFunction.class, true, true, inType, null, false);
}

protected DiscretizedStream<OUT> copy() {
return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey, transformation);
}
Expand Down

0 comments on commit bb5dc7e

Please sign in to comment.