Skip to content

Commit

Permalink
[Spark] Skip collecting commit stats to prevent computing Snapshot St…
Browse files Browse the repository at this point in the history
…ate (#2718)

## Description

Before this PR, Delta computes a
[SnapshotState](https://github.com/delta-io/delta/blob/v3.1.0/spark/src/main/scala/org/apache/spark/sql/delta/SnapshotState.scala#L46-L58)
during every commit. Computing a SnapshotState is fairly slow and
expensive, because it involves reading the entirety of a checkpoint,
sidecars, and log segment.

For many types of commit, it should be unnecessary to compute the
SnapshotState.

After this PR, a transaction can avoid computing the SnapshotState of a
newly created snapshot. Skipping the computation is enabled via a spark
configuration option `spark.databricks.delta.commitStats.collect=false`

This change can have a big performance impact when writing into a Delta
Table. Especially when the table comprises a large number of underlying
data files.

## How was this patch tested?

- Locally built delta-spark
- Ran a small spark job to insert rows into a delta table
- Inspected log4j output to see if snapshot state was computed
- Repeated again, this time setting
`spark.databricks.delta.commitStats.collect=false`

Simple demo job that triggers computing SnapshotState, before this PR:

```scala
val spark = SparkSession
  .builder
  .appName("myapp")
  .master("local[*]")
  .config("spark.sql.warehouse.dir", "./warehouse")
  .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate

spark.sql("""CREATE TABLE test_delta(id string) USING DELTA """)

spark.sql("""
  INSERT INTO test_delta (id) VALUES (42)
  """)

spark.close()
```

## Does this PR introduce _any_ user-facing changes?

Yes, after this PR the user can set spark config option
`spark.databricks.delta.commitStats.collect=false` to avoid computing
SnapshotState after a commit.
  • Loading branch information
istreeter committed Apr 17, 2024
1 parent 1b210c2 commit bba0e94
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1874,6 +1874,13 @@ trait OptimisticTransactionImpl extends TransactionalWrite
val info = currentTransactionInfo.commitInfo
.map(_.copy(readVersion = None, isolationLevel = None)).orNull
setNeedsCheckpoint(attemptVersion, postCommitSnapshot)
val doCollectCommitStats =
needsCheckpoint || spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_FORCE_ALL_COMMIT_STATS)

// Stats that force an expensive snapshot state reconstruction:
val numFilesTotal = if (doCollectCommitStats) postCommitSnapshot.numOfFiles else -1L
val sizeInBytesTotal = if (doCollectCommitStats) postCommitSnapshot.sizeInBytes else -1L

val stats = CommitStats(
startVersion = snapshot.version,
commitVersion = attemptVersion,
Expand All @@ -1887,8 +1894,8 @@ trait OptimisticTransactionImpl extends TransactionalWrite
numRemove = numRemove,
numSetTransaction = numSetTransaction,
bytesNew = bytesNew,
numFilesTotal = postCommitSnapshot.numOfFiles,
sizeInBytesTotal = postCommitSnapshot.sizeInBytes,
numFilesTotal = numFilesTotal,
sizeInBytesTotal = sizeInBytesTotal,
numCdcFiles = numCdcFiles,
cdcBytesNew = cdcBytesNew,
protocol = postCommitSnapshot.protocol,
Expand Down
6 changes: 5 additions & 1 deletion spark/src/main/scala/org/apache/spark/sql/delta/RowId.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,11 @@ object RowId {
* Extracts the high watermark of row IDs from a snapshot.
*/
private[delta] def extractHighWatermark(snapshot: Snapshot): Option[Long] =
RowTrackingMetadataDomain.fromSnapshot(snapshot).map(_.rowIdHighWaterMark)
if (isSupported(snapshot.protocol)) {
RowTrackingMetadataDomain.fromSnapshot(snapshot).map(_.rowIdHighWaterMark)
} else {
None
}

/** Base Row ID column name */
val BASE_ROW_ID = "base_row_id"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,17 @@ trait DeltaSQLConfBase {
.stringConf
.createOptional

val DELTA_FORCE_ALL_COMMIT_STATS =
buildConf("commitStats.force")
.internal()
.doc(
"""When true, forces commit statistics to be collected for logging purposes.
| Enabling this feature requires the Snapshot State to be computed, which is
| potentially expensive.
""".stripMargin)
.booleanConf
.createWithDefault(false)

val DELTA_CONVERT_USE_METADATA_LOG =
buildConf("convert.useMetadataLog")
.doc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ abstract class DeleteSuiteBase extends QueryTest
test("schema pruning on data condition") {
val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
append(input, Nil)
// Start from a cached snapshot state
deltaLog.update().stateDF

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkDelete(Some("key = 2"),
Expand All @@ -347,6 +349,8 @@ abstract class DeleteSuiteBase extends QueryTest
val input = Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
.select(struct("key", "value").alias("nested"))
append(input, Nil)
// Start from a cached snapshot state
deltaLog.update().stateDF

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkDelete(Some("nested.key = 2"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -890,4 +890,22 @@ class OptimisticTransactionSuite
}
}
}

test("Append does not trigger snapshot state computation") {
withTempDir { tableDir =>
val df = Seq((1, 0), (2, 1)).toDF("key", "value")
df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val deltaLog = DeltaLog.forTable(spark, tableDir)
val preCommitSnapshot = deltaLog.update()
assert(!preCommitSnapshot.stateReconstructionTriggered)

df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val postCommitSnapshot = deltaLog.update()
assert(!preCommitSnapshot.stateReconstructionTriggered)
assert(!postCommitSnapshot.stateReconstructionTriggered)
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,8 @@ abstract class UpdateSuiteBase

test("schema pruning on finding files to update") {
append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value"))
// Start from a cached snapshot state
deltaLog.update().stateDF

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkUpdate(condition = Some("key = 2"), setClauses = "key = 1, value = 3",
Expand All @@ -717,6 +719,8 @@ abstract class UpdateSuiteBase
test("nested schema pruning on finding files to update") {
append(Seq((2, 2), (1, 4), (1, 1), (0, 3)).toDF("key", "value")
.select(struct("key", "value").alias("nested")))
// Start from a cached snapshot state
deltaLog.update().stateDF

val executedPlans = DeltaTestUtils.withPhysicalPlansCaptured(spark) {
checkUpdate(condition = Some("nested.key = 2"),
Expand Down

0 comments on commit bba0e94

Please sign in to comment.