Permalink
Browse files

Adds back projectName to Executions (#316)

  • Loading branch information...
dufrannea authored and Masuzu committed Oct 26, 2018
1 parent 7011e69 commit 32a03763e7ef4665de1ebc2f2e45de180af9ad90
@@ -185,6 +185,7 @@ case class Execution[S <: Scheduling](
context: S#Context,
streams: ExecutionStreams,
platforms: Seq[ExecutionPlatform],
projectName: String,
projectVersion: String
)(implicit val executionContext: SideEffectThreadPool) {
@@ -391,6 +392,7 @@ private[cuttle] object Executor {
class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPlatform],
xa: XA,
logger: Logger,
val projectName: String,
val projectVersion: String)(implicit retryStrategy: RetryStrategy)
extends MetricProvider[S] {
@@ -779,6 +781,7 @@ class Executor[S <: Scheduling] private[cuttle] (val platforms: Seq[ExecutionPla
context,
streams = streams,
platforms,
projectName,
projectVersion
)(sideEffectExecutionContext)
val promise = Promise[Completed]
@@ -30,6 +30,7 @@ class ExecutorSpec extends FunSuite with TestScheduling {
Seq.empty,
xa = Transactor.fromConnection[IO](connection).copy(strategy0 = doobie.util.transactor.Strategy.void),
logger,
"project_name",
"test_version"
)(RetryStrategy.ExponentialBackoffRetryStrategy)
@@ -112,6 +113,7 @@ class ExecutorSpec extends FunSuite with TestScheduling {
override private[cuttle] def writeln(str: CharSequence): Unit = ???
},
platforms = Seq.empty,
"project_name",
"test_version"
)
@@ -127,7 +127,8 @@ object HelloCustomScheduling {
Seq(local.LocalPlatform(maxForkedProcesses = 10)),
stateDbTransactor,
logger,
"Custom scheduling example"
projectName = "Custom scheduling example",
projectVersion = "version"
)(RetryStrategy.ExponentialBackoffRetryStrategy)
loopScheduler.start(LoopJobs(hello), executor, stateDbTransactor, logger)
@@ -37,7 +37,7 @@ class CuttleProject private[cuttle] (
paused: Boolean = false
): Unit = {
val xa = CuttleDatabase.connect(databaseConfig)
val executor = new Executor[TimeSeries](platforms, xa, logger, version)(retryStrategy)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy)
if (paused) {
logger.info("Pausing workflow")
@@ -71,7 +71,7 @@ class CuttleProject private[cuttle] (
retryStrategy: RetryStrategy = RetryStrategy.ExponentialBackoffRetryStrategy
): (Service, () => Unit) = {
val xa = CuttleDatabase.connect(databaseConfig)
val executor = new Executor[TimeSeries](platforms, xa, logger, version)(retryStrategy)
val executor = new Executor[TimeSeries](platforms, xa, logger, name, version)(retryStrategy)
val startScheduler = () => {
logger.info("Start workflow")
@@ -440,7 +440,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E
}
.map { case (e, status) => e.toExecutionLog(status) }
val ((jobStates, backfills), _) = watchedState
val ((jobStates, _), _) = watchedState
val remainingExecutions =
for {
(interval, maybeBackfill) <- jobStates(job)
@@ -751,7 +751,7 @@ private[timeseries] case class TimeSeriesApp(project: CuttleProject, executor: E
}
case GET at url"/api/timeseries/lastruns?job=$jobId" =>
val (jobStates, backfills) = scheduler.state
val (jobStates, _) = scheduler.state
val successfulIntervalMaps = jobStates
.filter(s => s._1.id == jobId)
.values
@@ -42,7 +42,7 @@ object TimeSeriesSchedulerSpec {
}
val xa = CuttleDatabase.connect(DatabaseConfig(Seq(DBLocation("127.0.0.1", 3388)), "cuttle_dev", "root", ""))
val executor = new Executor[TimeSeries](Seq(LocalPlatform(maxForkedProcesses = 10)), xa, logger, project.version)(retryImmediatelyStrategy)
val executor = new Executor[TimeSeries](Seq(LocalPlatform(maxForkedProcesses = 10)), xa, logger, project.name, project.version)(retryImmediatelyStrategy)
val scheduler = project.scheduler
scheduler.initialize(project.jobs, xa, logger)

0 comments on commit 32a0376

Please sign in to comment.