Skip to content

Commit

Permalink
Merge pull request #10 from kailuowang/master
Browse files Browse the repository at this point in the history
Fix #4
  • Loading branch information
kailuowang committed Oct 26, 2015
2 parents 3ed63b9 + b351ded commit 45f6ad3
Show file tree
Hide file tree
Showing 6 changed files with 184 additions and 168 deletions.
4 changes: 4 additions & 0 deletions .sbtopts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-J-Xmx4G
-J-XX:MaxMetaspaceSize=1G
-J-XX:MaxPermSize=1G
-J-XX:+CMSClassUnloadingEnabled
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,12 @@ trait MessageScheduler {
import context.dispatcher
context.system.scheduler.scheduleOnce(delay, receiver, msg)
}
def maybeDelayedMsg(delayO: Option[FiniteDuration], msg: Any, receiver: ActorRef = self ): Option[Cancellable] = {
delayO.map(delayedMsg(_, msg, receiver)).orElse {
receiver ! msg
None
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@ import java.time.{ZoneOffset, LocalDateTime}

import akka.actor.SupervisorStrategy.Restart
import akka.actor._
import com.iheart.workpipeline.akka.patterns.queue.CommonProtocol.{WorkFailed, WorkTimedOut}
import com.iheart.workpipeline.akka.patterns.queue.Worker.Hold
import com.iheart.workpipeline.metrics.{Metric, MetricsCollector, NoOpMetricsCollector}
import com.iheart.workpipeline.akka.helpers.MessageScheduler
import com.iheart.workpipeline.akka.patterns.CommonProtocol.{ShutdownSuccessfully, QueryStatus}
import com.iheart.workpipeline.collection.FiniteCollection._

import QueueProcessor._
import Queue.{Retire}
import scala.concurrent.duration._
Expand All @@ -16,7 +20,11 @@ trait QueueProcessor extends Actor with ActorLogging with MessageScheduler {
val queue: QueueRef
def delegateeProps: Props
def settings: ProcessingWorkerPoolSettings
def resultChecker: ResultChecker
val metricsCollector: MetricsCollector
type ResultHistory = Vector[Boolean]
val resultHistoryLength: Int
protected def onWorkError(resultHistory: ResultHistory, pool: WorkerPool)

metricsCollector.send(Metric.PoolSize(settings.startingPoolSize))

Expand All @@ -25,57 +33,77 @@ trait QueueProcessor extends Actor with ActorLogging with MessageScheduler {
case _: Exception => Restart
}

def workerProp(queueRef: QueueRef, delegateeProps: Props): Props
def workerProp(queueRef: QueueRef, delegateeProps: Props): Props =
Worker.default(queue, delegateeProps)(resultChecker)

def receive: Receive = {
val workers = (1 to settings.startingPoolSize).map(createWorker).toSet
settings.maxProcessingTime.foreach(delayedMsg(_, QueueMaxProcessTimeReached(queue)))
context watch queue

monitoring(workers)
monitoring()(workers)
}


def monitoring(pool: WorkerPool): Receive = {
def monitoring(resultHistory: ResultHistory = Vector.empty)(pool: WorkerPool): Receive = {
def workError(): Unit = {
val newHistory = resultHistory.enqueueFinite(false, resultHistoryLength)
context become monitoring(newHistory)(pool)
onWorkError(newHistory, pool)
}

case ScaleTo(newPoolSize, reason) =>
log.info(s"Command to scale to $newPoolSize, currently at ${pool.size} due to ${reason.getOrElse("no reason given")}")
metricsCollector.send(Metric.PoolSize(newPoolSize))
{
case ScaleTo(newPoolSize, reason) =>
log.info(s"Command to scale to $newPoolSize, currently at ${pool.size} due to ${reason.getOrElse("no reason given")}")
metricsCollector.send(Metric.PoolSize(newPoolSize))

val diff = newPoolSize - pool.size
if (diff > 0)
context become monitoring(pool ++ (1 to diff).map(createWorker))
else if (diff < 0 && newPoolSize >= settings.minPoolSize)
pool.take(-diff).foreach(_ ! Worker.Retire)
val diff = newPoolSize - pool.size
if (diff > 0)
context become monitoring(resultHistory)(pool ++ (1 to diff).map(createWorker))
else if (diff < 0 && newPoolSize >= settings.minPoolSize)
pool.take(-diff).foreach(_ ! Worker.Retire)

case MissionAccomplished(worker) =>
removeWorker(pool, worker, monitoring, "successfully after all work is done")
case MissionAccomplished(worker) =>
removeWorker(pool, worker, monitoring(resultHistory), "successfully after all work is done")

case Terminated(worker) if pool.contains(worker) =>
removeWorker(pool, worker, monitoring, "unexpected termination when all workers retired")
case WorkCompleted(worker) =>
context become monitoring(resultHistory.enqueueFinite(true, resultHistoryLength))(pool)
metricsCollector.send(Metric.WorkCompleted)

case Terminated(`queue`) =>
log.info(s"Queue ${queue.path} is terminated")
self ! Shutdown(retireQueue = false)
case WorkFailed(_) =>
workError()
metricsCollector.send(Metric.WorkFailed)

case QueueMaxProcessTimeReached(queue) =>
log.warning(s"Queue ${queue.path} is still processing after max process time. Shutting Down")
self ! Shutdown(retireQueue = true)
case WorkTimedOut(_) =>
workError()
metricsCollector.send(Metric.WorkTimedOut)

case qs: QueryStatus => qs reply RunningStatus(pool)
case Terminated(worker) if pool.contains(worker) =>
removeWorker(pool, worker, monitoring(resultHistory), "unexpected termination when all workers retired")

case Shutdown(reportTo, timeout, retireQueue) =>
log.info("Commanded to shutdown. Shutting down")
if(retireQueue)
queue ! Retire(timeout)
else //retire from the workers' side
pool.foreach(_ ! Worker.Retire)
case Terminated(`queue`) =>
log.info(s"Queue ${queue.path} is terminated")
self ! Shutdown(retireQueue = false)

delayedMsg(timeout, ShutdownTimeout)
context become shuttingDown(pool, reportTo)
case QueueMaxProcessTimeReached(queue) =>
log.warning(s"Queue ${queue.path} is still processing after max process time. Shutting Down")
self ! Shutdown(retireQueue = true)

case qs: QueryStatus => qs reply RunningStatus(pool)

case Shutdown(reportTo, timeout, retireQueue) =>
log.info("Commanded to shutdown. Shutting down")
if(retireQueue)
queue ! Retire(timeout)
else //retire from the workers' side
pool.foreach(_ ! Worker.Retire)

delayedMsg(timeout, ShutdownTimeout)
context become shuttingDown(pool, reportTo)
}: Receive
}


def shuttingDown(pool: WorkerPool, reportTo: Option[ActorRef]): Receive = {
case MissionAccomplished(worker) =>
removeWorker(pool, worker, shuttingDown(_, reportTo), "successfully after command", reportTo)
Expand Down Expand Up @@ -139,8 +167,10 @@ case class DefaultQueueProcessor(queue: QueueRef,
settings: ProcessingWorkerPoolSettings,
metricsCollector: MetricsCollector = NoOpMetricsCollector,
resultChecker: ResultChecker) extends QueueProcessor {
def workerProp(queue: QueueRef, delegateeProps: Props): Props =
Worker.default(queue, delegateeProps, metricsCollector)(resultChecker)

override val resultHistoryLength: Int = 0

override protected def onWorkError(resultHistory: ResultHistory, pool: WorkerPool): Unit = () //do nothing
}

case class QueueProcessorWithCircuitBreaker(queue: QueueRef,
Expand All @@ -149,8 +179,13 @@ case class QueueProcessorWithCircuitBreaker(queue: QueueRef,
circuitBreakerSettings: CircuitBreakerSettings,
metricsCollector: MetricsCollector = NoOpMetricsCollector,
resultChecker: ResultChecker) extends QueueProcessor {
def workerProp(queue: QueueRef, delegateeProps: Props): Props =
Worker.withCircuitBreaker(queue, delegateeProps, circuitBreakerSettings, metricsCollector)(resultChecker)

override val resultHistoryLength: Int = circuitBreakerSettings.historyLength

override protected def onWorkError(resultHistory: ResultHistory, pool: WorkerPool): Unit = {
if( (resultHistory.count(r => !r).toDouble / resultHistoryLength) >= circuitBreakerSettings.errorRateThreshold )
pool.foreach(_ ! Hold(circuitBreakerSettings.closeDuration))
}

}

Expand All @@ -164,7 +199,10 @@ object QueueProcessor {
}



case class MissionAccomplished(worker: WorkerRef)
case class WorkCompleted(worker: WorkerRef)

case class QueueMaxProcessTimeReached(queue: QueueRef)
case class RunningStatus(pool: WorkerPool)
case object ShuttingDown
Expand Down
Loading

0 comments on commit 45f6ad3

Please sign in to comment.