Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions pramen/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,9 @@ pramen {
connection.retries.default = 3
connection.backoff.min.ms = 10000
connection.backoff.max.ms = 60000

# If true, restores the legacy behavior when sink exceptions were wrapped in "Unable to write to sink" exception.
wrap.sink.exceptions = false
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ object Keys {
val LOG_EXECUTOR_NODES = "pramen.log.executor.nodes"
val LOG_EFFECTIVE_CONFIG = "pramen.log.effective.config"

val WRAP_SINK_EXCEPTION = "pramen.internal.wrap.sink.exceptions"

final val KEYS_TO_REDACT: Set[String] = Set("password", "secret", "pwd", "access.key", "api.key", "api_key", "session.token", "access_key", "session_token", "auth.user.info")

final val CONFIG_KEYS_TO_REDACT = Set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,9 @@ class OperationSplitter(conf: Config,

if (operationDef.schedule == Schedule.Incremental) {
val latestOffsets = bookkeeper.getOffsetManager.getMaxInfoDateAndOffset(outputTable.name, None)
new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, latestOffsets, batchId, sourceName, source, transferTable, outputTable, sinkName, sink, specialCharacters, temporaryDirectory, disableCountQuery)
new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, latestOffsets, batchId, sourceName, source, transferTable, outputTable, sinkName, sink, specialCharacters, temporaryDirectory, disableCountQuery, conf)
} else {
new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, None, batchId, sourceName, source, transferTable, outputTable, sinkName, sink, specialCharacters, temporaryDirectory, disableCountQuery)
new TransferJob(operationDef, metastore, bookkeeper, notificationTargets, None, batchId, sourceName, source, transferTable, outputTable, sinkName, sink, specialCharacters, temporaryDirectory, disableCountQuery, conf)
}
})
}
Expand Down Expand Up @@ -171,7 +171,7 @@ class OperationSplitter(conf: Config,
bookkeeper.getOffsetManager.getMaxInfoDateAndOffset(outputTableName, None).map(_.maximumInfoDate)
} else None

new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, latestInfoDateOpt, outputTable, sinkName, sink, sinkTable)
new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, latestInfoDateOpt, outputTable, sinkName, sink, sinkTable, conf)
})
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import za.co.absa.pramen.api.jobdef.SinkTable
import za.co.absa.pramen.api.status.{DependencyWarning, JobType, TaskRunReason}
import za.co.absa.pramen.api.{DataFormat, MetastoreReader, Reason, Sink}
import za.co.absa.pramen.core.bookkeeper.Bookkeeper
import za.co.absa.pramen.core.config.Keys
import za.co.absa.pramen.core.exceptions.LazyJobErrorWrapper
import za.co.absa.pramen.core.metastore.model.{MetaTable, ReaderMode}
import za.co.absa.pramen.core.metastore.{MetaTableStats, Metastore, MetastoreReaderIncremental}
Expand All @@ -42,7 +43,8 @@ class SinkJob(operationDef: OperationDef,
outputTable: MetaTable,
sinkName: String,
sink: Sink,
sinkTable: SinkTable)
sinkTable: SinkTable,
workflowConf: Config)
(implicit spark: SparkSession)
extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, outputTable) {
import JobBase._
Expand Down Expand Up @@ -182,14 +184,26 @@ class SinkJob(operationDef: OperationDef,
val stats = MetaTableStats(Option(sinkResult.recordsSent))
SaveResult(stats, sinkResult.filesSent, sinkResult.hiveTables, sinkResult.warnings ++ tooLongWarnings)
} catch {
case NonFatal(ex) => throw new IllegalStateException("Unable to write to the sink.", ex)
case NonFatal(ex) =>
throw getSinkException(ex)
} finally {
Try {
sink.close()
}
}
}

/** The wrapping of the exception here is redundant since users already know that the sink has failed. But
* the behavior made configurable for backwards compatibility. */
private def getSinkException(cause: Throwable): Throwable = {
val wrapException = ConfigUtils.getOptionBoolean(workflowConf, Keys.WRAP_SINK_EXCEPTION).getOrElse(false)
if (wrapException) {
new IllegalStateException("Unable to write to the sink.", cause)
} else {
cause
}
}

private def getDataDf(infoDate: LocalDate, metastoreReader: MetastoreReader): DataFrame = {
try {
if (isIncremental) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ class TransferJob(operationDef: OperationDef,
sink: Sink,
specialCharacters: String,
tempDirectory: Option[String],
disableCountQuery: Boolean)
disableCountQuery: Boolean,
workflowConf: Config)
(implicit spark: SparkSession)
extends JobBase(operationDef, metastore, bookkeeper, notificationTargets, bookkeepingMetaTable) {

Expand All @@ -57,7 +58,7 @@ class TransferJob(operationDef: OperationDef,
}
}

val sinkJob: SinkJob = new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, latestInfoDate, bookkeepingMetaTable, sinkName, sink, TransferTableParser.getSinkTable(table))
val sinkJob: SinkJob = new SinkJob(operationDef, metastore, bookkeeper, notificationTargets, latestInfoDate, bookkeepingMetaTable, sinkName, sink, TransferTableParser.getSinkTable(table), workflowConf)

override val scheduleStrategy: ScheduleStrategy = ingestionJob.scheduleStrategy

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package za.co.absa.pramen.core.pipeline

import com.typesafe.config.{ConfigFactory, ConfigValueFactory}
import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory}
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{AnalysisException, DataFrame}
import org.scalatest.wordspec.AnyWordSpec
Expand Down Expand Up @@ -221,12 +221,16 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix
assert(ex.getCause.getMessage == "Dummy Exception")
}

"throw an exception when write() throws" in {
"throw an exception when write() throws with wrapper" in {
val sink = new SinkSpy(writeException = new RuntimeException("Dummy Exception"))
val conf = ConfigFactory.parseString(
"""
|pramen.internal.wrap.sink.exceptions = true
|""".stripMargin)

val (job, _) = getUseCase(tableDf = exampleDf, sink = sink)
val (job, _) = getUseCase(tableDf = exampleDf, sink = sink, conf = conf)

val ex = intercept[IllegalStateException] {
val ex = intercept[RuntimeException] {
job.save(exampleDf, infoDate, runReason, conf, Instant.now(), Some(10L))
}

Expand All @@ -237,6 +241,21 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix
assert(ex.getCause.getMessage == "Dummy Exception")
}

"throw an exception when write() throws without wrapper" in {
val sink = new SinkSpy(writeException = new RuntimeException("Dummy Exception"))

val (job, _) = getUseCase(tableDf = exampleDf, sink = sink)

val ex = intercept[RuntimeException] {
job.save(exampleDf, infoDate, runReason, conf, Instant.now(), Some(10L))
}

assert(sink.connectCalled == 1)
assert(sink.writeCalled == 1)
assert(sink.closeCalled == 1)
assert(ex.getMessage == "Dummy Exception")
}

"ignore if close() throws" in {
val sink = new SinkSpy(closeException = new RuntimeException("Dummy Exception"))

Expand All @@ -253,7 +272,8 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix
def getUseCase(sinkTable: SinkTable = SinkTableFactory.getDummySinkTable(),
tableDf: DataFrame = null,
tableException: Throwable = null,
sink: Sink = SinkSpy(conf, "", spark)): (SinkJob, SyncBookkeeperMock) = {
sink: Sink = SinkSpy(conf, "", spark),
conf: Config = ConfigFactory.empty()): (SinkJob, SyncBookkeeperMock) = {
val operation = OperationDefFactory.getDummyOperationDef(extraOptions = Map[String, String]("value" -> "7"))

val bk = new SyncBookkeeperMock
Expand All @@ -262,7 +282,7 @@ class SinkJobSuite extends AnyWordSpec with SparkTestBase with TextComparisonFix

val outputTable = MetaTableFactory.getDummyMetaTable(name = "table1->mysink")

(new SinkJob(operation, metastore, bk, Nil, None, outputTable, "sink_name", sink, sinkTable), bk)
(new SinkJob(operation, metastore, bk, Nil, None, outputTable, "sink_name", sink, sinkTable, conf), bk)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ class TransferJobSuite extends AnyWordSpec with SparkTestBase with TextCompariso

val outputTable = TransferTableParser.getMetaTable(transferTable)

(new TransferJob(operation, metastore, bk, Nil, None, 123L, "testSource", source, transferTable, outputTable, "sink_name", sink, " ", tempDirectory, disableCountQuery), bk)
(new TransferJob(operation, metastore, bk, Nil, None, 123L, "testSource", source, transferTable, outputTable, "sink_name", sink, " ", tempDirectory, disableCountQuery, sourceConfig), bk)
}

}
Loading