Skip to content

Commit

Permalink
Harmonize generic parameter names in Stream API classes
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Sep 28, 2015
1 parent 05d2138 commit 86c45bf
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 174 deletions.
Expand Up @@ -35,11 +35,11 @@
* partitioned by the given {@link KeySelector}. Operators like {@link #reduce}, * partitioned by the given {@link KeySelector}. Operators like {@link #reduce},
* {@link #fold} etc. can be applied on the {@link GroupedDataStream} to * {@link #fold} etc. can be applied on the {@link GroupedDataStream} to
* get additional functionality by the grouping. * get additional functionality by the grouping.
* *
* @param <OUT> * @param <T> The type of the elements in the Grouped Stream.
* The output type of the {@link GroupedDataStream}. * @param <KEY> The type of the key in the Keyed Stream.
*/ */
public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> { public class GroupedDataStream<T, KEY> extends KeyedDataStream<T, KEY> {


/** /**
* Creates a new {@link GroupedDataStream}, group inclusion is determined using * Creates a new {@link GroupedDataStream}, group inclusion is determined using
Expand All @@ -48,7 +48,7 @@ public class GroupedDataStream<OUT, KEY> extends KeyedDataStream<OUT, KEY> {
* @param dataStream Base stream of data * @param dataStream Base stream of data
* @param keySelector Function for determining group inclusion * @param keySelector Function for determining group inclusion
*/ */
public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySelector) { public GroupedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
super(dataStream, keySelector); super(dataStream, keySelector);
} }


Expand All @@ -64,8 +64,8 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* element of the input values with the same key. * element of the input values with the same key.
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reducer) { public SingleOutputStreamOperator<T, ?> reduce(ReduceFunction<T> reducer) {
return transform("Grouped Reduce", getType(), new StreamGroupedReduce<OUT>( return transform("Grouped Reduce", getType(), new StreamGroupedReduce<T>(
clean(reducer), keySelector)); clean(reducer), keySelector));
} }


Expand All @@ -82,12 +82,12 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* The initialValue passed to the folders for each key. * The initialValue passed to the folders for each key.
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<OUT, R> folder) { public <R> SingleOutputStreamOperator<R, ?> fold(R initialValue, FoldFunction<T, R> folder) {


TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(), TypeInformation<R> outType = TypeExtractor.getFoldReturnTypes(clean(folder), getType(),
Utils.getCallLocationName(), true); Utils.getCallLocationName(), true);


return transform("Grouped Fold", outType, new StreamGroupedFold<OUT, R>(clean(folder), return transform("Grouped Fold", outType, new StreamGroupedFold<T, R>(clean(folder),
keySelector, initialValue)); keySelector, initialValue));
} }


Expand All @@ -100,8 +100,8 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* The position in the data point to sum * The position in the data point to sum
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) { public SingleOutputStreamOperator<T, ?> sum(int positionToSum) {
return aggregate(new SumAggregator<OUT>(positionToSum, getType(), getExecutionConfig())); return aggregate(new SumAggregator<T>(positionToSum, getType(), getExecutionConfig()));
} }


/** /**
Expand All @@ -117,8 +117,8 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* applied. * applied.
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> sum(String field) { public SingleOutputStreamOperator<T, ?> sum(String field) {
return aggregate(new SumAggregator<OUT>(field, getType(), getExecutionConfig())); return aggregate(new SumAggregator<T>(field, getType(), getExecutionConfig()));
} }


/** /**
Expand All @@ -130,8 +130,8 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* The position in the data point to minimize * The position in the data point to minimize
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) { public SingleOutputStreamOperator<T, ?> min(int positionToMin) {
return aggregate(new ComparableAggregator<OUT>(positionToMin, getType(), AggregationType.MIN, return aggregate(new ComparableAggregator<T>(positionToMin, getType(), AggregationType.MIN,
getExecutionConfig())); getExecutionConfig()));
} }


Expand All @@ -148,8 +148,8 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* applied. * applied.
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> min(String field) { public SingleOutputStreamOperator<T, ?> min(String field) {
return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MIN, return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MIN,
false, getExecutionConfig())); false, getExecutionConfig()));
} }


Expand All @@ -162,8 +162,8 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* The position in the data point to maximize * The position in the data point to maximize
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) { public SingleOutputStreamOperator<T, ?> max(int positionToMax) {
return aggregate(new ComparableAggregator<OUT>(positionToMax, getType(), AggregationType.MAX, return aggregate(new ComparableAggregator<T>(positionToMax, getType(), AggregationType.MAX,
getExecutionConfig())); getExecutionConfig()));
} }


Expand All @@ -180,8 +180,8 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* applied. * applied.
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> max(String field) { public SingleOutputStreamOperator<T, ?> max(String field) {
return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAX, return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAX,
false, getExecutionConfig())); false, getExecutionConfig()));
} }


Expand All @@ -202,7 +202,7 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
@SuppressWarnings({ "rawtypes", "unchecked" }) @SuppressWarnings({ "rawtypes", "unchecked" })
public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) { public SingleOutputStreamOperator<T, ?> minBy(String field, boolean first) {
return aggregate(new ComparableAggregator(field, getType(), AggregationType.MINBY, return aggregate(new ComparableAggregator(field, getType(), AggregationType.MINBY,
first, getExecutionConfig())); first, getExecutionConfig()));
} }
Expand All @@ -223,8 +223,8 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* be returned * be returned
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) { public SingleOutputStreamOperator<T, ?> maxBy(String field, boolean first) {
return aggregate(new ComparableAggregator<OUT>(field, getType(), AggregationType.MAXBY, return aggregate(new ComparableAggregator<T>(field, getType(), AggregationType.MAXBY,
first, getExecutionConfig())); first, getExecutionConfig()));
} }


Expand All @@ -238,7 +238,7 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* The position in the data point to minimize * The position in the data point to minimize
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) { public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy) {
return this.minBy(positionToMinBy, true); return this.minBy(positionToMinBy, true);
} }


Expand All @@ -252,7 +252,7 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* The position in the data point to minimize * The position in the data point to minimize
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) { public SingleOutputStreamOperator<T, ?> minBy(String positionToMinBy) {
return this.minBy(positionToMinBy, true); return this.minBy(positionToMinBy, true);
} }


Expand All @@ -270,8 +270,8 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* minimal value, otherwise returns the last * minimal value, otherwise returns the last
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) { public SingleOutputStreamOperator<T, ?> minBy(int positionToMinBy, boolean first) {
return aggregate(new ComparableAggregator<OUT>(positionToMinBy, getType(), AggregationType.MINBY, first, return aggregate(new ComparableAggregator<T>(positionToMinBy, getType(), AggregationType.MINBY, first,
getExecutionConfig())); getExecutionConfig()));
} }


Expand All @@ -285,7 +285,7 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* The position in the data point to maximize * The position in the data point to maximize
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) { public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy) {
return this.maxBy(positionToMaxBy, true); return this.maxBy(positionToMaxBy, true);
} }


Expand All @@ -299,7 +299,7 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* The position in the data point to maximize * The position in the data point to maximize
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) { public SingleOutputStreamOperator<T, ?> maxBy(String positionToMaxBy) {
return this.maxBy(positionToMaxBy, true); return this.maxBy(positionToMaxBy, true);
} }


Expand All @@ -317,13 +317,13 @@ public GroupedDataStream(DataStream<OUT> dataStream, KeySelector<OUT, KEY> keySe
* maximum value, otherwise returns the last * maximum value, otherwise returns the last
* @return The transformed DataStream. * @return The transformed DataStream.
*/ */
public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) { public SingleOutputStreamOperator<T, ?> maxBy(int positionToMaxBy, boolean first) {
return aggregate(new ComparableAggregator<OUT>(positionToMaxBy, getType(), AggregationType.MAXBY, first, return aggregate(new ComparableAggregator<T>(positionToMaxBy, getType(), AggregationType.MAXBY, first,
getExecutionConfig())); getExecutionConfig()));
} }


protected SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregate) { protected SingleOutputStreamOperator<T, ?> aggregate(AggregationFunction<T> aggregate) {
StreamGroupedReduce<OUT> operator = new StreamGroupedReduce<OUT>(clean(aggregate), keySelector); StreamGroupedReduce<T> operator = new StreamGroupedReduce<T>(clean(aggregate), keySelector);
return transform("Grouped Aggregation", getType(), operator); return transform("Grouped Aggregation", getType(), operator);
} }
} }
Expand Up @@ -34,11 +34,11 @@
* *
* *
* @param <T> The type of the elements in the Keyed Stream. * @param <T> The type of the elements in the Keyed Stream.
* @param <K> The type of the key in the Keyed Stream. * @param <KEY> The type of the key in the Keyed Stream.
*/ */
public class KeyedDataStream<T, K> extends DataStream<T> { public class KeyedDataStream<T, KEY> extends DataStream<T> {


protected final KeySelector<T, K> keySelector; protected final KeySelector<T, KEY> keySelector;


/** /**
* Creates a new {@link KeyedDataStream} using the given {@link KeySelector} * Creates a new {@link KeyedDataStream} using the given {@link KeySelector}
Expand All @@ -49,13 +49,13 @@ public class KeyedDataStream<T, K> extends DataStream<T> {
* @param keySelector * @param keySelector
* Function for determining state partitions * Function for determining state partitions
*/ */
public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, K> keySelector) { public KeyedDataStream(DataStream<T> dataStream, KeySelector<T, KEY> keySelector) {
super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector))); super(dataStream.getExecutionEnvironment(), new PartitionTransformation<T>(dataStream.getTransformation(), new HashPartitioner<T>(keySelector)));
this.keySelector = keySelector; this.keySelector = keySelector;
} }




public KeySelector<T, K> getKeySelector() { public KeySelector<T, KEY> getKeySelector() {
return this.keySelector; return this.keySelector;
} }


Expand Down Expand Up @@ -98,8 +98,8 @@ public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
* @param policy The policy that defines the window. * @param policy The policy that defines the window.
* @return The windows data stream. * @return The windows data stream.
*/ */
public KeyedWindowDataStream<T, K> window(WindowPolicy policy) { public KeyedWindowDataStream<T, KEY> window(WindowPolicy policy) {
return new KeyedWindowDataStream<T, K>(this, policy); return new KeyedWindowDataStream<T, KEY>(this, policy);
} }


/** /**
Expand All @@ -112,7 +112,7 @@ public KeyedWindowDataStream<T, K> window(WindowPolicy policy) {
* @param slide The additional policy defining the slide of the window. * @param slide The additional policy defining the slide of the window.
* @return The windows data stream. * @return The windows data stream.
*/ */
public KeyedWindowDataStream<T, K> window(WindowPolicy window, WindowPolicy slide) { public KeyedWindowDataStream<T, KEY> window(WindowPolicy window, WindowPolicy slide) {
return new KeyedWindowDataStream<T, K>(this, window, slide); return new KeyedWindowDataStream<T, KEY>(this, window, slide);
} }
} }
Expand Up @@ -43,13 +43,13 @@
* KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation * KeyedWindowDataStream will be collapsed together with the KeyedDataStream and the operation
* over the window into one single operation. * over the window into one single operation.
* *
* @param <Type> The type of elements in the stream. * @param <T> The type of elements in the stream.
* @param <Key> The type of the key by which elements are grouped. * @param <K> The type of the key by which elements are grouped.
*/ */
public class KeyedWindowDataStream<Type, Key> { public class KeyedWindowDataStream<T, K> {


/** The keyed data stream that is windowed by this stream */ /** The keyed data stream that is windowed by this stream */
private final KeyedDataStream<Type, Key> input; private final KeyedDataStream<T, K> input;


/** The core window policy */ /** The core window policy */
private final WindowPolicy windowPolicy; private final WindowPolicy windowPolicy;
Expand All @@ -58,11 +58,11 @@ public class KeyedWindowDataStream<Type, Key> {
private final WindowPolicy slidePolicy; private final WindowPolicy slidePolicy;




public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, WindowPolicy windowPolicy) { public KeyedWindowDataStream(KeyedDataStream<T, K> input, WindowPolicy windowPolicy) {
this(input, windowPolicy, null); this(input, windowPolicy, null);
} }


public KeyedWindowDataStream(KeyedDataStream<Type, Key> input, public KeyedWindowDataStream(KeyedDataStream<T, K> input,
WindowPolicy windowPolicy, WindowPolicy slidePolicy) WindowPolicy windowPolicy, WindowPolicy slidePolicy)
{ {
TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic(); TimeCharacteristic time = input.getExecutionEnvironment().getStreamTimeCharacteristic();
Expand Down Expand Up @@ -91,7 +91,7 @@ public KeyedWindowDataStream(KeyedDataStream<Type, Key> input,
* @param function The reduce function. * @param function The reduce function.
* @return The data stream that is the result of applying the reduce function to the window. * @return The data stream that is the result of applying the reduce function to the window.
*/ */
public DataStream<Type> reduceWindow(ReduceFunction<Type> function) { public DataStream<T> reduceWindow(ReduceFunction<T> function) {
String callLocation = Utils.getCallLocationName(); String callLocation = Utils.getCallLocationName();
return createWindowOperator(function, input.getType(), "Reduce at " + callLocation); return createWindowOperator(function, input.getType(), "Reduce at " + callLocation);
} }
Expand All @@ -107,10 +107,10 @@ public DataStream<Type> reduceWindow(ReduceFunction<Type> function) {
* @param function The window function. * @param function The window function.
* @return The data stream that is the result of applying the window function to the window. * @return The data stream that is the result of applying the window function to the window.
*/ */
public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<Type, Result, Key> function) { public <Result> DataStream<Result> mapWindow(KeyedWindowFunction<T, Result, K> function) {
String callLocation = Utils.getCallLocationName(); String callLocation = Utils.getCallLocationName();


TypeInformation<Type> inType = input.getType(); TypeInformation<T> inType = input.getType();
TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType( TypeInformation<Result> resultType = TypeExtractor.getUnaryOperatorReturnType(
function, KeyedWindowFunction.class, true, true, inType, null, false); function, KeyedWindowFunction.class, true, true, inType, null, false);


Expand All @@ -125,9 +125,9 @@ private <Result> DataStream<Result> createWindowOperator(
Function function, TypeInformation<Result> resultType, String functionName) { Function function, TypeInformation<Result> resultType, String functionName) {


String opName = windowPolicy.toString(slidePolicy) + " of " + functionName; String opName = windowPolicy.toString(slidePolicy) + " of " + functionName;
KeySelector<Type, Key> keySel = input.getKeySelector(); KeySelector<T, K> keySel = input.getKeySelector();


OneInputStreamOperator<Type, Result> operator = OneInputStreamOperator<T, Result> operator =
PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel); PolicyToOperator.createOperatorForPolicies(windowPolicy, slidePolicy, function, keySel);


return input.transform(opName, resultType, operator); return input.transform(opName, resultType, operator);
Expand Down

0 comments on commit 86c45bf

Please sign in to comment.