Skip to content

Commit

Permalink
#415 Initialize pipeline notification targets after pipeline state is…
Browse files Browse the repository at this point in the history
… active.

This helps to create better email notifications in case pipeline notification
class is not found.
  • Loading branch information
yruslan committed Jun 4, 2024
1 parent 74ca3bb commit 8c9f1dd
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object AppRunner {
jobs <- filterJobs(state, jobsOrig, appContext.appConfig.runtimeConfig)
_ <- runStartupHook(state, appContext.appConfig.hookConfig)
_ <- validateShutdownHook(state, appContext.appConfig.hookConfig)
_ <- initPipelineNotificationTargets(state)
_ <- validatePipeline(jobs, state, appContext, spark)
_ <- runPipeline(conf, jobs, state, appContext, taskRunner, spark)
_ <- shutdownTaskRunner(taskRunner, state)
Expand Down Expand Up @@ -117,6 +118,12 @@ object AppRunner {
}, state, "initialization of the task runner")
}

private[core] def initPipelineNotificationTargets(implicit state: PipelineState): Try[Unit] = {
handleFailure(Try {
state.asInstanceOf[PipelineStateImpl].initNotificationTargets()
}, state, "Initialization of piepline notification targets")
}

private[core] def getSparkSession(implicit conf: Config,
state: PipelineState): Try[SparkSession] = {
handleFailure(Try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
private val environmentName = conf.getString(ENVIRONMENT_NAME)
private val sendEmailIfNoNewData: Boolean = conf.getBoolean(EMAIL_IF_NO_CHANGES)
private val hookConfig = HookConfig.fromConfig(conf)
private var pipelineNotificationTargets: Seq[PipelineNotificationTarget] = PipelineNotificationTargetFactory.fromConfig(conf)
private var pipelineNotificationTargets: Seq[PipelineNotificationTarget] = Seq.empty

// State
private val startedInstant = Instant.now
Expand All @@ -68,6 +68,9 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification
setSignalHandler(new Signal("INT"), "SIGINT (Ctrl + C)")
setSignalHandler(new Signal("TERM"), "SIGTERM (kill)")
setSignalHandler(new Signal("HUP"), "SIGHUP (network connection to the terminal has been lost)")
}

private[core] def initNotificationTargets(): Unit = {
pipelineNotificationTargets = PipelineNotificationTargetFactory.fromConfig(conf)
}

Expand Down

0 comments on commit 8c9f1dd

Please sign in to comment.