Skip to content

Commit

Permalink
Start project with jobs paused (#294)
Browse files Browse the repository at this point in the history
* Start project with jobs paused

Starting a project with lots of executions can lead to issues, such as
full CPU or being killed not answering a health-check in time.
By pausing them at startup, it can allow to analyze / fix the state
before starting jobs.
  • Loading branch information
vguerci committed Jul 27, 2018
1 parent f072e81 commit cd76d36
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 13 deletions.
9 changes: 8 additions & 1 deletion core/src/main/scala/com/criteo/cuttle/CuttleProject.scala
Expand Up @@ -3,6 +3,7 @@ package com.criteo.cuttle
import lol.http._

import com.criteo.cuttle.ExecutionContexts._, Implicits.serverExecutionContext
import com.criteo.cuttle.Auth.User

/**
* A cuttle project is a workflow to execute with the appropriate scheduler.
Expand Down Expand Up @@ -34,11 +35,17 @@ class CuttleProject[S <: Scheduling] private[cuttle] (
platforms: Seq[ExecutionPlatform] = CuttleProject.defaultPlatforms,
httpPort: Int = 8888,
databaseConfig: DatabaseConfig = DatabaseConfig.fromEnv,
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy,
paused: Boolean = false
): Unit = {
val xa = Database.connect(databaseConfig)
val executor = new Executor[S](platforms, xa, logger, name)(retryStrategy)

if (paused) {
logger.info("Pausing workflow")
executor.pauseJobs(workflow.vertices)(User("Startup"))
}

logger.info("Start workflow")
scheduler.start(workflow, executor, xa, logger)

Expand Down
34 changes: 22 additions & 12 deletions core/src/main/scala/com/criteo/cuttle/Executor.scala
Expand Up @@ -612,20 +612,24 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
ExecutionStreams.getStreams(executionId, queries, xa)

private[cuttle] def pauseJobs(jobs: Set[Job[S]])(implicit user: User): Unit = {
val pauseDate = Instant.now()
val pausedJobs = jobs.map(job => PausedJob(job.id, user, pauseDate))
val pauseQuery = pausedJobs.map(queries.pauseJob).reduceLeft(_ *> _)

val executionsToCancel = atomic { implicit tx =>
val pauseDate = Instant.now()
val pausedJobIds = pausedJobs.map(_.id)
val jobsToPause = jobs
.filter(job => !pausedJobIds.contains(job.id))
.map(job => PausedJob(job.id, user, pauseDate))
if (jobsToPause.isEmpty) return

val pauseQuery = jobsToPause.map(queries.pauseJob).reduceLeft(_ *> _)
Txn.setExternalDecider(new ExternalDecider {
def shouldCommit(implicit txn: InTxnEnd): Boolean = {
pauseQuery.transact(xa).unsafeRunSync
true
}
})

pausedJobs.flatMap { pausedJob =>
pausedState.getOrElseUpdate(pausedJob.id, pausedJob.toPausedJobWithExecutions())
jobsToPause.flatMap { pausedJob =>
pausedState.update(pausedJob.id, pausedJob.toPausedJobWithExecutions())
runningState.filterKeys(_.job.id == pausedJob.id).keys ++ throttledState
.filterKeys(_.job.id == pausedJob.id)
.keys
Expand Down Expand Up @@ -1019,12 +1023,18 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
statMetrics ++
Seq(getMetricsByTag(running, waiting, paused, failing)) ++
Seq(getMetricsByJob(running, waiting, paused, failing)) ++
Seq(executionsCounters.single().withDefaultsFor({
for {
job <- jobs.toSeq
outcome <- Seq("success", "failure")
} yield Set(("job_id", job.id), ("type", outcome)) ++ (if (job.tags.nonEmpty) Set("tags" -> job.tags.map(_.name).mkString(",")) else Nil)
}))
Seq(
executionsCounters
.single()
.withDefaultsFor({
for {
job <- jobs.toSeq
outcome <- Seq("success", "failure")
} yield
Set(("job_id", job.id), ("type", outcome)) ++ (if (job.tags.nonEmpty)
Set("tags" -> job.tags.map(_.name).mkString(","))
else Nil)
}))
}

/**
Expand Down

0 comments on commit cd76d36

Please sign in to comment.