Skip to content

Commit

Permalink
Merge pull request #9 from williamho/monitor
Browse files Browse the repository at this point in the history
Add support for StatsD metrics
  • Loading branch information
kailuowang committed Oct 23, 2015
2 parents b47cb4e + 3d4e124 commit 3ed63b9
Show file tree
Hide file tree
Showing 17 changed files with 848 additions and 114 deletions.
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ version in ThisBuild := "1.0.0-" + Versions.releaseType

scalaVersion in ThisBuild := Versions.scala

lazy val workPipeLine = project in file(".")
lazy val workPipeline = project in file(".")

resolvers ++= Dependencies.resolvers

libraryDependencies ++= Dependencies.akka ++
Dependencies.test
Dependencies.test ++
Dependencies.other

scalacOptions ++= List("-feature", "-deprecation", "-unchecked", "-Xlint")

4 changes: 4 additions & 0 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,8 @@ object Dependencies {
"org.specs2" %% "specs2-scalacheck" % Versions.specs2 % "test"
)

val other = Seq(
"org.scala-lang" % "scala-reflect" % Versions.scala)

}

62 changes: 62 additions & 0 deletions src/main/scala/com/iheart/util/ConfigWrapper.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package com.iheart.util

import scala.reflect.runtime.universe.{ typeOf, TypeTag }
import com.typesafe.config._
import scala.collection.JavaConverters._

object ConfigWrapper {
implicit class ImplicitConfigWrapper(underlying: Config) extends ConfigWrapper(underlying)
}

/** Wrapper for Typesafe Config that provides a more convenient interface for optional values
*
* @constructor wrap Typesafe Config
* @param underlying the original Config object
*/
class ConfigWrapper(underlying: Config) {
/** Get value at path, throws exception if path not found or type mismatch */
def get[T: TypeTag](path: String): T = getWithMode(path, Mode.uncaughtMode)

/** Get optional value at path, returns None if path not found */
def getOption[T: TypeTag](path: String): Option[T] = getWithMode(path, Mode.optionMode)

/** Convenience method for getting a value, with a fallback if path not found */
def getOrElse[T: TypeTag](path: String, fallback: => T): T = getOption[T](path).getOrElse(fallback)

protected def getWithMode[T: TypeTag](path: String, mode: Mode): mode.Type[T] = typeOf[T] match {
case t if t =:= typeOf[Boolean] =>
getValue(path, mode, _.getBoolean)
case t if t =:= typeOf[List[Boolean]] =>
getValue(path, mode, c => c.getBooleanList(_).asScala.map(_ == true).toList)

case t if t =:= typeOf[Double] =>
getValue(path, mode, _.getDouble)
case t if t =:= typeOf[List[Double]] =>
getValue(path, mode, c => c.getDoubleList(_).asScala.map(_.toDouble).toList)

case t if t =:= typeOf[Int] =>
getValue(path, mode, _.getInt)
case t if t =:= typeOf[List[Int]] =>
getValue(path, mode, c => c.getIntList(_).asScala.map(_.toInt).toList)

case t if t =:= typeOf[String] =>
getValue(path, mode, _.getString)
case t if t =:= typeOf[List[String]] =>
getValue(path, mode, c => c.getStringList(_).asScala.toList)

case t if t =:= typeOf[Config] =>
getValue(path, mode, _.getConfig)

case t => throw new MatchError(s"Unsupported type `${t.typeSymbol.fullName}` for ConfigWrapper.get")
}

protected def getValue[T](path: String, mode: Mode, f: Config => String => Any): mode.Type[T] =
mode.wrapException[T, ConfigException.Missing] {
if (underlying.hasPath(path)) {
f(underlying)(path).asInstanceOf[T]
} else {
throw new ConfigException.Missing(path)
}
}
}

41 changes: 41 additions & 0 deletions src/main/scala/com/iheart/util/Mode.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.iheart.util

import scala.language.higherKinds
import scala.reflect.ClassTag
import scala.util.{ Try, Success, Failure }

// based on http://blog.scalac.io/2015/05/28/scala-modes.html
trait Mode {
type Type[+_]

/** Wrap all exceptions */
def wrap[T](body: T): Type[T] = wrapException[T, Exception](body)

/** Wrap a specific type of exception -- all other exception types will throw */
def wrapException[T, E <: Exception: ClassTag](body: T): Type[T]
}

object Mode {
/** uncaughtMode doesn't handle exceptions */
val uncaughtMode = new Mode {
type Type[+T] = T

def wrapException[T, E <: Exception: ClassTag](body: T): Type[T] = body
}

/** optionMode wraps the result in an Option, if exception type matches */
val optionMode = new Mode {
type Type[+T] = Option[T]

def wrapException[T, E <: Exception: ClassTag](body: T): Type[T] =
try { Option(body) } catch { case e: E None }
}

/** tryMode wraps the result in a Try, for all exception types */
val tryMode = new Mode {
type Type[+T] = Try[T]

def wrapException[T, E <: Exception: ClassTag](body: T): Type[T] = Try(body)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import ActorDSL._
import com.iheart.workpipeline.akka.patterns.CommonProtocol.ShutdownGracefully
import com.iheart.workpipeline.akka.patterns.WorkPipeline.Settings
import com.iheart.workpipeline.akka.patterns.queue._
import com.iheart.util.ConfigWrapper.ImplicitConfigWrapper
import com.iheart.workpipeline.metrics.{MetricsCollector, NoOpMetricsCollector}
import com.typesafe.config.{Config, ConfigFactory}
import queue.CommonProtocol.WorkRejected
import queue.Queue.{EnqueueRejected, WorkEnqueued, Enqueue}
import EnqueueRejected.OverCapacity
Expand All @@ -16,20 +19,23 @@ trait WorkPipeline extends Actor {
protected def pipelineSettings: WorkPipeline.Settings
def backendProps: Props
def resultChecker: ResultChecker
def metricsCollector: MetricsCollector

protected def queueProps: Props

protected lazy val queue = context.actorOf(queueProps, name + "-backing-queue")


private val processor = context.actorOf(QueueProcessor.withCircuitBreaker(queue,
backendProps,
pipelineSettings.workerPool,
pipelineSettings.circuitBreaker)(resultChecker), name + "-queue-processor")
pipelineSettings.circuitBreaker,
metricsCollector)(resultChecker), name + "-queue-processor")

context watch processor

private val autoScaler = pipelineSettings.autoScalingSettings.foreach { s =>
context.actorOf(AutoScaling.default(queue, processor, s), name + "-auto-scaler" )
context.actorOf(AutoScaling.default(queue, processor, s, metricsCollector), name + "-auto-scaler" )
}

def receive: Receive = ({
Expand Down Expand Up @@ -74,12 +80,15 @@ object WorkPipeline {
case class PushingWorkPipeline(name: String,
settings: PushingWorkPipeline.Settings,
backendProps: Props,
metricsCollector: MetricsCollector = NoOpMetricsCollector,
resultChecker: ResultChecker)
extends WorkPipeline {

protected val pipelineSettings = settings.workPipeLineSettings
protected lazy val pipelineSettings = settings.workPipelineSettings

protected lazy val queueProps = Queue.withBackPressure(
settings.backPressureSettings, WorkSettings(), metricsCollector)

protected val queueProps = Queue.withBackPressure(settings.backPressureSettings, WorkSettings())

override def extraReceive: Receive = {
case m context.actorOf(PushingWorkPipeline.handlerProps(settings, queue)) forward m
Expand All @@ -90,7 +99,7 @@ object PushingWorkPipeline {
private class Handler(settings: Settings, queue: ActorRef) extends Actor with ActorLogging {
def receive: Receive = {
case msg =>
queue ! Enqueue(msg, Some(self), Some(WorkSettings(settings.workPipeLineSettings.workRetry, settings.workPipeLineSettings.workTimeout, Some(sender))))
queue ! Enqueue(msg, Some(self), Some(WorkSettings(settings.workPipelineSettings.workRetry, settings.workPipelineSettings.workTimeout, Some(sender))))
context become waitingForQueueConfirmation(sender)
}

Expand All @@ -110,8 +119,14 @@ object PushingWorkPipeline {
Props(new Handler(settings, queue))
}

def props(name: String, settings: Settings, backendProps: Props)
(resultChecker: ResultChecker) = Props(PushingWorkPipeline(name, settings, backendProps, resultChecker))
def props(name: String,
settings: Settings,
backendProps: Props,
metricsConfig: Config = ConfigFactory.empty)
(resultChecker: ResultChecker)(implicit system: ActorSystem) = {
val metricsCollector = MetricsCollector.fromConfig(name, metricsConfig)
Props(PushingWorkPipeline(name, settings, backendProps, metricsCollector, resultChecker))
}

val defaultBackPressureSettings = BackPressureSettings(
maxBufferSize = 60000,
Expand All @@ -120,7 +135,7 @@ object PushingWorkPipeline {



case class Settings(workPipeLineSettings: WorkPipeline.Settings, backPressureSettings: BackPressureSettings)
case class Settings(workPipelineSettings: WorkPipeline.Settings, backPressureSettings: BackPressureSettings)

val defaultSettings: Settings = Settings(WorkPipeline.defaultWorkPipelineSettings, defaultBackPressureSettings)

Expand All @@ -131,14 +146,22 @@ case class PullingWorkPipeline( name: String,
iterator: Iterator[_],
pipelineSettings: WorkPipeline.Settings,
backendProps: Props,
metricsCollector: MetricsCollector = NoOpMetricsCollector,
resultChecker: ResultChecker) extends WorkPipeline {

protected def queueProps = QueueOfIterator.props(iterator, WorkSettings())
protected def queueProps = QueueOfIterator.props(iterator, WorkSettings(), metricsCollector)

}

object PullingWorkPipeline {
def props(name: String, iterator: Iterator[_], settings: WorkPipeline.Settings, backendProps: Props)
(resultChecker: ResultChecker) = Props(PullingWorkPipeline(name, iterator, settings, backendProps, resultChecker))
def props(name: String,
iterator: Iterator[_],
settings: WorkPipeline.Settings,
backendProps: Props,
metricsConfig: Config = ConfigFactory.empty)
(resultChecker: ResultChecker)(implicit system: ActorSystem) = {
val metricsCollector = MetricsCollector.fromConfig(name, metricsConfig)
Props(PullingWorkPipeline(name, iterator, settings, backendProps, metricsCollector, resultChecker))
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import com.iheart.workpipeline.akka.helpers.MessageScheduler
import com.iheart.workpipeline.akka.patterns.queue.Queue.QueueDispatchInfo
import com.iheart.workpipeline.akka.patterns.queue.QueueProcessor.ScaleTo
import com.iheart.workpipeline.akka.patterns.queue.Worker.{Working, WorkerStatus}
import com.iheart.workpipeline.metrics.{MetricsCollector, NoOpMetricsCollector, Metric}

import scala.concurrent.duration._
import scala.util.Random
Expand All @@ -17,6 +18,8 @@ import scala.language.implicitConversions
trait AutoScaling extends Actor with ActorLogging with MessageScheduler {
val queue: QueueRef
val processor: QueueProcessorRef
val metricsCollector: MetricsCollector

//accessible only for testing purpose
private[queue] var perfLog: PerformanceLog = Map.empty
private[queue] var underUtilizationStreak: Option[UnderUtilizationStreak] = None
Expand All @@ -33,7 +36,7 @@ trait AutoScaling extends Actor with ActorLogging with MessageScheduler {
case Terminated(`queue`) | Terminated(`processor`) | Queue.Retiring | QueueProcessor.ShuttingDown => context stop self
}

private def idle: Receive = watchingQueueAndProcessor orElse {
private def idle: Receive = watchingQueueAndProcessor orElse {
case OptimizeOrExplore =>
queue ! QueryStatus()
delayedMsg(statusCollectionTimeout, StatusCollectionTimedOut)
Expand Down Expand Up @@ -93,11 +96,15 @@ trait AutoScaling extends Actor with ActorLogging with MessageScheduler {

val fullyUtilized = utilization == currentSize

// Send metrics
metricsCollector.send(Metric.PoolSize(currentSize))
metricsCollector.send(Metric.PoolUtilized(utilization))

underUtilizationStreak = if (!fullyUtilized)
underUtilizationStreak.map(s s.copy(highestUtilization = Math.max(s.highestUtilization, utilization))) orElse Some(UnderUtilizationStreak(LocalDateTime.now, utilization))
else None

if(fullyUtilized) {
if (fullyUtilized) {
val toUpdate = perfLog.get(currentSize).fold(dispatchWait) { oldSpeed
val nanos = (oldSpeed.toNanos * (1d - weightOfLatestMetric)) + (dispatchWait.toNanos * weightOfLatestMetric)
Duration.fromNanos(nanos)
Expand Down Expand Up @@ -157,7 +164,7 @@ object AutoScaling {

private case class SystemStatus(dispatchWait: Option[Duration] = None,
workerPool: Option[WorkerPool] = None,
workersStatus: List[WorkerStatus] = Nil ) {
workersStatus: List[WorkerStatus] = Nil) {
def collected: Boolean = (for {
_ <- dispatchWait
pool <- workerPool
Expand All @@ -171,9 +178,17 @@ object AutoScaling {

private[queue] case class UnderUtilizationStreak(start: LocalDateTime, highestUtilization: Int)

private[queue]type PerformanceLog = Map[PoolSize, Duration]
private[queue] type PerformanceLog = Map[PoolSize, Duration]

case class Default(queue: QueueRef, processor: QueueProcessorRef, settings: AutoScalingSettings) extends AutoScaling
case class Default(queue: QueueRef,
processor: QueueProcessorRef,
settings: AutoScalingSettings,
metricsCollector: MetricsCollector = NoOpMetricsCollector) extends AutoScaling

def default(queue: QueueRef, processor: QueueProcessorRef, settings: AutoScalingSettings) = Props(Default(queue, processor, settings))
def default(queue: QueueRef,
processor: QueueProcessorRef,
settings: AutoScalingSettings,
metricsCollector: MetricsCollector = NoOpMetricsCollector) =
Props(Default(queue, processor, settings, metricsCollector))
}

Loading

0 comments on commit 3ed63b9

Please sign in to comment.