Skip to content
Permalink
Browse files

Saves state on backfill creation (#391)

  • Loading branch information...
dufrannea committed Apr 5, 2019
1 parent 7a5b97f commit a93b76d3c31794e853242cfbcef131b9fe32d807
Showing with 21 additions and 20 deletions.
  1. +21 −20 timeseries/src/main/scala/com/criteo/cuttle/timeseries/TimeSeriesScheduler.scala
@@ -464,13 +464,14 @@ case class TimeSeriesScheduler(logger: Logger,
})
}

private def updateBackfillState(backfill: Backfill): Unit = atomic { implicit txn =>
private def updateBackfillState(backfill: Backfill): Map[TimeSeriesJob, IntervalMap[Instant, JobState]] = atomic { implicit txn =>
_backfills() = _backfills() + backfill
_state() = _state() ++ backfill.jobs.map(job => {
val newStart = job.scheduling.calendar.truncate(backfill.start)
val newEnd = job.scheduling.calendar.ceil(backfill.end)
job -> _state().apply(job).update(Interval(newStart, newEnd), Todo(Some(backfill)))
})
_state()
}

/**
@@ -529,8 +530,10 @@ case class TimeSeriesScheduler(logger: Logger,
case Right(backfills) =>
val dbUpdate: IO[Either[String, Unit]] = backfills
.map { newBackfill =>
updateBackfillState(newBackfill)
Database.createBackfill(newBackfill).transact(xa)
val updatedState = updateBackfillState(newBackfill)
runOrLogAndDie((Database.createBackfill(newBackfill) >>
Database.serializeState(updatedState, stateRetention)).transact(xa),
"TimeseriesScheduler, cannot serialize state, shutting down")
}
.sequence
.map(_ => Right(Unit))
@@ -738,18 +741,17 @@ case class TimeSeriesScheduler(logger: Logger,
_state() = compressState(_state(), maxVersions)
}
}
runOrLogAndDie(Database.serializeState(stateSnapshot, stateRetention).transact(xa).unsafeRunSync,
"TimeseriesScheduler, cannot serialize state, shutting down")
runOrLogAndDie(Database.serializeState(stateSnapshot, stateRetention).transact(xa),
"TimeseriesScheduler, cannot serialize state, shutting down").unsafeRunSync()
}

if (completedBackfills.nonEmpty)
runOrLogAndDie(
Database
.setBackfillStatus(completedBackfills.map(_.id), "COMPLETE")
.transact(xa)
.unsafeRunSync,
.transact(xa),
"TimeseriesScheduler, cannot serialize state, shutting down"
)
).unsafeRunSync()

val newRunning = stillRunning ++ newExecutions.map {
case (execution, result) =>
@@ -834,20 +836,19 @@ case class TimeSeriesScheduler(logger: Logger,
mainLoop(Set.empty)
}

private def runOrLogAndDie(thunk: => Unit, message: => String): Unit = {
private def runOrLogAndDie[K](thunk: IO[K], message: => String): IO[K] = {
import java.io._

try {
thunk
} catch {
case (e: Throwable) => {
logger.error(message)
val sw = new StringWriter
e.printStackTrace(new PrintWriter(sw))
logger.error(sw.toString)
System.exit(-1)
}
}
IO.suspend(thunk).onError({
case e =>
IO {
logger.error(message)
val sw = new StringWriter
e.printStackTrace(new PrintWriter(sw))
logger.error(sw.toString)
System.exit(-1)
}
})
}

private[timeseries] def collectCompletedJobs(state: State,

0 comments on commit a93b76d

Please sign in to comment.
You can’t perform that action at this time.