From 0eae0607a6522ecf5f9562841691bcc77ddb3d93 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Fri, 13 Jan 2017 14:39:05 +0800 Subject: [PATCH] [GEARPUMP-262] Add setup and teardown to user defined functions --- .../examples/kafka/dsl/KafkaReadWrite.scala | 4 +- .../examples/wordcountjava/dsl/WordCount.java | 47 +++- .../wordcount/dsl/WindowedWordCount.scala | 2 +- .../examples/wordcount/dsl/WordCount.scala | 4 +- .../external/hbase/dsl/HBaseDSLSink.scala | 5 +- .../streaming/kafka/dsl/KafkaDSL.scala | 12 +- .../javaapi/dsl/functions/ReduceFunction.java | 30 --- .../dsl/api/functions/FilterFunction.scala} | 25 ++- .../dsl/api/functions/MapFunction.scala | 43 ++++ .../dsl/api/functions/ReduceFunction.scala | 42 ++++ .../streaming/dsl/javaapi/JavaStream.scala | 18 +- .../streaming/dsl/javaapi/JavaStreamApp.scala | 5 +- .../javaapi/functions/FlatMapFunction.scala} | 17 +- .../javaapi/functions/GroupByFunction.scala} | 14 +- .../gearpump/streaming/dsl/plan/OP.scala | 4 +- .../plan/functions/SingleInputFunction.scala | 66 +++--- .../streaming/dsl/{ => scalaapi}/Stream.scala | 96 ++++++--- .../dsl/{ => scalaapi}/StreamApp.scala | 4 +- .../scalaapi/functions/FlatMapFunction.scala | 103 +++++++++ .../functions/SerializableFunction.scala} | 22 +- .../streaming/dsl/task/TransformTask.scala | 5 +- .../dsl/window/impl/WindowRunner.scala | 16 +- .../gearpump/streaming/dsl/plan/OpSpec.scala | 13 +- .../streaming/dsl/plan/PlannerSpec.scala | 15 +- .../functions/SingleInputFunctionSpec.scala | 202 +++++++++--------- .../dsl/{ => scalaapi}/StreamAppSpec.scala | 9 +- .../dsl/{ => scalaapi}/StreamSpec.scala | 13 +- 27 files changed, 549 insertions(+), 287 deletions(-) delete mode 100644 streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java rename streaming/src/main/{java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java => scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala} (62%) create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala rename streaming/src/main/{java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java => scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala} (64%) rename streaming/src/main/{java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java => scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala} (73%) rename streaming/src/main/scala/org/apache/gearpump/streaming/dsl/{ => scalaapi}/Stream.scala (72%) rename streaming/src/main/scala/org/apache/gearpump/streaming/dsl/{ => scalaapi}/StreamApp.scala (98%) create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala rename streaming/src/main/{java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java => scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala} (65%) rename streaming/src/test/scala/org/apache/gearpump/streaming/dsl/{ => scalaapi}/StreamAppSpec.scala (91%) rename streaming/src/test/scala/org/apache/gearpump/streaming/dsl/{ => scalaapi}/StreamSpec.scala (94%) diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala index 49d36194d..cbfe57a1b 100644 --- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala +++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala @@ -21,8 +21,8 @@ package org.apache.gearpump.streaming.examples.kafka.dsl import java.util.Properties import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser} -import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp import org.apache.gearpump.streaming.kafka.KafkaStoreFactory import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL._ diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java index d4866ed5f..2942861d0 100644 --- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java +++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java @@ -25,12 +25,17 @@ import org.apache.gearpump.cluster.client.ClientContext; import org.apache.gearpump.streaming.dsl.javaapi.JavaStream; import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction; +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction; +import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction; +import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction; import org.apache.gearpump.streaming.source.DataSource; import org.apache.gearpump.streaming.task.TaskContext; import scala.Tuple2; import java.time.Instant; import java.util.Arrays; +import java.util.Iterator; /** Java version of WordCount with high level DSL API */ public class WordCount { @@ -46,15 +51,13 @@ public static void main(Config akkaConf, String[] args) throws InterruptedExcept JavaStream sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"), 1, UserConfig.empty(), "source"); - JavaStream words = sentence.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator(), - "flatMap"); + JavaStream words = sentence.flatMap(new Split(), "flatMap"); - JavaStream> ones = words.map(s -> new Tuple2<>(s, 1), "map"); + JavaStream> ones = words.map(new Ones(), "map"); - JavaStream> groupedOnes = ones.groupBy(Tuple2::_1, 1, "groupBy"); + JavaStream> groupedOnes = ones.groupBy(new TupleKey(), 1, "groupBy"); - JavaStream> wordcount = groupedOnes.reduce( - (t1, t2) -> new Tuple2<>(t1._1(), t1._2() + t2._2()), "reduce"); + JavaStream> wordcount = groupedOnes.reduce(new Count(), "reduce"); wordcount.log(); @@ -88,4 +91,36 @@ public Instant getWatermark() { return Instant.now(); } } + + private static class Split extends FlatMapFunction { + + @Override + public Iterator apply(String s) { + return Arrays.asList(s.split("\\s+")).iterator(); + } + } + + private static class Ones extends MapFunction> { + + @Override + public Tuple2 apply(String s) { + return new Tuple2<>(s, 1); + } + } + + private static class Count extends ReduceFunction> { + + @Override + public Tuple2 apply(Tuple2 t1, Tuple2 t2) { + return new Tuple2<>(t1._1(), t1._2() + t2._2()); + } + } + + private static class TupleKey extends GroupByFunction, String> { + + @Override + public String apply(Tuple2 tuple) { + return tuple._1(); + } + } } diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala index 4f43fd466..401eac048 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -22,7 +22,7 @@ import java.time.{Duration, Instant} import org.apache.gearpump.Message import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp} +import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp} import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow} import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.TaskContext diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala index 22f597c78..1cbfb22c4 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala @@ -20,8 +20,8 @@ package org.apache.gearpump.streaming.examples.wordcount.dsl import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} -import org.apache.gearpump.streaming.dsl.StreamApp -import org.apache.gearpump.streaming.dsl.StreamApp._ +import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp +import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._ import org.apache.gearpump.util.AkkaApp /** Same WordCount with High level DSL syntax */ diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala index 2417763ee..22efa89e3 100644 --- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala +++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala @@ -18,13 +18,10 @@ package org.apache.gearpump.external.hbase.dsl import scala.language.implicitConversions - import org.apache.hadoop.conf.Configuration - import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.external.hbase.HBaseSink -import org.apache.gearpump.streaming.dsl.Stream -import org.apache.gearpump.streaming.dsl.Stream.Sink +import org.apache.gearpump.streaming.dsl.scalaapi.Stream /** Create a HBase DSL Sink */ class HBaseDSLSink[T](stream: Stream[T]) { diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala index f1bb26a3a..996ae0ba5 100644 --- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala +++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala @@ -21,7 +21,7 @@ import java.util.Properties import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl -import org.apache.gearpump.streaming.dsl.StreamApp +import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp} import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource} import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory @@ -44,7 +44,7 @@ object KafkaDSL { parallelism: Int = 1, config: UserConfig = UserConfig.empty, description: String = "KafkaSource" - ): dsl.Stream[T] = { + ): Stream[T] = { app.source[T](new KafkaSource(topics, properties), parallelism, config, description) } @@ -66,19 +66,19 @@ object KafkaDSL { properties: Properties, parallelism: Int = 1, config: UserConfig = UserConfig.empty, - description: String = "KafkaSource"): dsl.Stream[T] = { + description: String = "KafkaSource"): Stream[T] = { val source = new KafkaSource(topics, properties) source.setCheckpointStore(checkpointStoreFactory) app.source[T](source, parallelism, config, description) } import scala.language.implicitConversions - implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = { + implicit def streamToKafkaDSL[T](stream: Stream[T]): KafkaDSL[T] = { new KafkaDSL[T](stream) } } -class KafkaDSL[T](stream: dsl.Stream[T]) { +class KafkaDSL[T](stream: Stream[T]) { /** * Sinks data to Kafka @@ -94,7 +94,7 @@ class KafkaDSL[T](stream: dsl.Stream[T]) { properties: Properties, parallelism: Int = 1, userConfig: UserConfig = UserConfig.empty, - description: String = "KafkaSink"): dsl.Stream[T] = { + description: String = "KafkaSink"): Stream[T] = { stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, description) } } \ No newline at end of file diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java deleted file mode 100644 index 2bcac6013..000000000 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; - -/** - * Function that applies reduce operation - * - * @param Input value type - */ -public interface ReduceFunction extends Serializable { - T apply(T t1, T t2); -} diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala similarity index 62% rename from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala index e1fc82129..e4e7309f6 100644 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala @@ -15,17 +15,28 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.gearpump.streaming.dsl.api.functions -package org.apache.gearpump.streaming.javaapi.dsl.functions; +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction -import java.io.Serializable; +object FilterFunction { + + def apply[T](fn: T => Boolean): FilterFunction[T] = { + new FilterFunction[T] { + override def apply(t: T): Boolean = { + fn(t) + } + } + } +} /** - * Function that map a value of type T to value of type R + * Returns true to keep the input and false otherwise. * - * @param Input value type - * @param Output value type + * @param T Input value type */ -public interface MapFunction extends Serializable { - R apply(T t); +abstract class FilterFunction[T] extends SerializableFunction { + + def apply(t: T): Boolean + } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala new file mode 100644 index 000000000..70fe9d46b --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +object MapFunction { + + def apply[T, R](fn: T => R): MapFunction[T, R] = { + new MapFunction[T, R] { + override def apply(t: T): R = { + fn(t) + } + } + } +} + +/** + * Transforms an input into an output of possibly different types. + * + * @param T Input value type + * @param R Output value type + */ +abstract class MapFunction[T, R] extends SerializableFunction { + + def apply(t: T): R + +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala new file mode 100644 index 000000000..25b12be2d --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.api.functions + +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction + +object ReduceFunction { + + def apply[T](fn: (T, T) => T): ReduceFunction[T] = { + new ReduceFunction[T] { + override def apply(t1: T, t2: T): T = { + fn(t1, t2) + } + } + } +} + +/** + * Combines two inputs into one output of the same type. + * + * @param T Type of both inputs and output + */ +abstract class ReduceFunction[T] extends SerializableFunction { + + def apply(t1: T, t2: T): T + +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index f2654ea19..7f3c250fa 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -15,14 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.gearpump.streaming.dsl.javaapi -import scala.collection.JavaConverters._ import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction +import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream} import org.apache.gearpump.streaming.dsl.window.api.Window -import org.apache.gearpump.streaming.dsl.{Stream, WindowStream} -import org.apache.gearpump.streaming.javaapi.dsl.functions._ import org.apache.gearpump.streaming.task.Task /** @@ -31,23 +31,23 @@ import org.apache.gearpump.streaming.task.Task class JavaStream[T](val stream: Stream[T]) { /** FlatMap on stream */ - def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description)) + def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = { + new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap")) } /** Map on stream */ def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = { - new JavaStream[R](stream.map({ t: T => fn(t) }, description)) + new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description)) } /** Only keep the messages that FilterFunction returns true. */ def filter(fn: FilterFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.filter({ t: T => fn(t) }, description)) + new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description)) } /** Does aggregation on the stream */ def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = { - new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description)) + new JavaStream[T](stream.reduce(fn, description)) } def log(): Unit = { diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala index 82a284e37..b8d1f4c32 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala @@ -19,13 +19,14 @@ package org.apache.gearpump.streaming.dsl.javaapi import java.util.Collection -import scala.collection.JavaConverters._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext -import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp} +import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp} import org.apache.gearpump.streaming.source.DataSource +import scala.collection.JavaConverters._ + class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) { private val streamApp = StreamApp(name, context, userConfig) diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala similarity index 64% rename from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala index 6c7128055..85d597da7 100644 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala @@ -15,17 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.gearpump.streaming.dsl.javaapi.functions -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; +import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction /** - * GroupBy function which assign value of type T to groups + * Transforms one input into zero or more outputs of possibly different types. + * This Java version of FlatMapFunction returns a java.util.Iterator. * - * @param Input value type - * @param Group Type + * @param T Input value type + * @param R Output value type */ -public interface GroupByFunction extends Serializable { - Group apply(T t); +abstract class FlatMapFunction[T, R] extends SerializableFunction { + + def apply(t: T): java.util.Iterator[R] } diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala similarity index 73% rename from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala index f07ceffe1..7656cba4c 100644 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala @@ -15,16 +15,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.gearpump.streaming.dsl.javaapi.functions -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; +import org.apache.gearpump.streaming.dsl.api.functions.MapFunction /** - * Filter function + * Assigns the input value into a group. * - * @param Message of type T + * @param T Input value type + * @param GROUP Group value type */ -public interface FilterFunction extends Serializable { - boolean apply(T t); -} +abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP] diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index f15d87544..82ea7c7ad 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -22,7 +22,7 @@ import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, SingleInputFunction} import org.apache.gearpump.streaming.{Constants, Processor} import org.apache.gearpump.streaming.dsl.task.TransformTask import org.apache.gearpump.streaming.dsl.window.api.GroupByFn @@ -134,7 +134,7 @@ case class ChainableOp[IN, OUT]( other match { case op: ChainableOp[OUT, _] => // TODO: preserve type info - ChainableOp(fn.andThen(op.fn)) + ChainableOp(AndThen(fn, op.fn)) case _ => throw new OpChainException(this, other) } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala index 53226483e..687fd2e0d 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala @@ -17,23 +17,35 @@ */ package org.apache.gearpump.streaming.dsl.plan.functions -trait SingleInputFunction[IN, OUT] extends Serializable { +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction + +/** + * Internal function to process single input + * + * @param IN input value type + * @param OUT output value type + */ +sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable { + + def setup(): Unit = {} + def process(value: IN): TraversableOnce[OUT] - def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - AndThen(this, other) - } + def finish(): TraversableOnce[OUT] = None - def clearState(): Unit = {} + + def teardown(): Unit = {} + def description: String } -case class AndThen[IN, MIDDLE, OUT]( - first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) +case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE], + second: SingleInputFunction[MIDDLE, OUT]) extends SingleInputFunction[IN, OUT] { - override def andThen[OUTER]( - other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - first.andThen(second.andThen(other)) + override def setup(): Unit = { + first.setup() + second.setup() } override def process(value: IN): TraversableOnce[OUT] = { @@ -49,9 +61,9 @@ case class AndThen[IN, MIDDLE, OUT]( } } - override def clearState(): Unit = { - first.clearState() - second.clearState() + override def teardown(): Unit = { + first.teardown() + second.teardown() } override def description: String = { @@ -61,22 +73,31 @@ case class AndThen[IN, MIDDLE, OUT]( } } -class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String) +class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String) extends SingleInputFunction[IN, OUT] { + override def setup(): Unit = { + fn.setup() + } + override def process(value: IN): TraversableOnce[OUT] = { fn(value) } - override def description: String = descriptionMessage + override def teardown(): Unit = { + fn.teardown() + } } - -class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String) +class Reducer[T](fn: ReduceFunction[T], val description: String) extends SingleInputFunction[T, T] { private var state: Option[T] = None + override def setup(): Unit = { + fn.setup() + } + override def process(value: T): TraversableOnce[T] = { if (state.isEmpty) { state = Option(value) @@ -90,23 +111,18 @@ class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String) state } - override def clearState(): Unit = { + override def teardown(): Unit = { state = None + fn.teardown() } - - override def description: String = descriptionMessage } -class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { +class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { override def process(value: T): TraversableOnce[Unit] = { emit(value) None } - override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = { - throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction") - } - override def description: String = "" } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala similarity index 72% rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala index 440a45ed8..430d79596 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala @@ -16,14 +16,16 @@ * limitations under the License. */ -package org.apache.gearpump.streaming.dsl +package org.apache.gearpump.streaming.dsl.scalaapi import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.plan._ -import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.plan.functions._ import org.apache.gearpump.streaming.dsl.window.api._ -import org.apache.gearpump.streaming.dsl.window.impl._ +import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow} import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -36,55 +38,95 @@ class Stream[T]( private val edge: Option[OpEdge] = None) { /** - * converts a value[T] to a list of value[R] + * Returns a new stream by applying a flatMap function to each element + * and flatten the results. * - * @param fn FlatMap function + * @param fn flatMap function * @param description The description message for this operation * @return A new stream with type [R] */ def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = { - val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description)) - graph.addVertex(flatMapOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp) - new Stream[R](graph, flatMapOp) + this.flatMap(FlatMapFunction(fn), description) } /** - * Maps message of type T message of type R + * Returns a new stream by applying a flatMap function to each element + * and flatten the results. * - * @param fn Function + * @param fn flatMap function + * @param description The description message for this operation + * @return A new stream with type [R] + */ + def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = { + transform(new FlatMapper[T, R](fn, description)) + } + + /** + * Returns a new stream by applying a map function to each element. + * + * @param fn map function * @return A new stream with type [R] */ def map[R](fn: T => R, description: String = "map"): Stream[R] = { - this.flatMap({ data => - Option(fn(data)) - }, description) + this.map(MapFunction(fn), description) + } + + /** + * Returns a new stream by applying a map function to each element. + * + * @param fn map function + * @return A new stream with type [R] + */ + def map[R](fn: MapFunction[T, R], description: String): Stream[R] = { + this.flatMap(FlatMapFunction(fn), description) } /** - * Keeps records when fun(T) == true + * Returns a new Stream keeping the elements that satisfy the filter function. * - * @param fn the filter - * @return a new stream after filter + * @param fn filter function + * @return a new stream after filter */ def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = { - this.flatMap({ data => - if (fn(data)) Option(data) else None - }, description) + this.filter(FilterFunction(fn), description) } /** - * Reduces operations. + * Returns a new Stream keeping the elements that satisfy the filter function. * - * @param fn reduction function + * @param fn filter function + * @return a new stream after filter + */ + def filter(fn: FilterFunction[T], description: String): Stream[T] = { + this.flatMap(FlatMapFunction(fn), description) + } + /** + * Returns a new stream by applying a reduce function over all the elements. + * + * @param fn reduce function * @param description description message for this operator - * @return a new stream after reduction + * @return a new stream after reduce */ def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = { - val reduceOp = ChainableOp(new ReduceFunction(fn, description)) - graph.addVertex(reduceOp) - graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp) - new Stream(graph, reduceOp) + reduce(ReduceFunction(fn), description) + } + + /** + * Returns a new stream by applying a reduce function over all the elements. + * + * @param fn reduce function + * @param description description message for this operator + * @return a new stream after reduce + */ + def reduce(fn: ReduceFunction[T], description: String): Stream[T] = { + transform(new Reducer[T](fn, description)) + } + + private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = { + val op = ChainableOp(fn) + graph.addVertex(op) + graph.addEdge(thisNode, edge.getOrElse(Direct), op) + new Stream(graph, op) } /** diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala similarity index 98% rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala index 81161466c..d6eed2e5a 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala @@ -16,11 +16,12 @@ * limitations under the License. */ -package org.apache.gearpump.streaming.dsl +package org.apache.gearpump.streaming.dsl.scalaapi import java.time.Instant import akka.actor.ActorSystem +import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.streaming.StreamApplication @@ -28,7 +29,6 @@ import org.apache.gearpump.streaming.dsl.plan._ import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.Graph -import org.apache.gearpump.Message import scala.language.implicitConversions diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala new file mode 100644 index 000000000..f10a3dbba --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.scalaapi.functions + +import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction} +import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction} + +import scala.collection.JavaConverters._ + +object FlatMapFunction { + + def apply[T, R](fn: JFlatMapFunction[T, R]): FlatMapFunction[T, R] = { + new FlatMapFunction[T, R] { + + override def setup(): Unit = { + fn.setup() + } + + override def apply(t: T): TraversableOnce[R] = { + fn.apply(t).asScala + } + + + override def teardown(): Unit = { + fn.teardown() + } + } + } + + def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = { + new FlatMapFunction[T, R] { + override def apply(t: T): TraversableOnce[R] = { + fn(t) + } + } + } + + def apply[T, R](fn: MapFunction[T, R]): FlatMapFunction[T, R] = { + new FlatMapFunction[T, R] { + + override def setup(): Unit = { + fn.setup() + } + + override def apply(t: T): TraversableOnce[R] = { + Option(fn(t)) + } + + override def teardown(): Unit = { + fn.teardown() + } + } + } + + def apply[T, R](fn: FilterFunction[T]): FlatMapFunction[T, T] = { + new FlatMapFunction[T, T] { + + override def setup(): Unit = { + fn.setup() + } + + override def apply(t: T): TraversableOnce[T] = { + if (fn(t)) { + Option(t) + } else { + None + } + } + + override def teardown(): Unit = { + fn.teardown() + } + } + } +} + +/** + * Transforms one input into zero or more outputs of possibly different types. + * This Scala version of FlatMapFunction returns a TraversableOnce. + * + * @param T Input value type + * @param R Output value type + */ +abstract class FlatMapFunction[T, R] extends SerializableFunction { + + def apply(t: T): TraversableOnce[R] + +} diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala similarity index 65% rename from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala index 9788dd21e..ab88bf1b6 100644 --- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala @@ -15,18 +15,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - -package org.apache.gearpump.streaming.javaapi.dsl.functions; - -import java.io.Serializable; -import java.util.Iterator; +package org.apache.gearpump.streaming.dsl.scalaapi.functions /** - * Function that converts a value of type T to a iterator of values of type R. - * - * @param Input value type - * @param Return value type + * Superclass for all user defined function interfaces. + * This ensures all functions are serializable and provides common methods + * like setup and teardown. Users should not extend this class directly + * but subclasses like [[FlatMapFunction]]. */ -public interface FlatMapFunction extends Serializable { - Iterator apply(T t); +abstract class SerializableFunction extends java.io.Serializable { + + def setup(): Unit = {} + + def teardown(): Unit = {} + } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index e35f08577..c13a4fb39 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -23,9 +23,8 @@ import org.apache.gearpump.streaming.Constants._ import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction import org.apache.gearpump.streaming.task.{Task, TaskContext} -class TransformTask[IN, OUT]( - operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, - userConf: UserConfig) extends Task(taskContext, userConf) { +class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]], + taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { def this(taskContext: TaskContext, userConf: UserConfig) = { this(userConf.getValue[SingleInputFunction[IN, OUT]]( diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala index d87a9e46e..223a4afad 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -28,7 +28,7 @@ import com.gs.collections.impl.map.mutable.UnifiedMap import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap import com.gs.collections.impl.set.mutable.UnifiedSet import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, SingleInputFunction} import org.apache.gearpump.streaming.dsl.window.api.Discarding import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.LogUtil @@ -39,7 +39,6 @@ trait WindowRunner { def process(message: Message): Unit def trigger(time: Instant): Unit - } object DefaultWindowRunner { @@ -59,7 +58,6 @@ class DefaultWindowRunner[IN, GROUP, OUT]( private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]] private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]] - override def process(message: Message): Unit = { val (group, buckets) = groupBy.groupBy(message) buckets.foreach { bucket => @@ -72,8 +70,11 @@ class DefaultWindowRunner[IN, GROUP, OUT]( inputs.add(message.msg.asInstanceOf[IN]) windowGroups.put(wg, inputs) } - groupFns.putIfAbsent(group, - userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get) + if (!groupFns.containsKey(group)) { + val fn = userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get + fn.setup() + groupFns.put(group, fn) + } } override def trigger(time: Instant): Unit = { @@ -88,8 +89,7 @@ class DefaultWindowRunner[IN, GROUP, OUT]( wgs.forEach(new Procedure[WindowGroup[GROUP]] { override def value(each: WindowGroup[GROUP]): Unit = { val inputs = windowGroups.remove(each) - val reduceFn = groupFns.get(each.group) - .andThen[Unit](new EmitFunction[OUT](emitResult(_, time))) + val reduceFn = AndThen(groupFns.get(each.group), new Emit[OUT](emitResult(_, time))) inputs.forEach(new Procedure[IN] { override def value(t: IN): Unit = { // .toList forces eager evaluation @@ -99,7 +99,7 @@ class DefaultWindowRunner[IN, GROUP, OUT]( // .toList forces eager evaluation reduceFn.finish().toList if (groupBy.window.accumulationMode == Discarding) { - reduceFn.clearState() + reduceFn.teardown() } } }) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index 98bf24f07..f0920de17 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -25,7 +25,8 @@ import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.Processor import org.apache.gearpump.streaming.Processor.DefaultProcessor import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask} -import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GroupByFn import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource @@ -145,7 +146,6 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val chainedOp = chainableOp1.chain(chainableOp2) - verify(fn1).andThen(fn2) chainedOp shouldBe a[ChainableOp[_, _]] unchainableOps.foreach { op => @@ -156,12 +156,9 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS } "get Processor" in { - val fn = new SingleInputFunction[Any, Any] { - override def process(value: Any): TraversableOnce[Any] = null - - override def description: String = null - } - val chainableOp = ChainableOp[Any, Any](fn) + val fn = mock[FlatMapFunction[Any, Any]] + val flatMapper = new FlatMapper(fn, "flatMap") + val chainableOp = ChainableOp[Any, Any](flatMapper) val processor = chainableOp.getProcessor processor shouldBe a[Processor[_]] diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index 1610f0ee2..3f23fa988 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -23,10 +23,12 @@ import java.time.Instant import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ -import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer} +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GroupByFn import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource @@ -56,8 +58,8 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc val graph = Graph.empty[Op, OpEdge] val sourceOp = DataSourceOp(new AnySource) val groupByOp = GroupByOp(new AnyGroupByFn) - val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction) - val reduceOp = ChainableOp[Any, Any](anyReduceFunction) + val flatMapOp = ChainableOp[Any, Any](anyFlatMapper) + val reduceOp = ChainableOp[Any, Any](anyReducer) val processorOp = new ProcessorOp[AnyTask] val sinkOp = DataSinkOp(new AnySink) val directEdge = Direct @@ -92,9 +94,10 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc object PlannerSpec { private val anyParallelism = 1 - private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap") - private val anyReduceFunction = new ReduceFunction[Any]( - (left: Any, right: Any) => (left, right), "reduce") + private val anyFlatMapper = new FlatMapper[Any, Any]( + FlatMapFunction(Option(_)), "flatMap") + private val anyReducer = new Reducer[Any]( + ReduceFunction((left: Any, right: Any) => (left, right)), "reduce") class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala index ad12e3342..2c03e1c53 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala @@ -23,9 +23,11 @@ import akka.actor.ActorSystem import org.apache.gearpump.Message import org.apache.gearpump.cluster.{TestUtil, UserConfig} import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.CollectionDataSource import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction +import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource +import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} import org.apache.gearpump.streaming.dsl.window.api.CountWindow import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow @@ -77,165 +79,164 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { andThen.finish().toList shouldBe List(secondResult) } - "clear both states on clearState" in { - andThen.clearState() + "set up both functions on setup" in { + andThen.setup() - verify(first).clearState() - verify(second).clearState() + verify(first).setup() + verify(second).setup() } - "return AndThen on andThen" in { - val third = mock[SingleInputFunction[T, Any]] - when(second.andThen(third)).thenReturn(AndThen(second, third)) + "tear down both functions on teardown" in { + andThen.teardown() - andThen.andThen[Any](third) + verify(first).teardown() + verify(second).teardown() + } + + "chain multiple single input function" in { + val split = new FlatMapper[String, String](FlatMapFunction(_.split("\\s")), "split") + + val filter = new FlatMapper[String, String]( + FlatMapFunction(word => if (word.isEmpty) None else Some(word)), "filter") + + val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map") + + val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum") + + val all = AndThen(split, AndThen(filter, AndThen(map, sum))) - verify(first).andThen(AndThen(second, third)) + assert(all.description == "split.filter.map.sum") + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + // force eager evaluation + all.process(data).toList + val result = all.finish().toList + assert(result.nonEmpty) + assert(result.last == 15) } } - "FlatMapFunction" should { + "FlatMapper" should { - val flatMap = mock[R => TraversableOnce[S]] - val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap") + val flatMapFunction = mock[FlatMapFunction[R, S]] + val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap") "call flatMap function when processing input value" in { val input = mock[R] - flatMapFunction.process(input) - verify(flatMap).apply(input) + flatMapper.process(input) + verify(flatMapFunction).apply(input) } "return passed in description" in { - flatMapFunction.description shouldBe "flatMap" + flatMapper.description shouldBe "flatMap" } "return None on finish" in { - flatMapFunction.finish() shouldBe List.empty[S] + flatMapper.finish() shouldBe List.empty[S] } - "do nothing on clearState" in { - flatMapFunction.clearState() - verifyZeroInteractions(flatMap) + "set up FlatMapFunction on setup" in { + flatMapper.setup() + + verify(flatMapFunction).setup() } - "return AndThen on andThen" in { - val other = mock[SingleInputFunction[S, T]] - flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]] + "tear down FlatMapFunction on teardown" in { + flatMapper.teardown() + + verify(flatMapFunction).teardown() } } "ReduceFunction" should { - "call reduce function when processing input value" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") val input1 = mock[T] val input2 = mock[T] val output = mock[T] - when(reduce.apply(input1, input2)).thenReturn(output, output) + when(reduceFunction.apply(input1, input2)).thenReturn(output, output) - reduceFunction.process(input1) shouldBe List.empty[T] - reduceFunction.process(input2) shouldBe List.empty[T] - reduceFunction.finish() shouldBe List(output) + reducer.process(input1) shouldBe List.empty[T] + reducer.process(input2) shouldBe List.empty[T] + reducer.finish() shouldBe List(output) - reduceFunction.clearState() - reduceFunction.process(input1) shouldBe List.empty[T] - reduceFunction.clearState() - reduceFunction.process(input2) shouldBe List.empty[T] - reduceFunction.finish() shouldBe List(input2) + reducer.teardown() + reducer.process(input1) shouldBe List.empty[T] + reducer.teardown() + reducer.process(input2) shouldBe List.empty[T] + reducer.finish() shouldBe List(input2) } "return passed in description" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - reduceFunction.description shouldBe "reduce" + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.description shouldBe "reduce" } "return None on finish" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - reduceFunction.finish() shouldBe List.empty[T] + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.finish() shouldBe List.empty[T] } - "do nothing on clearState" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - reduceFunction.clearState() - verifyZeroInteractions(reduce) + "set up reduce function on setup" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.setup() + + verify(reduceFunction).setup() } - "return AndThen on andThen" in { - val reduce = mock[(T, T) => T] - val reduceFunction = new ReduceFunction[T](reduce, "reduce") - val other = mock[SingleInputFunction[T, Any]] - reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]] + "tear down reduce function on teardown" in { + val reduceFunction = mock[ReduceFunction[T]] + val reducer = new Reducer[T](reduceFunction, "reduce") + reducer.teardown() + + verify(reduceFunction).teardown() } } - "EmitFunction" should { + "Emit" should { - val emit = mock[T => Unit] - val emitFunction = new EmitFunction[T](emit) + val emitFunction = mock[T => Unit] + val emit = new Emit[T](emitFunction) "emit input value when processing input value" in { val input = mock[T] - emitFunction.process(input) shouldBe List.empty[Unit] + emit.process(input) shouldBe List.empty[Unit] - verify(emit).apply(input) + verify(emitFunction).apply(input) } "return empty description" in { - emitFunction.description shouldBe "" + emit.description shouldBe "" } "return None on finish" in { - emitFunction.finish() shouldBe List.empty[Unit] + emit.finish() shouldBe List.empty[Unit] } - "do nothing on clearState" in { - emitFunction.clearState() - verifyZeroInteractions(emit) - } + "do nothing on setup" in { + emit.setup() - "throw exception on andThen" in { - val other = mock[SingleInputFunction[Unit, Any]] - intercept[UnsupportedOperationException] { - emitFunction.andThen(other) - } + verifyZeroInteractions(emitFunction) } - } - - "andThen" should { - "chain multiple single input function" in { - val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") - val filter = new FlatMapFunction[String, String](word => - if (word.isEmpty) None else Some(word), "filter") + "do nothing on teardown" in { + emit.teardown() - val map = new FlatMapFunction[String, Int](word => Some(1), "map") - - val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") - - val all = split.andThen(filter).andThen(map).andThen(sum) - - assert(all.description == "split.filter.map.sum") - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - // force eager evaluation - all.process(data).toList - val result = all.finish().toList - assert(result.nonEmpty) - assert(result.last == 15) + verifyZeroInteractions(emitFunction) } } @@ -261,7 +262,8 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { // Source with transformer val anotherTaskContext = MockUtil.mockTaskContext - val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val double = new FlatMapper[String, String](FlatMapFunction( + word => List(word, word)), "double") val another = new DataSourceTask(anotherTaskContext, conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) another.onStart(Instant.EPOCH) @@ -279,9 +281,8 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { val data = "1 2 2 3 3 3" - val concat = new ReduceFunction[String]({ (left, right) => - left + right - }, "concat") + val concat = new Reducer[String](ReduceFunction({ (left, right) => + left + right}), "concat") implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( @@ -315,7 +316,8 @@ class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { // Source with transformer val taskContext = MockUtil.mockTaskContext val conf = UserConfig.empty - val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val double = new FlatMapper[String, String](FlatMapFunction( + word => List(word, word)), "double") val task = new TransformTask[String, String](Some(double), taskContext, conf) task.onStart(Instant.EPOCH) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala similarity index 91% rename from streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala rename to streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala index db4db9315..5b90a3e4b 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala @@ -16,14 +16,15 @@ * limitations under the License. */ -package org.apache.gearpump.streaming.dsl +package org.apache.gearpump.streaming.dsl.scalaapi import akka.actor.ActorSystem import org.apache.gearpump.cluster.TestUtil import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.streaming.dsl.scalaapi import org.apache.gearpump.streaming.partitioner.PartitionerDescription -import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.util.Graph import org.mockito.Mockito.when import org.scalatest._ @@ -49,8 +50,8 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M when(context.system).thenReturn(system) val dsl = StreamApp("dsl", context) - dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]] - dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]] + dsl.source(List("A"), 2, "A") shouldBe a [scalaapi.Stream[_]] + dsl.source(List("B"), 3, "B") shouldBe a [scalaapi.Stream[_]] val application = dsl.plan() application shouldBe a [StreamApplication] diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala similarity index 94% rename from streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala rename to streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala index 8def61e1f..62a3bcb2b 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala @@ -16,19 +16,19 @@ * limitations under the License. */ -package org.apache.gearpump.streaming.dsl +package org.apache.gearpump.streaming.dsl.scalaapi import akka.actor._ import org.apache.gearpump.Message import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} -import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} -import org.apache.gearpump.streaming.dsl.StreamSpec.Join import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner +import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} +import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.util.Graph import org.apache.gearpump.util.Graph._ import org.mockito.Mockito.when @@ -71,9 +71,10 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock map(word => (word, 1)). groupBy(_._1, parallelism = 2). reduce((left, right) => (left._1, left._2 + right._2)). - map[Either[(String, Int), String]](Left(_)) + map[Either[(String, Int), String]]({t: (String, Int) => Left(t)}) - val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) + val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]]( + {s: String => Right(s)}) stream.merge(query).process[(String, Int)](classOf[Join], 1) val app: StreamApplication = dsl.plan()