Skip to content
Permalink
Browse files

Upgrade kamon (#127)

* Upgrade Kamon libs and make it possible to trace IO actions

* Update Kamon in the master node

* Upgrade Docker related plugins and setup Prometheus

* Setup prometheus as monitoring tool
  • Loading branch information...
A. Alonso Dominguez
A. Alonso Dominguez committed Feb 10, 2018
1 parent fdb0157 commit bc5b2e0fab50c47b8065d4c58bf239e5caa2b2d0
@@ -274,7 +274,7 @@ lazy val master = (project in file("master"))
.settings(
moduleName := "quckoo-master",
scalaJSProjects := Seq(console),
dockerExposedPorts := Seq(2551, 8095),
dockerExposedPorts := Seq(2551, 8095, 9095),
parallelExecution in Test := false,
parallelExecution in MultiJvm := false
)
@@ -289,7 +289,7 @@ lazy val worker = (project in file("worker"))
.settings(Dependencies.clusterWorker)
.settings(
moduleName := "quckoo-worker",
dockerExposedPorts := Seq(5001, 9010),
dockerExposedPorts := Seq(5001, 9010, 9095),
parallelExecution in Test := false
)
.dependsOn(shared % "compile->compile;test->test", testSupportJVM % Test)
@@ -177,17 +177,21 @@ cassandra-snapshot-store {
keyspace = "quckoo_snapshot"
}

kamon.metric.filters {
akka-actor {
kamon.util.filters {
"akka.tracked-actor" {
includes = [ "QuckooClusterSystem/user/quckoo/**" ]
}

akka-dispatcher {
"akka.tracked-dispatcher" {
includes = [
"QuckooClusterSystem/akka.actor.default-dispatcher",
"QuckooClusterSystem/quckoo.resolver.dispatcher",
"QuckooClusterSystem/quckoo.api-dispatcher",
"QuckooClusterSystem/quckoo.trigger-dispatcher"
]
}

"akka.traced-actor" {
includes = [ "**" ]
}
}
@@ -25,7 +25,6 @@ import akka.pattern._
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, OverflowStrategy}
import akka.stream.scaladsl.Source
import akka.util.Timeout

import cats.data.{EitherT, ValidatedNel}
import cats.implicits._

@@ -41,6 +40,9 @@ import io.quckoo.protocol.scheduler._
import io.quckoo.protocol.cluster._
import io.quckoo.protocol.worker.WorkerEvent

import _root_.kamon.Kamon
import _root_.kamon.prometheus.PrometheusReporter

import slogging._

import scala.concurrent.{ExecutionContext, Future, Promise}
@@ -72,14 +74,17 @@ object QuckooFacade extends LazyLogging {
}
}

def startMonitoring(implicit ec: ExecutionContext): Future[Unit] =
Future(Kamon.addReporter(new PrometheusReporter))

val promise = Promise[Unit]()
val journal = new QuckooProductionJournal
val guardian =
system.actorOf(QuckooGuardian.props(settings, journal, promise), "quckoo")

import system.dispatcher
(promise.future, startHttpListener(new QuckooFacade(guardian)))
.mapN((_, _) => ())
(promise.future, startHttpListener(new QuckooFacade(guardian)), startMonitoring)
.mapN((_, _, _) => ())
}

}
@@ -25,8 +25,6 @@ import io.quckoo.cluster.config.ClusterSettings
import io.quckoo.cluster.{QuckooFacade, SystemName}
import io.quckoo.time.implicits.systemClock

import kamon.Kamon

import slogging._

import scopt.OptionParser
@@ -75,14 +73,12 @@ object Boot extends LazyLogging {

parser.parse(args, CliOptions()).foreach { opts =>
logger.info(s"Starting Quckoo Server ${Info.version}...\n" + Logo)
Kamon.start()

val config = opts.toConfig.withFallback(ConfigFactory.load())

implicit val system = ActorSystem(SystemName, config)
sys.addShutdownHook {
logger.info("Received kill signal, terminating...")
Kamon.shutdown()
}

ClusterSettings(config)
@@ -25,9 +25,6 @@ import io.quckoo.{JobId, JobSpec}
import io.quckoo.api.TopicTag
import io.quckoo.protocol.registry._

import kamon.Kamon
import kamon.metric.instrument.MinMaxCounter

/**
* Created by alonsodomin on 15/04/2016.
*/
@@ -64,11 +61,6 @@ class PersistentJob extends PersistentActor with ActorLogging with Stash {
private[this] val mediator = DistributedPubSub(context.system).mediator
private[this] var stateDuringRecovery: Option[JobSpec] = None

private[this] val enabledJobCounter: MinMaxCounter =
Kamon.metrics.minMaxCounter("enabled-jobs")
private[this] val disabledJobCounter: MinMaxCounter =
Kamon.metrics.minMaxCounter("disabled-jobs")

override def persistenceId: String = s"$PersistenceIdPrefix-${self.path.name}"

override def receiveRecover: Receive = {
@@ -101,7 +93,6 @@ class PersistentJob extends PersistentActor with ActorLogging with Stash {
case CreateJob(jobId, jobSpec) =>
persist(JobAccepted(jobId, jobSpec)) { event =>
log.info("Job '{}' has been successfully registered.", jobId)
enabledJobCounter.increment()
mediator ! DistributedPubSubMediator.Publish(TopicTag.Registry.name, event)
unstashAll()
context.become(enabled(jobId, jobSpec))
@@ -117,9 +108,6 @@ class PersistentJob extends PersistentActor with ActorLogging with Stash {

case DisableJob(`jobId`) =>
persist(JobDisabled(jobId)) { event =>
enabledJobCounter.decrement()
disabledJobCounter.increment()

mediator ! DistributedPubSubMediator.Publish(TopicTag.Registry.name, event)
sender() ! event
context.become(disabled(jobId, spec.copy(disabled = true)))
@@ -136,9 +124,6 @@ class PersistentJob extends PersistentActor with ActorLogging with Stash {

case EnableJob(`jobId`) =>
persist(JobEnabled(jobId)) { event =>
enabledJobCounter.increment()
disabledJobCounter.decrement()

mediator ! DistributedPubSubMediator.Publish(TopicTag.Registry.name, event)
sender() ! event
context.become(enabled(jobId, spec.copy(disabled = false)))
@@ -40,7 +40,7 @@ import io.quckoo.protocol.registry._
import io.quckoo.resolver.Resolver
import io.quckoo.resolver.ivy.IvyResolver

import kamon.trace.Tracer
import kamon.Kamon

import scala.concurrent._
import scala.concurrent.duration._
@@ -129,11 +129,11 @@ class Registry private[registry] (resolver: Resolver[IO], journal: QuckooJournal

case RegisterJob(spec) =>
val registrationTrackId = s"registration-${UUID.randomUUID()}"
Tracer.withNewContext(registrationTrackId) {
implicit val r = resolver
val registrationProps = Registration.props(spec, shardRegion, sender())
context.actorOf(registrationProps, registrationTrackId)
}
Kamon.currentSpan.tag("registration-track-id", registrationTrackId)

implicit val r = resolver
val registrationProps = Registration.props(spec, shardRegion, sender())
context.actorOf(registrationProps, registrationTrackId)

case GetJobs =>
val origSender = sender()
@@ -151,9 +151,7 @@ class Registry private[registry] (resolver: Resolver[IO], journal: QuckooJournal

case get @ GetJob(jobId) =>
if (jobIds.contains(jobId)) {
Tracer.withNewContext(s"get-job-$jobId") {
shardRegion forward get
}
shardRegion forward get
} else {
sender() ! JobNotFound(jobId)
}
@@ -31,8 +31,6 @@ import io.quckoo.api.TopicTag
import io.quckoo.protocol.registry._
import io.quckoo.protocol.scheduler._

import kamon.trace.Tracer

import scala.concurrent.duration._

/**
@@ -313,10 +311,8 @@ class ExecutionDriver(
}

// Instantiate a new execution lifecycle
val lifecycle = Tracer.withNewContext(s"execution-${task.id}") {
val lifecycleProps = lifecycleFactory.executionLifecycleProps(state)
context.watch(context.actorOf(lifecycleProps, task.id.toString))
}
val lifecycleProps = lifecycleFactory.executionLifecycleProps(state)
val lifecycle = context.watch(context.actorOf(lifecycleProps, task.id.toString))

// Create a trigger to fire the task
val trigger = createTrigger(task, state.planId, lifecycle, time)
@@ -23,8 +23,6 @@ import akka.persistence.fsm.{LoggingPersistentFSM, PersistentFSM}
import io.quckoo.{QuckooError, PlanId, Task, TaskExecution}
import io.quckoo.cluster.scheduler.TaskQueue.EnqueueAck

import kamon.trace.Tracer

import scala.concurrent.duration._
import scala.reflect.ClassTag

@@ -202,13 +200,11 @@ class ExecutionLifecycle(
override def applyEvent(event: ExecutionEvent, previous: ExecutionState): ExecutionState =
event match {
case Awaken(task, `planId`, queue) =>
Tracer.withNewContext(s"task-${task.id}") {
log.debug(
"Execution lifecycle for task '{}' is allocating a slot at the local queue.",
task.id
)
queue ! TaskQueue.Enqueue(task)
}
log.debug(
"Execution lifecycle for task '{}' is allocating a slot at the local queue.",
task.id
)
queue ! TaskQueue.Enqueue(task)
previous.copy(task = Some(task), queue = Some(queue))

case event @ Triggered(task) =>
@@ -37,8 +37,6 @@ import io.quckoo.cluster.protocol._
import io.quckoo.protocol.registry._
import io.quckoo.protocol.scheduler._

import kamon.trace.Tracer

import scala.concurrent._
import scala.concurrent.duration._

@@ -130,10 +128,8 @@ class Scheduler(journal: QuckooJournal, registry: ActorRef, queueProps: Props)(
context become warmingUp

case cmd: ScheduleJob =>
Tracer.withNewContext(s"schedule-${cmd.jobId}") {
val handler = context.actorOf(jobFetcherProps(cmd.jobId, sender(), cmd))
registry.tell(GetJob(cmd.jobId), handler)
}
val handler = context.actorOf(jobFetcherProps(cmd.jobId, sender(), cmd))
registry.tell(GetJob(cmd.jobId), handler)

case cmd @ CreateExecutionDriver(_, config, _) =>
val planId = PlanId(UUID.randomUUID())
@@ -143,16 +139,12 @@ class Scheduler(journal: QuckooJournal, registry: ActorRef, queueProps: Props)(

case cancel: CancelExecutionPlan =>
log.debug("Starting execution driver termination process for plan '{}'.", cancel.planId)
Tracer.withNewContext(s"cancel-${cancel.planId}") {
val props = terminatorProps(cancel, sender())
context.actorOf(props, s"execution-driver-terminator-${cancel.planId}")
}
val props = terminatorProps(cancel, sender())
context.actorOf(props, s"execution-driver-terminator-${cancel.planId}")

case get @ GetExecutionPlan(planId) =>
if (planIds.contains(planId)) {
Tracer.withNewContext(s"get-plan-$planId") {
shardRegion forward get
}
shardRegion forward get
} else {
sender() ! ExecutionPlanNotFound(planId)
}
@@ -183,9 +175,7 @@ class Scheduler(journal: QuckooJournal, registry: ActorRef, queueProps: Props)(
Source(executions).runWith(Sink.actorRef(sender(), Status.Success(GetTaskExecutions)))

case msg: WorkerMessage =>
Tracer.withNewContext(s"worker-${msg.workerId}") {
taskQueue forward msg
}
taskQueue forward msg

case event: SchedulerEvent =>
handleEvent(event)
@@ -26,7 +26,7 @@ import io.quckoo.cluster.net._
import io.quckoo.protocol.worker._

import kamon.Kamon
import kamon.metric.instrument.MinMaxCounter
import kamon.metric.Gauge

import scala.collection.immutable.Queue
import scala.concurrent.duration._
@@ -80,10 +80,10 @@ class TaskQueue private[scheduler] (maxWorkTimeout: FiniteDuration)
private[this] var inProgressTasks = Map.empty[TaskId, ActorRef]
private[this] var workerRemoveTasks = Map.empty[NodeId, Cancellable]

private[this] val pendingCounter: MinMaxCounter =
Kamon.metrics.minMaxCounter("pending-tasks")
private[this] val inProgressCounter: MinMaxCounter =
Kamon.metrics.minMaxCounter("inprogress-tasks")
private[this] val pendingCounter: Gauge =
Kamon.gauge("pending-tasks")
private[this] val inProgressCounter: Gauge =
Kamon.gauge("inprogress-tasks")

private[this] val cleanupTask = createCleanUpTask()

@@ -9,17 +9,17 @@ object Dependencies {

// Logging -------

val slogging = "0.6.0"
val slogging = "0.6.1"
val log4j = "2.10.0"
val slf4j = "1.7.25"

// Testing --------

val scalaTest = "3.0.4"
val scalaTest = "3.0.5"
val scalaCheck = "1.13.5"
val scalaMock = "3.6.0"
val discipline = "0.8"
val wiremock = "2.13.0"
val wiremock = "2.14.0"

// Akka ----------

@@ -45,8 +45,12 @@ object Dependencies {
// Monitoring ----

object kamon {
val core = "0.6.7"
val http = "0.6.8"
val core = "1.0.1"
val akka = "1.0.0"
val scala = "1.0.0"
val sysmetrics = "1.0.0"
val prometheus = "1.0.0"
val statsd = "0.6.7"
}

// ScalaJS -------
@@ -138,11 +142,12 @@ object Dependencies {

object Kamon {
val core = "io.kamon" %% "kamon-core" % version.kamon.core
val akka = "io.kamon" %% "kamon-akka-remote-2.4" % version.kamon.core
val http = "io.kamon" %% "kamon-akka-http" % version.kamon.http
val scala = "io.kamon" %% "kamon-scala" % version.kamon.core
val sysmetrics = "io.kamon" %% "kamon-system-metrics" % version.kamon.core
val statsd = "io.kamon" %% "kamon-statsd" % version.kamon.core
val akka = "io.kamon" %% "kamon-akka-remote-2.5" % version.kamon.akka
val http = "io.kamon" %% "kamon-akka-http-2.5" % version.kamon.core
val scala = "io.kamon" %% "kamon-scala-future" % version.kamon.scala
val sysmetrics = "io.kamon" %% "kamon-system-metrics" % version.kamon.sysmetrics
val prometheus = "io.kamon" %% "kamon-prometheus" % version.kamon.prometheus
val statsd = "io.kamon" %% "kamon-statsd" % version.kamon.statsd
}

val slf4j = "org.slf4j" % "slf4j-api" % version.slf4j
@@ -325,7 +330,7 @@ object Dependencies {
libraryDependencies ++= compiler.plugins ++ Log4j.All ++ Seq(
Akka.actor, Akka.slf4j, Akka.clusterTools, Akka.clusterMetrics,
Akka.kryo, ivy, scalaXml, pureconfig, slogging_slf4j,
Kamon.core, Kamon.akka, Kamon.scala, Kamon.sysmetrics, Kamon.statsd,
Kamon.core, Kamon.akka, Kamon.scala, Kamon.sysmetrics, Kamon.prometheus,
betterfiles,

"eu.timepit" %% "refined-pureconfig" % version.refined
@@ -20,10 +20,10 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-twirl" % "1.3.13")

// Server side plugins
addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.2")
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.3.3")
addSbtPlugin("com.typesafe.sbt" % "sbt-multi-jvm" % "0.4.0")
addSbtPlugin("com.lightbend.sbt" % "sbt-aspectj" % "0.11.0")
addSbtPlugin("com.tapad" % "sbt-docker-compose" % "1.0.31")
addSbtPlugin("com.tapad" % "sbt-docker-compose" % "1.0.34")

// Code generators
addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.7.0")

0 comments on commit bc5b2e0

Please sign in to comment.
You can’t perform that action at this time.