From 76274d389c1d2d4f9528f3d1041e8f665322e3e1 Mon Sep 17 00:00:00 2001 From: manuzhang Date: Wed, 16 Aug 2017 06:46:24 +0800 Subject: [PATCH 1/2] [GEARPUMP-341] Update processing watermark in DataSinkTask --- .../examples/wordcount/dsl/WindowedWordCount.scala | 6 +++--- .../org/apache/gearpump/streaming/sink/DataSinkTask.scala | 4 ++++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala index 2aa1bb4af..379c7b620 100644 --- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala +++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala @@ -24,7 +24,7 @@ import org.apache.gearpump.cluster.client.ClientContext import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption} import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp} import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindows} -import org.apache.gearpump.streaming.source.DataSource +import org.apache.gearpump.streaming.source.{DataSource, Watermark} import org.apache.gearpump.streaming.task.TaskContext import org.apache.gearpump.util.AkkaApp @@ -45,7 +45,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser { groupBy(_._1). sum.sink(new LoggerSink) - context.submit(app) + context.submit(app).waitUntilFinish() context.close() } @@ -79,7 +79,7 @@ object WindowedWordCount extends AkkaApp with ArgumentsParser { override def getWatermark: Instant = { if (data.isEmpty) { - watermark = watermark.plusMillis(1) + watermark = Watermark.MAX } watermark } diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala index 0db44f236..932c7506c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/sink/DataSinkTask.scala @@ -52,4 +52,8 @@ class DataSinkTask private[sink](context: TaskContext, conf: UserConfig, sink: D LOG.info("closing data sink...") sink.close() } + + override def onWatermarkProgress(watermark: Instant): Unit = { + context.updateWatermark(watermark) + } } From 19de3f8a78de41ea948dbff214df905122f629fd Mon Sep 17 00:00:00 2001 From: manuzhang Date: Wed, 16 Aug 2017 07:22:48 +0800 Subject: [PATCH 2/2] Use "sudo: required" on travis --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 95c1427dc..8148c322b 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,6 @@ language: - scala -sudo: false +sudo: required jdk: - oraclejdk8 addons: