Skip to content

Commit

Permalink
Explicitly throw when sync iceberg conversion fails (#2886)
Browse files Browse the repository at this point in the history
## Description
sync iceberg conversion happens when enabled with session config, or
uniform iceberg is turned on for the first time. This explicitly throws
when sync iceberg conversion fails so user knows conversion status.

## How was this patch tested?
UT
  • Loading branch information
lzlfred committed Apr 16, 2024
1 parent 9bae749 commit 7261656
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import scala.collection.JavaConverters._
import scala.util.control.Breaks._
import scala.util.control.NonFatal

import org.apache.spark.sql.delta.{DeltaFileNotFoundException, DeltaFileProviderUtils, OptimisticTransactionImpl, Snapshot, UniversalFormat, UniversalFormatConverter}
import org.apache.spark.sql.delta.{DeltaErrors, DeltaFileNotFoundException, DeltaFileProviderUtils, OptimisticTransactionImpl, Snapshot, UniversalFormat, UniversalFormatConverter}
import org.apache.spark.sql.delta.actions.{Action, AddFile, CommitInfo, RemoveFile}
import org.apache.spark.sql.delta.hooks.IcebergConverterHook
import org.apache.spark.sql.delta.metering.DeltaLogging
Expand Down Expand Up @@ -169,10 +169,21 @@ class IcebergConverter(spark: SparkSession)
*/
override def convertSnapshot(
snapshotToConvert: Snapshot, catalogTable: CatalogTable): Option[(Long, Long)] = {
if (!UniversalFormat.icebergEnabled(snapshotToConvert.metadata)) {
return None
try {
convertSnapshot(snapshotToConvert, None, catalogTable)
} catch {
case NonFatal(e) =>
logError(s"Error when converting to Iceberg metadata", e)
recordDeltaEvent(
snapshotToConvert.deltaLog,
"delta.iceberg.conversion.error",
data = Map(
"exception" -> ExceptionUtils.getMessage(e),
"stackTrace" -> ExceptionUtils.getStackTrace(e)
)
)
throw e
}
convertSnapshot(snapshotToConvert, None, catalogTable)
}

/**
Expand All @@ -185,22 +196,27 @@ class IcebergConverter(spark: SparkSession)
*/
override def convertSnapshot(
snapshotToConvert: Snapshot, txn: OptimisticTransactionImpl): Option[(Long, Long)] = {
if (!UniversalFormat.icebergEnabled(snapshotToConvert.metadata)) {
return None
}
txn.catalogTable match {
case Some(table) => convertSnapshot(snapshotToConvert, Some(txn), table)
case _ =>
logWarning(s"CatalogTable for table ${snapshotToConvert.deltaLog.tableId} " +
s"is empty in txn. Skip iceberg conversion.")
try {
txn.catalogTable match {
case Some(table) => convertSnapshot(snapshotToConvert, Some(txn), table)
case _ =>
val msg = s"CatalogTable for table ${snapshotToConvert.deltaLog.tableId} " +
s"is empty in txn. Skip iceberg conversion."
throw DeltaErrors.universalFormatConversionFailedException(
snapshotToConvert.version, "iceberg", msg)
}
} catch {
case NonFatal(e) =>
logError(s"Error when converting to Iceberg metadata", e)
recordDeltaEvent(
snapshotToConvert.deltaLog,
"delta.iceberg.conversion.skipped.emptyCatalogTable",
txn.deltaLog,
"delta.iceberg.conversion.error",
data = Map(
"version" -> snapshotToConvert.version
"exception" -> ExceptionUtils.getMessage(e),
"stackTrace" -> ExceptionUtils.getStackTrace(e)
)
)
None
throw e
}
}

Expand Down
6 changes: 6 additions & 0 deletions spark/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -2320,6 +2320,12 @@
],
"sqlState" : "XXKDS"
},
"DELTA_UNIVERSAL_FORMAT_CONVERSION_FAILED" : {
"message" : [
"Failed to convert the table version <version> to the universal format <format>. <message>"
],
"sqlState" : "KD00E"
},
"DELTA_UNIVERSAL_FORMAT_VIOLATION" : {
"message" : [
"The validation of Universal Format (<format>) has failed: <violation>"
Expand Down
10 changes: 10 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3213,6 +3213,16 @@ trait DeltaErrorsBase
)
}

def universalFormatConversionFailedException(
failedOnCommitVersion: Long,
format: String,
errorMessage: String): Throwable = {
new DeltaRuntimeException(
errorClass = "DELTA_UNIVERSAL_FORMAT_CONVERSION_FAILED",
messageParameters = Array(s"$failedOnCommitVersion", format, errorMessage)
)
}

def invalidAutoCompactType(value: String): Throwable = {
new DeltaIllegalArgumentException(
errorClass = "DELTA_INVALID_AUTO_COMPACT_TYPE",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
package org.apache.spark.sql.delta.hooks

import org.apache.spark.sql.delta.{OptimisticTransactionImpl, Snapshot, UniversalFormat}
import org.apache.spark.sql.delta.DeltaErrors
import org.apache.spark.sql.delta.actions.{Action, Metadata}
import org.apache.spark.sql.delta.metering.DeltaLogging
import org.apache.spark.sql.delta.sources.DeltaSQLConf.DELTA_UNIFORM_ICEBERG_SYNC_CONVERT_ENABLED
import org.apache.commons.lang3.exception.ExceptionUtils

import org.apache.spark.sql.SparkSession

Expand Down Expand Up @@ -52,4 +54,11 @@ object IcebergConverterHook extends PostCommitHook with DeltaLogging {
converter.enqueueSnapshotForConversion(postCommitSnapshot, txn)
}
}

// Always throw when sync Iceberg conversion fails. Async conversion exception
// is handled in the async thread.
override def handleError(spark: SparkSession, error: Throwable, version: Long): Unit = {
throw DeltaErrors.universalFormatConversionFailedException(
version, "iceberg", ExceptionUtils.getMessage(error))
}
}

0 comments on commit 7261656

Please sign in to comment.