From c35e6cb0274c984cd0dcc346dca28c1ee8a0893d Mon Sep 17 00:00:00 2001 From: manuzhang Date: Tue, 1 Aug 2017 12:08:46 +0800 Subject: [PATCH 1/5] Don't use window runner for non-folding function --- .../gearpump/streaming/dsl/package.scala | 2 +- .../gearpump/streaming/dsl/plan/OP.scala | 59 +++++++++++++------ .../streaming/dsl/task/GroupByTask.scala | 12 ++-- .../streaming/dsl/task/TransformTask.scala | 6 +- ...Runner.scala => TimedValueProcessor.scala} | 44 +++++++++----- .../source/DataSourceProcessor.scala | 6 +- .../streaming/source/DataSourceTask.scala | 6 +- .../gearpump/streaming/task/TaskUtil.scala | 4 +- .../gearpump/streaming/dsl/plan/OpSpec.scala | 6 +- .../plan/functions/FunctionRunnerSpec.scala | 10 ++-- .../streaming/dsl/task/GroupByTaskSpec.scala | 4 +- .../dsl/task/TransformTaskSpec.scala | 4 +- .../window/impl/DefaultWindowRunnerSpec.scala | 2 +- .../streaming/source/DataSourceTaskSpec.scala | 8 +-- 14 files changed, 105 insertions(+), 68 deletions(-) rename streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/{WindowRunner.scala => TimedValueProcessor.scala} (86%) diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala index 6d43f16f3..bb32eb7a9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala @@ -36,7 +36,7 @@ package org.apache.gearpump.streaming * * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]] * * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops. * - * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.WindowRunner]], which internally + * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.TimedValueProcessor]], which internally * runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with * [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually * executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]]. diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index c37ced613..3c5ebb6bf 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -21,12 +21,11 @@ package org.apache.gearpump.streaming.dsl.plan import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FunctionRunner} -import org.apache.gearpump.streaming.dsl.window.impl.{AndThen => WindowRunnerAT} +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FoldRunner, FunctionRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{WindowProcessor, DirectProcessor, TimedValueProcessor, AndThen => WindowRunnerAT} import org.apache.gearpump.streaming.{Constants, Processor} import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask} import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows} -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} import org.apache.gearpump.streaming.sink.{DataSink, DataSinkProcessor} import org.apache.gearpump.streaming.source.{DataSource, DataSourceTask} import org.apache.gearpump.streaming.task.Task @@ -134,39 +133,61 @@ case class DataSourceOp( dataSource: DataSource, parallelism: Int = 1, description: String = "source", - userConfig: UserConfig = UserConfig.empty) + userConfig: UserConfig = UserConfig.empty, + windowRunner: Option[TimedValueProcessor[Any, Any]] = None) extends Op { override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { - case op: WindowTransformOp[_, _] => + case op: WindowTransformOp[Any, Any] => + val chainedRunner = + windowRunner.map(WindowRunnerAT(_, op.windowRunner)).getOrElse(op.windowRunner) DataSourceOp( dataSource, parallelism, Op.concatenate(description, op.description), Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, - op.windowRunner), - op.userConfig)) - case op: TransformOp[_, _] => - chain( - WindowOp(GlobalWindows()).chain(op)) + chainedRunner), + op.userConfig), + Some(chainedRunner)) + case op: TransformOp[Any, Any] => + val fn = op.fn + // WindowProcessor is required for FoldRunner + // AndThen could be composite FoldRunners + if (!fn.isInstanceOf[FoldRunner[Any, Any]] && !fn.isInstanceOf[AndThen[Any, Any, Any]]) { + val runner = new DirectProcessor(op.fn) + val chainedRunner = + windowRunner.map(WindowRunnerAT(_, runner)).getOrElse(runner) + DataSourceOp( + dataSource, + parallelism, + Op.concatenate(description, op.description), + Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, + chainedRunner), + op.userConfig), + Some(chainedRunner) + ) + } else { + chain( + WindowOp(GlobalWindows()).chain(op)) + } case op: WindowOp => chain( op.chain(TransformOp(new DummyRunner[Any]()))) case op: TransformWindowTransformOp[_, _, _] => - chain( - WindowOp(GlobalWindows()).chain(op.transformOp) - .chain(op.windowTransformOp)) + chain(op.transformOp).chain(op.windowTransformOp) case _ => throw new OpChainException(this, other) } } override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { - Op.withGlobalWindowsDummyRunner(this, userConfig, + if (userConfig.getValue(Constants.GEARPUMP_STREAMING_OPERATOR).isEmpty) { + chain(TransformOp(new DummyRunner[Any])).toProcessor + } else { Processor[DataSourceTask[Any, Any]](parallelism, description, userConfig.withValue(Constants.GEARPUMP_STREAMING_SOURCE, dataSource)) - ) + } } } @@ -212,7 +233,7 @@ case class TransformOp[IN, OUT]( Op.concatenate(userConfig, op.userConfig)) case op: WindowOp => TransformWindowTransformOp(this, - WindowTransformOp(new DefaultWindowRunner[OUT, OUT]( + WindowTransformOp(new WindowProcessor[OUT, OUT]( op.windows, new DummyRunner[OUT] ), op.description, op.userConfig)) case op: TransformWindowTransformOp[OUT, _, _] => @@ -244,13 +265,13 @@ case class WindowOp( override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { case op: TransformOp[_, _] => - WindowTransformOp(new DefaultWindowRunner(windows, op.fn), + WindowTransformOp(new WindowProcessor(windows, op.fn), Op.concatenate(description, op.description), Op.concatenate(userConfig, op.userConfig)) case op: WindowOp => chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any]))) case op: TransformWindowTransformOp[_, _, _] => - WindowTransformOp(new DefaultWindowRunner(windows, op.transformOp.fn), + WindowTransformOp(new WindowProcessor(windows, op.transformOp.fn), Op.concatenate(description, op.transformOp.description), Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp) case _ => @@ -352,7 +373,7 @@ case class MergeOp( * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]]. */ private case class WindowTransformOp[IN, OUT]( - windowRunner: WindowRunner[IN, OUT], + windowRunner: TimedValueProcessor[IN, OUT], description: String, userConfig: UserConfig) extends Op { diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala index b3f3ad238..76fa1f2b5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala @@ -25,7 +25,7 @@ import com.gs.collections.impl.map.mutable.UnifiedMap import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR} -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TimedValueProcessor} import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} @@ -44,8 +44,8 @@ class GroupByTask[IN, GROUP, OUT]( ) } - private val groups: UnifiedMap[GROUP, WindowRunner[IN, OUT]] = - new UnifiedMap[GROUP, WindowRunner[IN, OUT]] + private val groups: UnifiedMap[GROUP, TimedValueProcessor[IN, OUT]] = + new UnifiedMap[GROUP, TimedValueProcessor[IN, OUT]] override def onNext(message: Message): Unit = { val input = message.value.asInstanceOf[IN] @@ -53,7 +53,7 @@ class GroupByTask[IN, GROUP, OUT]( if (!groups.containsKey(group)) { groups.put(group, - userConfig.getValue[WindowRunner[IN, OUT]]( + userConfig.getValue[TimedValueProcessor[IN, OUT]]( GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get) } @@ -65,8 +65,8 @@ class GroupByTask[IN, GROUP, OUT]( if (groups.isEmpty && watermark == Watermark.MAX) { taskContext.updateWatermark(Watermark.MAX) } else { - groups.values.forEach(new Consumer[WindowRunner[IN, OUT]] { - override def accept(runner: WindowRunner[IN, OUT]): Unit = { + groups.values.forEach(new Consumer[TimedValueProcessor[IN, OUT]] { + override def accept(runner: TimedValueProcessor[IN, OUT]): Unit = { TaskUtil.trigger(watermark, runner, taskContext) } }) diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index 5ad64fa70..a36dd2de9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -22,16 +22,16 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TimedValueProcessor} import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} class TransformTask[IN, OUT]( - runner: WindowRunner[IN, OUT], + runner: TimedValueProcessor[IN, OUT], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { def this(context: TaskContext, conf: UserConfig) = { this( - conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + conf.getValue[TimedValueProcessor[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, context, conf ) } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/TimedValueProcessor.scala similarity index 86% rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/TimedValueProcessor.scala index ee3c06736..45419ce0e 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/TimedValueProcessor.scala @@ -32,23 +32,17 @@ import org.apache.gearpump.streaming.task.TaskUtil import scala.collection.mutable.ArrayBuffer /** - * Inputs for [[WindowRunner]]. + * Inputs for [[TimedValueProcessor]]. */ case class TimestampedValue[T](value: T, timestamp: Instant) /** - * Outputs triggered by [[WindowRunner]] + * Outputs triggered by [[TimedValueProcessor]] */ case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]], watermark: Instant) -/** - * This is responsible for executing window calculation. - * 1. Groups elements into windows as defined by window function - * 2. Applies window calculation to each group - * 3. Emits results on triggering - */ -trait WindowRunner[IN, OUT] extends java.io.Serializable { +trait TimedValueProcessor[IN, OUT] extends java.io.Serializable { def process(timestampedValue: TimestampedValue[IN]): Unit @@ -59,8 +53,8 @@ trait WindowRunner[IN, OUT] extends java.io.Serializable { * A composite WindowRunner that first executes its left child and feeds results * into result child. */ -case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], - right: WindowRunner[MIDDLE, OUT]) extends WindowRunner[IN, OUT] { +case class AndThen[IN, MIDDLE, OUT](left: TimedValueProcessor[IN, MIDDLE], + right: TimedValueProcessor[MIDDLE, OUT]) extends TimedValueProcessor[IN, OUT] { override def process(timestampedValue: TimestampedValue[IN]): Unit = { left.process(timestampedValue) @@ -73,13 +67,35 @@ case class AndThen[IN, MIDDLE, OUT](left: WindowRunner[IN, MIDDLE], } } +class DirectProcessor[IN, OUT](fnRunner: FunctionRunner[IN, OUT]) + extends TimedValueProcessor[IN, OUT] { + + private val buffer = ArrayBuffer.empty[TimestampedValue[OUT]] + + override def process(timestampedValue: TimestampedValue[IN]): Unit = { + fnRunner.setup() + fnRunner.process(timestampedValue.value) + .map(TimestampedValue(_, timestampedValue.timestamp)) + .foreach(tv => buffer.append(tv)) + fnRunner.finish() + fnRunner.teardown() + } + + override def trigger(time: Instant): TriggeredOutputs[OUT] = { + TriggeredOutputs(buffer, time) + } +} + /** - * Default implementation for [[WindowRunner]]. + * This is responsible for executing window calculation. + * 1. Groups elements into windows as defined by window function + * 2. Applies window calculation to each group + * 3. Emits results on triggering */ -class DefaultWindowRunner[IN, OUT]( +class WindowProcessor[IN, OUT]( windows: Windows, fnRunner: FunctionRunner[IN, OUT]) - extends WindowRunner[IN, OUT] { + extends TimedValueProcessor[IN, OUT] { private val windowFn = windows.windowFn private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]] diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala index dd4c0d393..4f45f685f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala @@ -22,7 +22,7 @@ import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows} -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, Window, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{WindowProcessor, Window, TimedValueProcessor} import org.apache.gearpump.streaming.{Constants, Processor} /** @@ -48,8 +48,8 @@ object DataSourceProcessor { Processor[DataSourceTask[Any, Any]](parallelism, description, taskConf .withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource) - .withValue[WindowRunner[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR, - new DefaultWindowRunner[Any, Any]( + .withValue[TimedValueProcessor[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR, + new WindowProcessor[Any, Any]( Windows(PerElementWindowFunction, description = "perElementWindows"), new DummyRunner[Any]))) } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index f93c4969f..508a85d09 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TimedValueProcessor} import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} /** @@ -40,7 +40,7 @@ import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} */ class DataSourceTask[IN, OUT] private[source]( source: DataSource, - windowRunner: WindowRunner[IN, OUT], + windowRunner: TimedValueProcessor[IN, OUT], context: TaskContext, conf: UserConfig) extends Task(context, conf) { @@ -48,7 +48,7 @@ class DataSourceTask[IN, OUT] private[source]( def this(context: TaskContext, conf: UserConfig) = { this( conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get, - conf.getValue[WindowRunner[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + conf.getValue[TimedValueProcessor[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, context, conf ) } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala index bd889c4ca..7edefa022 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.task import java.time.Instant import org.apache.gearpump.Message -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TimedValueProcessor} object TaskUtil { @@ -36,7 +36,7 @@ object TaskUtil { loader.loadClass(className).asSubclass(classOf[Task]) } - def trigger[IN, OUT](watermark: Instant, runner: WindowRunner[IN, OUT], + def trigger[IN, OUT](watermark: Instant, runner: TimedValueProcessor[IN, OUT], context: TaskContext): Unit = { val triggeredOutputs = runner.trigger(watermark) context.updateWatermark(triggeredOutputs.watermark) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index ca0135d03..9d5b94d5b 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -28,7 +28,7 @@ import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTas import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, FlatMapper, FunctionRunner} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{WindowProcessor, TimedValueProcessor} import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.{Task, TaskContext} @@ -173,7 +173,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "chain WindowTransformOp" in { - val runner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner()) + val runner = new WindowProcessor[Any, Any](GlobalWindows(), new DummyRunner()) val windowTransformOp = mock[WindowTransformOp[Any, Any]] when(windowTransformOp.windowRunner).thenReturn(runner) @@ -199,7 +199,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val mergeOp = MergeOp() "chain WindowTransformOp" in { - val runner = mock[WindowRunner[Any, Any]] + val runner = mock[TimedValueProcessor[Any, Any]] val windowTransformOp = mock[WindowTransformOp[Any, Any]] when(windowTransformOp.windowRunner).thenReturn(runner) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala index 62442247e..620631282 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala @@ -29,7 +29,7 @@ import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.task.TransformTask import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows -import org.apache.gearpump.streaming.dsl.window.impl.{DefaultWindowRunner, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{WindowProcessor, TimedValueProcessor} import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{Matchers, WordSpec} @@ -218,11 +218,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val data = "one two three".split("\\s+") val dataSource = new CollectionDataSource[String](data) - val runner1 = new DefaultWindowRunner[String, String]( + val runner1 = new WindowProcessor[String, String]( GlobalWindows(), new DummyRunner[String]) val conf = UserConfig.empty .withValue(GEARPUMP_STREAMING_SOURCE, dataSource) - .withValue[WindowRunner[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1) + .withValue[TimedValueProcessor[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1) // Source with no transformer val source = new DataSourceTask[String, String]( @@ -239,7 +239,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val anotherTaskContext = MockUtil.mockTaskContext val double = new FlatMapper[String, String](FlatMapFunction( word => List(word, word)), "double") - val runner2 = new DefaultWindowRunner[String, String]( + val runner2 = new WindowProcessor[String, String]( GlobalWindows(), double) val another = new DataSourceTask(anotherTaskContext, conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2)) @@ -262,7 +262,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val conf = UserConfig.empty val double = new FlatMapper[String, String](FlatMapFunction( word => List(word, word)), "double") - val transform = new DefaultWindowRunner[String, String](GlobalWindows(), double) + val transform = new WindowProcessor[String, String](GlobalWindows(), double) val task = new TransformTask[String, String](transform, taskContext, conf) task.onStart(Instant.EPOCH) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala index 9e6bf5914..770dc3bca 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows import org.apache.gearpump.streaming.{Constants, MockUtil} -import org.apache.gearpump.streaming.dsl.window.impl.DefaultWindowRunner +import org.apache.gearpump.streaming.dsl.window.impl.WindowProcessor import org.apache.gearpump.streaming.source.Watermark import org.mockito.Mockito._ import org.scalacheck.Gen @@ -40,7 +40,7 @@ class GroupByTaskSpec extends PropSpec with PropertyChecks forAll(longGen) { (time: Instant) => val groupBy = mock[Any => Int] - val windowRunner = new DefaultWindowRunner[Any, Any](GlobalWindows(), new DummyRunner[Any]) + val windowRunner = new WindowProcessor[Any, Any](GlobalWindows(), new DummyRunner[Any]) val context = MockUtil.mockTaskContext val config = UserConfig.empty .withValue( diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index e38c5a360..3834b463a 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -22,7 +22,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, TimedValueProcessor} import org.mockito.Mockito.{verify, when} import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} @@ -36,7 +36,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with val watermarkGen = longGen.map(Instant.ofEpochMilli) forAll(watermarkGen) { (watermark: Instant) => - val windowRunner = mock[WindowRunner[Any, Any]] + val windowRunner = mock[TimedValueProcessor[Any, Any]] val context = MockUtil.mockTaskContext val config = UserConfig.empty val task = new TransformTask[Any, Any](windowRunner, context, config) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala index b23d0ee30..2ef91ecbc 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala @@ -46,7 +46,7 @@ class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks implicit val system = MockUtil.system val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2)) val windows = SessionWindows.apply(Duration.ofMillis(4L)) - val windowRunner = new DefaultWindowRunner[KV, Option[KV]](windows, + val windowRunner = new WindowProcessor[KV, Option[KV]](windows, new FoldRunner[KV, Option[KV]](reduce, "reduce")) data.foreach(m => windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp))) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index d62739af9..7e459f18c 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, WindowRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, TimedValueProcessor} import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -40,7 +40,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val runner = mock[WindowRunner[Any, Any]] + val runner = mock[TimedValueProcessor[Any, Any]] val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) sourceTask.onStart(startTime) @@ -57,7 +57,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val runner = mock[WindowRunner[Any, Any]] + val runner = mock[TimedValueProcessor[Any, Any]] val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) val msg = Message(str, timestamp) when(dataSource.read()).thenReturn(msg) @@ -79,7 +79,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val runner = mock[WindowRunner[Any, Any]] + val runner = mock[TimedValueProcessor[Any, Any]] val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) sourceTask.onStop() From bbb0c87922281e075e75baad69eeb958def459e9 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Wed, 21 Mar 2018 22:53:00 +0800 Subject: [PATCH 2/5] Fix tests --- .../scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala | 2 +- .../org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala | 4 ++-- .../gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index 9d5b94d5b..ce3e1707e 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -85,7 +85,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val processor = dataSourceOp.toProcessor processor shouldBe a[Processor[_]] processor.parallelism shouldBe dataSourceOp.parallelism - processor.description shouldBe s"${dataSourceOp.description}.globalWindows" + processor.description shouldBe s"${dataSourceOp.description}" } } diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala index be4cc635c..f2c9d0e41 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala @@ -87,8 +87,8 @@ class PlannerSpec extends FlatSpec with Matchers with BeforeAndAfterAll with Moc .mapVertex(_.description) plan.getVertices.toSet should contain theSameElementsAs - Set("source.globalWindows", "groupBy.globalWindows.flatMap.reduce", "processor", "sink") - plan.outgoingEdgesOf("source.globalWindows").iterator.next()._2 shouldBe + Set("source", "groupBy.globalWindows.flatMap.reduce", "processor", "sink") + plan.outgoingEdgesOf("source").iterator.next()._2 shouldBe a[GroupByPartitioner[_, _]] plan.outgoingEdgesOf("groupBy.globalWindows.flatMap.reduce").iterator.next()._2 shouldBe a[CoLocationPartitioner] diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala index d43bca0df..f701c5e69 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala @@ -61,9 +61,9 @@ class StreamAppSpec extends FlatSpec with Matchers with BeforeAndAfterAll with M dag.getVertices.size shouldBe 2 dag.getVertices.foreach { processor => processor.taskClass shouldBe classOf[DataSourceTask[_, _]].getName - if (processor.description == "A.globalWindows") { + if (processor.description == "A") { processor.parallelism shouldBe 2 - } else if (processor.description == "B.globalWindows") { + } else if (processor.description == "B") { processor.parallelism shouldBe 3 } else { fail(s"undefined source ${processor.description}") From fd7d06376eecf5f978809e5f71e4948ebaf1a94b Mon Sep 17 00:00:00 2001 From: manuzhang Date: Thu, 22 Mar 2018 22:54:56 +0800 Subject: [PATCH 3/5] Fix processing without window --- .../gearpump/streaming/dsl/package.scala | 2 +- .../gearpump/streaming/dsl/plan/OP.scala | 52 +++++----- .../dsl/plan/functions/FunctionRunner.scala | 8 +- .../streaming/dsl/task/GroupByTask.scala | 14 +-- .../streaming/dsl/task/TransformTask.scala | 8 +- ...rocessor.scala => StreamingOperator.scala} | 96 ++++++++++++------- .../source/DataSourceProcessor.scala | 22 ++--- .../streaming/source/DataSourceTask.scala | 17 ++-- .../gearpump/streaming/task/TaskUtil.scala | 4 +- .../gearpump/streaming/dsl/plan/OpSpec.scala | 12 +-- .../plan/functions/FunctionRunnerSpec.scala | 10 +- .../streaming/dsl/task/GroupByTaskSpec.scala | 4 +- .../dsl/task/TransformTaskSpec.scala | 6 +- .../window/impl/DefaultWindowRunnerSpec.scala | 4 +- .../streaming/source/DataSourceTaskSpec.scala | 18 ++-- 15 files changed, 155 insertions(+), 122 deletions(-) rename streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/{TimedValueProcessor.scala => StreamingOperator.scala} (72%) diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala index bb32eb7a9..2d3d94b26 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/package.scala @@ -36,7 +36,7 @@ package org.apache.gearpump.streaming * * [[org.apache.gearpump.streaming.dsl.task.GroupByTask]] to execute Ops followed by [[org.apache.gearpump.streaming.dsl.plan.GroupByOp]] * * [[org.apache.gearpump.streaming.dsl.task.TransformTask]] to execute all other Ops. * - * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.TimedValueProcessor]], which internally + * All but [[org.apache.gearpump.streaming.sink.DataSinkTask]] delegates execution to [[org.apache.gearpump.streaming.dsl.window.impl.StreamingOperator]], which internally * runs a chain of [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]] grouped by windows. Window assignments are either explicitly defined with * [[org.apache.gearpump.streaming.dsl.window.api.Windows]] API or implicitly in [[org.apache.gearpump.streaming.dsl.window.api.GlobalWindows]]. UDFs are eventually * executed by [[org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner]]. diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala index 3c5ebb6bf..4a71b08ed 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala @@ -21,8 +21,8 @@ package org.apache.gearpump.streaming.dsl.plan import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Processor.DefaultProcessor -import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FoldRunner, FunctionRunner} -import org.apache.gearpump.streaming.dsl.window.impl.{WindowProcessor, DirectProcessor, TimedValueProcessor, AndThen => WindowRunnerAT} +import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, DummyRunner, FlatMapper, FunctionRunner} +import org.apache.gearpump.streaming.dsl.window.impl.{AndThenOperator, FlatMapOperator, StreamingOperator, WindowOperator} import org.apache.gearpump.streaming.{Constants, Processor} import org.apache.gearpump.streaming.dsl.task.{GroupByTask, TransformTask} import org.apache.gearpump.streaming.dsl.window.api.{GlobalWindows, Windows} @@ -68,6 +68,16 @@ object Op { } } + def isFlatMapper(runner: FunctionRunner[Any, Any]): Boolean = { + runner match { + case fm: FlatMapper[Any, Any] => + true + case at: AndThen[Any, Any, Any] => + isFlatMapper(at.first) && isFlatMapper(at.second) + case _ => + false + } + } } /** @@ -134,14 +144,14 @@ case class DataSourceOp( parallelism: Int = 1, description: String = "source", userConfig: UserConfig = UserConfig.empty, - windowRunner: Option[TimedValueProcessor[Any, Any]] = None) + operator: Option[StreamingOperator[Any, Any]] = None) extends Op { override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { case op: WindowTransformOp[Any, Any] => val chainedRunner = - windowRunner.map(WindowRunnerAT(_, op.windowRunner)).getOrElse(op.windowRunner) + operator.map(AndThenOperator(_, op.operator)).getOrElse(op.operator) DataSourceOp( dataSource, parallelism, @@ -151,13 +161,11 @@ case class DataSourceOp( op.userConfig), Some(chainedRunner)) case op: TransformOp[Any, Any] => - val fn = op.fn - // WindowProcessor is required for FoldRunner - // AndThen could be composite FoldRunners - if (!fn.isInstanceOf[FoldRunner[Any, Any]] && !fn.isInstanceOf[AndThen[Any, Any, Any]]) { - val runner = new DirectProcessor(op.fn) + val runner = op.runner + if (Op.isFlatMapper(runner)) { + val fm = new FlatMapOperator[Any, Any](runner) val chainedRunner = - windowRunner.map(WindowRunnerAT(_, runner)).getOrElse(runner) + operator.map(AndThenOperator(_, fm)).getOrElse(fm) DataSourceOp( dataSource, parallelism, @@ -216,10 +224,10 @@ case class DataSinkOp( * to another Op to be executed */ case class TransformOp[IN, OUT]( - fn: FunctionRunner[IN, OUT], + runner: FunctionRunner[IN, OUT], userConfig: UserConfig = UserConfig.empty) extends Op { - override def description: String = fn.description + override def description: String = runner.description override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { @@ -229,16 +237,16 @@ case class TransformOp[IN, OUT]( // => ChainableOp(f1).chain(ChainableOp(f2)).chain(ChainableOp(f3)) // => AndThen(AndThen(f1, f2), f3) TransformOp( - AndThen(fn, op.fn), + AndThen(runner, op.runner), Op.concatenate(userConfig, op.userConfig)) case op: WindowOp => TransformWindowTransformOp(this, - WindowTransformOp(new WindowProcessor[OUT, OUT]( + WindowTransformOp(new WindowOperator[OUT, OUT]( op.windows, new DummyRunner[OUT] ), op.description, op.userConfig)) case op: TransformWindowTransformOp[OUT, _, _] => TransformWindowTransformOp(TransformOp( - AndThen(fn, op.transformOp.fn), + AndThen(runner, op.transformOp.runner), Op.concatenate(userConfig, op.transformOp.userConfig) ), op.windowTransformOp) case _ => @@ -265,13 +273,13 @@ case class WindowOp( override def chain(other: Op)(implicit system: ActorSystem): Op = { other match { case op: TransformOp[_, _] => - WindowTransformOp(new WindowProcessor(windows, op.fn), + WindowTransformOp(new WindowOperator(windows, op.runner), Op.concatenate(description, op.description), Op.concatenate(userConfig, op.userConfig)) case op: WindowOp => chain(TransformOp(new DummyRunner[Any])).chain(op.chain(TransformOp(new DummyRunner[Any]))) case op: TransformWindowTransformOp[_, _, _] => - WindowTransformOp(new WindowProcessor(windows, op.transformOp.fn), + WindowTransformOp(new WindowOperator(windows, op.transformOp.runner), Op.concatenate(description, op.transformOp.description), Op.concatenate(userConfig, op.transformOp.userConfig)).chain(op.windowTransformOp) case _ => @@ -311,7 +319,7 @@ case class GroupByOp[IN, GROUP] private( Op.concatenate(description, op.description), Op.concatenate( userConfig - .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.windowRunner), + .withValue(Constants.GEARPUMP_STREAMING_OPERATOR, op.operator), userConfig)) case op: WindowOp => chain(op.chain(TransformOp(new DummyRunner[Any]()))) @@ -350,7 +358,7 @@ case class MergeOp( parallelism, description, Op.concatenate(userConfig.withValue(Constants.GEARPUMP_STREAMING_OPERATOR, - op.windowRunner), + op.operator), op.userConfig)) case op: WindowOp => chain(op.chain(TransformOp(new DummyRunner[Any]()))) @@ -373,7 +381,7 @@ case class MergeOp( * it will be translated to a [[org.apache.gearpump.streaming.dsl.task.TransformTask]]. */ private case class WindowTransformOp[IN, OUT]( - windowRunner: TimedValueProcessor[IN, OUT], + operator: StreamingOperator[IN, OUT], description: String, userConfig: UserConfig) extends Op { @@ -381,7 +389,7 @@ private case class WindowTransformOp[IN, OUT]( other match { case op: WindowTransformOp[OUT, _] => WindowTransformOp( - WindowRunnerAT(windowRunner, op.windowRunner), + AndThenOperator(operator, op.operator), Op.concatenate(description, op.description), Op.concatenate(userConfig, op.userConfig) ) @@ -393,7 +401,7 @@ private case class WindowTransformOp[IN, OUT]( override def toProcessor(implicit system: ActorSystem): Processor[_ <: Task] = { // TODO: this should be chained to DataSourceOp / GroupByOp / MergeOp Processor[TransformTask[Any, Any]](1, description, userConfig.withValue( - Constants.GEARPUMP_STREAMING_OPERATOR, windowRunner)) + Constants.GEARPUMP_STREAMING_OPERATOR, operator)) } } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala index 2c11238c7..c63825739 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunner.scala @@ -39,6 +39,7 @@ sealed trait FunctionRunner[IN, OUT] extends java.io.Serializable { def description: String } + case class AndThen[IN, MIDDLE, OUT](first: FunctionRunner[IN, MIDDLE], second: FunctionRunner[MIDDLE, OUT]) extends FunctionRunner[IN, OUT] { @@ -114,10 +115,5 @@ class FoldRunner[T, A](fn: FoldFunction[T, A], val description: String) } } -class DummyRunner[T] extends FunctionRunner[T, T] { - - override def process(value: T): TraversableOnce[T] = Option(value) - - override def description: String = "" -} +class DummyRunner[T] extends FlatMapper[T, T](FlatMapFunction(Option(_)), "") diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala index 76fa1f2b5..80a6e6761 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala @@ -25,7 +25,7 @@ import com.gs.collections.impl.map.mutable.UnifiedMap import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR} -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TimedValueProcessor} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator} import org.apache.gearpump.streaming.source.Watermark import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} @@ -44,8 +44,8 @@ class GroupByTask[IN, GROUP, OUT]( ) } - private val groups: UnifiedMap[GROUP, TimedValueProcessor[IN, OUT]] = - new UnifiedMap[GROUP, TimedValueProcessor[IN, OUT]] + private val groups: UnifiedMap[GROUP, StreamingOperator[IN, OUT]] = + new UnifiedMap[GROUP, StreamingOperator[IN, OUT]] override def onNext(message: Message): Unit = { val input = message.value.asInstanceOf[IN] @@ -53,11 +53,11 @@ class GroupByTask[IN, GROUP, OUT]( if (!groups.containsKey(group)) { groups.put(group, - userConfig.getValue[TimedValueProcessor[IN, OUT]]( + userConfig.getValue[StreamingOperator[IN, OUT]]( GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get) } - groups.get(group).process(TimestampedValue(message.value.asInstanceOf[IN], + groups.get(group).foreach(TimestampedValue(message.value.asInstanceOf[IN], message.timestamp)) } @@ -65,8 +65,8 @@ class GroupByTask[IN, GROUP, OUT]( if (groups.isEmpty && watermark == Watermark.MAX) { taskContext.updateWatermark(Watermark.MAX) } else { - groups.values.forEach(new Consumer[TimedValueProcessor[IN, OUT]] { - override def accept(runner: TimedValueProcessor[IN, OUT]): Unit = { + groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] { + override def accept(runner: StreamingOperator[IN, OUT]): Unit = { TaskUtil.trigger(watermark, runner, taskContext) } }) diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index a36dd2de9..fe026841c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -22,22 +22,22 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TimedValueProcessor} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator} import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} class TransformTask[IN, OUT]( - runner: TimedValueProcessor[IN, OUT], + runner: StreamingOperator[IN, OUT], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { def this(context: TaskContext, conf: UserConfig) = { this( - conf.getValue[TimedValueProcessor[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + conf.getValue[StreamingOperator[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, context, conf ) } override def onNext(msg: Message): Unit = { - runner.process(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)) + runner.foreach(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)) } override def onWatermarkProgress(watermark: Instant): Unit = { diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/TimedValueProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala similarity index 72% rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/TimedValueProcessor.scala rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala index 45419ce0e..63715dbc4 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/TimedValueProcessor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala @@ -23,6 +23,7 @@ import com.gs.collections.api.block.predicate.Predicate import com.gs.collections.api.block.procedure.Procedure import com.gs.collections.impl.list.mutable.FastList import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap +import org.apache.gearpump.Message import org.apache.gearpump.streaming.dsl.plan.functions.FunctionRunner import org.apache.gearpump.streaming.dsl.window.api.WindowFunction.Context import org.apache.gearpump.streaming.dsl.window.api.{Discarding, Windows} @@ -32,57 +33,87 @@ import org.apache.gearpump.streaming.task.TaskUtil import scala.collection.mutable.ArrayBuffer /** - * Inputs for [[TimedValueProcessor]]. + * Inputs for [[StreamingOperator]]. */ -case class TimestampedValue[T](value: T, timestamp: Instant) +case class TimestampedValue[T](value: T, timestamp: Instant) { + + def this(msg: Message) = { + this(msg.value.asInstanceOf[T], msg.timestamp) + } + + def toMessage: Message = Message(value, timestamp) +} /** - * Outputs triggered by [[TimedValueProcessor]] + * Outputs triggered by [[StreamingOperator]] */ case class TriggeredOutputs[T](outputs: TraversableOnce[TimestampedValue[T]], watermark: Instant) -trait TimedValueProcessor[IN, OUT] extends java.io.Serializable { - def process(timestampedValue: TimestampedValue[IN]): Unit +trait StreamingOperator[IN, OUT] extends java.io.Serializable { + + def setup(): Unit = {} + + def foreach(timestampedValue: TimestampedValue[IN]): Unit + + def flatMap( + timestampedValue: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { + foreach(timestampedValue) + None + } def trigger(time: Instant): TriggeredOutputs[OUT] + + def teardown(): Unit = {} } /** * A composite WindowRunner that first executes its left child and feeds results * into result child. */ -case class AndThen[IN, MIDDLE, OUT](left: TimedValueProcessor[IN, MIDDLE], - right: TimedValueProcessor[MIDDLE, OUT]) extends TimedValueProcessor[IN, OUT] { +case class AndThenOperator[IN, MIDDLE, OUT](left: StreamingOperator[IN, MIDDLE], + right: StreamingOperator[MIDDLE, OUT]) extends StreamingOperator[IN, OUT] { - override def process(timestampedValue: TimestampedValue[IN]): Unit = { - left.process(timestampedValue) + override def foreach( + timestampedValue: TimestampedValue[IN]): Unit = { + left.foreach(timestampedValue) } override def trigger(time: Instant): TriggeredOutputs[OUT] = { val lOutputs = left.trigger(time) - lOutputs.outputs.foreach(right.process) + lOutputs.outputs.foreach(right.foreach) right.trigger(lOutputs.watermark) } } -class DirectProcessor[IN, OUT](fnRunner: FunctionRunner[IN, OUT]) - extends TimedValueProcessor[IN, OUT] { +/** + * @param runner FlatMapper or chained FlatMappers + */ +class FlatMapOperator[IN, OUT](runner: FunctionRunner[IN, OUT]) + extends StreamingOperator[IN, OUT] { + + override def setup(): Unit = { + runner.setup() + } - private val buffer = ArrayBuffer.empty[TimestampedValue[OUT]] + override def foreach(timestampedValue: TimestampedValue[IN]): Unit = { + throw new UnsupportedOperationException("foreach should not be invoked on FlatMapOperator; " + + "please use flatMap instead") + } - override def process(timestampedValue: TimestampedValue[IN]): Unit = { - fnRunner.setup() - fnRunner.process(timestampedValue.value) + override def flatMap( + timestampedValue: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { + runner.process(timestampedValue.value) .map(TimestampedValue(_, timestampedValue.timestamp)) - .foreach(tv => buffer.append(tv)) - fnRunner.finish() - fnRunner.teardown() } override def trigger(time: Instant): TriggeredOutputs[OUT] = { - TriggeredOutputs(buffer, time) + TriggeredOutputs(None, time) + } + + override def teardown(): Unit = { + runner.teardown() } } @@ -92,17 +123,18 @@ class DirectProcessor[IN, OUT](fnRunner: FunctionRunner[IN, OUT]) * 2. Applies window calculation to each group * 3. Emits results on triggering */ -class WindowProcessor[IN, OUT]( +class WindowOperator[IN, OUT]( windows: Windows, - fnRunner: FunctionRunner[IN, OUT]) - extends TimedValueProcessor[IN, OUT] { + runner: FunctionRunner[IN, OUT]) + extends StreamingOperator[IN, OUT] { private val windowFn = windows.windowFn private val windowInputs = new TreeSortedMap[Window, FastList[TimestampedValue[IN]]] - private var setup = false + private var isSetup = false private var watermark = Watermark.MIN - override def process(timestampedValue: TimestampedValue[IN]): Unit = { + override def foreach( + timestampedValue: TimestampedValue[IN]): Unit = { val wins = windowFn(new Context[IN] { override def element: IN = timestampedValue.value @@ -149,26 +181,26 @@ class WindowProcessor[IN, OUT]( val firstWin = windowInputs.firstKey if (!time.isBefore(firstWin.endTime)) { val inputs = windowInputs.remove(firstWin) - if (!setup) { - fnRunner.setup() - setup = true + if (!isSetup) { + runner.setup() + isSetup = true } inputs.forEach(new Procedure[TimestampedValue[IN]] { override def value(tv: TimestampedValue[IN]): Unit = { - fnRunner.process(tv.value).foreach { + runner.process(tv.value).foreach { out: OUT => outputs += TimestampedValue(out, tv.timestamp) } } }) - fnRunner.finish().foreach { + runner.finish().foreach { out: OUT => outputs += TimestampedValue(out, firstWin.endTime.minusMillis(1)) } val newWmk = TaskUtil.max(wmk, firstWin.endTime) if (windows.accumulationMode == Discarding) { - fnRunner.teardown() + runner.teardown() // discarding, setup need to be called for each window - setup = false + isSetup = false } onTrigger(outputs, newWmk) } else { diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala index 4f45f685f..c471a00b0 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceProcessor.scala @@ -18,11 +18,11 @@ package org.apache.gearpump.streaming.source + import akka.actor.ActorSystem import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner -import org.apache.gearpump.streaming.dsl.window.api.{WindowFunction, Windows} -import org.apache.gearpump.streaming.dsl.window.impl.{WindowProcessor, Window, TimedValueProcessor} +import org.apache.gearpump.streaming.dsl.window.impl.{FlatMapOperator, StreamingOperator} import org.apache.gearpump.streaming.{Constants, Processor} /** @@ -48,19 +48,9 @@ object DataSourceProcessor { Processor[DataSourceTask[Any, Any]](parallelism, description, taskConf .withValue[DataSource](Constants.GEARPUMP_STREAMING_SOURCE, dataSource) - .withValue[TimedValueProcessor[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR, - new WindowProcessor[Any, Any]( - Windows(PerElementWindowFunction, description = "perElementWindows"), - new DummyRunner[Any]))) - } - - - case object PerElementWindowFunction extends WindowFunction { - override def apply[T]( - context: WindowFunction.Context[T]): Array[Window] = { - Array(Window(context.timestamp, context.timestamp.plusMillis(1))) - } - - override def isNonMerging: Boolean = true + .withValue[StreamingOperator[Any, Any]](Constants.GEARPUMP_STREAMING_OPERATOR, + new FlatMapOperator(new DummyRunner[Any]) + ) + ) } } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 508a85d09..966a01563 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump._ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TimedValueProcessor} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator} import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} /** @@ -40,7 +40,7 @@ import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} */ class DataSourceTask[IN, OUT] private[source]( source: DataSource, - windowRunner: TimedValueProcessor[IN, OUT], + operator: StreamingOperator[IN, OUT], context: TaskContext, conf: UserConfig) extends Task(context, conf) { @@ -48,7 +48,7 @@ class DataSourceTask[IN, OUT] private[source]( def this(context: TaskContext, conf: UserConfig) = { this( conf.getValue[DataSource](GEARPUMP_STREAMING_SOURCE)(context.system).get, - conf.getValue[TimedValueProcessor[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, + conf.getValue[StreamingOperator[IN, OUT]](GEARPUMP_STREAMING_OPERATOR)(context.system).get, context, conf ) } @@ -64,16 +64,14 @@ class DataSourceTask[IN, OUT] private[source]( override def onNext(m: Message): Unit = { 0.until(batchSize).foreach { _ => - Option(source.read()).foreach( - msg => windowRunner.process( - TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp))) + Option(source.read()).foreach(process) } self ! Watermark(source.getWatermark) } override def onWatermarkProgress(watermark: Instant): Unit = { - TaskUtil.trigger(watermark, windowRunner, context) + TaskUtil.trigger(watermark, operator, context) } override def onStop(): Unit = { @@ -81,4 +79,9 @@ class DataSourceTask[IN, OUT] private[source]( source.close() } + private def process(msg: Message): Unit = { + operator.flatMap(new TimestampedValue(msg)) + .foreach { tv => context.output(tv.toMessage) } + } + } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala index 7edefa022..ed304ce73 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala @@ -21,7 +21,7 @@ package org.apache.gearpump.streaming.task import java.time.Instant import org.apache.gearpump.Message -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TimedValueProcessor} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator} object TaskUtil { @@ -36,7 +36,7 @@ object TaskUtil { loader.loadClass(className).asSubclass(classOf[Task]) } - def trigger[IN, OUT](watermark: Instant, runner: TimedValueProcessor[IN, OUT], + def trigger[IN, OUT](watermark: Instant, runner: StreamingOperator[IN, OUT], context: TaskContext): Unit = { val triggeredOutputs = runner.trigger(watermark) context.updateWatermark(triggeredOutputs.watermark) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala index ce3e1707e..79ef135cc 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala @@ -28,7 +28,7 @@ import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTas import org.apache.gearpump.streaming.dsl.plan.functions.{DummyRunner, FlatMapper, FunctionRunner} import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows -import org.apache.gearpump.streaming.dsl.window.impl.{WindowProcessor, TimedValueProcessor} +import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, StreamingOperator} import org.apache.gearpump.streaming.sink.DataSink import org.apache.gearpump.streaming.source.DataSource import org.apache.gearpump.streaming.task.{Task, TaskContext} @@ -66,7 +66,7 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val dataSourceOp = DataSourceOp(dataSource) val transformOp = mock[TransformOp[Any, Any]] val fn = mock[FunctionRunner[Any, Any]] - when(transformOp.fn).thenReturn(fn) + when(transformOp.runner).thenReturn(fn) val chainedOp = dataSourceOp.chain(transformOp) @@ -173,9 +173,9 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS "chain WindowTransformOp" in { - val runner = new WindowProcessor[Any, Any](GlobalWindows(), new DummyRunner()) + val runner = new WindowOperator[Any, Any](GlobalWindows(), new DummyRunner()) val windowTransformOp = mock[WindowTransformOp[Any, Any]] - when(windowTransformOp.windowRunner).thenReturn(runner) + when(windowTransformOp.operator).thenReturn(runner) val chainedOp = groupByOp.chain(windowTransformOp) chainedOp shouldBe a[GroupByOp[_, _]] @@ -199,9 +199,9 @@ class OpSpec extends WordSpec with Matchers with BeforeAndAfterAll with MockitoS val mergeOp = MergeOp() "chain WindowTransformOp" in { - val runner = mock[TimedValueProcessor[Any, Any]] + val runner = mock[StreamingOperator[Any, Any]] val windowTransformOp = mock[WindowTransformOp[Any, Any]] - when(windowTransformOp.windowRunner).thenReturn(runner) + when(windowTransformOp.operator).thenReturn(runner) val chainedOp = mergeOp.chain(windowTransformOp) chainedOp shouldBe a [MergeOp] diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala index 620631282..c92f9c84e 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/FunctionRunnerSpec.scala @@ -29,7 +29,7 @@ import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction import org.apache.gearpump.streaming.dsl.task.TransformTask import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows -import org.apache.gearpump.streaming.dsl.window.impl.{WindowProcessor, TimedValueProcessor} +import org.apache.gearpump.streaming.dsl.window.impl.{WindowOperator, StreamingOperator} import org.mockito.Matchers._ import org.mockito.Mockito._ import org.scalatest.{Matchers, WordSpec} @@ -218,11 +218,11 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val data = "one two three".split("\\s+") val dataSource = new CollectionDataSource[String](data) - val runner1 = new WindowProcessor[String, String]( + val runner1 = new WindowOperator[String, String]( GlobalWindows(), new DummyRunner[String]) val conf = UserConfig.empty .withValue(GEARPUMP_STREAMING_SOURCE, dataSource) - .withValue[TimedValueProcessor[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1) + .withValue[StreamingOperator[String, String]](GEARPUMP_STREAMING_OPERATOR, runner1) // Source with no transformer val source = new DataSourceTask[String, String]( @@ -239,7 +239,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val anotherTaskContext = MockUtil.mockTaskContext val double = new FlatMapper[String, String](FlatMapFunction( word => List(word, word)), "double") - val runner2 = new WindowProcessor[String, String]( + val runner2 = new WindowOperator[String, String]( GlobalWindows(), double) val another = new DataSourceTask(anotherTaskContext, conf.withValue(GEARPUMP_STREAMING_OPERATOR, runner2)) @@ -262,7 +262,7 @@ class FunctionRunnerSpec extends WordSpec with Matchers with MockitoSugar { val conf = UserConfig.empty val double = new FlatMapper[String, String](FlatMapFunction( word => List(word, word)), "double") - val transform = new WindowProcessor[String, String](GlobalWindows(), double) + val transform = new WindowOperator[String, String](GlobalWindows(), double) val task = new TransformTask[String, String](transform, taskContext, conf) task.onStart(Instant.EPOCH) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala index 770dc3bca..62e14f421 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/GroupByTaskSpec.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.dsl.plan.functions.DummyRunner import org.apache.gearpump.streaming.dsl.window.api.GlobalWindows import org.apache.gearpump.streaming.{Constants, MockUtil} -import org.apache.gearpump.streaming.dsl.window.impl.WindowProcessor +import org.apache.gearpump.streaming.dsl.window.impl.WindowOperator import org.apache.gearpump.streaming.source.Watermark import org.mockito.Mockito._ import org.scalacheck.Gen @@ -40,7 +40,7 @@ class GroupByTaskSpec extends PropSpec with PropertyChecks forAll(longGen) { (time: Instant) => val groupBy = mock[Any => Int] - val windowRunner = new WindowProcessor[Any, Any](GlobalWindows(), new DummyRunner[Any]) + val windowRunner = new WindowOperator[Any, Any](GlobalWindows(), new DummyRunner[Any]) val context = MockUtil.mockTaskContext val config = UserConfig.empty .withValue( diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala index 3834b463a..0009ad528 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/task/TransformTaskSpec.scala @@ -22,7 +22,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, TimedValueProcessor} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, StreamingOperator} import org.mockito.Mockito.{verify, when} import org.scalacheck.Gen import org.scalatest.{Matchers, PropSpec} @@ -36,7 +36,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with val watermarkGen = longGen.map(Instant.ofEpochMilli) forAll(watermarkGen) { (watermark: Instant) => - val windowRunner = mock[TimedValueProcessor[Any, Any]] + val windowRunner = mock[StreamingOperator[Any, Any]] val context = MockUtil.mockTaskContext val config = UserConfig.empty val task = new TransformTask[Any, Any](windowRunner, context, config) @@ -45,7 +45,7 @@ class TransformTaskSpec extends PropSpec with PropertyChecks with Matchers with val message = Message(value, time) task.onNext(message) - verify(windowRunner).process(TimestampedValue(value, time)) + verify(windowRunner).foreach(TimestampedValue(value, time)) when(windowRunner.trigger(watermark)).thenReturn( TriggeredOutputs(Some(TimestampedValue(value, time)), watermark)) diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala index 2ef91ecbc..1ac72139b 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/window/impl/DefaultWindowRunnerSpec.scala @@ -46,10 +46,10 @@ class DefaultWindowRunnerSpec extends PropSpec with PropertyChecks implicit val system = MockUtil.system val reduce = ReduceFunction[KV]((kv1, kv2) => (kv1._1, kv1._2 + kv2._2)) val windows = SessionWindows.apply(Duration.ofMillis(4L)) - val windowRunner = new WindowProcessor[KV, Option[KV]](windows, + val windowRunner = new WindowOperator[KV, Option[KV]](windows, new FoldRunner[KV, Option[KV]](reduce, "reduce")) - data.foreach(m => windowRunner.process(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp))) + data.foreach(m => windowRunner.foreach(TimestampedValue(m.value.asInstanceOf[KV], m.timestamp))) windowRunner.trigger(Watermark.MAX).outputs.toList shouldBe List( TimestampedValue(Some(("foo", 1)), Instant.ofEpochMilli(4)), diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala index 7e459f18c..cd2cfa7a5 100644 --- a/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala +++ b/streaming/src/test/scala/org/apache/gearpump/streaming/source/DataSourceTaskSpec.scala @@ -23,7 +23,7 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.MockUtil -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, TimedValueProcessor} +import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, TriggeredOutputs, StreamingOperator} import org.mockito.Mockito._ import org.scalacheck.Gen import org.scalatest.mock.MockitoSugar @@ -40,7 +40,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val runner = mock[TimedValueProcessor[Any, Any]] + val runner = mock[StreamingOperator[Any, Any]] val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) sourceTask.onStart(startTime) @@ -57,13 +57,17 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val runner = mock[TimedValueProcessor[Any, Any]] - val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) + val processor = mock[StreamingOperator[String, String]] + val sourceTask = new DataSourceTask[String, String](dataSource, processor, + taskContext, config) val msg = Message(str, timestamp) when(dataSource.read()).thenReturn(msg) - when(runner.trigger(Watermark.MAX)).thenReturn( - TriggeredOutputs(Some(TimestampedValue(str.asInstanceOf[Any], timestamp)), Watermark.MAX)) + when(processor.flatMap(new TimestampedValue[String](msg))).thenReturn( + Some(new TimestampedValue[String](msg)) + ) + when(processor.trigger(Watermark.MAX)).thenReturn( + TriggeredOutputs[String](None, Watermark.MAX)) sourceTask.onNext(Message("next")) sourceTask.onWatermarkProgress(Watermark.MAX) @@ -79,7 +83,7 @@ class DataSourceTaskSpec extends PropSpec with PropertyChecks with Matchers with val dataSource = mock[DataSource] val config = UserConfig.empty .withInt(DataSourceConfig.SOURCE_READ_BATCH_SIZE, 1) - val runner = mock[TimedValueProcessor[Any, Any]] + val runner = mock[StreamingOperator[Any, Any]] val sourceTask = new DataSourceTask[Any, Any](dataSource, runner, taskContext, config) sourceTask.onStop() From 7774a34ebede9e109d81221ecc999b32946254ee Mon Sep 17 00:00:00 2001 From: manuzhang Date: Sun, 8 Apr 2018 08:44:44 +0800 Subject: [PATCH 4/5] Fix AndThenOperator --- .../dsl/window/impl/StreamingOperator.scala | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala index 63715dbc4..32beb8efd 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala @@ -55,11 +55,11 @@ trait StreamingOperator[IN, OUT] extends java.io.Serializable { def setup(): Unit = {} - def foreach(timestampedValue: TimestampedValue[IN]): Unit + def foreach(tv: TimestampedValue[IN]): Unit def flatMap( - timestampedValue: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { - foreach(timestampedValue) + tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { + foreach(tv) None } @@ -76,8 +76,13 @@ case class AndThenOperator[IN, MIDDLE, OUT](left: StreamingOperator[IN, MIDDLE], right: StreamingOperator[MIDDLE, OUT]) extends StreamingOperator[IN, OUT] { override def foreach( - timestampedValue: TimestampedValue[IN]): Unit = { - left.foreach(timestampedValue) + tv: TimestampedValue[IN]): Unit = { + left.flatMap(tv).foreach(right.flatMap) + } + + override def flatMap( + tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { + left.flatMap(tv).flatMap(right.flatMap) } override def trigger(time: Instant): TriggeredOutputs[OUT] = { From b188058684e2ef92df621d5a3aa1e7fe272dbc67 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Tue, 10 Apr 2018 00:31:54 +0800 Subject: [PATCH 5/5] Fix StreamingOperator teardown --- .../streaming/dsl/task/GroupByTask.scala | 19 ++++++++--- .../streaming/dsl/task/TransformTask.scala | 16 +++++++--- .../dsl/window/impl/StreamingOperator.scala | 32 +++++++++++++------ .../streaming/source/DataSourceTask.scala | 2 ++ 4 files changed, 51 insertions(+), 18 deletions(-) diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala index 80a6e6761..b61535454 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala @@ -52,9 +52,10 @@ class GroupByTask[IN, GROUP, OUT]( val group = groupBy(input) if (!groups.containsKey(group)) { - groups.put(group, - userConfig.getValue[StreamingOperator[IN, OUT]]( - GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get) + val operator = userConfig.getValue[StreamingOperator[IN, OUT]]( + GEARPUMP_STREAMING_OPERATOR)(taskContext.system).get + operator.setup() + groups.put(group, operator) } groups.get(group).foreach(TimestampedValue(message.value.asInstanceOf[IN], @@ -66,10 +67,18 @@ class GroupByTask[IN, GROUP, OUT]( taskContext.updateWatermark(Watermark.MAX) } else { groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] { - override def accept(runner: StreamingOperator[IN, OUT]): Unit = { - TaskUtil.trigger(watermark, runner, taskContext) + override def accept(operator: StreamingOperator[IN, OUT]): Unit = { + TaskUtil.trigger(watermark, operator, taskContext) } }) } } + + override def onStop(): Unit = { + groups.values.forEach(new Consumer[StreamingOperator[IN, OUT]] { + override def accept(operator: StreamingOperator[IN, OUT]): Unit = { + operator.teardown() + } + }) + } } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala index fe026841c..6c78e0b20 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala @@ -22,11 +22,11 @@ import java.time.Instant import org.apache.gearpump.Message import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants._ -import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, StreamingOperator} +import org.apache.gearpump.streaming.dsl.window.impl.{StreamingOperator, TimestampedValue} import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} class TransformTask[IN, OUT]( - runner: StreamingOperator[IN, OUT], + operator: StreamingOperator[IN, OUT], taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) { def this(context: TaskContext, conf: UserConfig) = { @@ -36,11 +36,19 @@ class TransformTask[IN, OUT]( ) } + override def onStart(startTime: Instant): Unit = { + operator.setup() + } + override def onNext(msg: Message): Unit = { - runner.foreach(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)) + operator.foreach(TimestampedValue(msg.value.asInstanceOf[IN], msg.timestamp)) } override def onWatermarkProgress(watermark: Instant): Unit = { - TaskUtil.trigger(watermark, runner, taskContext) + TaskUtil.trigger(watermark, operator, taskContext) + } + + override def onStop(): Unit = { + operator.teardown() } } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala index 32beb8efd..4f29c9eec 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/StreamingOperator.scala @@ -75,6 +75,11 @@ trait StreamingOperator[IN, OUT] extends java.io.Serializable { case class AndThenOperator[IN, MIDDLE, OUT](left: StreamingOperator[IN, MIDDLE], right: StreamingOperator[MIDDLE, OUT]) extends StreamingOperator[IN, OUT] { + override def setup(): Unit = { + left.setup() + right.setup() + } + override def foreach( tv: TimestampedValue[IN]): Unit = { left.flatMap(tv).foreach(right.flatMap) @@ -90,6 +95,11 @@ case class AndThenOperator[IN, MIDDLE, OUT](left: StreamingOperator[IN, MIDDLE], lOutputs.outputs.foreach(right.foreach) right.trigger(lOutputs.watermark) } + + override def teardown(): Unit = { + left.teardown() + right.teardown() + } } /** @@ -102,15 +112,15 @@ class FlatMapOperator[IN, OUT](runner: FunctionRunner[IN, OUT]) runner.setup() } - override def foreach(timestampedValue: TimestampedValue[IN]): Unit = { + override def foreach(tv: TimestampedValue[IN]): Unit = { throw new UnsupportedOperationException("foreach should not be invoked on FlatMapOperator; " + "please use flatMap instead") } override def flatMap( - timestampedValue: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { - runner.process(timestampedValue.value) - .map(TimestampedValue(_, timestampedValue.timestamp)) + tv: TimestampedValue[IN]): TraversableOnce[TimestampedValue[OUT]] = { + runner.process(tv.value) + .map(TimestampedValue(_, tv.timestamp)) } override def trigger(time: Instant): TriggeredOutputs[OUT] = { @@ -139,11 +149,11 @@ class WindowOperator[IN, OUT]( private var watermark = Watermark.MIN override def foreach( - timestampedValue: TimestampedValue[IN]): Unit = { + tv: TimestampedValue[IN]): Unit = { val wins = windowFn(new Context[IN] { - override def element: IN = timestampedValue.value + override def element: IN = tv.value - override def timestamp: Instant = timestampedValue.timestamp + override def timestamp: Instant = tv.timestamp }) wins.foreach { win => if (windowFn.isNonMerging) { @@ -151,9 +161,9 @@ class WindowOperator[IN, OUT]( val inputs = new FastList[TimestampedValue[IN]] windowInputs.put(win, inputs) } - windowInputs.get(win).add(timestampedValue) + windowInputs.get(win).add(tv) } else { - merge(windowInputs, win, timestampedValue) + merge(windowInputs, win, tv) } } @@ -229,4 +239,8 @@ class WindowOperator[IN, OUT]( watermark = TaskUtil.max(watermark, triggeredOutputs.watermark) TriggeredOutputs(triggeredOutputs.outputs, watermark) } + + override def teardown(): Unit = { + runner.teardown() + } } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala index 966a01563..b09ad667f 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/source/DataSourceTask.scala @@ -58,6 +58,7 @@ class DataSourceTask[IN, OUT] private[source]( override def onStart(startTime: Instant): Unit = { LOG.info(s"opening data source at ${startTime.toEpochMilli}") source.open(context, startTime) + operator.setup() self ! Watermark(source.getWatermark) } @@ -77,6 +78,7 @@ class DataSourceTask[IN, OUT] private[source]( override def onStop(): Unit = { LOG.info("closing data source...") source.close() + operator.teardown() } private def process(msg: Message): Unit = {