From 66017ab7bf5166ec312684f0e3e49e7219b4c24d Mon Sep 17 00:00:00 2001 From: manuzhang Date: Sun, 9 Oct 2016 08:52:00 +0800 Subject: [PATCH 1/3] [GEARPUMP-23] add window dsl The PR is opened for early review and the work is in progress with following todos. - [x] basic window dsl support with `WindowedWordCount` example - [x] improve `ReduceFunction` to not emit intermediate results - [x] add unit tests - [ ] add comments and update documentation - [ ] support different types of computation (e.g. monoid which doesn't require input elements to be held in the window) Author: manuzhang Closes #85 from manuzhang/window_dsl. --- .../wordcount/dsl/WindowedWordCount.scala | 87 +++++ .../apache/gearpump/streaming/Constants.scala | 1 + .../streaming/StreamApplication.scala | 2 +- .../gearpump/streaming/dsl/Stream.scala | 106 +++--- .../gearpump/streaming/dsl/StreamApp.scala | 34 +- .../streaming/dsl/javaapi/JavaStream.scala | 22 +- .../apache/gearpump/streaming/dsl/op/OP.scala | 109 ------ ...itioner.scala => GroupByPartitioner.scala} | 13 +- .../gearpump/streaming/dsl/plan/OP.scala | 214 +++++++++++ .../streaming/dsl/plan/OpTranslator.scala | 222 ------------ .../gearpump/streaming/dsl/plan/Planner.scala | 65 ++-- .../plan/functions/SingleInputFunction.scala | 107 ++++++ .../streaming/dsl/task/CountTriggerTask.scala | 63 ++++ .../dsl/task/EventTimeTriggerTask.scala | 59 ++++ .../dsl/task/ProcessingTimeTriggerTask.scala | 82 +++++ .../streaming/dsl/task/TransformTask.scala | 47 +++ .../dsl/window/api/AccumulationMode.scala | 24 ++ .../streaming/dsl/window/api/GroupByFn.scala | 47 +++ .../streaming/dsl/window/api/Trigger.scala | 27 ++ .../streaming/dsl/window/api/Window.scala | 77 ++++ .../streaming/dsl/window/api/WindowFn.scala | 63 ++++ .../dsl/window/impl/ReduceFnRunner.scala | 29 ++ .../streaming/dsl/window/impl/Window.scala | 75 ++++ .../dsl/window/impl/WindowRunner.scala | 114 ++++++ .../streaming/source/DataSourceTask.scala | 15 +- .../gearpump/streaming/task/TaskActor.scala | 4 +- .../streaming/dsl/StreamAppSpec.scala | 67 ++-- .../gearpump/streaming/dsl/StreamSpec.scala | 24 +- .../partitioner/GroupByPartitionerSpec.scala | 23 +- .../gearpump/streaming/dsl/plan/OpSpec.scala | 244 +++++++++++++ .../streaming/dsl/plan/OpTranslatorSpec.scala | 148 -------- .../streaming/dsl/plan/PlannerSpec.scala | 132 +++++++ .../functions/SingleInputFunctionSpec.scala | 333 ++++++++++++++++++ .../dsl/task/CountTriggerTaskSpec.scala | 61 ++++ .../dsl/task/EventTimeTriggerTaskSpec.scala | 66 ++++ .../task/ProcessingTimeTriggerTaskSpec.scala | 69 ++++ 36 files changed, 2205 insertions(+), 670 deletions(-) create mode 100644 examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala delete mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala rename streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/{GroupbyPartitioner.scala => GroupByPartitioner.scala} (77%) create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala delete mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala create mode 100644 streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala delete mode 100644 streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala create mode 100644 streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala create mode 100644 streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala create mode 100644 streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala create mode 100644 streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala create mode 100644 streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala new file mode 100644 index 000000000..4f43fd466 --- /dev/null +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.examples.wordcount.dsl + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} +import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp} +import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow} +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.AkkaApp + +object WindowedWordCount extends AkkaApp with ArgumentsParser { + + override val options: Array[(String, CLIOption[Any])] = Array.empty + + override def main(akkaConf: Config, args: Array[String]): Unit = { + val context = ClientContext(akkaConf) + val app = StreamApp("dsl", context) + app.source[String](new TimedDataSource). + // word => (word, count) + flatMap(line => line.split("[\\s]+")).map((_, 1)). + // fix window + window(FixedWindow.apply(Duration.ofMillis(5L)) + .triggering(EventTimeTrigger)). + // (word, count1), (word, count2) => (word, count1 + count2) + groupBy(_._1). + sum.sink(new LoggerSink) + + context.submit(app) + context.close() + } + + private class TimedDataSource extends DataSource { + + private var data = List( + Message("foo", 1L), + Message("bar", 2L), + Message("foo", 3L), + Message("foo", 5L), + Message("bar", 7L), + Message("bar", 8L) + ) + + private var watermark: Instant = Instant.ofEpochMilli(0) + + override def read(): Message = { + if (data.nonEmpty) { + val msg = data.head + data = data.tail + watermark = Instant.ofEpochMilli(msg.timestamp) + msg + } else { + null + } + } + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def close(): Unit = {} + + override def getWatermark: Instant = { + if (data.isEmpty) { + watermark = watermark.plusMillis(1) + } + watermark + } + } +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala index cd33b507a..f99a43649 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/Constants.scala @@ -22,6 +22,7 @@ object Constants { val GEARPUMP_STREAMING_OPERATOR = "gearpump.streaming.dsl.operator" val GEARPUMP_STREAMING_SOURCE = "gearpump.streaming.source" val GEARPUMP_STREAMING_GROUPBY_FUNCTION = "gearpump.streaming.dsl.groupby-function" + val GEARPUMP_STREAMING_WINDOW_FUNCTION = "gearpump.streaming.dsl.window-function" val GEARPUMP_STREAMING_LOCALITIES = "gearpump.streaming.localities" diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala index 66ec87303..a6588a14e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/StreamApplication.scala @@ -123,7 +123,7 @@ object LifeTime { */ class StreamApplication( override val name: String, val inputUserConfig: UserConfig, - val dag: Graph[ProcessorDescription, PartitionerDescription]) + dag: Graph[ProcessorDescription, PartitionerDescription]) extends Application { require(!dag.hasDuplicatedEdge(), "Graph should not have duplicated edges") diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala index 786d496dc..440a45ed8 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala @@ -20,7 +20,10 @@ package org.apache.gearpump.streaming.dsl import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.op._ +import org.apache.gearpump.streaming.dsl.plan._ +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.window.api._ +import org.apache.gearpump.streaming.dsl.window.impl._ import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -35,12 +38,12 @@ class Stream[T]( /** * converts a value[T] to a list of value[R] * - * @param fun FlatMap function + * @param fn FlatMap function * @param description The description message for this operation * @return A new stream with type [R] */ - def flatMap[R](fun: T => TraversableOnce[R], description: String = null): Stream[R] = { - val flatMapOp = FlatMapOp(fun, Option(description).getOrElse("flatmap")) + def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = { + val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description)) graph.addVertex(flatMapOp) graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp) new Stream[R](graph, flatMapOp) @@ -49,36 +52,36 @@ class Stream[T]( /** * Maps message of type T message of type R * - * @param fun Function + * @param fn Function * @return A new stream with type [R] */ - def map[R](fun: T => R, description: String = null): Stream[R] = { + def map[R](fn: T => R, description: String = "map"): Stream[R] = { this.flatMap({ data => - Option(fun(data)) - }, Option(description).getOrElse("map")) + Option(fn(data)) + }, description) } /** * Keeps records when fun(T) == true * - * @param fun the filter + * @param fn the filter * @return a new stream after filter */ - def filter(fun: T => Boolean, description: String = null): Stream[T] = { + def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = { this.flatMap({ data => - if (fun(data)) Option(data) else None - }, Option(description).getOrElse("filter")) + if (fn(data)) Option(data) else None + }, description) } /** * Reduces operations. * - * @param fun reduction function + * @param fn reduction function * @param description description message for this operator * @return a new stream after reduction */ - def reduce(fun: (T, T) => T, description: String = null): Stream[T] = { - val reduceOp = ReduceOp(fun, Option(description).getOrElse("reduce")) + def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = { + val reduceOp = ChainableOp(new ReduceFunction(fn, description)) graph.addVertex(reduceOp) graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp) new Stream(graph, reduceOp) @@ -88,7 +91,10 @@ class Stream[T]( * Log to task log file */ def log(): Unit = { - this.map(msg => LoggerFactory.getLogger("dsl").info(msg.toString), "log") + this.map(msg => { + LoggerFactory.getLogger("dsl").info(msg.toString) + msg + }, "log") } /** @@ -97,8 +103,8 @@ class Stream[T]( * @param other the other stream * @return the merged stream */ - def merge(other: Stream[T], description: String = null): Stream[T] = { - val mergeOp = MergeOp(Option(description).getOrElse("merge")) + def merge(other: Stream[T], description: String = "merge"): Stream[T] = { + val mergeOp = MergeOp(description, UserConfig.empty) graph.addVertex(mergeOp) graph.addEdge(thisNode, edge.getOrElse(Direct), mergeOp) graph.addEdge(other.thisNode, other.edge.getOrElse(Shuffle), mergeOp) @@ -115,20 +121,29 @@ class Stream[T]( * * For example, * {{{ - * Stream[People].groupBy(_.gender).flatmap(..).filter.(..).reduce(..) + * Stream[People].groupBy(_.gender).flatMap(..).filter(..).reduce(..) * }}} * - * @param fun Group by function + * @param fn Group by function * @param parallelism Parallelism level * @param description The description * @return the grouped stream */ - def groupBy[Group](fun: T => Group, parallelism: Int = 1, description: String = null) - : Stream[T] = { - val groupOp = GroupByOp(fun, parallelism, Option(description).getOrElse("groupBy")) - graph.addVertex(groupOp) - graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) - new Stream[T](graph, groupOp) + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + window(CountWindow.apply(1).accumulating) + .groupBy[GROUP](fn, parallelism, description) + } + + /** + * Window function + * + * @param win window definition + * @param description window description + * @return [[WindowStream]] where groupBy could be applied + */ + def window(win: Window, description: String = "window"): WindowStream[T] = { + new WindowStream[T](graph, edge, thisNode, win, description) } /** @@ -140,15 +155,28 @@ class Stream[T]( */ def process[R]( processor: Class[_ <: Task], parallelism: Int, conf: UserConfig = UserConfig.empty, - description: String = null): Stream[R] = { - val processorOp = ProcessorOp(processor, parallelism, conf, - Option(description).getOrElse("process")) + description: String = "process"): Stream[R] = { + val processorOp = ProcessorOp(processor, parallelism, conf, description) graph.addVertex(processorOp) graph.addEdge(thisNode, edge.getOrElse(Shuffle), processorOp) new Stream[R](graph, processorOp, Some(Shuffle)) } } +class WindowStream[T](graph: Graph[Op, OpEdge], edge: Option[OpEdge], thisNode: Op, + window: Window, winDesc: String) { + + def groupBy[GROUP](fn: T => GROUP, parallelism: Int = 1, + description: String = "groupBy"): Stream[T] = { + val groupBy: GroupByFn[T, (GROUP, List[Bucket])] = GroupAlsoByWindow(fn, window) + val groupOp = GroupByOp[T, (GROUP, List[Bucket])](groupBy, parallelism, + s"$winDesc.$description") + graph.addVertex(groupOp) + graph.addEdge(thisNode, edge.getOrElse(Shuffle), groupOp) + new Stream[T](graph, groupOp) + } +} + class KVStream[K, V](stream: Stream[Tuple2[K, V]]) { /** * GroupBy key @@ -192,30 +220,18 @@ object Stream { } implicit class Sink[T](stream: Stream[T]) extends java.io.Serializable { - def sink[T](dataSink: DataSink, parallism: Int, conf: UserConfig, description: String) - : Stream[T] = { - implicit val sink = DataSinkOp[T](dataSink, parallism, conf, - Some(description).getOrElse("traversable")) + def sink(dataSink: DataSink, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "sink"): Stream[T] = { + implicit val sink = DataSinkOp(dataSink, parallelism, conf, description) stream.graph.addVertex(sink) stream.graph.addEdge(stream.thisNode, Shuffle, sink) new Stream[T](stream.graph, sink) } - - def sink[T]( - sink: Class[_ <: Task], parallism: Int, conf: UserConfig = UserConfig.empty, - description: String = null): Stream[T] = { - val sinkOp = ProcessorOp(sink, parallism, conf, Option(description).getOrElse("source")) - stream.graph.addVertex(sinkOp) - stream.graph.addEdge(stream.thisNode, Shuffle, sinkOp) - new Stream[T](stream.graph, sinkOp) - } } } class LoggerSink[T] extends DataSink { - var logger: Logger = null - - private var context: TaskContext = null + var logger: Logger = _ override def open(context: TaskContext): Unit = { this.logger = context.logger diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala index d45737b0e..81161466c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala @@ -24,10 +24,9 @@ import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.streaming.StreamApplication -import org.apache.gearpump.streaming.dsl.op.{DataSourceOp, Op, OpEdge, ProcessorOp} -import org.apache.gearpump.streaming.dsl.plan.Planner +import org.apache.gearpump.streaming.dsl.plan._ import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.Graph import org.apache.gearpump.Message @@ -50,7 +49,8 @@ import scala.language.implicitConversions * @param name name of app */ class StreamApp( - val name: String, system: ActorSystem, userConfig: UserConfig, val graph: Graph[Op, OpEdge]) { + name: String, system: ActorSystem, userConfig: UserConfig, + private val graph: Graph[Op, OpEdge]) { def this(name: String, system: ActorSystem, userConfig: UserConfig) = { this(name, system, userConfig, Graph.empty[Op, OpEdge]) @@ -76,34 +76,16 @@ object StreamApp { implicit class Source(app: StreamApp) extends java.io.Serializable { - def source[T](dataSource: DataSource, parallelism: Int): Stream[T] = { - source(dataSource, parallelism, UserConfig.empty) - } - - def source[T](dataSource: DataSource, parallelism: Int, description: String): Stream[T] = { - source(dataSource, parallelism, UserConfig.empty, description) - } - - def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig): Stream[T] = { - source(dataSource, parallelism, conf, description = null) - } - - def source[T](dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) - : Stream[T] = { + def source[T](dataSource: DataSource, parallelism: Int = 1, + conf: UserConfig = UserConfig.empty, description: String = "source"): Stream[T] = { implicit val sourceOp = DataSourceOp(dataSource, parallelism, conf, description) app.graph.addVertex(sourceOp) new Stream[T](app.graph, sourceOp) } + def source[T](seq: Seq[T], parallelism: Int, description: String): Stream[T] = { this.source(new CollectionDataSource[T](seq), parallelism, UserConfig.empty, description) } - - def source[T](source: Class[_ <: Task], parallelism: Int, conf: UserConfig, description: String) - : Stream[T] = { - val sourceOp = ProcessorOp(source, parallelism, conf, Option(description).getOrElse("source")) - app.graph.addVertex(sourceOp) - new Stream[T](app.graph, sourceOp) - } } } @@ -115,7 +97,7 @@ class CollectionDataSource[T](seq: Seq[T]) extends DataSource { override def read(): Message = { if (iterator.hasNext) { - Message(iterator.next()) + Message(iterator.next(), Instant.now().toEpochMilli) } else { null } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala index 6eff20cf2..3003b981f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala @@ -19,9 +19,9 @@ package org.apache.gearpump.streaming.dsl.javaapi import scala.collection.JavaConverters._ - import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.dsl.Stream +import org.apache.gearpump.streaming.dsl.window.api.Window +import org.apache.gearpump.streaming.dsl.{Stream, WindowStream} import org.apache.gearpump.streaming.javaapi.dsl.functions._ import org.apache.gearpump.streaming.task.Task @@ -63,9 +63,13 @@ class JavaStream[T](val stream: Stream[T]) { * Group by a stream and turns it to a list of sub-streams. Operations chained after * groupBy applies to sub-streams. */ - def groupBy[Group](fn: GroupByFunction[T, Group], parallelism: Int, description: String) - : JavaStream[T] = { - new JavaStream[T](stream.groupBy({t: T => fn(t)}, parallelism, description)) + def groupBy[GROUP](fn: GroupByFunction[T, GROUP], + parallelism: Int, description: String): JavaStream[T] = { + new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + } + + def window(win: Window, description: String): JavaWindowStream[T] = { + new JavaWindowStream[T](stream.window(win, description)) } /** Add a low level Processor to process messages */ @@ -75,3 +79,11 @@ class JavaStream[T](val stream: Stream[T]) { new JavaStream[R](stream.process(processor, parallelism, conf, description)) } } + +class JavaWindowStream[T](stream: WindowStream[T]) { + + def groupBy[GROUP](fn: GroupByFunction[T, GROUP], parallelism: Int, + description: String): JavaStream[T] = { + new JavaStream[T](stream.groupBy((t: T) => fn, parallelism, description)) + } +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala deleted file mode 100644 index 49d9dec72..000000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/op/OP.scala +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.op - -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.sink.DataSink -import org.apache.gearpump.streaming.source.DataSource -import org.apache.gearpump.streaming.task.Task - -/** - * Operators for the DSL - */ -sealed trait Op { - def description: String - def conf: UserConfig -} - -/** - * When translated to running DAG, SlaveOP can be attach to MasterOP or other SlaveOP - * "Attach" means running in same Actor. - */ -trait SlaveOp[T] extends Op - -case class FlatMapOp[T, R]( - fun: (T) => TraversableOnce[R], description: String, conf: UserConfig = UserConfig.empty) - extends SlaveOp[T] - -case class ReduceOp[T](fun: (T, T) => T, description: String, conf: UserConfig = UserConfig.empty) - extends SlaveOp[T] - -trait MasterOp extends Op - -trait ParameterizedOp[T] extends MasterOp - -case class MergeOp(description: String, override val conf: UserConfig = UserConfig.empty) - extends MasterOp - -case class GroupByOp[T, R]( - fun: T => R, parallelism: Int, description: String, - override val conf: UserConfig = UserConfig.empty) - extends ParameterizedOp[T] - -case class ProcessorOp[T <: Task]( - processor: Class[T], parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -case class DataSourceOp[T]( - dataSource: DataSource, parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -case class DataSinkOp[T]( - dataSink: DataSink, parallelism: Int, conf: UserConfig, description: String) - extends ParameterizedOp[T] - -/** - * Contains operators which can be chained to single one. - * - * For example, flatmap().map().reduce() can be chained to single operator as - * no data shuffling is required. - * @param ops list of operations - */ -case class OpChain(ops: List[Op]) extends Op { - def head: Op = ops.head - def last: Op = ops.last - - def description: String = null - - override def conf: UserConfig = { - // The head's conf has priority - ops.reverse.foldLeft(UserConfig.empty) { (conf, op) => - conf.withConfig(op.conf) - } - } -} - -trait OpEdge - -/** - * The upstream OP and downstream OP doesn't require network data shuffle. - * - * For example, map, flatmap operation doesn't require network shuffle, we can use Direct - * to represent the relation with upstream operators. - */ -case object Direct extends OpEdge - -/** - * The upstream OP and downstream OP DOES require network data shuffle. - * - * For example, map, flatmap operation doesn't require network shuffle, we can use Direct - * to represent the relation with upstream operators. - */ -case object Shuffle extends OpEdge - diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala similarity index 77% rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala index b2e29328a..2ec881b7e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupbyPartitioner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitioner.scala @@ -20,6 +20,7 @@ package org.apache.gearpump.streaming.dsl.partitioner import org.apache.gearpump.Message import org.apache.gearpump.partitioner.UnicastPartitioner +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn /** * Partition messages by applying group by function first. @@ -35,12 +36,14 @@ import org.apache.gearpump.partitioner.UnicastPartitioner * } * }}} * - * @param groupBy First apply message with groupBy function, then pick the hashCode of the output + * @param fn First apply message with groupBy function, then pick the hashCode of the output * to do the partitioning. You must define hashCode() for output type of groupBy function. */ -class GroupByPartitioner[T, GROUP](groupBy: T => GROUP = null) extends UnicastPartitioner { - override def getPartition(msg: Message, partitionNum: Int, currentPartitionId: Int): Int = { - val hashCode = groupBy(msg.msg.asInstanceOf[T]).hashCode() +class GroupByPartitioner[T, Group](fn: GroupByFn[T, Group]) + extends UnicastPartitioner { + override def getPartition(message: Message, partitionNum: Int, currentPartitionId: Int): Int = { + val hashCode = fn.groupBy(message).hashCode() (hashCode & Integer.MAX_VALUE) % partitionNum } -} \ No newline at end of file +} + diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala new file mode 100644 index 000000000..744976b7c --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.plan + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.Processor.DefaultProcessor +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.{Constants, Processor} +import org.apache.gearpump.streaming.dsl.task.TransformTask +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} +import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask} +import org.apache.gearpump.streaming.task.Task + +import scala.reflect.ClassTag + +/** + * This is a vertex on the logical plan. + */ +sealed trait Op { + + def description: String + + def userConfig: UserConfig + + def chain(op: Op)(implicit system: ActorSystem): Op + + def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] +} + +/** + * This represents a low level Processor. + */ +case class ProcessorOp[T <: Task]( + processor: Class[T], + parallelism: Int, + userConfig: UserConfig, + description: String) + extends Op { + + def this( + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "processor")(implicit classTag: ClassTag[T]) = { + this(classTag.runtimeClass.asInstanceOf[Class[T]], parallelism, userConfig, description) + } + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + throw new OpChainException(this, other) + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + DefaultProcessor(parallelism, description, userConfig, processor) + } +} + +/** + * This represents a DataSource. + */ +case class DataSourceOp( + dataSource: DataSource, + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "source") + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[_, _] => + DataSourceOp(dataSource, parallelism, + userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn), + description) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Processor[DataSourceTask[Any, Any]](parallelism, description, + userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) + } +} + +/** + * This represents a DataSink. + */ +case class DataSinkOp( + dataSink: DataSink, + parallelism: Int = 1, + userConfig: UserConfig = UserConfig.empty, + description: String = "sink") + extends Op { + + override def chain(op: Op)(implicit system: ActorSystem): Op = { + throw new OpChainException(this, op) + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + DataSinkProcessor(dataSink, parallelism, description) + } +} + +/** + * This represents operations that can be chained together + * (e.g. flatMap, map, filter, reduce) and further chained + * to another Op to be used + */ +case class ChainableOp[IN, OUT]( + fn: SingleInputFunction[IN, OUT]) extends Op { + + override def description: String = fn.description + + override def userConfig: UserConfig = UserConfig.empty + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[OUT, _] => + // TODO: preserve type info + ChainableOp(fn.andThen(op.fn)) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + throw new UnsupportedOperationException("ChainedOp cannot be translated to Processor") + } +} + +/** + * This represents a Processor with window aggregation + */ +case class GroupByOp[IN, GROUP]( + groupByFn: GroupByFn[IN, GROUP], + parallelism: Int = 1, + description: String = "groupBy", + override val userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[_, _] => + GroupByOp(groupByFn, parallelism, description, + userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn)) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + groupByFn.getProcessor(parallelism, description, userConfig) + } +} + +/** + * This represents a Processor transforming merged streams + */ +case class MergeOp(description: String, userConfig: UserConfig = UserConfig.empty) + extends Op { + + override def chain(other: Op)(implicit system: ActorSystem): Op = { + other match { + case op: ChainableOp[_, _] => + MergeOp(description, userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.fn)) + case _ => + throw new OpChainException(this, other) + } + } + + override def getProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { + Processor[TransformTask[Any, Any]](1, description, userConfig) + } + +} + +/** + * This is an edge on the logical plan. + */ +trait OpEdge + +/** + * The upstream OP and downstream OP doesn't require network data shuffle. + * e.g. ChainableOp + */ +case object Direct extends OpEdge + +/** + * The upstream OP and downstream OP DOES require network data shuffle. + * e.g. GroupByOp + */ +case object Shuffle extends OpEdge + +/** + * Runtime exception thrown on chaining. + */ +class OpChainException(op1: Op, op2: Op) extends RuntimeException(s"$op1 cannot be chained by $op2") \ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala deleted file mode 100644 index 8de291c59..000000000 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslator.scala +++ /dev/null @@ -1,222 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.plan - -import scala.collection.TraversableOnce -import akka.actor.ActorSystem -import org.slf4j.Logger -import org.apache.gearpump._ -import org.apache.gearpump.cluster.UserConfig -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.Processor -import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.op._ -import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ -import org.apache.gearpump.streaming.sink.DataSinkProcessor -import org.apache.gearpump.streaming.source.DataSourceTask -import org.apache.gearpump.streaming.task.{Task, TaskContext} -import org.apache.gearpump.util.LogUtil - -/** - * Translates a OP to a TaskDescription - */ -class OpTranslator extends java.io.Serializable { - val LOG: Logger = LogUtil.getLogger(getClass) - - def translate(ops: OpChain)(implicit system: ActorSystem): Processor[_ <: Task] = { - - val baseConfig = ops.conf - - ops.ops.head match { - case op: MasterOp => - val tail = ops.ops.tail - val func = toFunction(tail) - val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) - - op match { - case DataSourceOp(dataSource, parallelism, conf, description) => - Processor[DataSourceTask[Any, Any]](parallelism, - description = description + "." + func.description, - userConfig.withValue(GEARPUMP_STREAMING_SOURCE, dataSource)) - case groupby@GroupByOp(_, parallelism, description, _) => - Processor[GroupByTask[Object, Object, Object]](parallelism, - description = description + "." + func.description, - userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, groupby)) - case merge: MergeOp => - Processor[TransformTask[Object, Object]](1, - description = op.description + "." + func.description, - userConfig) - case ProcessorOp(processor, parallelism, conf, description) => - DefaultProcessor(parallelism, - description = description + "." + func.description, - userConfig, processor) - case DataSinkOp(dataSink, parallelism, conf, description) => - DataSinkProcessor(dataSink, parallelism, description + func.description) - } - case op: SlaveOp[_] => - val func = toFunction(ops.ops) - val userConfig = baseConfig.withValue(GEARPUMP_STREAMING_OPERATOR, func) - - Processor[TransformTask[Object, Object]](1, - description = func.description, - taskConf = userConfig) - case chain: OpChain => - throw new RuntimeException("Not supposed to be called!") - } - } - - private def toFunction(ops: List[Op]): SingleInputFunction[Object, Object] = { - val func: SingleInputFunction[Object, Object] = new DummyInputFunction[Object]() - val totalFunction = ops.foldLeft(func) { (fun, op) => - - val opFunction = op match { - case flatmap: FlatMapOp[Object @unchecked, Object @unchecked] => - new FlatMapFunction(flatmap.fun, flatmap.description) - case reduce: ReduceOp[Object @unchecked] => - new ReduceFunction(reduce.fun, reduce.description) - case _ => - throw new RuntimeException("Not supposed to be called!") - } - fun.andThen(opFunction.asInstanceOf[SingleInputFunction[Object, Object]]) - } - totalFunction.asInstanceOf[SingleInputFunction[Object, Object]] - } -} - -object OpTranslator { - - trait SingleInputFunction[IN, OUT] extends Serializable { - def process(value: IN): TraversableOnce[OUT] - def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { - new AndThen(this, other) - } - - def description: String - } - - class DummyInputFunction[T] extends SingleInputFunction[T, T] { - override def andThen[OUTER](other: SingleInputFunction[T, OUTER]) - : SingleInputFunction[T, OUTER] = { - other - } - - // Should never be called - override def process(value: T): TraversableOnce[T] = None - - override def description: String = "" - } - - class AndThen[IN, MIDDLE, OUT]( - first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) - extends SingleInputFunction[IN, OUT] { - - override def process(value: IN): TraversableOnce[OUT] = { - first.process(value).flatMap(second.process) - } - - override def description: String = { - Option(first.description).flatMap { description => - Option(second.description).map(description + "." + _) - }.orNull - } - } - - class FlatMapFunction[IN, OUT](fun: IN => TraversableOnce[OUT], descriptionMessage: String) - extends SingleInputFunction[IN, OUT] { - - override def process(value: IN): TraversableOnce[OUT] = { - fun(value) - } - - override def description: String = { - this.descriptionMessage - } - } - - class ReduceFunction[T](fun: (T, T) => T, descriptionMessage: String) - extends SingleInputFunction[T, T] { - - private var state: Any = _ - - override def process(value: T): TraversableOnce[T] = { - if (state == null) { - state = value - } else { - state = fun(state.asInstanceOf[T], value) - } - Some(state.asInstanceOf[T]) - } - - override def description: String = descriptionMessage - } - - class GroupByTask[IN, GROUP, OUT]( - groupBy: IN => GROUP, taskContext: TaskContext, userConf: UserConfig) - extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[GroupByOp[IN, GROUP]]( - GEARPUMP_STREAMING_GROUPBY_FUNCTION )(taskContext.system).get.fun, - taskContext, userConf) - } - - private var groups = Map.empty[GROUP, SingleInputFunction[IN, OUT]] - - override def onNext(msg: Message): Unit = { - val time = msg.timestamp - - val group = groupBy(msg.msg.asInstanceOf[IN]) - if (!groups.contains(group)) { - val operator = - userConf.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get - groups += group -> operator - } - - val operator = groups(group) - - operator.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - } - } - } - - class TransformTask[IN, OUT]( - operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, - userConf: UserConfig) extends Task(taskContext, userConf) { - - def this(taskContext: TaskContext, userConf: UserConfig) = { - this(userConf.getValue[SingleInputFunction[IN, OUT]]( - GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) - } - - override def onNext(msg: Message): Unit = { - val time = msg.timestamp - - operator match { - case Some(op) => - op.process(msg.msg.asInstanceOf[IN]).foreach { msg => - taskContext.output(new Message(msg.asInstanceOf[AnyRef], time)) - } - case None => - taskContext.output(new Message(msg.msg, time)) - } - } - } - -} \ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala index f5bbd654d..16d5c06ea 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/Planner.scala @@ -22,7 +22,6 @@ import akka.actor.ActorSystem import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, Partitioner} import org.apache.gearpump.streaming.Processor -import org.apache.gearpump.streaming.dsl.op._ import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner import org.apache.gearpump.streaming.task.Task import org.apache.gearpump.util.Graph @@ -33,64 +32,60 @@ class Planner { * Converts Dag of Op to Dag of TaskDescription. TaskDescription is part of the low * level Graph API. */ - def plan(dag: Graph[Op, OpEdge])(implicit system: ActorSystem) - : Graph[Processor[_ <: Task], _ <: Partitioner] = { + def plan(dag: Graph[Op, OpEdge]) + (implicit system: ActorSystem): Graph[Processor[_ <: Task], _ <: Partitioner] = { - val opTranslator = new OpTranslator() - - val newDag = optimize(dag) - newDag.mapEdge { (node1, edge, node2) => + val graph = optimize(dag) + graph.mapEdge { (node1, edge, node2) => edge match { case Shuffle => - node2.head match { - case groupBy: GroupByOp[Any @unchecked, Any @unchecked] => - new GroupByPartitioner(groupBy.fun) + node2 match { + case groupBy: GroupByOp[_, _] => + new GroupByPartitioner(groupBy.groupByFn) case _ => new HashPartitioner } case Direct => new CoLocationPartitioner } - }.mapVertex { opChain => - opTranslator.translate(opChain) - } + }.mapVertex(_.getProcessor) } - private def optimize(dag: Graph[Op, OpEdge]): Graph[OpChain, OpEdge] = { - val newGraph = dag.mapVertex(op => OpChain(List(op))) - - val nodes = newGraph.topologicalOrderWithCirclesIterator.toList.reverse + private def optimize(dag: Graph[Op, OpEdge]) + (implicit system: ActorSystem): Graph[Op, OpEdge] = { + val graph = dag.copy + val nodes = graph.topologicalOrderWithCirclesIterator.toList.reverse for (node <- nodes) { - val outGoingEdges = newGraph.outgoingEdgesOf(node) + val outGoingEdges = graph.outgoingEdgesOf(node) for (edge <- outGoingEdges) { - merge(newGraph, edge._1, edge._3) + merge(graph, edge._1, edge._3) } } - newGraph + graph } - private def merge(dag: Graph[OpChain, OpEdge], node1: OpChain, node2: OpChain) - : Graph[OpChain, OpEdge] = { - if (dag.outDegreeOf(node1) == 1 && - dag.inDegreeOf(node2) == 1 && + private def merge(graph: Graph[Op, OpEdge], node1: Op, node2: Op) + (implicit system: ActorSystem): Unit = { + if (graph.outDegreeOf(node1) == 1 && + graph.inDegreeOf(node2) == 1 && // For processor node, we don't allow it to merge with downstream operators - !node1.head.isInstanceOf[ProcessorOp[_ <: Task]]) { - val (_, edge, _) = dag.outgoingEdgesOf(node1).head + !node1.isInstanceOf[ProcessorOp[_ <: Task]] && + !node2.isInstanceOf[ProcessorOp[_ <: Task]]) { + val (_, edge, _) = graph.outgoingEdgesOf(node1).head if (edge == Direct) { - val opList = OpChain(node1.ops ++ node2.ops) - dag.addVertex(opList) - for (incomingEdge <- dag.incomingEdgesOf(node1)) { - dag.addEdge(incomingEdge._1, incomingEdge._2, opList) + val chainedOp = node1.chain(node2) + graph.addVertex(chainedOp) + for (incomingEdge <- graph.incomingEdgesOf(node1)) { + graph.addEdge(incomingEdge._1, incomingEdge._2, chainedOp) } - for (outgoingEdge <- dag.outgoingEdgesOf(node2)) { - dag.addEdge(opList, outgoingEdge._2, outgoingEdge._3) + for (outgoingEdge <- graph.outgoingEdgesOf(node2)) { + graph.addEdge(chainedOp, outgoingEdge._2, outgoingEdge._3) } // Remove the old vertex - dag.removeVertex(node1) - dag.removeVertex(node2) + graph.removeVertex(node1) + graph.removeVertex(node2) } } - dag } } \ No newline at end of file diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala new file mode 100644 index 000000000..609fbb06f --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan.functions + +trait SingleInputFunction[IN, OUT] extends Serializable { + def process(value: IN): TraversableOnce[OUT] + def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = { + new AndThen(this, other) + } + def finish(): TraversableOnce[OUT] = None + def clearState(): Unit = {} + def description: String +} + +class AndThen[IN, MIDDLE, OUT]( + first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT]) + extends SingleInputFunction[IN, OUT] { + + override def process(value: IN): TraversableOnce[OUT] = { + first.process(value).flatMap(second.process) + } + + override def finish(): TraversableOnce[OUT] = { + val firstResult = first.finish().flatMap(second.process) + if (firstResult.isEmpty) { + second.finish() + } else { + firstResult + } + } + + override def clearState(): Unit = { + first.clearState() + second.clearState() + } + + override def description: String = { + Option(first.description).flatMap { description => + Option(second.description).map(description + "." + _) + }.orNull + } +} + +class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String) + extends SingleInputFunction[IN, OUT] { + + override def process(value: IN): TraversableOnce[OUT] = { + fn(value) + } + + override def description: String = descriptionMessage +} + + +class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String) + extends SingleInputFunction[T, T] { + + private var state: Option[T] = None + + override def process(value: T): TraversableOnce[T] = { + if (state.isEmpty) { + state = Option(value) + } else { + state = state.map(fn(_, value)) + } + None + } + + override def finish(): TraversableOnce[T] = { + state + } + + override def clearState(): Unit = { + state = None + } + + override def description: String = descriptionMessage +} + +class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] { + + override def process(value: T): TraversableOnce[Unit] = { + emit(value) + None + } + + override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = { + throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction") + } + + override def description: String = "" +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala new file mode 100644 index 000000000..4ee2fa8c1 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTask.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.window.api.CountWindowFn +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +/** + * This task triggers output on number of messages in a window. + */ +class CountTriggerTask[IN, GROUP]( + groupBy: GroupAlsoByWindow[IN, GROUP], + windowRunner: WindowRunner, + taskContext: TaskContext, + userConfig: UserConfig) + extends Task(taskContext, userConfig) { + + def this(groupBy: GroupAlsoByWindow[IN, GROUP], + taskContext: TaskContext, userConfig: UserConfig) = { + this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), + taskContext, userConfig) + } + + def this(taskContext: TaskContext, userConfig: UserConfig) = { + this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, + taskContext, userConfig) + } + + private val windowSize = groupBy.window.windowFn.asInstanceOf[CountWindowFn].size + private var num = 0 + + override def onNext(msg: Message): Unit = { + windowRunner.process(msg) + num += 1 + if (windowSize == num) { + windowRunner.trigger(Instant.ofEpochMilli(windowSize)) + num = 0 + } + } +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala new file mode 100644 index 000000000..4b7649f71 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTask.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +/** + * This task triggers output on watermark progress. + */ +class EventTimeTriggerTask[IN, GROUP]( + groupBy: GroupAlsoByWindow[IN, GROUP], + windowRunner: WindowRunner, + taskContext: TaskContext, + userConfig: UserConfig) + extends Task(taskContext, userConfig) { + + def this(groupBy: GroupAlsoByWindow[IN, GROUP], + taskContext: TaskContext, userConfig: UserConfig) = { + this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), + taskContext, userConfig) + } + + def this(taskContext: TaskContext, userConfig: UserConfig) = { + this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, + taskContext, userConfig) + } + + override def onNext(message: Message): Unit = { + windowRunner.process(message) + } + + override def onWatermarkProgress(watermark: Instant): Unit = { + windowRunner.trigger(watermark) + } + +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala new file mode 100644 index 000000000..980a54b23 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTask.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant +import java.util.concurrent.TimeUnit + +import akka.actor.Actor.Receive +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering +import org.apache.gearpump.streaming.dsl.window.api.SlidingWindowFn +import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, GroupAlsoByWindow, WindowRunner} +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +import scala.concurrent.duration.FiniteDuration + +object ProcessingTimeTriggerTask { + case object Triggering +} + +/** + * This task triggers output on scheduled system time interval. + */ +class ProcessingTimeTriggerTask[IN, GROUP]( + groupBy: GroupAlsoByWindow[IN, GROUP], + windowRunner: WindowRunner, + taskContext: TaskContext, + userConfig: UserConfig) + extends Task(taskContext, userConfig) { + + def this(groupBy: GroupAlsoByWindow[IN, GROUP], + taskContext: TaskContext, userConfig: UserConfig) = { + this(groupBy, new DefaultWindowRunner(taskContext, userConfig, groupBy)(taskContext.system), + taskContext, userConfig) + } + + def this(taskContext: TaskContext, userConfig: UserConfig) = { + this(userConfig.getValue[GroupAlsoByWindow[IN, GROUP]]( + GEARPUMP_STREAMING_GROUPBY_FUNCTION)(taskContext.system).get, + taskContext, userConfig) + } + + private val windowFn = groupBy.window.windowFn.asInstanceOf[SlidingWindowFn] + private val windowSizeMs = windowFn.size.toMillis + private val windowStepMs = windowFn.step.toMillis + + override def onStart(startTime: Instant): Unit = { + val initialDelay = windowSizeMs - Instant.now.toEpochMilli % windowSizeMs + taskContext.scheduleOnce( + new FiniteDuration(initialDelay, TimeUnit.MILLISECONDS))(self ! Triggering) + } + + override def onNext(message: Message): Unit = { + windowRunner.process(message) + } + + override def receiveUnManagedMessage: Receive = { + case Triggering => + windowRunner.trigger(Instant.now) + taskContext.scheduleOnce( + new FiniteDuration(windowStepMs, TimeUnit.MILLISECONDS))(self ! Triggering) + } + +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala new file mode 100644 index 000000000..e35f08577 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.task.{Task, TaskContext} + +class TransformTask[IN, OUT]( + operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext, + userConf: UserConfig) extends Task(taskContext, userConf) { + + def this(taskContext: TaskContext, userConf: UserConfig) = { + this(userConf.getValue[SingleInputFunction[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system), taskContext, userConf) + } + + override def onNext(msg: Message): Unit = { + val time = msg.timestamp + + operator match { + case Some(op) => + op.process(msg.msg.asInstanceOf[IN]).foreach { msg => + taskContext.output(new Message(msg, time)) + } + case None => + taskContext.output(new Message(msg.msg, time)) + } + } +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala new file mode 100644 index 000000000..a4524a8e6 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/AccumulationMode.scala @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +sealed trait AccumulationMode + +case object Accumulating extends AccumulationMode + +case object Discarding extends AccumulationMode diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala new file mode 100644 index 000000000..30e68ba39 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/GroupByFn.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.task.Task + +/** + * Divides messages into groups according its payload and timestamp. + * Check [[org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow]] + * for default implementation. + */ +trait GroupByFn[T, GROUP] { + + /** + * Used by + * 1. GroupByPartitioner to shuffle messages + * 2. WindowRunner to group messages for time-based aggregation + */ + def groupBy(message: Message): GROUP + + /** + * Returns a Processor according to window trigger during planning + */ + def getProcessor(parallelism: Int, description: String, + userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] +} + + diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala new file mode 100644 index 000000000..9865e18d2 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Trigger.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +sealed trait Trigger + +case object EventTimeTrigger extends Trigger + +case object ProcessingTimeTrigger extends Trigger + +case object CountTrigger extends Trigger + diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala new file mode 100644 index 000000000..4b94879bc --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/Window.scala @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import java.time.Duration + +/** + * + * @param windowFn + * @param trigger + * @param accumulationMode + */ +case class Window( + windowFn: WindowFn, + trigger: Trigger = EventTimeTrigger, + accumulationMode: AccumulationMode = Discarding) { + + def triggering(trigger: Trigger): Window = { + Window(windowFn, trigger) + } + + def accumulating: Window = { + Window(windowFn, trigger, Accumulating) + } + + def discarding: Window = { + Window(windowFn, trigger, Discarding) + } +} + +object CountWindow { + + def apply(size: Int): Window = { + Window(CountWindowFn(size), CountTrigger) + } +} + +object FixedWindow { + + /** + * Defines a FixedWindow. + * @param size window size + * @return a Window definition + */ + def apply(size: Duration): Window = { + Window(SlidingWindowFn(size, size)) + } +} + +object SlidingWindow { + + /** + * Defines a SlidingWindow + * @param size window size + * @param step window step to slide forward + * @return a Window definition + */ + def apply(size: Duration, step: Duration): Window = { + Window(SlidingWindowFn(size, step)) + } +} + diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala new file mode 100644 index 000000000..0768730c1 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/api/WindowFn.scala @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.api + +import java.time.{Duration, Instant} + +import org.apache.gearpump.TimeStamp +import org.apache.gearpump.streaming.dsl.window.impl.Bucket + +import scala.collection.mutable.ArrayBuffer + +sealed trait WindowFn { + def apply(timestamp: Instant): List[Bucket] +} + +case class SlidingWindowFn(size: Duration, step: Duration) + extends WindowFn { + + def this(size: Duration) = { + this(size, size) + } + + override def apply(timestamp: Instant): List[Bucket] = { + val sizeMillis = size.toMillis + val stepMillis = step.toMillis + val timeMillis = timestamp.toEpochMilli + val windows = ArrayBuffer.empty[Bucket] + var start = lastStartFor(timeMillis, stepMillis) + windows += Bucket.ofEpochMilli(start, start + sizeMillis) + start -= stepMillis + while (start >= timeMillis) { + windows += Bucket.ofEpochMilli(start, start + sizeMillis) + start -= stepMillis + } + windows.toList + } + + private def lastStartFor(timestamp: TimeStamp, windowStep: Long): TimeStamp = { + timestamp - (timestamp + windowStep) % windowStep + } +} + +case class CountWindowFn(size: Int) extends WindowFn { + + override def apply(timestamp: Instant): List[Bucket] = { + List(Bucket.ofEpochMilli(0, size)) + } +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala new file mode 100644 index 000000000..e978983ff --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/ReduceFnRunner.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import org.apache.gearpump.Message +import org.apache.gearpump.streaming.dsl.window.api.Trigger + +trait ReduceFnRunner { + + def process(message: Message): Unit + + def onTrigger(trigger: Trigger): Unit + +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala new file mode 100644 index 000000000..53cf5d07a --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/Window.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.{Message, TimeStamp} +import org.apache.gearpump.streaming.dsl.window.api._ +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, EventTimeTriggerTask, ProcessingTimeTriggerTask} +import org.apache.gearpump.streaming.task.Task + +object Bucket { + def ofEpochMilli(startTime: TimeStamp, endTime: TimeStamp): Bucket = { + Bucket(Instant.ofEpochMilli(startTime), Instant.ofEpochMilli(endTime)) + } +} + +/** + * A window unit including startTime and excluding endTime. + */ +case class Bucket(startTime: Instant, endTime: Instant) extends Comparable[Bucket] { + override def compareTo(o: Bucket): Int = { + val ret = startTime.compareTo(o.startTime) + if (ret != 0) { + ret + } else { + endTime.compareTo(o.endTime) + } + } +} + +case class GroupAlsoByWindow[T, GROUP](groupByFn: T => GROUP, window: Window) + extends GroupByFn[T, (GROUP, List[Bucket])] { + + override def groupBy(message: Message): (GROUP, List[Bucket]) = { + val group = groupByFn(message.msg.asInstanceOf[T]) + val buckets = window.windowFn(Instant.ofEpochMilli(message.timestamp)) + group -> buckets + } + + override def getProcessor(parallelism: Int, description: String, + userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = { + val config = userConfig.withValue(GEARPUMP_STREAMING_GROUPBY_FUNCTION, this) + window.trigger match { + case CountTrigger => + Processor[CountTriggerTask[T, GROUP]](parallelism, description, config) + case ProcessingTimeTrigger => + Processor[ProcessingTimeTriggerTask[T, GROUP]](parallelism, description, config) + case EventTimeTrigger => + Processor[EventTimeTriggerTask[T, GROUP]](parallelism, description, config) + } + } + +} + + diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala new file mode 100644 index 000000000..9af5e6148 --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.window.impl + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.gs.collections.api.block.procedure.Procedure +import org.apache.gearpump.gs.collections.impl.list.mutable.FastList +import org.apache.gearpump.gs.collections.impl.map.mutable.UnifiedMap +import org.apache.gearpump.gs.collections.impl.map.sorted.mutable.TreeSortedMap +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.window.api.Discarding +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.LogUtil +import org.slf4j.Logger + +trait WindowRunner { + + def process(message: Message): Unit + + def trigger(time: Instant): Unit + +} + +object DefaultWindowRunner { + + private val LOG: Logger = LogUtil.getLogger(classOf[DefaultWindowRunner[_, _, _]]) + + case class WindowGroup[GROUP](bucket: Bucket, group: GROUP) + extends Comparable[WindowGroup[GROUP]] { + override def compareTo(o: WindowGroup[GROUP]): Int = { + val ret = bucket.compareTo(o.bucket) + if (ret != 0) { + ret + } else if (group.equals(o.group)) { + 0 + } else { + -1 + } + } + } +} + +class DefaultWindowRunner[IN, GROUP, OUT]( + taskContext: TaskContext, userConfig: UserConfig, + groupBy: GroupAlsoByWindow[IN, GROUP])(implicit system: ActorSystem) + extends WindowRunner { + import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner._ + + private val windowGroups = new TreeSortedMap[WindowGroup[GROUP], FastList[IN]] + private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]] + + + override def process(message: Message): Unit = { + val (group, buckets) = groupBy.groupBy(message) + buckets.foreach { bucket => + val wg = WindowGroup(bucket, group) + val inputs = windowGroups.getOrDefault(wg, new FastList[IN](1)) + inputs.add(message.msg.asInstanceOf[IN]) + windowGroups.put(wg, inputs) + } + groupFns.putIfAbsent(group, + userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get) + } + + override def trigger(time: Instant): Unit = { + onTrigger() + + @annotation.tailrec + def onTrigger(): Unit = { + if (windowGroups.notEmpty()) { + val first = windowGroups.firstKey + if (!time.isBefore(first.bucket.endTime)) { + val inputs = windowGroups.remove(first) + val reduceFn = groupFns.get(first.group) + .andThen[Unit](new EmitFunction[OUT](emitResult(_, time))) + inputs.forEach(new Procedure[IN] { + override def value(t: IN): Unit = { + reduceFn.process(t) + } + }) + reduceFn.finish() + if (groupBy.window.accumulationMode == Discarding) { + reduceFn.clearState() + } + onTrigger() + } + } + } + + def emitResult(result: OUT, time: Instant): Unit = { + taskContext.output(Message(result, time.toEpochMilli)) + } + } +} diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index fb2d89817..535497c23 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.plan.OpTranslator.{DummyInputFunction, SingleInputFunction} +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction import org.apache.gearpump.streaming.task.{Task, TaskContext} /** @@ -57,15 +57,10 @@ class DataSourceTask[IN, OUT] private[source]( private val processMessage: Message => Unit = operator match { case Some(op) => - op match { - case bad: DummyInputFunction[IN] => - (message: Message) => context.output(message) - case _ => - (message: Message) => { - op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT => - context.output(Message(m, message.timestamp)) - } - } + (message: Message) => { + op.process(message.msg.asInstanceOf[IN]).foreach { m: OUT => + context.output(Message(m, message.timestamp)) + } } case None => (message: Message) => context.output(message) diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index c0b6a29c7..9a52cc6b5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -23,7 +23,7 @@ import java.util import java.util.concurrent.TimeUnit import akka.actor._ -import org.apache.gearpump.streaming.source.{Watermark, DataSourceTask} +import org.apache.gearpump.streaming.source.Watermark import org.slf4j.Logger import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.gs.collections.impl.map.mutable.primitive.IntShortHashMap @@ -308,9 +308,9 @@ class TaskActor( private def updateUpstreamMinClock(upstreamClock: TimeStamp): Unit = { if (upstreamClock > this.upstreamMinClock) { + this.upstreamMinClock = upstreamClock task.onWatermarkProgress(Instant.ofEpochMilli(this.upstreamMinClock)) } - this.upstreamMinClock = upstreamClock val subMinClock = subscriptions.foldLeft(Long.MaxValue) { (min, sub) => val subMin = sub._2.minClock diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala index e919a34cb..e0407ec37 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala @@ -21,7 +21,10 @@ package org.apache.gearpump.streaming.dsl import akka.actor.ActorSystem import org.apache.gearpump.cluster.TestUtil import org.apache.gearpump.cluster.client.ClientContext +import org.apache.gearpump.partitioner.PartitionerDescription +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.util.Graph import org.mockito.Mockito.when import org.scalatest._ import org.scalatest.mock.MockitoSugar @@ -30,7 +33,7 @@ import scala.concurrent.Await import scala.concurrent.duration.Duration class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - implicit var system: ActorSystem = null + implicit var system: ActorSystem = _ override def beforeAll(): Unit = { system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) @@ -45,49 +48,25 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M val context: ClientContext = mock[ClientContext] when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) - app.source(List("A"), 1, "") - app.source(List("B"), 1, "") + val dsl = StreamApp("dsl", context) + dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]] + dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]] - assert(app.graph.vertices.size == 2) - } - - it should "plan the dsl to Processsor(TaskDescription) DAG" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - - val app = StreamApp("dsl", context) - val parallism = 3 - app.source(List("A", "B", "C"), parallism, "").flatMap(Array(_)).reduce(_ + _) - val task = app.plan.dag.vertices.iterator.next() - assert(task.taskClass == classOf[DataSourceTask[_, _]].getName) - assert(task.parallelism == parallism) - } - - it should "produce 3 messages" in { - val context: ClientContext = mock[ClientContext] - when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) - val list = List[String]( - "0", - "1", - "2" - ) - val producer = app.source(list, 1, "producer").flatMap(Array(_)).reduce(_ + _) - val task = app.plan.dag.vertices.iterator.next() - /* - val task = app.plan.dag.vertices.iterator.map(desc => { - LOG.info(s"${desc.taskClass}") - }) - val sum = producer.flatMap(msg => { - LOG.info("in flatMap") - assert(msg.msg.isInstanceOf[String]) - val num = msg.msg.asInstanceOf[String].toInt - Array(num) - }).reduce(_+_) - val task = app.plan.dag.vertices.iterator.map(desc => { - LOG.info(s"${desc.taskClass}") - }) - */ + val application = dsl.plan() + application shouldBe a [StreamApplication] + application.name shouldBe "dsl" + val dag = application.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get + dag.vertices.size shouldBe 2 + dag.vertices.foreach { processor => + processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName + if (processor.description == "A") { + processor.parallelism shouldBe 2 + } else if (processor.description == "B") { + processor.parallelism shouldBe 3 + } else { + fail(s"undefined source ${processor.description}") + } + } } } diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala index 816feef91..fdc721b5a 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala @@ -22,10 +22,11 @@ import akka.actor._ import org.apache.gearpump.Message import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner} +import org.apache.gearpump.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription} +import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication} import org.apache.gearpump.streaming.dsl.StreamSpec.Join import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner -import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} import org.apache.gearpump.streaming.source.DataSourceTask import org.apache.gearpump.streaming.task.{Task, TaskContext} import org.apache.gearpump.util.Graph @@ -40,7 +41,6 @@ import scala.util.{Either, Left, Right} class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { - implicit var system: ActorSystem = _ override def beforeAll(): Unit = { @@ -56,7 +56,7 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock val context: ClientContext = mock[ClientContext] when(context.system).thenReturn(system) - val app = StreamApp("dsl", context) + val dsl = StreamApp("dsl", context) val data = """ @@ -66,30 +66,32 @@ class StreamSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Mock five four five """ - val stream = app.source(data.lines.toList, 1, ""). + val stream = dsl.source(data.lines.toList, 1, ""). flatMap(line => line.split("[\\s]+")).filter(_.nonEmpty). map(word => (word, 1)). groupBy(_._1, parallelism = 2). reduce((left, right) => (left._1, left._2 + right._2)). map[Either[(String, Int), String]](Left(_)) - val query = app.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) + val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_)) stream.merge(query).process[(String, Int)](classOf[Join], 1) - val appDescription = app.plan() + val app: StreamApplication = dsl.plan() + val dag = app.userConfig + .getValue[Graph[ProcessorDescription, PartitionerDescription]](StreamApplication.DAG).get - val dagTopology = appDescription.dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => + val dagTopology = dag.mapVertex(_.taskClass).mapEdge { (node1, edge, node2) => edge.partitionerFactory.partitioner.getClass.getName } val expectedDagTopology = getExpectedDagTopology - assert(dagTopology.vertices.toSet.equals(expectedDagTopology.vertices.toSet)) - assert(dagTopology.edges.toSet.equals(expectedDagTopology.edges.toSet)) + dagTopology.vertices.toSet should contain theSameElementsAs expectedDagTopology.vertices.toSet + dagTopology.edges.toSet should contain theSameElementsAs expectedDagTopology.edges.toSet } private def getExpectedDagTopology: Graph[String, String] = { val source = classOf[DataSourceTask[_, _]].getName - val group = classOf[GroupByTask[_, _, _]].getName + val group = classOf[CountTriggerTask[_, _]].getName val merge = classOf[TransformTask[_, _]].getName val join = classOf[Join].getName diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala index fcc646dd0..f49eb0496 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/partitioner/GroupByPartitionerSpec.scala @@ -18,24 +18,33 @@ package org.apache.gearpump.streaming.dsl.partitioner -import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} +import java.time.Duration +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} import org.apache.gearpump.Message import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitionerSpec.People +import org.apache.gearpump.streaming.dsl.window.api.{FixedWindow, GroupByFn} +import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow} class GroupByPartitionerSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - it should "use the outpout of groupBy function to do partition" in { + + it should "group by message payload and window" in { val mark = People("Mark", "male") val tom = People("Tom", "male") val michelle = People("Michelle", "female") val partitionNum = 10 - val groupBy = new GroupByPartitioner[People, String](_.gender) - assert(groupBy.getPartition(Message(mark), partitionNum) - == groupBy.getPartition(Message(tom), partitionNum)) + val groupByFn: GroupByFn[People, (String, List[Bucket])] = + GroupAlsoByWindow[People, String](_.gender, FixedWindow.apply(Duration.ofMillis(5))) + val groupBy = new GroupByPartitioner[People, (String, List[Bucket])](groupByFn) + groupBy.getPartition(Message(mark, 1L), partitionNum) shouldBe + groupBy.getPartition(Message(tom, 2L), partitionNum) + + groupBy.getPartition(Message(mark, 1L), partitionNum) should not be + groupBy.getPartition(Message(tom, 6L), partitionNum) - assert(groupBy.getPartition(Message(mark), partitionNum) - != groupBy.getPartition(Message(michelle), partitionNum)) + groupBy.getPartition(Message(mark, 2L), partitionNum) should not be + groupBy.getPartition(Message(michelle, 3L), partitionNum) } } diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala new file mode 100644 index 000000000..bf52abcf0 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.Processor +import org.apache.gearpump.streaming.Processor.DefaultProcessor +import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask} +import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{BeforeAndAfterAll, Matchers, WordSpec} +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + private val unchainableOps: List[Op] = List( + mock[DataSourceOp], + mock[DataSinkOp], + mock[GroupByOp[Any, Any]], + mock[MergeOp], + mock[ProcessorOp[AnyTask]]) + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "DataSourceOp" should { + + "chain ChainableOp" in { + val dataSource = new AnySource + val dataSourceOp = DataSourceOp(dataSource) + val chainableOp = mock[ChainableOp[Any, Any]] + val fn = mock[SingleInputFunction[Any, Any]] + + val chainedOp = dataSourceOp.chain(chainableOp) + + chainedOp shouldBe a[DataSourceOp] + verify(chainableOp).fn + + unchainableOps.foreach { op => + intercept[OpChainException] { + dataSourceOp.chain(op) + } + } + } + + "get Processor of DataSource" in { + val dataSource = new AnySource + val dataSourceOp = DataSourceOp(dataSource) + val processor = dataSourceOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe dataSourceOp.parallelism + processor.description shouldBe dataSourceOp.description + } + } + + "DataSinkOp" should { + + "not chain any Op" in { + val dataSink = new AnySink + val dataSinkOp = DataSinkOp(dataSink) + val chainableOp = mock[ChainableOp[Any, Any]] + val ops = chainableOp +: unchainableOps + ops.foreach { op => + intercept[OpChainException] { + dataSinkOp.chain(op) + } + } + } + + "get Processor of DataSink" in { + val dataSink = new AnySink + val dataSinkOp = DataSinkOp(dataSink) + val processor = dataSinkOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe dataSinkOp.parallelism + processor.description shouldBe dataSinkOp.description + } + } + + "ProcessorOp" should { + + "not chain any Op" in { + val processorOp = new ProcessorOp[AnyTask] + val chainableOp = mock[ChainableOp[Any, Any]] + val ops = chainableOp +: unchainableOps + ops.foreach { op => + intercept[OpChainException] { + processorOp.chain(op) + } + } + } + + "get Processor" in { + val processorOp = new ProcessorOp[AnyTask] + val processor = processorOp.getProcessor + processor shouldBe a [DefaultProcessor[_]] + processor.parallelism shouldBe processorOp.parallelism + processor.description shouldBe processorOp.description + } + } + + "ChainableOp" should { + + "chain ChainableOp" in { + val fn1 = mock[SingleInputFunction[Any, Any]] + val chainableOp1 = ChainableOp[Any, Any](fn1) + + val fn2 = mock[SingleInputFunction[Any, Any]] + val chainableOp2 = ChainableOp[Any, Any](fn2) + + val chainedOp = chainableOp1.chain(chainableOp2) + + verify(fn1).andThen(fn2) + chainedOp shouldBe a[ChainableOp[_, _]] + + unchainableOps.foreach { op => + intercept[OpChainException] { + chainableOp1.chain(op) + } + } + } + + "throw exception on getProcessor" in { + val fn1 = mock[SingleInputFunction[Any, Any]] + val chainableOp1 = ChainableOp[Any, Any](fn1) + intercept[UnsupportedOperationException] { + chainableOp1.getProcessor + } + } + } + + "GroupByOp" should { + + "chain ChainableOp" in { + val groupByFn = mock[GroupByFn[Any, Any]] + val groupByOp = GroupByOp[Any, Any](groupByFn) + val fn = mock[SingleInputFunction[Any, Any]] + val chainableOp = mock[ChainableOp[Any, Any]] + when(chainableOp.fn).thenReturn(fn) + + val chainedOp = groupByOp.chain(chainableOp) + chainedOp shouldBe a[GroupByOp[_, _]] + + unchainableOps.foreach { op => + intercept[OpChainException] { + groupByOp.chain(op) + } + } + } + + "delegate to groupByFn on getProcessor" in { + val groupByFn = mock[GroupByFn[Any, Any]] + val groupByOp = GroupByOp[Any, Any](groupByFn) + + groupByOp.getProcessor + verify(groupByFn).getProcessor(anyInt, anyString, any[UserConfig])(any[ActorSystem]) + } + } + + "MergeOp" should { + + val mergeOp = MergeOp("merge") + + "chain ChainableOp" in { + val fn = mock[SingleInputFunction[Any, Any]] + val chainableOp = mock[ChainableOp[Any, Any]] + when(chainableOp.fn).thenReturn(fn) + + val chainedOp = mergeOp.chain(chainableOp) + chainedOp shouldBe a [MergeOp] + + unchainableOps.foreach { op => + intercept[OpChainException] { + mergeOp.chain(op) + } + } + } + + "get Processor" in { + val processor = mergeOp.getProcessor + processor shouldBe a[Processor[_]] + processor.parallelism shouldBe 1 + } + } +} + +object OpSpec { + class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) + + class AnySource extends DataSource { + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = Message("any") + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + } + + class AnySink extends DataSink { + + override def open(context: TaskContext): Unit = {} + + override def write(message: Message): Unit = {} + + override def close(): Unit = {} +} +} diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala deleted file mode 100644 index 2112fd082..000000000 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpTranslatorSpec.scala +++ /dev/null @@ -1,148 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gearpump.streaming.dsl.plan - -import java.time.Instant - -import scala.concurrent.Await -import scala.concurrent.duration.Duration -import akka.actor.ActorSystem -import org.mockito.ArgumentCaptor -import org.mockito.Matchers._ -import org.mockito.Mockito._ -import org.scalatest._ -import org.apache.gearpump.Message -import org.apache.gearpump.cluster.{TestUtil, UserConfig} -import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.CollectionDataSource -import org.apache.gearpump.streaming.dsl.plan.OpTranslator._ -import org.apache.gearpump.streaming.source.DataSourceTask - -class OpTranslatorSpec extends FlatSpec with Matchers with BeforeAndAfterAll { - - - "andThen" should "chain multiple single input function" in { - val dummy = new DummyInputFunction[String] - val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") - - val filter = new FlatMapFunction[String, String](word => - if (word.isEmpty) None else Some(word), "filter") - - val map = new FlatMapFunction[String, Int](word => Some(1), "map") - - val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") - - val all = dummy.andThen(split).andThen(filter).andThen(map).andThen(sum) - - assert(all.description == "split.filter.map.sum") - - val data = - """ - five four three two one - five four three two - five four three - five four - five - """ - val count = all.process(data).toList.last - assert(count == 15) - } - - "Source" should "iterate over input source and apply attached operator" in { - - val taskContext = MockUtil.mockTaskContext - implicit val actorSystem = MockUtil.system - - val data = "one two three".split("\\s") - val dataSource = new CollectionDataSource[String](data) - val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) - - // Source with no transformer - val source = new DataSourceTask[String, String]( - taskContext, conf) - source.onStart(Instant.EPOCH) - source.onNext(Message("next")) - data.foreach { s => - verify(taskContext, times(1)).output(Message(s)) - } - - // Source with transformer - val anotherTaskContext = MockUtil.mockTaskContext - val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val another = new DataSourceTask(anotherTaskContext, - conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) - another.onStart(Instant.EPOCH) - another.onNext(Message("next")) - data.foreach { s => - verify(anotherTaskContext, times(2)).output(Message(s)) - } - } - - "GroupByTask" should "group input by groupBy Function and " + - "apply attached operator for each group" in { - - val data = "1 2 2 3 3 3" - - val concat = new ReduceFunction[String]({ (left, right) => - left + right - }, "concat") - - implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) - val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( - GEARPUMP_STREAMING_OPERATOR, concat) - - val taskContext = MockUtil.mockTaskContext - - val task = new GroupByTask[String, String, String](input => input, taskContext, config) - task.onStart(Instant.EPOCH) - - val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) - - data.split("\\s+").foreach { word => - task.onNext(Message(word)) - } - verify(taskContext, times(6)).output(peopleCaptor.capture()) - - import scala.collection.JavaConverters._ - - val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) - assert(values.mkString(",") == "1,2,22,3,33,333") - system.terminate() - Await.result(system.whenTerminated, Duration.Inf) - } - - "MergeTask" should "accept two stream and apply the attached operator" in { - - // Source with transformer - val taskContext = MockUtil.mockTaskContext - val conf = UserConfig.empty - val double = new FlatMapFunction[String, String](word => List(word, word), "double") - val task = new TransformTask[String, String](Some(double), taskContext, conf) - task.onStart(Instant.EPOCH) - - val data = "1 2 2 3 3 3".split("\\s+") - - data.foreach { input => - task.onNext(Message(input)) - } - - verify(taskContext, times(data.length * 2)).output(anyObject()) - } -} \ No newline at end of file diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala new file mode 100644 index 000000000..f8666ba10 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -0,0 +1,132 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.dsl.plan + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.partitioner.CoLocationPartitioner +import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner +import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._ +import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction} +import org.apache.gearpump.streaming.dsl.window.api.GroupByFn +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.{MockUtil, Processor} +import org.apache.gearpump.streaming.task.{Task, TaskContext} +import org.apache.gearpump.util.Graph +import org.scalatest.mock.MockitoSugar +import org.scalatest.{BeforeAndAfterAll, FlatSpec, Matchers} + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with MockitoSugar { + + implicit var system: ActorSystem = _ + + override def beforeAll(): Unit = { + system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + } + + override def afterAll(): Unit = { + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + + "Planner" should "chain operations" in { + val graph = Graph.empty[Op, OpEdge] + val sourceOp = DataSourceOp(new AnySource) + val groupByOp = GroupByOp(new AnyGroupByFn) + val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction) + val reduceOp = ChainableOp[Any, Any](anyReduceFunction) + val processorOp = new ProcessorOp[AnyTask] + val sinkOp = DataSinkOp(new AnySink) + val directEdge = Direct + val shuffleEdge = Shuffle + + graph.addVertex(sourceOp) + graph.addVertex(groupByOp) + graph.addEdge(sourceOp, shuffleEdge, groupByOp) + graph.addVertex(flatMapOp) + graph.addEdge(groupByOp, directEdge, flatMapOp) + graph.addVertex(reduceOp) + graph.addEdge(flatMapOp, directEdge, reduceOp) + graph.addVertex(processorOp) + graph.addEdge(reduceOp, directEdge, processorOp) + graph.addVertex(sinkOp) + graph.addEdge(processorOp, directEdge, sinkOp) + + implicit val system = MockUtil.system + + val planner = new Planner + val plan = planner.plan(graph) + .mapVertex(_.description) + + plan.vertices.toSet should contain theSameElementsAs + Set("source", "groupBy", "processor", "sink") + plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]] + plan.outgoingEdgesOf("groupBy").iterator.next()._2 shouldBe a[CoLocationPartitioner] + plan.outgoingEdgesOf("processor").iterator.next()._2 shouldBe a[CoLocationPartitioner] + } +} + +object PlannerSpec { + + private val anyParallelism = 1 + private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap") + private val anyReduceFunction = new ReduceFunction[Any]( + (left: Any, right: Any) => (left, right), "reduce") + + class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config) + + class AnySource extends DataSource { + + override def open(context: TaskContext, startTime: Instant): Unit = {} + + override def read(): Message = Message("any") + + override def close(): Unit = {} + + override def getWatermark: Instant = Instant.now() + } + + class AnySink extends DataSink { + + override def open(context: TaskContext): Unit = {} + + override def write(message: Message): Unit = {} + + override def close(): Unit = {} + } + + class AnyGroupByFn extends GroupByFn[Any, Any] { + + override def groupBy(message: Message): Any = message.msg + + override def getProcessor( + parallelism: Int, + description: String, + userConfig: UserConfig)(implicit system: ActorSystem): Processor[_ <: Task] = { + Processor[AnyTask](anyParallelism, description) + } + } +} diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala new file mode 100644 index 000000000..94feae4d6 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala @@ -0,0 +1,333 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.plan.functions + +import java.time.Instant + +import akka.actor.ActorSystem +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.{TestUtil, UserConfig} +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.CollectionDataSource +import org.apache.gearpump.streaming.source.DataSourceTask +import org.apache.gearpump.streaming.Constants._ +import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask} +import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow +import org.mockito.ArgumentCaptor +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalatest.{Matchers, WordSpec} +import org.scalatest.mock.MockitoSugar + +import scala.concurrent.Await +import scala.concurrent.duration.Duration + +class SingleInputFunctionSpec extends WordSpec with Matchers with MockitoSugar { + import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunctionSpec._ + + "AndThen" should { + + val first = mock[SingleInputFunction[R, S]] + val second = mock[SingleInputFunction[S, T]] + val andThen = new AndThen(first, second) + + "chain first and second functions when processing input value" in { + val input = mock[R] + val firstOutput = mock[S] + val secondOutput = mock[T] + when(first.process(input)).thenReturn(Some(firstOutput)) + when(second.process(firstOutput)).thenReturn(Some(secondOutput)) + + andThen.process(input).toList shouldBe List(secondOutput) + } + + "return chained description" in { + when(first.description).thenReturn("first") + when(second.description).thenReturn("second") + andThen.description shouldBe "first.second" + } + + "return either first result or second on finish" in { + val firstResult = mock[S] + val processedFirst = mock[T] + val secondResult = mock[T] + + when(first.finish()).thenReturn(Some(firstResult)) + when(second.process(firstResult)).thenReturn(Some(processedFirst)) + andThen.finish().toList shouldBe List(processedFirst) + + when(first.finish()).thenReturn(None) + when(second.finish()).thenReturn(Some(secondResult)) + andThen.finish().toList shouldBe List(secondResult) + } + + "clear both states on clearState" in { + andThen.clearState() + + verify(first).clearState() + verify(second).clearState() + } + + "return AndThen on andThen" in { + val third = mock[SingleInputFunction[T, Any]] + andThen.andThen[Any](third) shouldBe an [AndThen[_, _, _]] + } + } + + "FlatMapFunction" should { + + val flatMap = mock[R => TraversableOnce[S]] + val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap") + + "call flatMap function when processing input value" in { + val input = mock[R] + flatMapFunction.process(input) + verify(flatMap).apply(input) + } + + "return passed in description" in { + flatMapFunction.description shouldBe "flatMap" + } + + "return None on finish" in { + flatMapFunction.finish() shouldBe List.empty[S] + } + + "do nothing on clearState" in { + flatMapFunction.clearState() + verifyZeroInteractions(flatMap) + } + + "return AndThen on andThen" in { + val other = mock[SingleInputFunction[S, T]] + flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]] + } + } + + "ReduceFunction" should { + + + "call reduce function when processing input value" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + val input1 = mock[T] + val input2 = mock[T] + val output = mock[T] + + when(reduce.apply(input1, input2)).thenReturn(output, output) + + reduceFunction.process(input1) shouldBe List.empty[T] + reduceFunction.process(input2) shouldBe List.empty[T] + reduceFunction.finish() shouldBe List(output) + + reduceFunction.clearState() + reduceFunction.process(input1) shouldBe List.empty[T] + reduceFunction.clearState() + reduceFunction.process(input2) shouldBe List.empty[T] + reduceFunction.finish() shouldBe List(input2) + } + + "return passed in description" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + reduceFunction.description shouldBe "reduce" + } + + "return None on finish" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + reduceFunction.finish() shouldBe List.empty[T] + } + + "do nothing on clearState" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + reduceFunction.clearState() + verifyZeroInteractions(reduce) + } + + "return AndThen on andThen" in { + val reduce = mock[(T, T) => T] + val reduceFunction = new ReduceFunction[T](reduce, "reduce") + val other = mock[SingleInputFunction[T, Any]] + reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]] + } + } + + "EmitFunction" should { + + val emit = mock[T => Unit] + val emitFunction = new EmitFunction[T](emit) + + "emit input value when processing input value" in { + val input = mock[T] + + emitFunction.process(input) shouldBe List.empty[Unit] + + verify(emit).apply(input) + } + + "return empty description" in { + emitFunction.description shouldBe "" + } + + "return None on finish" in { + emitFunction.finish() shouldBe List.empty[Unit] + } + + "do nothing on clearState" in { + emitFunction.clearState() + verifyZeroInteractions(emit) + } + + "throw exception on andThen" in { + val other = mock[SingleInputFunction[Unit, Any]] + intercept[UnsupportedOperationException] { + emitFunction.andThen(other) + } + } + } + + "andThen" should { + "chain multiple single input function" in { + val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split") + + val filter = new FlatMapFunction[String, String](word => + if (word.isEmpty) None else Some(word), "filter") + + val map = new FlatMapFunction[String, Int](word => Some(1), "map") + + val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum") + + val all = split.andThen(filter).andThen(map).andThen(sum) + + assert(all.description == "split.filter.map.sum") + + val data = + """ + five four three two one + five four three two + five four three + five four + five + """ + // force eager evaluation + all.process(data).toList + val result = all.finish().toList + assert(result.nonEmpty) + assert(result.last == 15) + } + } + + "Source" should { + "iterate over input source and apply attached operator" in { + + val taskContext = MockUtil.mockTaskContext + implicit val actorSystem = MockUtil.system + + val data = "one two three".split("\\s") + val dataSource = new CollectionDataSource[String](data) + val conf = UserConfig.empty.withValue(GEARPUMP_STREAMING_SOURCE, dataSource) + + // Source with no transformer + val source = new DataSourceTask[String, String]( + taskContext, conf) + source.onStart(Instant.EPOCH) + source.onNext(Message("next")) + data.foreach { s => + verify(taskContext, times(1)).output(MockUtil.argMatch[Message]( + message => message.msg == s)) + } + + // Source with transformer + val anotherTaskContext = MockUtil.mockTaskContext + val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val another = new DataSourceTask(anotherTaskContext, + conf.withValue(GEARPUMP_STREAMING_OPERATOR, double)) + another.onStart(Instant.EPOCH) + another.onNext(Message("next")) + data.foreach { s => + verify(anotherTaskContext, times(2)).output(MockUtil.argMatch[Message]( + message => message.msg == s)) + } + } + } + + "CountTriggerTask" should { + "group input by groupBy Function and " + + "apply attached operator for each group" in { + + val data = "1 2 2 3 3 3" + + val concat = new ReduceFunction[String]({ (left, right) => + left + right + }, "concat") + + implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG) + val config = UserConfig.empty.withValue[SingleInputFunction[String, String]]( + GEARPUMP_STREAMING_OPERATOR, concat) + + val taskContext = MockUtil.mockTaskContext + + val groupBy = GroupAlsoByWindow((input: String) => input, CountWindow.apply(1).accumulating) + val task = new CountTriggerTask[String, String](groupBy, taskContext, config) + task.onStart(Instant.EPOCH) + + val peopleCaptor = ArgumentCaptor.forClass(classOf[Message]) + + data.split("\\s+").foreach { word => + task.onNext(Message(word)) + } + verify(taskContext, times(6)).output(peopleCaptor.capture()) + + import scala.collection.JavaConverters._ + + val values = peopleCaptor.getAllValues.asScala.map(input => input.msg.asInstanceOf[String]) + assert(values.mkString(",") == "1,2,22,3,33,333") + system.terminate() + Await.result(system.whenTerminated, Duration.Inf) + } + } + + "MergeTask" should { + "accept two stream and apply the attached operator" in { + + // Source with transformer + val taskContext = MockUtil.mockTaskContext + val conf = UserConfig.empty + val double = new FlatMapFunction[String, String](word => List(word, word), "double") + val task = new TransformTask[String, String](Some(double), taskContext, conf) + task.onStart(Instant.EPOCH) + + val data = "1 2 2 3 3 3".split("\\s+") + + data.foreach { input => + task.onNext(Message(input)) + } + + verify(taskContext, times(data.length * 2)).output(anyObject()) + } + } +} + +object SingleInputFunctionSpec { + type R = AnyRef + type S = AnyRef + type T = AnyRef +} diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala new file mode 100644 index 000000000..871d75119 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/CountTriggerTaskSpec.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.Instant + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.window.api.CountWindow +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks +import org.scalatest.{Matchers, PropSpec} + +class CountTriggerTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("CountTriggerTask should trigger output by number of messages in a window") { + + implicit val system = MockUtil.system + + val numGen = Gen.chooseNum[Int](1, 1000) + + forAll(numGen, numGen) { (windowSize: Int, msgNum: Int) => + + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val window = CountWindow.apply(windowSize) + when(groupBy.window).thenReturn(window) + val windowRunner = mock[WindowRunner] + val userConfig = UserConfig.empty + + val task = new CountTriggerTask[Any, Any](groupBy, windowRunner, + MockUtil.mockTaskContext, userConfig) + val message = mock[Message] + + for (i <- 1 to msgNum) { + task.onNext(message) + } + verify(windowRunner, times(msgNum)).process(message) + verify(windowRunner, times(msgNum / windowSize)).trigger(Instant.ofEpochMilli(windowSize)) + } + } +} diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala new file mode 100644 index 000000000..a69abe6f0 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/EventTimeTriggerTaskSpec.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, SlidingWindow} +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + +class EventTimeTriggerTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("EventTimeTriggerTask should trigger on watermark") { + val longGen = Gen.chooseNum[Long](1L, 1000L) + val windowSizeGen = longGen + val windowStepGen = longGen + val watermarkGen = longGen.map(Instant.ofEpochMilli) + + forAll(windowSizeGen, windowStepGen, watermarkGen) { + (windowSize: Long, windowStep: Long, watermark: Instant) => + + val window = SlidingWindow.apply(Duration.ofMillis(windowSize), + Duration.ofMillis(windowStep)).triggering(EventTimeTrigger) + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val windowRunner = mock[WindowRunner] + val context = MockUtil.mockTaskContext + val config = UserConfig.empty + + when(groupBy.window).thenReturn(window) + + val task = new EventTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config) + + val message = mock[Message] + task.onNext(message) + verify(windowRunner).process(message) + + task.onWatermarkProgress(watermark) + verify(windowRunner).trigger(any[Instant]) + } + } + +} diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala new file mode 100644 index 000000000..39e1b4ce4 --- /dev/null +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/ProcessingTimeTriggerTaskSpec.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.streaming.dsl.task + +import java.time.{Duration, Instant} + +import org.apache.gearpump.Message +import org.apache.gearpump.cluster.UserConfig +import org.apache.gearpump.streaming.MockUtil +import org.apache.gearpump.streaming.dsl.task.ProcessingTimeTriggerTask.Triggering +import org.apache.gearpump.streaming.dsl.window.api.{ProcessingTimeTrigger, SlidingWindow} +import org.apache.gearpump.streaming.dsl.window.impl.{GroupAlsoByWindow, WindowRunner} +import org.mockito.Matchers._ +import org.mockito.Mockito._ +import org.scalacheck.Gen +import org.scalatest.{Matchers, PropSpec} +import org.scalatest.mock.MockitoSugar +import org.scalatest.prop.PropertyChecks + +class ProcessingTimeTriggerTaskSpec extends PropSpec with PropertyChecks + with Matchers with MockitoSugar { + + property("ProcessingTimeTriggerTask should trigger on system time interval") { + val longGen = Gen.chooseNum[Long](1L, 1000L) + val windowSizeGen = longGen + val windowStepGen = longGen + val startTimeGen = longGen.map(Instant.ofEpochMilli) + + forAll(windowSizeGen, windowStepGen, startTimeGen) { + (windowSize: Long, windowStep: Long, startTime: Instant) => + + val window = SlidingWindow.apply(Duration.ofMillis(windowSize), + Duration.ofMillis(windowStep)).triggering(ProcessingTimeTrigger) + val groupBy = mock[GroupAlsoByWindow[Any, Any]] + val windowRunner = mock[WindowRunner] + val context = MockUtil.mockTaskContext + val config = UserConfig.empty + + when(groupBy.window).thenReturn(window) + + val task = new ProcessingTimeTriggerTask[Any, Any](groupBy, windowRunner, context, config) + + task.onStart(startTime) + + val message = mock[Message] + task.onNext(message) + verify(windowRunner).process(message) + + task.receiveUnManagedMessage(Triggering) + verify(windowRunner).trigger(any[Instant]) + } + } + +} From 9b879456d6ff03e961aca97857903082be189084 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Mon, 10 Oct 2016 12:42:16 +0800 Subject: [PATCH 2/3] [GEARPUMP-218] add shaded library as transitive dependencies Author: manuzhang Closes #92 from manuzhang/add_shaded_pom. --- project/Build.scala | 50 +++++++++++---- project/BuildShaded.scala | 127 ++++++++++++++++++++------------------ 2 files changed, 104 insertions(+), 73 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 17e78dfd1..fe8ec6104 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -150,12 +150,6 @@ object Build extends sbt.Build { ) ) - val streamingDependencies = Seq( - unmanagedJars in Compile ++= Seq( - getShadedJarFile("gs-collections", version.value) - ) - ) - val coreDependencies = Seq( libraryDependencies ++= Seq( "org.slf4j" % "slf4j-api" % slf4jVersion, @@ -194,9 +188,9 @@ object Build extends sbt.Build { ), unmanagedJars in Compile ++= Seq( - getShadedJarFile("metrics-graphite", version.value), - getShadedJarFile("guava", version.value), - getShadedJarFile("akka-kryo", version.value) + getShadedJarFile(shaded_metrics_graphite.id, version.value), + getShadedJarFile(shaded_guava.id, version.value), + getShadedJarFile(shaded_akka_kryo.id, version.value) ) ) @@ -245,6 +239,20 @@ object Build extends sbt.Build { .map(_.filterNot(_.getCanonicalPath.contains("akka"))) } + private def addShadedDeps(deps: Seq[xml.Node], node: xml.Node): xml.Node = { + node match { + case elem: xml.Elem => + val child = if (elem.label == "dependencies") { + elem.child ++ deps + } else { + elem.child.map(addShadedDeps(deps, _)) + } + xml.Elem(elem.prefix, elem.label, elem.attributes, elem.scope, false, child: _*) + case _ => + node + } + } + lazy val root = Project( id = "gearpump", base = file("."), @@ -257,7 +265,14 @@ object Build extends sbt.Build { lazy val core = Project( id = "gearpump-core", base = file("core"), - settings = commonSettings ++ javadocSettings ++ coreDependencies) + settings = commonSettings ++ javadocSettings ++ coreDependencies ++ Seq( + pomPostProcess := { + (node: xml.Node) => addShadedDeps(List( + getShadedDepXML(organization.value, shaded_akka_kryo.id, version.value), + getShadedDepXML(organization.value, shaded_guava.id, version.value), + getShadedDepXML(organization.value, shaded_metrics_graphite.id, version.value)), node) + } + )) .disablePlugins(sbtassembly.AssemblyPlugin) lazy val daemon = Project( @@ -277,9 +292,18 @@ object Build extends sbt.Build { lazy val streaming = Project( id = "gearpump-streaming", base = file("streaming"), - settings = commonSettings ++ javadocSettings ++ streamingDependencies) - .dependsOn(core % "test->test; compile->compile", daemon % "test->test") - .disablePlugins(sbtassembly.AssemblyPlugin) + settings = commonSettings ++ javadocSettings ++ Seq( + unmanagedJars in Compile ++= Seq( + getShadedJarFile(shaded_gs_collections.id, version.value) + ), + + pomPostProcess := { + (node: xml.Node) => addShadedDeps(List( + getShadedDepXML(organization.value, shaded_gs_collections.id, version.value)), node) + } + )) + .dependsOn(core % "test->test; compile->compile", shaded_gs_collections, daemon % "test->test") + .disablePlugins(sbtassembly.AssemblyPlugin) lazy val external_kafka = Project( id = "gearpump-external-kafka", diff --git a/project/BuildShaded.scala b/project/BuildShaded.scala index 1f59bfd2e..a43587ca8 100644 --- a/project/BuildShaded.scala +++ b/project/BuildShaded.scala @@ -35,7 +35,7 @@ object BuildShaded extends sbt.Build { _.copy(includeScala = false) }, assemblyJarName in assembly := { - s"${name.value}-$scalaVersionMajor-${version.value}-assembly.jar" + s"${name.value}_$scalaVersionMajor-${version.value}.jar" }, target in assembly := baseDirectory.value.getParentFile / "target" / scalaVersionMajor ) @@ -44,92 +44,99 @@ object BuildShaded extends sbt.Build { id = "gearpump-shaded", base = file("shaded") ).aggregate(shaded_akka_kryo, shaded_gs_collections, shaded_guava, shaded_metrics_graphite) - .disablePlugins(sbtassembly.AssemblyPlugin) - + .disablePlugins(sbtassembly.AssemblyPlugin) lazy val shaded_akka_kryo = Project( id = "gearpump-shaded-akka-kryo", base = file("shaded/akka-kryo"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-akka-kryo", - "assembly"), sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.zap("com.google.protobuf.**").inAll, - ShadeRule.zap("com.typesafe.config.**").inAll, - ShadeRule.zap("akka.**").inAll, - ShadeRule.zap("org.jboss.netty.**").inAll, - ShadeRule.zap("net.jpountz.lz4.**").inAll, - ShadeRule.zap("org.uncommons.maths.**").inAll, - ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.romix.@1").inAll, - ShadeRule.rename("com.esotericsoftware.**" -> - "org.apache.gearpump.esotericsoftware.@1").inAll, - ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.objenesis.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion - ) + settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-akka-kryo"), + sbtassembly.AssemblyKeys.assembly) ++ + Seq( + assemblyShadeRules in assembly := Seq( + ShadeRule.zap("com.google.protobuf.**").inAll, + ShadeRule.zap("com.typesafe.config.**").inAll, + ShadeRule.zap("akka.**").inAll, + ShadeRule.zap("org.jboss.netty.**").inAll, + ShadeRule.zap("net.jpountz.lz4.**").inAll, + ShadeRule.zap("org.uncommons.maths.**").inAll, + ShadeRule.rename("com.romix.**" -> "org.apache.gearpump.romix.@1").inAll, + ShadeRule.rename("com.esotericsoftware.**" -> + "org.apache.gearpump.esotericsoftware.@1").inAll, + ShadeRule.rename("org.objenesis.**" -> "org.apache.gearpump.objenesis.@1").inAll + ) + ) ++ + Seq( + libraryDependencies ++= Seq( + "com.github.romix.akka" %% "akka-kryo-serialization" % kryoVersion ) + ) ) lazy val shaded_gs_collections = Project( id = "gearpump-shaded-gs-collections", base = file("shaded/gs-collections"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-gs-collections", - "assembly"), sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.rename("com.gs.collections.**" -> - "org.apache.gearpump.gs.collections.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.goldmansachs" % "gs-collections" % gsCollectionsVersion - ) + settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-gs-collections"), + sbtassembly.AssemblyKeys.assembly) ++ + Seq( + assemblyShadeRules in assembly := Seq( + ShadeRule.rename("com.gs.collections.**" -> + "org.apache.gearpump.gs.collections.@1").inAll ) + ) ++ + Seq( + libraryDependencies ++= Seq( + "com.goldmansachs" % "gs-collections" % gsCollectionsVersion + ) + ) ) lazy val shaded_guava = Project( id = "gearpump-shaded-guava", base = file("shaded/guava"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-guava", - "assembly"), sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.rename("com.google.**" -> "org.apache.gearpump.google.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.google.guava" % "guava" % guavaVersion - ) + settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-guava"), + sbtassembly.AssemblyKeys.assembly) ++ + Seq( + assemblyShadeRules in assembly := Seq( + ShadeRule.rename("com.google.**" -> "org.apache.gearpump.google.@1").inAll + ) + ) ++ + Seq( + libraryDependencies ++= Seq( + "com.google.guava" % "guava" % guavaVersion ) + ) ) lazy val shaded_metrics_graphite = Project( id = "gearpump-shaded-metrics-graphite", base = file("shaded/metrics-graphite"), - settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-metrics-graphite", - "assembly"), sbtassembly.AssemblyKeys.assembly) ++ - Seq( - assemblyShadeRules in assembly := Seq( - ShadeRule.rename("com.codahale.metrics.**" -> - "org.apache.gearpump.codahale.metrics.@1").inAll - ) - ) ++ - Seq( - libraryDependencies ++= Seq( - "com.codahale.metrics" % "metrics-graphite" % codahaleVersion, - "com.codahale.metrics" % "metrics-jvm" % codahaleVersion - ) + settings = shadeAssemblySettings ++ addArtifact(Artifact("gearpump-shaded-metrics-graphite"), + sbtassembly.AssemblyKeys.assembly) ++ + Seq( + assemblyShadeRules in assembly := Seq( + ShadeRule.rename("com.codahale.metrics.**" -> + "org.apache.gearpump.codahale.metrics.@1").inAll ) + ) ++ + Seq( + libraryDependencies ++= Seq( + "com.codahale.metrics" % "metrics-graphite" % codahaleVersion, + "com.codahale.metrics" % "metrics-jvm" % codahaleVersion + ) + ) ) def getShadedJarFile(name: String, gearpumpVersion: String): File = { shaded.base / "target" / scalaVersionMajor / - s"gearpump-shaded-$name-$scalaVersionMajor-$gearpumpVersion-assembly.jar" + s"${name}_$scalaVersionMajor-$gearpumpVersion.jar" + } + + def getShadedDepXML(groupId: String, artifactId: String, version: String): scala.xml.Node = { + + {groupId} + {artifactId} + {version} + } } \ No newline at end of file From 1b9889121c0d776c9423544b76e27292838d1300 Mon Sep 17 00:00:00 2001 From: darionyaphet Date: Tue, 11 Oct 2016 06:10:33 +0800 Subject: [PATCH 3/3] GEARPUMP-215 Gearpump Redis Integration - RedisSink [Gearpump Redis Integration - RedisStorage](https://issues.apache.org/jira/browse/GEARPUMP-215) Author: darionyaphet Closes #93 from darionyaphet/GEARPUMP-215. --- .../apache/gearpump/redis/RedisMessage.scala | 456 ++++++++++++++++++ .../org/apache/gearpump/redis/RedisSink.scala | 119 +++++ project/Build.scala | 12 + 3 files changed, 587 insertions(+) create mode 100644 experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala create mode 100644 experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala new file mode 100644 index 000000000..84dec70be --- /dev/null +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisMessage.scala @@ -0,0 +1,456 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.redis + +import java.nio.charset.Charset + +object RedisMessage { + + private def toBytes(strings: List[String]): List[Array[Byte]] = + strings.map(string => string.getBytes(Charset.forName("UTF8"))) + + private def toBytes(string: String): Array[Byte] = + string.getBytes(Charset.forName("UTF8")) + + object Connection { + + /** + * Change the selected database for the current connection + * + * @param index + */ + case class SELECT(index: Int) + + } + + object Geo { + + /** + * Add one geospatial item in the geospatial index represented using a sorted set + * + * @param key + * @param longitude + * @param latitude + * @param member + */ + case class GEOADD(key: Array[Byte], longitude: Double, + latitude: Double, member: Array[Byte]) { + def this(key: String, longitude: Double, + latitude: Double, member: String) = + this(toBytes(key), longitude, latitude, toBytes(member)) + } + + } + + object Hashes { + + /** + * Delete a hash field + * + * @param key + * @param field + */ + case class HDEL(key: Array[Byte], field: Array[Byte]) { + def this(key: String, field: String) = this(toBytes(key), toBytes(field)) + } + + /** + * Increment the integer value of a hash field by the given number + * + * @param key + * @param field + * @param increment + */ + case class HINCRBY(key: Array[Byte], field: Array[Byte], increment: Long) { + def this(key: String, field: String, increment: Long) = + this(toBytes(key), toBytes(field), increment) + } + + /** + * Increment the float value of a hash field by the given amount + * + * @param key + * @param field + * @param increment + */ + case class HINCRBYFLOAT(key: Array[Byte], field: Array[Byte], increment: Float) { + def this(key: String, field: String, increment: Float) = + this(toBytes(key), toBytes(field), increment) + } + + + /** + * Set the string value of a hash field + * + * @param key + * @param field + * @param value + */ + case class HSET(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { + def this(key: String, field: String, value: String) = + this(toBytes(key), toBytes(field), toBytes(value)) + } + + /** + * Set the value of a hash field, only if the field does not exist + * + * @param key + * @param field + * @param value + */ + case class HSETNX(key: Array[Byte], field: Array[Byte], value: Array[Byte]) { + def this(key: String, field: String, value: String) = + this(toBytes(key), toBytes(field), toBytes(value)) + } + + } + + object HyperLogLog { + + /** + * Adds the specified elements to the specified HyperLogLog + * + * @param key + * @param element + */ + case class PFADD(key: String, element: String) + + } + + object Lists { + + + /** + * Prepend one or multiple values to a list + * + * @param key + * @param value + */ + case class LPUSH(key: Array[Byte], value: Array[Byte]) { + + def this(key: String, value: String) = this(key, toBytes(value)) + } + + /** + * Prepend a value to a list, only if the list exists + * + * @param key + * @param value + */ + case class LPUSHX(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Set the value of an element in a list by its index + * + * @param key + * @param index + * @param value + */ + case class LSET(key: Array[Byte], index: Long, value: Array[Byte]) { + def this(key: String, index: Long, value: String) = this(toBytes(key), index, toBytes(value)) + } + + /** + * Append one or multiple values to a list + * + * @param key + * @param value + */ + case class RPUSH(key: Array[Byte], value: Array[Byte]) { + + def this(key: String, value: String) = this(key, toBytes(value)) + } + + /** + * Append a value to a list, only if the list exists + * + * @param key + * @param value + */ + case class RPUSHX(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + } + + object Keys { + + /** + * Delete a key + * + * @param message + */ + case class DEL(message: Array[Byte]) { + + def this(message: String) = this(toBytes(message)) + } + + /** + * Set a key's time to live in seconds + * + * @param key + */ + case class EXPIRE(key: Array[Byte], seconds: Int) { + def this(key: String, seconds: Int) = this(toBytes(key), seconds) + } + + /** + * Set the expiration for a key as a UNIX timestamp + * + * @param key + * @param timestamp + */ + case class EXPIREAT(key: Array[Byte], timestamp: Long) { + def this(key: String, timestamp: Long) = this(toBytes(key), timestamp) + } + + /** + * Atomically transfer a key from a Redis instance to another one. + * + * @param host + * @param port + * @param key + * @param database + * @param timeout + */ + case class MIGRATE(host: Array[Byte], port: Int, key: Array[Byte], database: Int, timeout: Int) { + def this(host: String, port: Int, key: String, database: Int, timeout: Int) = + this(toBytes(host), port, toBytes(key), database, timeout) + } + + /** + * Move a key to another database + * + * @param key + * @param db + */ + case class MOVE(key: Array[Byte], db: Int) { + def this(key: String, db: Int) = this(toBytes(key), db) + } + + /** + * Remove the expiration from a key + * + * @param key + */ + case class PERSIST(key: Array[Byte]) { + def this(key: String) = this(toBytes(key)) + } + + /** + * Set a key's time to live in milliseconds + * + * @param key + * @param milliseconds + */ + case class PEXPIRE(key: Array[Byte], milliseconds: Long) { + def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + } + + /** + * Set the expiration for a key as a UNIX timestamp specified in milliseconds + * + * @param key + * @param timestamp + */ + case class PEXPIREAT(key: Array[Byte], timestamp: Long) { + def this(key: String, milliseconds: Long) = this(toBytes(key), milliseconds) + } + + /** + * Rename a key + * + * @param key + * @param newKey + */ + case class RENAME(key: Array[Byte], newKey: Array[Byte]) { + def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + } + + /** + * Rename a key, only if the new key does not exist + * + * @param key + * @param newKey + */ + case class RENAMENX(key: Array[Byte], newKey: Array[Byte]) { + def this(key: String, newKey: String) = this(toBytes(key), toBytes(newKey)) + } + + } + + + object Sets { + + /** + * Add one or more members to a set + * + * @param key + * @param members + */ + case class SADD(key: Array[Byte], members: Array[Byte]) { + + def this(key: String, members: String) = this(key, toBytes(members)) + } + + + /** + * Move a member from one set to another + * + * @param source + * @param destination + * @param member + */ + case class SMOVE(source: Array[Byte], destination: Array[Byte], member: Array[Byte]) { + def this(source: String, destination: String, member: String) = + this(toBytes(source), toBytes(destination), toBytes(member)) + } + + + /** + * Remove one or more members from a set + * + * @param key + * @param member + */ + case class SREM(key: Array[Byte], member: Array[Byte]) { + + def this(key: String, member: String) = this(key, toBytes(member)) + } + + } + + object String { + + /** + * Append a value to a key + * + * @param key + * @param value + */ + case class APPEND(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Decrement the integer value of a key by one + * + * @param key + */ + case class DECR(key: Array[Byte]) { + def this(key: String) = this(toBytes(key)) + } + + /** + * Decrement the integer value of a key by the given number + * + * @param key + * @param decrement + */ + case class DECRBY(key: Array[Byte], decrement: Int) { + def this(key: String, decrement: Int) = this(toBytes(key), decrement) + } + + /** + * Increment the integer value of a key by one + * + * @param key + */ + case class INCR(key: Array[Byte]) { + def this(key: String) = this(toBytes(key)) + } + + /** + * Increment the integer value of a key by the given amount + * + * @param key + * @param increment + */ + case class INCRBY(key: Array[Byte], increment: Int) { + def this(key: String, increment: Int) = this(toBytes(key), increment) + } + + /** + * Increment the float value of a key by the given amount + * + * @param key + * @param increment + */ + case class INCRBYFLOAT(key: Array[Byte], increment: Double) { + def this(key: String, increment: Number) = this(toBytes(key), increment) + } + + + /** + * Set the string value of a key + * + * @param key + * @param value + */ + case class SET(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Sets or clears the bit at offset in the string value stored at key + * + * @param key + * @param offset + * @param value + */ + case class SETBIT(key: Array[Byte], offset: Long, value: Array[Byte]) { + def this(key: String, offset: Long, value: String) = this(toBytes(key), offset, toBytes(value)) + } + + /** + * Set the value and expiration of a key + * + * @param key + * @param seconds + * @param value + */ + case class SETEX(key: Array[Byte], seconds: Int, value: Array[Byte]) { + def this(key: String, seconds: Int, value: String) = this(toBytes(key), seconds, toBytes(value)) + } + + /** + * Set the value of a key, only if the key does not exist + * + * @param key + * @param value + */ + case class SETNX(key: Array[Byte], value: Array[Byte]) { + def this(key: String, value: String) = this(toBytes(key), toBytes(value)) + } + + /** + * Overwrite part of a string at key starting at the specified offset + * + * @param key + * @param offset + * @param value + */ + case class SETRANGE(key: Array[Byte], offset: Int, value: Array[Byte]) { + def this(key: String, offset: Int, value: String) = this(toBytes(key), offset, toBytes(value)) + } + + } + +} diff --git a/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala new file mode 100644 index 000000000..3f7594907 --- /dev/null +++ b/experiments/redis/src/main/scala/org/apache/gearpump/redis/RedisSink.scala @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gearpump.redis + +import org.apache.gearpump.Message +import org.apache.gearpump.redis.RedisMessage.Geo.GEOADD +import org.apache.gearpump.redis.RedisMessage.Hashes._ +import org.apache.gearpump.redis.RedisMessage.HyperLogLog._ +import org.apache.gearpump.redis.RedisMessage.Keys._ +import org.apache.gearpump.redis.RedisMessage.Lists._ +import org.apache.gearpump.redis.RedisMessage.Sets._ +import org.apache.gearpump.redis.RedisMessage.String._ +import org.apache.gearpump.streaming.sink.DataSink +import org.apache.gearpump.streaming.task.TaskContext +import org.apache.gearpump.util.LogUtil +import redis.clients.jedis.Jedis +import redis.clients.jedis.Protocol.{DEFAULT_DATABASE, DEFAULT_HOST, DEFAULT_PORT, DEFAULT_TIMEOUT} + +/** + * Save message in Redis Instance + * + * @param host + * @param port + * @param timeout + * @param database + * @param password + */ +class RedisSink( + host: String = DEFAULT_HOST, + port: Int = DEFAULT_PORT, + timeout: Int = DEFAULT_TIMEOUT, + database: Int = DEFAULT_DATABASE, + password: String = "") extends DataSink { + + private val LOG = LogUtil.getLogger(getClass) + @transient private lazy val client = new Jedis(host, port, timeout) + + override def open(context: TaskContext): Unit = { + client.select(database) + + if (password != null && password.length != 0) { + client.auth(password) + } + } + + override def write(message: Message): Unit = { + + message.msg match { + // GEO + case msg: GEOADD => client.geoadd(msg.key, msg.longitude, msg.latitude, msg.member) + + // Hashes + case msg: HDEL => client.hdel(msg.key, msg.field) + case msg: HINCRBY => client.hincrBy(msg.key, msg.field, msg.increment) + case msg: HINCRBYFLOAT => client.hincrByFloat(msg.key, msg.field, msg.increment) + case msg: HSET => client.hset(msg.key, msg.field, msg.value) + case msg: HSETNX => client.hsetnx(msg.key, msg.field, msg.value) + + // HyperLogLog + case msg: PFADD => client.pfadd(msg.key, msg.element) + + // Lists + case msg: LPUSH => client.lpush(msg.key, msg.value) + case msg: LPUSHX => client.lpushx(msg.key, msg.value) + case msg: LSET => client.lset(msg.key, msg.index, msg.value) + case msg: RPUSH => client.rpush(msg.key, msg.value) + case msg: RPUSHX => client.rpushx(msg.key, msg.value) + + // Keys + case msg: DEL => client.del(msg.message) + case msg: EXPIRE => client.expire(msg.key, msg.seconds) + case msg: EXPIREAT => client.expireAt(msg.key, msg.timestamp) + case msg: MIGRATE => client.migrate(msg.host, msg.port, msg.key, msg.database, msg.timeout) + case msg: MOVE => client.move(msg.key, msg.db) + case msg: PERSIST => client.persist(msg.key) + case msg: PEXPIRE => client.pexpire(msg.key, msg.milliseconds) + case msg: PEXPIREAT => client.pexpireAt(msg.key, msg.timestamp) + case msg: RENAME => client.rename(msg.key, msg.newKey) + case msg: RENAMENX => client.renamenx(msg.key, msg.newKey) + + // Sets + case msg: SADD => client.sadd(msg.key, msg.members) + case msg: SMOVE => client.smove(msg.source, msg.destination, msg.member) + case msg: SREM => client.srem(msg.key, msg.member) + + // String + case msg: APPEND => client.append(msg.key, msg.value) + case msg: DECR => client.decr(msg.key) + case msg: DECRBY => client.decrBy(msg.key, msg.decrement) + case msg: INCR => client.incr(msg.key) + case msg: INCRBY => client.incrBy(msg.key, msg.increment) + case msg: INCRBYFLOAT => client.incrByFloat(msg.key, msg.increment) + case msg: SET => client.set(msg.key, msg.value) + case msg: SETBIT => client.setbit(msg.key, msg.offset, msg.value) + case msg: SETEX => client.setex(msg.key, msg.seconds, msg.value) + case msg: SETNX => client.setnx(msg.key, msg.value) + case msg: SETRANGE => client.setrange(msg.key, msg.offset, msg.value) + } + } + + override def close(): Unit = { + client.close() + } +} diff --git a/project/Build.scala b/project/Build.scala index fe8ec6104..34f0ae281 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -415,6 +415,18 @@ object Build extends sbt.Build { )) .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + lazy val redis = Project( + id = "gearpump-experiments-redis", + base = file("experiments/redis"), + settings = commonSettings ++ noPublish ++ myAssemblySettings ++ + Seq( + libraryDependencies ++= Seq( + "redis.clients" % "jedis" % "2.9.0" + ), + mainClass in(Compile, packageBin) := Some("org.apache.gearpump.example.Test") + )) + .dependsOn(streaming % "test->test; provided", daemon % "test->test; provided") + lazy val storm = Project( id = "gearpump-experiments-storm", base = file("experiments/storm"),