Skip to content
Permalink
Browse files

introduce a time window batching (#426)

- user can configure a time window batching per job
- an example is added to HelloTimeSeries
  • Loading branch information...
eryshev committed Aug 12, 2019
1 parent 5a9a6d5 commit cec8fa500c33ff920f2f7d44736c9f1bee0584e5
@@ -53,6 +53,7 @@ case class Job[S <: Scheduling](id: String,
* @return A future indicating the execution result (Failed future means failed execution).
*/
private[cuttle] def run(execution: Execution[S]): Future[Completed] = effect(execution)

}

/** Companion object for [[Job]]. */
@@ -19,6 +19,7 @@ import com.criteo.cuttle.timeseries._
// time series scheduler.
import java.time.ZoneOffset.UTC
import java.time._
import java.time.{Duration => JavaDuration}

import scala.concurrent.duration._

@@ -28,9 +29,9 @@ object HelloTimeSeries {
def main(args: Array[String]): Unit = {

// We define a common start date for all our jobs. This is required by the
// time series scheduler to define a start date for each job. Here we dynaically
// time series scheduler to define a start date for each job. Here we dynamically
// compute it as 7 days ago (_and we round it to midnight UTC_).
val start: Instant = LocalDate.now.minusDays(7).atStartOfDay.toInstant(UTC)
val start: Instant = LocalDate.now.minusDays(1).atStartOfDay.toInstant(UTC)

// Here is our first job. The second parameter is the scheduling configuration.
// __hello1__ is defined as a job computing hourly partitions starting at the start
@@ -70,11 +71,15 @@ object HelloTimeSeries {

// Here is our third job. Look how we can also define some metadata such as a human friendly
// name and a set of tags. This information is used in the UI to help retrieving your jobs.
// This job will be executed in batching mode, it means that it will always wait for some period of
// time(10 seconds here) and create a single executions for each 5 consequent partition available to compute.
val hello3 =
Job("hello3",
hourly(start),
"prepare-export-job.cuttle_example.hello3_stats_daily",
tags = Set(Tag("hello"), Tag("unsafe"))) { implicit e =>
Job(
"hello3",
hourly(start, batching = TimeSeriesBatching(5, JavaDuration.ofSeconds(10))),
"prepare-export-job.cuttle_example.hello3_stats_daily",
tags = Set(Tag("hello"), Tag("unsafe"))
) { implicit e =>
// Here we mix a Scala code execution and a sh script execution.
e.streams.info("Hello 3 from an unsafe job")
e.streams.info(s"My previous failures are ${e.previousFailures}")
@@ -100,7 +105,7 @@ object HelloTimeSeries {
}
}

// Our last job is a daily job. For the daily job we still need to annouce a start date, plus
// Our last job is a daily job. For the daily job we still need to announce a start date, plus
// we need to define the time zone for which _days_ must be considered. The partitions for
// daily jobs will usually be 24 hours, unless you are choosing a time zone with light saving.
val world = Job("world", daily(UTC, start), "export-job.cuttle.world_stats", tags = Set(Tag("world"))) {
@@ -12,7 +12,6 @@ import scala.concurrent.duration.{Duration => ScalaDuration}
import scala.concurrent.stm.Txn.ExternalDecider
import scala.concurrent.stm._
import scala.math.Ordering.Implicits._

import cats._
import cats.effect.IO
import cats.implicits._
@@ -22,7 +21,7 @@ import doobie.implicits._
import io.circe._
import io.circe.generic.semiauto._
import io.circe.syntax._

import codes.reactive.scalatime._
import com.criteo.cuttle.ThreadPools.Implicits.sideEffectThreadPool
import com.criteo.cuttle.ThreadPools._
import com.criteo.cuttle.Metrics._
@@ -42,15 +41,15 @@ sealed trait TimeSeriesCalendar {
if (truncated == t) t
else next(t)
}
private[timeseries] def inInterval(interval: Interval[Instant], maxPeriods: Int) = {
private[timeseries] def inInterval(interval: Interval[Instant], batching: TimeSeriesBatching) = {
def go(lo: Instant, hi: Instant, acc: List[(Instant, Instant)]): List[(Instant, Instant)] = {
val nextLo = next(lo)
if (nextLo.isAfter(hi)) acc
else go(nextLo, hi, (lo, nextLo) +: acc)
}
interval match {
case Interval(Finite(lo), Finite(hi)) =>
go(ceil(lo), hi, List.empty).reverse.grouped(maxPeriods).map(xs => (xs.head._1, xs.last._2))
go(ceil(lo), hi, List.empty).reverse.grouped(batching.size).map(xs => (xs.head._1, xs.last._2))
case _ =>
sys.error("panic")
}
@@ -294,23 +293,48 @@ object TimeSeriesContext {
*/
case class TimeSeriesDependency(offsetLow: Duration, offsetHigh: Duration)

/**
* The maximum number of partitions the job can handle at once and a delay the scheduler will wait for partition to arrive. If the size is defined
* to a value more than `1`, the scheduler will wait for delay trigger [[com.criteo.cuttle.Execution Execution]]
* with a scheduling context extended to by the @size.
*
* @param size the maximum number of joint intervals which are going to be run within single execution.
* @param delay the delay for which the scheduler will wait for the new executions to arrive for current batch.
*
*/
case class TimeSeriesBatching(size: Int, delay: Duration) {
require(size >= 1)

def asJson: Json =
Json.obj(
"size" -> size.asJson,
"delay" -> delay.toMillis.asJson
)
}

object TimeSeriesBatching {
val default = TimeSeriesBatching(1, 0.seconds)
}

/** Configure a [[com.criteo.cuttle.Job Job]] as a [[TimeSeries]] job,
*
* @param calendar The calendar partitions configuration for this job (for example hourly or daily).
* @param start The start instant at which this job must start being executed.
* @param maxPeriods The maximum number of partitions the job can handle at once. If this is defined
* to a value more than `1` and if possible, the scheduler can trigger [[com.criteo.cuttle.Execution Executions]]
* for more than 1 partition at once.
* @param batching The batching parameters [[com.criteo.cuttle.timeseries.TimeSeriesBatching]].
*
*/
case class TimeSeries(calendar: TimeSeriesCalendar, start: Instant, end: Option[Instant] = None, maxPeriods: Int = 1)
case class TimeSeries(calendar: TimeSeriesCalendar,
start: Instant,
end: Option[Instant] = None,
batching: TimeSeriesBatching = TimeSeriesBatching.default)
extends Scheduling {
type Context = TimeSeriesContext
override def asJson: Json =
Json.obj(
"kind" -> "timeseries".asJson,
"start" -> start.asJson,
"end" -> end.asJson,
"maxPeriods" -> maxPeriods.asJson,
"batching" -> batching.asJson,
"calendar" -> calendar.asJson
)
}
@@ -401,6 +425,8 @@ case class TimeSeriesScheduler(logger: Logger,

private val _pausedJobs = Ref(Set.empty[PausedJob])

private val debouncedJobs = collection.mutable.Map.empty[TimeSeriesJob, Instant]

def pausedJobs(): Set[PausedJob] = atomic { implicit txn =>
_pausedJobs()
}
@@ -723,16 +749,37 @@ case class TimeSeriesScheduler(logger: Logger,
case (_, _, effect) => effect.isCompleted
}

val (stateSnapshot, completedBackfills, toRun) = atomic { implicit txn =>
val (stateSnapshot, completedBackfills, toRun, debounced, commitInstant) = atomic { implicit txn =>
val now = Instant.now
val (stateSnapshot, newBackfills, completedBackfills) =
collectCompletedJobs(_state(), _backfills(), completed)

val toRun = jobsToRun(workflow, stateSnapshot, Instant.now, executor.projectVersion)
val (toRun, debounced) = jobsToRun(workflow, stateSnapshot, now, executor.projectVersion)

_state() = stateSnapshot
_backfills() = newBackfills

(stateSnapshot, completedBackfills, toRun)
(stateSnapshot, completedBackfills, toRun, debounced, now)
}

toRun.groupBy(_._1).foreach {
case (job, _) =>
if (job.scheduling.batching.size > 1) {
logger.debug(s"scheduling batches for ${job.name}")
debouncedJobs -= job
}
}

debounced.groupBy(_._1).foreach {
case (job, _) =>
debouncedJobs.get(job) match {
case Some(interval) =>
logger.debug(s"job ${job.name} is waiting for intervals to batch until $interval")
case None =>
val debouncePeriodEnd = commitInstant.plus(job.scheduling.batching.delay)
debouncedJobs(job) = debouncePeriodEnd
logger.debug(s"job ${job.name} will wait for intervals to batch until $debouncePeriodEnd")
}
}

val newExecutions = executor.runAll(toRun)
@@ -901,7 +948,7 @@ case class TimeSeriesScheduler(logger: Logger,
private[timeseries] def jobsToRun(workflow: Workflow,
state0: State,
now: Instant,
projectVersion: String): List[Executable] = {
projectVersion: String): (List[Executable], List[Executable]) = {

val timerInterval = Interval(Bottom, Finite(now))
val state = state0.mapValues(_.intersect(timerInterval))
@@ -925,7 +972,7 @@ case class TimeSeriesScheduler(logger: Logger,
.toList
.map { case (interval, _) => interval }

workflow.vertices.filter(job => !pausedJobIds.contains(job.id)).toList.flatMap { job =>
val job2Contexts = workflow.vertices.filter(job => !pausedJobIds.contains(job.id)).toList.flatMap { job =>
val full = IntervalMap[Instant, Unit](Interval[Instant](Bottom, Top) -> (()))
val dependenciesSatisfied = parentsMap
.getOrElse(job, Set.empty)
@@ -962,10 +1009,26 @@ case class TimeSeriesScheduler(logger: Logger,

for {
(interval, maybeBackfill) <- toRun.toList
(lo, hi) <- job.scheduling.calendar.inInterval(interval, job.scheduling.maxPeriods)
(lo, hi) <- job.scheduling.calendar.inInterval(interval, job.scheduling.batching)
} yield {
logger.debug(s"creating a context for ${job.name} for interval $lo, $hi")
(job, TimeSeriesContext(lo, hi, maybeBackfill, projectVersion))
}

}

job2Contexts.partition {
case (job, _) =>
if (job.scheduling.batching.size == 1) {
true
} else {
debouncedJobs.get(job) match {
case Some(interval) if now.isAfter(interval) =>
true
case _ =>
false
}
}
}
}

@@ -73,7 +73,8 @@ package object timeseries {
* @param start The instant this calendar will start.
* @param end The optional instant this calendar will end.
*/
def hourly(start: Instant, end: Option[Instant] = None) = TimeSeries(calendar = NHourly(1), start, end)
def hourly(start: Instant, end: Option[Instant] = None, batching: TimeSeriesBatching = TimeSeriesBatching.default) =
TimeSeries(calendar = NHourly(1), start, end, batching)

/** Defines a N-hourly calendar starting at the specified instant.
*
@@ -85,10 +86,13 @@ package object timeseries {
* @param start The instant this calendar will start.
* @param end The optional instant this calendar will end.
*/
def nhourly(hours: Int, start: Instant, end: Option[Instant] = None): TimeSeries =
if (hours <= 0 || hours >= 24 || 24 % hours != 0)
def nhourly(hours: Int,
start: Instant,
end: Option[Instant] = None,
batching: TimeSeriesBatching = TimeSeriesBatching.default): TimeSeries =
if (hours <= 0 || hours >= 24 || 24 % hours != 0) {
throw new IllegalArgumentException("hours should be a strictly positive divider of 24 different than 24")
else TimeSeries(calendar = NHourly(hours), start, end)
} else TimeSeries(calendar = NHourly(hours), start, end, batching)

/** Defines an daily calendar starting at the specified instant, and using the specified time zone.
* Days are defined as complete calendar days starting a midnight and during 24 hours. If the specified
@@ -104,7 +108,11 @@ package object timeseries {
* @param end The optional instant this calendar will end.
* @param tz The time zone for which these _days_ are defined.
*/
def daily(tz: ZoneId, start: Instant, end: Option[Instant] = None) = TimeSeries(calendar = Daily(tz), start, end)
def daily(tz: ZoneId,
start: Instant,
end: Option[Instant] = None,
batching: TimeSeriesBatching = TimeSeriesBatching.default) =
TimeSeries(calendar = Daily(tz), start, end, batching)

/** Defines a weekly calendar. Weeks are defined as complete calendar weeks starting on a specific
* day of the week at midnight and lasting 7 days. The specified time zone is used to define the exact
@@ -122,8 +130,11 @@ package object timeseries {
* @param end The optional instant this calendar will end.
* @param tz The time zone for which these _weeks_ are defined.
*/
def weekly(tz: ZoneId, start: Instant, end: Option[Instant] = None) =
TimeSeries(calendar = Weekly(tz, start.atZone(tz).getDayOfWeek), start, end)
def weekly(tz: ZoneId,
start: Instant,
end: Option[Instant] = None,
batching: TimeSeriesBatching = TimeSeriesBatching.default) =
TimeSeries(calendar = Weekly(tz, start.atZone(tz).getDayOfWeek), start, end, batching)

/** Defines a monthly calendar. Months are defined as complete calendar months starting on the 1st day and
* during 28,29,30 or 31 days. The specified time zone is used to define the exact month start instant.
@@ -138,7 +149,11 @@ package object timeseries {
* @param end The optional instant this calendar will end.
* @param tz The time zone for which these months are defined.
*/
def monthly(tz: ZoneId, start: Instant, end: Option[Instant] = None) = TimeSeries(calendar = Monthly(tz), start, end)
def monthly(tz: ZoneId,
start: Instant,
end: Option[Instant] = None,
batching: TimeSeriesBatching = TimeSeriesBatching.default) =
TimeSeries(calendar = Monthly(tz), start, end, batching)

def measure[A, B]: MeasureKey[Interval[A], B] = new MeasureKey[Interval[A], B]

@@ -5,6 +5,7 @@ import java.time.{Duration, Instant}
import scala.concurrent.Future

import org.scalatest.FunSuite
import codes.reactive.scalatime._

import com.criteo.cuttle.Auth
import com.criteo.cuttle.{Completed, Job, TestScheduling}
@@ -73,6 +74,8 @@ class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling {
}

test("identify jobs to do") {
val batchingJob = Job("batching_job", scheduling.copy(batching = TimeSeriesBatching(2, 2.seconds)))(completed)

val state: State = Map(
parentTestJob -> IntervalMap(
Interval(date"2017-03-25T01:00:00Z", date"2017-03-25T02:00:00Z") -> Done("v1"),
@@ -83,24 +86,33 @@ class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling {
Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T03:00:00Z") -> Todo(None),
Interval(date"2017-03-25T03:00:00Z", date"2017-03-25T04:00:00Z") -> Done("v2"),
Interval(date"2017-03-25T04:00:00Z", date"2017-03-25T05:00:00Z") -> Todo(None)
),
batchingJob -> IntervalMap(
Interval(date"2017-03-25T02:00:00Z", date"2017-03-25T05:00:00Z") -> Todo(None)
)
)
val batchedIntervals = Set(
(batchingJob, TimeSeriesContext(date"2017-03-25T02:00:00Z", date"2017-03-25T04:00:00Z", None, "last_version"))
)

val jobsToRun = scheduler.jobsToRun(
(testJob dependsOn parentTestJob)(TimeSeriesDependency(Duration.ofHours(-1), Duration.ofHours(0))),
((testJob and batchingJob) dependsOn parentTestJob)(
TimeSeriesDependency(Duration.ofHours(-1), Duration.ofHours(0))
),
state,
date"2017-03-25T05:00:00Z",
"last_version"
)
assert(
jobsToRun.toSet.equals(
jobsToRun._1.toSet.equals(
Set(
(testJob, TimeSeriesContext(date"2017-03-25T02:00:00Z", date"2017-03-25T03:00:00Z", None, "last_version")),
(parentTestJob,
TimeSeriesContext(date"2017-03-25T04:00:00Z", date"2017-03-25T05:00:00Z", None, "last_version"))
)
)
)
assert(jobsToRun._2.toSet.equals(batchedIntervals))
}

val oneDayInterval = Interval(Instant.parse("2019-01-01T00:00:00Z"), Instant.parse("2019-01-02T00:00:00Z"))
@@ -109,7 +121,7 @@ class TimeSeriesSchedulerSpec extends FunSuite with TestScheduling {
test("Generate right n-hours periods") {

def checkPeriod(period: Int) = {
val allIntervals = nhourly(period, start).calendar.inInterval(oneDayInterval, 1).toList
val allIntervals = nhourly(period, start).calendar.inInterval(oneDayInterval, TimeSeriesBatching.default).toList

assert(allIntervals.size.equals(24 / period))

@@ -21,7 +21,7 @@ class TimeSeriesSpec extends FunSuite with TestScheduling {
val state = Map(
job(1) -> IntervalMap[Instant, JobState](Interval(Finite(ts(2)), Top) -> JobState.Todo(None))
)
val result = scheduler.jobsToRun(job(1), state, ts(5), "test_version")
val result = scheduler.jobsToRun(job(1), state, ts(5), "test_version")._1
assert(
result ==
(2 to 4).map { i =>

0 comments on commit cec8fa5

Please sign in to comment.
You can’t perform that action at this time.