From 6a4cd0882deb98fc98cc56b71911d103a5754d6b Mon Sep 17 00:00:00 2001 From: huafengw Date: Fri, 29 Sep 2017 16:02:59 +0800 Subject: [PATCH] [GEARPUMP-350] Fix the not started app clock --- .../streaming/dsl/task/GroupByTask.scala | 4 ++-- .../streaming/task/Subscription.scala | 2 +- .../gearpump/streaming/task/TaskActor.scala | 8 +++---- .../gearpump/streaming/task/TaskUtil.scala | 4 ++++ .../gearpump/streaming/task/TaskWrapper.scala | 10 ++++++++- .../streaming/task/WatermarkProducer.scala | 21 +++++++++++++++++++ 6 files changed, 40 insertions(+), 9 deletions(-) create mode 100644 streaming/src/main/scala/org/apache/gearpump/streaming/task/WatermarkProducer.scala diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala index b3f3ad238..783a9a7a5 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/GroupByTask.scala @@ -27,7 +27,7 @@ import org.apache.gearpump.cluster.UserConfig import org.apache.gearpump.streaming.Constants.{GEARPUMP_STREAMING_GROUPBY_FUNCTION, GEARPUMP_STREAMING_OPERATOR} import org.apache.gearpump.streaming.dsl.window.impl.{TimestampedValue, WindowRunner} import org.apache.gearpump.streaming.source.Watermark -import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} +import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil, WatermarkProducer} /** * Processes messages in groups as defined by groupBy function. @@ -35,7 +35,7 @@ import org.apache.gearpump.streaming.task.{Task, TaskContext, TaskUtil} class GroupByTask[IN, GROUP, OUT]( groupBy: IN => GROUP, taskContext: TaskContext, - userConfig: UserConfig) extends Task(taskContext, userConfig) { + userConfig: UserConfig) extends Task(taskContext, userConfig) with WatermarkProducer { def this(context: TaskContext, conf: UserConfig) = { this( diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala index 24f17637a..1b41e742c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/Subscription.scala @@ -172,7 +172,7 @@ class Subscription( processingWatermarkSince(index) = messageCount(index) } - pendingMessageCount(ack.taskId.index) = (messageCount(ack.taskId.index) - ack.seq).toShort + pendingMessageCount(index) = (messageCount(index) - ack.seq).toShort updateMaxPendingCount() } else { LOG.error(s"Failed! received ack: $ack, received: ${ack.actualReceivedNum}, " + diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala index b43457e19..93aeb628c 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskActor.scala @@ -41,7 +41,6 @@ import org.apache.gearpump.util.{LogUtil, TimeOutScheduler} import scala.collection.JavaConverters._ import scala.concurrent.duration._ - /** * All tasks of Gearpump run inside an Actor. TaskActor is the Actor container for a task. */ @@ -246,7 +245,6 @@ class TaskActor( private def doHandleMessage(): Int = { var done = false - var count = 0 while (allowSendingMoreMessages() && !done) { @@ -339,8 +337,7 @@ class TaskActor( val subWatermark = getSubscriptionWatermark(subscriptions, watermark) watermark = TaskUtil.max(Instant.ofEpochMilli(life.birth), - TaskUtil.min(upstreamWatermark, - TaskUtil.min(processingWatermark, subWatermark))) + TaskUtil.min(upstreamWatermark, processingWatermark, subWatermark)) // Checks whether current task is dead. if (watermark.toEpochMilli > life.death) { @@ -361,10 +358,11 @@ class TaskActor( } private def getSubscriptionWatermark(subs: List[(Int, Subscription)], wmk: Instant): Instant = { + val wmkInMilli = wmk.toEpochMilli Instant.ofEpochMilli(subs.foldLeft(Watermark.MAX.toEpochMilli) { case (min, (_, sub)) => val subWmk = sub.watermark - if (subWmk == wmk.toEpochMilli) { + if (subWmk == wmkInMilli) { sub.onStallingTime(subWmk) } Math.min(min, subWmk) diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala index bd889c4ca..075ba50ff 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskUtil.scala @@ -53,6 +53,10 @@ object TaskUtil { else t1 } + def min(t1: Instant, t2: Instant, t3: Instant): Instant = { + min(t1, min(t2, t3)) + } + /** * @return t1 if t1 is not smaller than t2 and t2 otherwise */ diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala index 1e4430bb6..54b614da9 100644 --- a/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/TaskWrapper.scala @@ -46,6 +46,9 @@ class TaskWrapper( private var task: Option[Task] = None + private val shouldUpdateProcessingWatermark = + !taskClass.isAssignableFrom(classOf[WatermarkProducer]) + def setTaskActor(actor: TaskActor): Unit = this.actor = actor override def appId: Int = context.appId @@ -97,7 +100,12 @@ class TaskWrapper( task.foreach(_.onStart(startTime)) } - override def onNext(msg: Message): Unit = task.foreach(_.onNext(msg)) + override def onNext(msg: Message): Unit = { + task.foreach(_.onNext(msg)) + if (shouldUpdateProcessingWatermark) { + updateWatermark(msg.timestamp) + } + } override def onStop(): Unit = { task.foreach(_.onStop()) diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/task/WatermarkProducer.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/task/WatermarkProducer.scala new file mode 100644 index 000000000..c9de156ce --- /dev/null +++ b/streaming/src/main/scala/org/apache/gearpump/streaming/task/WatermarkProducer.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gearpump.streaming.task + +trait WatermarkProducer