Skip to content

Commit e65d06e

Browse files
authored
add retry logic for delta uniform iceberg conversion (#3856)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? <!-- Please add the component selected below to the beginning of the pull request title For example: [Spark] Title of my pull request --> - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description <!-- - Describe what this PR changes. - Describe why we need the change. If this PR resolves an issue be sure to include "Resolves #XXX" to correctly link and close the issue upon merge. --> add retry logic for delta uniform iceberg conversion ## How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to test the changes thoroughly including negative and positive cases if possible. If the changes were tested in any way other than unit tests, please clarify how you tested step by step (ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future). If the changes were not tested, please explain why. --> ## Does this PR introduce _any_ user-facing changes? <!-- If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Delta Lake versions or within the unreleased branches such as master. If no, write 'No'. -->
1 parent 95d493c commit e65d06e

File tree

3 files changed

+77
-6
lines changed

3 files changed

+77
-6
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.apache.spark.sql.delta.icebergShaded.IcebergTransactionUtils._
2929
import org.apache.spark.sql.delta.logging.DeltaLogKeys
3030
import org.apache.spark.sql.delta.metering.DeltaLogging
3131
import org.apache.spark.sql.delta.schema.SchemaUtils
32+
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3233
import org.apache.commons.lang3.exception.ExceptionUtils
3334
import org.apache.hadoop.conf.Configuration
3435
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
@@ -37,6 +38,7 @@ import shadedForDelta.org.apache.iceberg.mapping.MappingUtil
3738
import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser
3839

3940
import org.apache.spark.internal.MDC
41+
import org.apache.spark.sql.SparkSession
4042
import org.apache.spark.sql.catalyst.catalog.CatalogTable
4143

4244
sealed trait IcebergTableOp
@@ -56,6 +58,7 @@ case object REPLACE_TABLE extends IcebergTableOp
5658
* @param lastConvertedDeltaVersion the delta version this Iceberg txn starts from.
5759
*/
5860
class IcebergConversionTransaction(
61+
protected val spark: SparkSession,
5962
protected val catalogTable: CatalogTable,
6063
protected val conf: Configuration,
6164
protected val postCommitSnapshot: Snapshot,
@@ -342,11 +345,18 @@ class IcebergConversionTransaction(
342345
// possible legitimate Delta version which is 0.
343346
val deltaVersion = if (tableOp == CREATE_TABLE) -1 else postCommitSnapshot.version
344347

345-
txn.updateProperties()
346-
.set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString)
348+
var updateTxn = txn.updateProperties()
349+
updateTxn = updateTxn.set(IcebergConverter.DELTA_VERSION_PROPERTY, deltaVersion.toString)
347350
.set(IcebergConverter.DELTA_TIMESTAMP_PROPERTY, postCommitSnapshot.timestamp.toString)
348351
.set(IcebergConstants.ICEBERG_NAME_MAPPING_PROPERTY, nameMapping)
349-
.commit()
352+
353+
if (spark.sessionState.conf.getConf(
354+
DeltaSQLConf.DELTA_UNIFORM_ICEBERG_INCLUDE_BASE_CONVERTED_VERSION)) {
355+
lastConvertedDeltaVersion.foreach { v =>
356+
updateTxn = updateTxn.set(IcebergConverter.BASE_DELTA_VERSION_PROPERTY, v.toString)
357+
}
358+
}
359+
updateTxn.commit()
350360

351361
// We ensure the iceberg txns are serializable by only allowing them to commit against
352362
// lastConvertedIcebergSnapshotId.

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala

Lines changed: 48 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
3333
import org.apache.commons.lang3.exception.ExceptionUtils
3434
import org.apache.hadoop.fs.Path
3535
import shadedForDelta.org.apache.iceberg.{Table => IcebergTable}
36+
import shadedForDelta.org.apache.iceberg.exceptions.CommitFailedException
3637
import shadedForDelta.org.apache.iceberg.hive.{HiveCatalog, HiveTableOperations}
3738

3839
import org.apache.spark.internal.MDC
@@ -54,8 +55,17 @@ object IcebergConverter {
5455
*/
5556
val DELTA_TIMESTAMP_PROPERTY = "delta-timestamp"
5657

58+
/**
59+
* Property to be set in translated Iceberg metadata files.
60+
* Indicates the base delta commit version # that the conversion started from
61+
*/
62+
val BASE_DELTA_VERSION_PROPERTY = "base-delta-version"
63+
5764
def getLastConvertedDeltaVersion(table: Option[IcebergTable]): Option[Long] =
5865
table.flatMap(_.properties().asScala.get(DELTA_VERSION_PROPERTY)).map(_.toLong)
66+
67+
def getLastConvertedDeltaTimestamp(table: Option[IcebergTable]): Option[Long] =
68+
table.flatMap(_.properties().asScala.get(DELTA_TIMESTAMP_PROPERTY)).map(_.toLong)
5969
}
6070

6171
/**
@@ -177,7 +187,7 @@ class IcebergConverter(spark: SparkSession)
177187
override def convertSnapshot(
178188
snapshotToConvert: Snapshot, catalogTable: CatalogTable): Option[(Long, Long)] = {
179189
try {
180-
convertSnapshot(snapshotToConvert, None, catalogTable)
190+
convertSnapshotWithRetry(snapshotToConvert, None, catalogTable)
181191
} catch {
182192
case NonFatal(e) =>
183193
logError(log"Error when converting to Iceberg metadata", e)
@@ -205,7 +215,7 @@ class IcebergConverter(spark: SparkSession)
205215
snapshotToConvert: Snapshot, txn: OptimisticTransactionImpl): Option[(Long, Long)] = {
206216
try {
207217
txn.catalogTable match {
208-
case Some(table) => convertSnapshot(snapshotToConvert, Some(txn), table)
218+
case Some(table) => convertSnapshotWithRetry(snapshotToConvert, Some(txn), table)
209219
case _ =>
210220
val msg = s"CatalogTable for table ${snapshotToConvert.deltaLog.tableId} " +
211221
s"is empty in txn. Skip iceberg conversion."
@@ -227,6 +237,41 @@ class IcebergConverter(spark: SparkSession)
227237
}
228238
}
229239

240+
/**
241+
* Convert the specified snapshot into Iceberg with retry
242+
*/
243+
private def convertSnapshotWithRetry(
244+
snapshotToConvert: Snapshot,
245+
txnOpt: Option[OptimisticTransactionImpl],
246+
catalogTable: CatalogTable,
247+
maxRetry: Int =
248+
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_UNIFORM_ICEBERG_RETRY_TIMES)
249+
): Option[(Long, Long)] = {
250+
var retryAttempt = 0
251+
while (retryAttempt < maxRetry) {
252+
try {
253+
return convertSnapshot(snapshotToConvert, txnOpt, catalogTable)
254+
} catch {
255+
case e: CommitFailedException if retryAttempt < maxRetry =>
256+
retryAttempt += 1
257+
val lastConvertedIcebergTable = loadIcebergTable(snapshotToConvert, catalogTable)
258+
val lastDeltaVersionConverted = IcebergConverter
259+
.getLastConvertedDeltaVersion(lastConvertedIcebergTable)
260+
val lastConvertedDeltaTimestamp = IcebergConverter
261+
.getLastConvertedDeltaTimestamp(lastConvertedIcebergTable)
262+
// Do not retry if the current or higher Delta version is already converted
263+
(lastDeltaVersionConverted, lastConvertedDeltaTimestamp) match {
264+
case (Some(version), Some(timestamp)) if version >= snapshotToConvert.version =>
265+
return Some(version, timestamp)
266+
case _ =>
267+
logWarning(s"CommitFailedException when converting to Iceberg metadata;" +
268+
s" retry count $retryAttempt", e)
269+
}
270+
}
271+
}
272+
throw new IllegalStateException("should not happen")
273+
}
274+
230275
/**
231276
* Convert the specified snapshot into Iceberg. NOTE: This operation is blocking. Call
232277
* enqueueSnapshotForConversion to run the operation asynchronously.
@@ -288,7 +333,7 @@ class IcebergConverter(spark: SparkSession)
288333
}
289334

290335
val icebergTxn = new IcebergConversionTransaction(
291-
cleanedCatalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp,
336+
spark, cleanedCatalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp,
292337
lastConvertedIcebergSnapshotId, lastDeltaVersionConverted)
293338

294339
// Write out the actions taken since the last conversion (or since table creation).

spark/src/main/scala/org/apache/spark/sql/delta/sources/DeltaSQLConf.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,6 +1412,22 @@ trait DeltaSQLConfBase {
14121412
.booleanConf
14131413
.createWithDefault(false)
14141414

1415+
val DELTA_UNIFORM_ICEBERG_RETRY_TIMES =
1416+
buildConf("uniform.iceberg.retry.times")
1417+
.doc("The number of retries iceberg conversions should have in case " +
1418+
"of failures")
1419+
.internal()
1420+
.intConf
1421+
.createWithDefault(3)
1422+
1423+
val DELTA_UNIFORM_ICEBERG_INCLUDE_BASE_CONVERTED_VERSION =
1424+
buildConf("uniform.iceberg.include.base.converted.version")
1425+
.doc("If true, include the base converted delta version as a tbl property in Iceberg " +
1426+
"metadata to indicate the delta version that the conversion started from")
1427+
.internal()
1428+
.booleanConf
1429+
.createWithDefault(true)
1430+
14151431
val DELTA_OPTIMIZE_MIN_FILE_SIZE =
14161432
buildConf("optimize.minFileSize")
14171433
.internal()

0 commit comments

Comments
 (0)