Skip to content

Commit

Permalink
#415 Extent notification target interface to include table definition…
Browse files Browse the repository at this point in the history
… of the table.
  • Loading branch information
yruslan committed May 22, 2024
1 parent ca85c3e commit 0d416f7
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import java.time.{Instant, LocalDate}

case class TaskNotification(
tableName: String,
tableDef: MetaTableDef,
infoDate: Option[LocalDate],
started: Option[Instant],
finished: Option[Instant],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import za.co.absa.pramen.core.app.config.InfoDateConfig
import za.co.absa.pramen.core.app.config.InfoDateConfig.DEFAULT_DATE_FORMAT
import za.co.absa.pramen.core.app.config.RuntimeConfig.UNDERCOVER
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.metastore.MetastoreImpl.getMetaTableDef
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.peristence.{MetastorePersistence, TransientJobManager}
import za.co.absa.pramen.core.utils.ConfigUtils
Expand Down Expand Up @@ -224,21 +225,6 @@ class MetastoreImpl(appConfig: Config,
}
}

private[core] def getMetaTableDef(table: MetaTable): MetaTableDef = {
MetaTableDef(
table.name,
table.description,
table.format,
table.infoDateColumn,
table.infoDateFormat,
table.hiveTable,
table.hivePath,
table.infoDateStart,
table.readOptions,
table.writeOptions
)
}

private[core] def prepareHiveSchema(schema: StructType, mt: MetaTable): StructType = {
val fieldType = if (mt.infoDateFormat == DEFAULT_DATE_FORMAT) DateType else StringType

Expand All @@ -264,5 +250,20 @@ object MetastoreImpl {

new MetastoreImpl(conf, tableDefs, bookkeeper, metadataManager, isUndercover)
}

private[core] def getMetaTableDef(table: MetaTable): MetaTableDef = {
MetaTableDef(
table.name,
table.description,
table.format,
table.infoDateColumn,
table.infoDateFormat,
table.hiveTable,
table.hivePath,
table.infoDateStart,
table.readOptions,
table.writeOptions
)
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import za.co.absa.pramen.core.exceptions.{FatalErrorWrapper, ReasonException}
import za.co.absa.pramen.core.journal.Journal
import za.co.absa.pramen.core.journal.model.TaskCompleted
import za.co.absa.pramen.core.lock.TokenLockFactory
import za.co.absa.pramen.core.metastore.MetaTableStats
import za.co.absa.pramen.core.metastore.model.MetaTable
import za.co.absa.pramen.core.metastore.{MetaTableStats, MetastoreImpl}
import za.co.absa.pramen.core.notify.NotificationTargetManager
import za.co.absa.pramen.core.pipeline.JobPreRunStatus._
import za.co.absa.pramen.core.pipeline._
Expand Down Expand Up @@ -393,6 +393,7 @@ abstract class TaskRunnerBase(conf: Config,
NotificationTargetManager.runStatusToTaskStatus(result.runStatus).foreach { taskStatus =>
val notification = TaskNotification(
task.job.outputTable.name,
MetastoreImpl.getMetaTableDef(task.job.outputTable),
Option(task.infoDate),
result.runInfo.map(_.started),
result.runInfo.map(_.finished),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import sun.misc.Signal
import za.co.absa.pramen.api.{NotificationBuilder, PipelineNotificationTarget, TaskNotification}
import za.co.absa.pramen.core.app.config.HookConfig
import za.co.absa.pramen.core.app.config.RuntimeConfig.EMAIL_IF_NO_CHANGES
import za.co.absa.pramen.core.metastore.MetastoreImpl
import za.co.absa.pramen.core.metastore.peristence.{TransientJobManager, TransientTableManager}
import za.co.absa.pramen.core.notify.pipeline.{PipelineNotification, PipelineNotificationEmail}
import za.co.absa.pramen.core.notify.{NotificationTargetManager, PipelineNotificationTargetFactory}
Expand Down Expand Up @@ -279,6 +280,7 @@ object PipelineStateImpl {
NotificationTargetManager.runStatusToTaskStatus(taskResult.runStatus).map(taskStatus =>
TaskNotification(
taskResult.job.outputTable.name,
MetastoreImpl.getMetaTableDef(taskResult.job.outputTable),
taskResult.runInfo.map(_.infoDate),
taskResult.runInfo.map(_.started),
taskResult.runInfo.map(_.finished),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@

package za.co.absa.pramen.core

import za.co.absa.pramen.api.{SchemaDifference, TaskNotification, TaskStatus}
import za.co.absa.pramen.api.{MetaTableDef, SchemaDifference, TaskNotification, TaskStatus}

import java.time.{Instant, LocalDate}

object TaskNotificationFactory {
def getDummyTaskNotification(tableName: String = "dummy_table",
tableDef: MetaTableDef = MetaTableDefFactory.getDummyMetaTableDef(name = "dummy_table"),
infoDate: Option[LocalDate] = Some(LocalDate.of(2022, 2, 18)),
started: Option[Instant] = Some(Instant.ofEpochMilli(1613600000000L)),
finished: Option[Instant] = Some(Instant.ofEpochMilli(1672759508000L)),
Expand All @@ -32,7 +33,7 @@ object TaskNotificationFactory {
schemaChanges: Seq[SchemaDifference] = Seq.empty,
dependencyWarningTables: Seq[String] = Seq.empty,
options: Map[String, String] = Map.empty[String, String]): TaskNotification = {
TaskNotification(tableName, infoDate, started, finished, status, applicationId, isTransient, isRawFilesJob, schemaChanges, dependencyWarningTables, options)
TaskNotification(tableName, tableDef, infoDate, started, finished, status, applicationId, isTransient, isRawFilesJob, schemaChanges, dependencyWarningTables, options)
}

}

0 comments on commit 0d416f7

Please sign in to comment.