Skip to content

Commit

Permalink
[streaming] Streaming scala api fix for window changes + example cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
gyfora authored and mbalassi committed Feb 16, 2015
1 parent f32990f commit aef52e8
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 54 deletions.
Expand Up @@ -67,6 +67,7 @@ protected DiscretizedStream(SingleOutputStreamOperator<StreamWindow<OUT>, ?> dis
* The reduce function that will be applied to the windows.
* @return The transformed DataStream
*/
@Override
public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {

DiscretizedStream<OUT> out = partition(false).transform("Window Reduce", getType(),
Expand All @@ -93,20 +94,26 @@ public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
* The reduce function that will be applied to the windows.
* @return The transformed DataStream
*/
@Override
public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction) {

TypeInformation<R> retType = TypeExtractor.getGroupReduceReturnTypes(reduceFunction,
getType());

DiscretizedStream<R> out = partition(true).transform("Window Reduce", retType,
return mapWindow(reduceFunction, retType);
}

@Override
public <R> DiscretizedStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFunction,
TypeInformation<R> returnType) {
DiscretizedStream<R> out = partition(true).transform("Window Reduce", returnType,
new WindowMapper<OUT, R>(reduceFunction));

if (isGrouped()) {
return out.merge();
} else {
return out;
}

}

private <R> DiscretizedStream<R> transform(String operatorName, TypeInformation<R> retType,
Expand Down
Expand Up @@ -240,7 +240,6 @@ private DiscretizedStream<OUT> discretize(boolean isMap) {
* @return The transformed DataStream
*/
public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {

return discretize(false).reduceWindow(reduceFunction);
}

Expand All @@ -261,10 +260,6 @@ public <R> WindowedDataStream<R> mapWindow(GroupReduceFunction<OUT, R> reduceFun
return discretize(true).mapWindow(reduceFunction);
}

public DataStream<OUT> flatten() {
return dataStream;
}

/**
* Applies a reduceGroup transformation on the windowed data stream by
* reducing the current window at every trigger. In contrast with the
Expand All @@ -281,10 +276,14 @@ public DataStream<OUT> flatten() {
* The reduce function that will be applied to the windows.
* @return The transformed DataStream
*/
public <R> SingleOutputStreamOperator<R, ?> mapWindow(
public <R> WindowedDataStream<R> mapWindow(
GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) {

throw new RuntimeException("Not implemented yet");
return discretize(true).mapWindow(reduceFunction, outType);
}

public DataStream<OUT> flatten() {
return dataStream;
}

protected Class<?> getClassAtPos(int pos) {
Expand Down
Expand Up @@ -17,13 +17,9 @@

package org.apache.flink.streaming.examples.socket;

import java.util.concurrent.TimeUnit;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.helper.Time;
import org.apache.flink.streaming.examples.wordcount.WordCount.Tokenizer;

/**
Expand Down Expand Up @@ -55,19 +51,29 @@
public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {

if (!parseParameters(args)) {
return;
}

// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();

// get input data
DataStream<String> text = env.socketTextStream("localhost", 9999);
DataStream<String> text = env.socketTextStream(hostName, port);

WindowedDataStream<Tuple2<String, Integer>> counts =
DataStream<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.window(Time.of(5, TimeUnit.SECONDS)).groupBy(0).sum(1);
.groupBy(0)
.sum(1);

counts.print();
if (fileOutput) {
counts.writeAsText(outputPath, 1);
} else {
counts.print();
}

// execute program
env.execute("WordCount from SocketTextStream Example");
Expand All @@ -94,8 +100,7 @@ private static boolean parseParameters(String[] args) {
hostName = args[0];
port = Integer.valueOf(args[1]);
} else {
System.err
.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
System.err.println("Usage: SocketTextStreamWordCount <hostname> <port> [<output path>]");
return false;
}
return true;
Expand Down
Expand Up @@ -52,7 +52,7 @@ object TopSpeedWindowing {
.local
.maxBy("speed")

cars print
cars.flatten print

StreamExecutionEnvironment.getExecutionEnvironment.execute("TopSpeedWindowing")

Expand Down
Expand Up @@ -451,8 +451,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* the trigger and eviction policies please use to
* window(List(triggers), List(evicters))
*/
def window(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
javaStream.window(windowingHelper: _*)
def window(windowingHelper: WindowingHelper[_]): WindowedDataStream[T] =
javaStream.window(windowingHelper)

/**
* Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s.
Expand All @@ -461,8 +461,8 @@ class DataStream[T](javaStream: JavaStream[T]) {
* use-cases please refer to window(WindowingHelper[_]*)
*
*/
def window(triggers: List[TriggerPolicy[T]], evicters: List[EvictionPolicy[T]]):
WindowedDataStream[T] = javaStream.window(triggers, evicters)
def window(trigger: TriggerPolicy[T], evicter: EvictionPolicy[T]):
WindowedDataStream[T] = javaStream.window(trigger, evicter)

/**
*
Expand Down
Expand Up @@ -35,17 +35,18 @@ import org.apache.flink.streaming.api.function.aggregation.SumFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.clean
import org.apache.flink.streaming.api.windowing.helper.WindowingHelper
import org.apache.flink.streaming.api.windowing.helper._
import org.apache.flink.streaming.api.invokable.operator.windowing.StreamWindow
import org.apache.flink.util.Collector

class WindowedDataStream[T](javaStream: JavaWStream[WindowedDataStreamOld]) {
class WindowedDataStream[T](javaStream: JavaWStream[T]) {

/**
* Defines the slide size (trigger frequency) for the windowed data stream.
* This controls how often the user defined function will be triggered on
* the window.
*/
def every(windowingHelper: WindowingHelper[_]*): WindowedDataStream[T] =
javaStream.every(windowingHelper: _*)
def every(windowingHelper: WindowingHelper[_]): WindowedDataStream[T] =
javaStream.every(windowingHelper)

/**
* Groups the elements of the WindowedDataStream using the given
Expand Down Expand Up @@ -94,36 +95,45 @@ class WindowedDataStream[T](javaStream: JavaWStream[WindowedDataStreamOld]) {
* operator
*
*/
def local(): WindowedDataStream[T]= {
javaStream.local
}
def local(): WindowedDataStream[T] = javaStream.local

/**
* Flattens the result of a window transformation returning the stream of window
* contents elementwise
*/
def flatten(): DataStream[T] = javaStream.flatten()

/**
* Returns the stream of StreamWindows created by the window tranformation
*/
def getDiscretizedStream(): DataStream[StreamWindow[T]] = javaStream.getDiscretizedStream()

/**
* Applies a reduce transformation on the windowed data stream by reducing
* the current window at every trigger.
*
*/
def reduce(reducer: ReduceFunction[T]): DataStream[T] = {
def reduceWindow(reducer: ReduceFunction[T]): WindowedDataStream[T] = {
if (reducer == null) {
throw new NullPointerException("Reduce function must not be null.")
}
javaStream.reduce(reducer)
javaStream.reduceWindow(reducer)
}

/**
* Applies a reduce transformation on the windowed data stream by reducing
* the current window at every trigger.
*
*/
def reduce(fun: (T, T) => T): DataStream[T] = {
def reduceWindow(fun: (T, T) => T): WindowedDataStream[T] = {
if (fun == null) {
throw new NullPointerException("Reduce function must not be null.")
}
val reducer = new ReduceFunction[T] {
val cleanFun = clean(fun)
def reduce(v1: T, v2: T) = { cleanFun(v1, v2) }
}
reduce(reducer)
reduceWindow(reducer)
}

/**
Expand All @@ -134,12 +144,12 @@ class WindowedDataStream[T](javaStream: JavaWStream[WindowedDataStreamOld]) {
* </br>
* Whenever possible try to use reduce instead of groupReduce for increased efficiency
*/
def reduceGroup[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]):
DataStream[R] = {
def mapWindow[R: ClassTag: TypeInformation](reducer: GroupReduceFunction[T, R]):
WindowedDataStream[R] = {
if (reducer == null) {
throw new NullPointerException("GroupReduce function must not be null.")
}
javaStream.reduceGroup(reducer, implicitly[TypeInformation[R]])
javaStream.mapWindow(reducer, implicitly[TypeInformation[R]])
}

/**
Expand All @@ -150,97 +160,98 @@ class WindowedDataStream[T](javaStream: JavaWStream[WindowedDataStreamOld]) {
* </br>
* Whenever possible try to use reduce instead of groupReduce for increased efficiency
*/
def reduceGroup[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
DataStream[R] = {
def mapWindow[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
WindowedDataStream[R] = {
if (fun == null) {
throw new NullPointerException("GroupReduce function must not be null.")
}
val reducer = new GroupReduceFunction[T, R] {
val cleanFun = clean(fun)
def reduce(in: java.lang.Iterable[T], out: Collector[R]) = { cleanFun(in, out) }
}
reduceGroup(reducer)
mapWindow(reducer)
}

/**
* Applies an aggregation that that gives the maximum of the elements in the window at
* the given position.
*
*/
def max(position: Int): DataStream[T] = aggregate(AggregationType.MAX, position)
def max(position: Int): WindowedDataStream[T] = aggregate(AggregationType.MAX, position)

/**
* Applies an aggregation that that gives the maximum of the elements in the window at
* the given field.
*
*/
def max(field: String): DataStream[T] = aggregate(AggregationType.MAX, field)
def max(field: String): WindowedDataStream[T] = aggregate(AggregationType.MAX, field)

/**
* Applies an aggregation that that gives the minimum of the elements in the window at
* the given position.
*
*/
def min(position: Int): DataStream[T] = aggregate(AggregationType.MIN, position)
def min(position: Int): WindowedDataStream[T] = aggregate(AggregationType.MIN, position)

/**
* Applies an aggregation that that gives the minimum of the elements in the window at
* the given field.
*
*/
def min(field: String): DataStream[T] = aggregate(AggregationType.MIN, field)
def min(field: String): WindowedDataStream[T] = aggregate(AggregationType.MIN, field)

/**
* Applies an aggregation that sums the elements in the window at the given position.
*
*/
def sum(position: Int): DataStream[T] = aggregate(AggregationType.SUM, position)
def sum(position: Int): WindowedDataStream[T] = aggregate(AggregationType.SUM, position)

/**
* Applies an aggregation that sums the elements in the window at the given field.
*
*/
def sum(field: String): DataStream[T] = aggregate(AggregationType.SUM, field)
def sum(field: String): WindowedDataStream[T] = aggregate(AggregationType.SUM, field)

/**
* Applies an aggregation that that gives the maximum element of the window by
* the given position. When equality, returns the first.
*
*/
def maxBy(position: Int): DataStream[T] = aggregate(AggregationType.MAXBY,
def maxBy(position: Int): WindowedDataStream[T] = aggregate(AggregationType.MAXBY,
position)

/**
* Applies an aggregation that that gives the maximum element of the window by
* the given field. When equality, returns the first.
*
*/
def maxBy(field: String): DataStream[T] = aggregate(AggregationType.MAXBY,
def maxBy(field: String): WindowedDataStream[T] = aggregate(AggregationType.MAXBY,
field)

/**
* Applies an aggregation that that gives the minimum element of the window by
* the given position. When equality, returns the first.
*
*/
def minBy(position: Int): DataStream[T] = aggregate(AggregationType.MINBY,
def minBy(position: Int): WindowedDataStream[T] = aggregate(AggregationType.MINBY,
position)

/**
* Applies an aggregation that that gives the minimum element of the window by
* the given field. When equality, returns the first.
*
*/
def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY,
def minBy(field: String): WindowedDataStream[T] = aggregate(AggregationType.MINBY,
field)

private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
private def aggregate(aggregationType: AggregationType, field: String):
WindowedDataStream[T] = {
val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
aggregate(aggregationType, position)
}

def aggregate(aggregationType: AggregationType, position: Int):
DataStream[T] = {
WindowedDataStream[T] = {

val jStream = javaStream.asInstanceOf[JavaWStream[Product]]
val outType = jStream.getType().asInstanceOf[TupleTypeInfoBase[_]]
Expand All @@ -256,7 +267,8 @@ class WindowedDataStream[T](javaStream: JavaWStream[WindowedDataStreamOld]) {
case _ => new agg.ProductComparableAggregator(aggregationType, true)
}

new DataStream[Product](jStream.reduce(reducer)).asInstanceOf[DataStream[T]]
new WindowedDataStream[Product](
jStream.reduceWindow(reducer)).asInstanceOf[WindowedDataStream[T]]
}

}

0 comments on commit aef52e8

Please sign in to comment.