From 8c9f1dd3d2705e22c5c338d8882fcbaa96922645 Mon Sep 17 00:00:00 2001 From: Ruslan Iushchenko Date: Tue, 4 Jun 2024 09:04:36 +0200 Subject: [PATCH] #415 Initialize pipeline notification targets after pipeline state is active. This helps to create better email notifications in case pipeline notification class is not found. --- .../scala/za/co/absa/pramen/core/runner/AppRunner.scala | 7 +++++++ .../za/co/absa/pramen/core/state/PipelineStateImpl.scala | 5 ++++- 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala index 3dcb54c5..0dcc3827 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/runner/AppRunner.scala @@ -65,6 +65,7 @@ object AppRunner { jobs <- filterJobs(state, jobsOrig, appContext.appConfig.runtimeConfig) _ <- runStartupHook(state, appContext.appConfig.hookConfig) _ <- validateShutdownHook(state, appContext.appConfig.hookConfig) + _ <- initPipelineNotificationTargets(state) _ <- validatePipeline(jobs, state, appContext, spark) _ <- runPipeline(conf, jobs, state, appContext, taskRunner, spark) _ <- shutdownTaskRunner(taskRunner, state) @@ -117,6 +118,12 @@ object AppRunner { }, state, "initialization of the task runner") } + private[core] def initPipelineNotificationTargets(implicit state: PipelineState): Try[Unit] = { + handleFailure(Try { + state.asInstanceOf[PipelineStateImpl].initNotificationTargets() + }, state, "Initialization of piepline notification targets") + } + private[core] def getSparkSession(implicit conf: Config, state: PipelineState): Try[SparkSession] = { handleFailure(Try { diff --git a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala index 71c1045c..462f8818 100644 --- a/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala +++ b/pramen/core/src/main/scala/za/co/absa/pramen/core/state/PipelineStateImpl.scala @@ -45,7 +45,7 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification private val environmentName = conf.getString(ENVIRONMENT_NAME) private val sendEmailIfNoNewData: Boolean = conf.getBoolean(EMAIL_IF_NO_CHANGES) private val hookConfig = HookConfig.fromConfig(conf) - private var pipelineNotificationTargets: Seq[PipelineNotificationTarget] = PipelineNotificationTargetFactory.fromConfig(conf) + private var pipelineNotificationTargets: Seq[PipelineNotificationTarget] = Seq.empty // State private val startedInstant = Instant.now @@ -68,6 +68,9 @@ class PipelineStateImpl(implicit conf: Config, notificationBuilder: Notification setSignalHandler(new Signal("INT"), "SIGINT (Ctrl + C)") setSignalHandler(new Signal("TERM"), "SIGTERM (kill)") setSignalHandler(new Signal("HUP"), "SIGHUP (network connection to the terminal has been lost)") + } + + private[core] def initNotificationTargets(): Unit = { pipelineNotificationTargets = PipelineNotificationTargetFactory.fromConfig(conf) }