Skip to content

Commit be9718d

Browse files
authored
[Spark] Fix race condition in Uniform conversion (#3189)
## Description This PR fixes a race condition in UniForm Iceberg Converter. Before our change, UniForm Iceberg Converter executes as follows: 1. Read `lastConvertedDeltaVersion` from Iceberg latest snapshot 2. Convert the delta commits starting from `lastConvertedDeltaVersion` to iceberg snapshots 3. Commit the iceberg snapshots. When there are multiple iceberg conversion threads, a race condition may occur, causing one delta commit to be written into multiple Iceberg snapshots, and data corruption. As an example, considering we have a UniForm table with latest delta version and iceberg version both 1. Two threads A and B start writing to delta tables. 1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` = 1, and converts delta version 2. 2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` = 1, and converts delta version 2, 3. 3. Thread A commits Iceberg version 2, including converted delta version 2. 4. Thread B commits Iceberg version 3, including converted delta version 2 and 3. When both threads commit to Iceberg, we will have delta version 2 included in iceberg history twice as different snapshots. If version 2 is an AddFile, that means we insert the same data twice into iceberg. Our fix works as follows: 1. Read `lastConvertedDeltaVersion` and **a new field** `lastConvertedIcebergSnapshotId` from Iceberg latest snapshot 2. Convert the delta commits starting from `lastConvertedDeltaVersion` to iceberg snapshots 5. Before Iceberg Commits, checks that the base snapshot ID of this transaction equals `lastConvertedIcebergSnapshotId` (**this check is the core of this change**) 6. Commit the iceberg snapshots. This change makes sure we are only committing against a specific Iceberg snapshot, and will abort if the snapshot we want to commit against is not the latest one. As an example, our fix will successfully block the example above. 1. Thread A writes Delta version 2, reads `lastConvertedDeltaVersion` = 1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2. 2. Thread B writes Delta version 3, reads `lastConvertedDeltaVersion` = 1, `lastConvertedIcebergSnapshotId` = S0 and converts delta version 2, 3. 3. Thread A creates an Iceberg transaction with parent snapshot S0. Because `lastConvertedIcebergSnapshotId` is also S0, it commits and update iceberg latest snapshot to S1. 4. Thread B creates an Iceberg transaction, with parent snapshot S1. Because `lastConvertedIcebergSnapshotId` is S0 != S1, it aborts the conversion.
1 parent c0b3c97 commit be9718d

File tree

2 files changed

+63
-7
lines changed

2 files changed

+63
-7
lines changed

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConversionTransaction.scala

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616

1717
package org.apache.spark.sql.delta.icebergShaded
1818

19+
import java.util.ConcurrentModificationException
20+
1921
import scala.collection.JavaConverters._
2022
import scala.collection.mutable.ArrayBuffer
2123
import scala.util.control.NonFatal
@@ -29,7 +31,6 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
2931
import org.apache.commons.lang3.exception.ExceptionUtils
3032
import org.apache.hadoop.conf.Configuration
3133
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
32-
import shadedForDelta.org.apache.iceberg.hadoop.HadoopTables
3334
import shadedForDelta.org.apache.iceberg.mapping.MappingUtil
3435
import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser
3536

@@ -48,12 +49,15 @@ case object REPLACE_TABLE extends IcebergTableOp
4849
* @param conf Configuration for Iceberg Hadoop interactions.
4950
* @param postCommitSnapshot Latest Delta snapshot associated with this Iceberg commit.
5051
* @param tableOp How to instantiate the underlying Iceberg table. Defaults to WRITE_TABLE.
52+
* @param lastConvertedIcebergSnapshotId the iceberg snapshot this Iceberg txn should write to.
53+
* @param lastConvertedDeltaVersion the delta version this Iceberg txn starts from.
5154
*/
5255
class IcebergConversionTransaction(
5356
protected val catalogTable: CatalogTable,
5457
protected val conf: Configuration,
5558
protected val postCommitSnapshot: Snapshot,
5659
protected val tableOp: IcebergTableOp = WRITE_TABLE,
60+
protected val lastConvertedIcebergSnapshotId: Option[Long] = None,
5761
protected val lastConvertedDeltaVersion: Option[Long] = None) extends DeltaLogging {
5862

5963
///////////////////////////
@@ -197,7 +201,7 @@ class IcebergConversionTransaction(
197201
DeltaFileProviderUtils.createJsonStatsParser(postCommitSnapshot.statsSchema)
198202

199203
/** Visible for testing. */
200-
private[icebergShaded]val txn = createIcebergTxn()
204+
private[icebergShaded]val (txn, startFromSnapshotId) = withStartSnapshotId(createIcebergTxn())
201205

202206
/** Tracks if this transaction has already committed. You can only commit once. */
203207
private var committed = false
@@ -320,6 +324,25 @@ class IcebergConversionTransaction(
320324
.set(IcebergConverter.ICEBERG_NAME_MAPPING_PROPERTY, nameMapping)
321325
.commit()
322326

327+
// We ensure the iceberg txns are serializable by only allowing them to commit against
328+
// lastConvertedIcebergSnapshotId.
329+
//
330+
// If the startFromSnapshotId is non-empty and not the same as lastConvertedIcebergSnapshotId,
331+
// there is a new iceberg transaction committed after we read lastConvertedIcebergSnapshotId,
332+
// and before this check. We explicitly abort by throwing exceptions.
333+
//
334+
// If startFromSnapshotId is empty, the txn must be one of the following:
335+
// 1. CREATE_TABLE
336+
// 2. Writing to an empty table
337+
// 3. REPLACE_TABLE
338+
// In either case this txn is safe to commit.
339+
//
340+
// Iceberg will further guarantee that txns passed this check are serializable.
341+
if (startFromSnapshotId.isDefined && lastConvertedIcebergSnapshotId != startFromSnapshotId) {
342+
throw new ConcurrentModificationException("Cannot commit because the converted " +
343+
s"metadata is based on a stale iceberg snapshot $lastConvertedIcebergSnapshotId"
344+
)
345+
}
323346
try {
324347
txn.commitTransaction()
325348
if (tableOp == CREATE_TABLE) {
@@ -399,6 +422,16 @@ class IcebergConversionTransaction(
399422
// Helper Methods //
400423
////////////////////
401424

425+
/**
426+
* We fetch the txn table's current snapshot id before any writing is made on the transaction.
427+
* This id should equal [[lastConvertedIcebergSnapshotId]] for the transaction to commit.
428+
*
429+
* @param txn the iceberg transaction
430+
* @return txn and the snapshot id just before this txn
431+
*/
432+
private def withStartSnapshotId(txn: IcebergTransaction): (IcebergTransaction, Option[Long]) =
433+
(txn, Option(txn.table().currentSnapshot()).map(_.snapshotId()))
434+
402435
private def recordIcebergCommit(errorOpt: Option[Throwable] = None): Unit = {
403436
val icebergTxnTypes =
404437
if (fileUpdates.nonEmpty) Map("icebergTxnTypes" -> fileUpdates.map(_.opType)) else Map.empty

iceberg/src/main/scala/org/apache/spark/sql/delta/icebergShaded/IcebergConverter.scala

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.metering.DeltaLogging
3030
import org.apache.spark.sql.delta.sources.DeltaSQLConf
3131
import org.apache.commons.lang3.exception.ExceptionUtils
3232
import org.apache.hadoop.fs.Path
33+
import shadedForDelta.org.apache.iceberg.{Table => IcebergTable}
3334
import shadedForDelta.org.apache.iceberg.hive.{HiveCatalog, HiveTableOperations}
3435

3536
import org.apache.spark.sql.SparkSession
@@ -51,6 +52,9 @@ object IcebergConverter {
5152
val DELTA_TIMESTAMP_PROPERTY = "delta-timestamp"
5253

5354
val ICEBERG_NAME_MAPPING_PROPERTY = "schema.name-mapping.default"
55+
56+
def getLastConvertedDeltaVersion(table: Option[IcebergTable]): Option[Long] =
57+
table.flatMap(_.properties().asScala.get(DELTA_VERSION_PROPERTY)).map(_.toLong)
5458
}
5559

5660
/**
@@ -235,13 +239,16 @@ class IcebergConverter(spark: SparkSession)
235239
catalogTable: CatalogTable): Option[(Long, Long)] =
236240
recordFrameProfile("Delta", "IcebergConverter.convertSnapshot") {
237241
val log = snapshotToConvert.deltaLog
238-
val lastDeltaVersionConverted: Option[Long] =
239-
loadLastDeltaVersionConverted(snapshotToConvert, catalogTable)
242+
val lastConvertedIcebergTable = loadIcebergTable(snapshotToConvert, catalogTable)
243+
val lastConvertedIcebergSnapshotId =
244+
lastConvertedIcebergTable.flatMap(it => Option(it.currentSnapshot())).map(_.snapshotId())
245+
val lastDeltaVersionConverted = IcebergConverter
246+
.getLastConvertedDeltaVersion(lastConvertedIcebergTable)
240247
val maxCommitsToConvert =
241248
spark.sessionState.conf.getConf(DeltaSQLConf.ICEBERG_MAX_COMMITS_TO_CONVERT)
242249

243250
// Nth to convert
244-
if (lastDeltaVersionConverted.exists(_ == snapshotToConvert.version)) {
251+
if (lastDeltaVersionConverted.contains(snapshotToConvert.version)) {
245252
return None
246253
}
247254

@@ -276,7 +283,8 @@ class IcebergConverter(spark: SparkSession)
276283
}
277284

278285
val icebergTxn = new IcebergConversionTransaction(
279-
catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp, lastDeltaVersionConverted)
286+
catalogTable, log.newDeltaHadoopConf(), snapshotToConvert, tableOp,
287+
lastConvertedIcebergSnapshotId, lastDeltaVersionConverted)
280288

281289
// Write out the actions taken since the last conversion (or since table creation).
282290
// This is done in batches, with each batch corresponding either to one delta file,
@@ -349,9 +357,24 @@ class IcebergConverter(spark: SparkSession)
349357
override def loadLastDeltaVersionConverted(
350358
snapshot: Snapshot, catalogTable: CatalogTable): Option[Long] =
351359
recordFrameProfile("Delta", "IcebergConverter.loadLastDeltaVersionConverted") {
352-
catalogTable.properties.get(IcebergConverter.DELTA_VERSION_PROPERTY).map(_.toLong)
360+
IcebergConverter.getLastConvertedDeltaVersion(loadIcebergTable(snapshot, catalogTable))
353361
}
354362

363+
protected def loadIcebergTable(
364+
snapshot: Snapshot, catalogTable: CatalogTable): Option[IcebergTable] = {
365+
recordFrameProfile("Delta", "IcebergConverter.loadLastConvertedIcebergTable") {
366+
val hiveCatalog = IcebergTransactionUtils
367+
.createHiveCatalog(snapshot.deltaLog.newDeltaHadoopConf())
368+
val icebergTableId = IcebergTransactionUtils
369+
.convertSparkTableIdentifierToIcebergHive(catalogTable.identifier)
370+
if (hiveCatalog.tableExists(icebergTableId)) {
371+
Some(hiveCatalog.loadTable(icebergTableId))
372+
} else {
373+
None
374+
}
375+
}
376+
}
377+
355378
/**
356379
* Build an iceberg TransactionHelper from the provided txn, and commit the set of changes
357380
* specified by the actionsToCommit.

0 commit comments

Comments
 (0)