Skip to content

Commit f577290

Browse files
[Spark] Fix auto-conflict handling logic in Optimize to handle DVs (#3981)
<!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md 2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP] Your PR title ...'. 3. Be sure to keep the PR description updated to reflect all changes. 4. Please write your PR title to summarize what this PR proposes. 5. If possible, provide a concise example to reproduce the issue for a faster review. 6. If applicable, include the corresponding issue number in the PR title and link it in the body. --> #### Which Delta project/connector is this regarding? - [x] Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) ## Description Bug: There was an existing long standing bug where the custom conflict detection logic in Optimize does not catch concurrent transactions that add DVs. e.g. AddFile(path='a') -> AddFile(path='a', dv='dv1'). Fix: Updated the conflict resolution to consider a composite key of (path, dvId) instead of just depending on path. ## How was this patch tested? - unit tests ## Does this PR introduce _any_ user-facing changes? no
1 parent fc81d12 commit f577290

File tree

2 files changed

+106
-2
lines changed

2 files changed

+106
-2
lines changed

spark/src/main/scala/org/apache/spark/sql/delta/commands/OptimizeTableCommand.scala

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.metric.SQLMetrics.createMetric
4646
import org.apache.spark.sql.types._
4747
import org.apache.spark.util.{SystemClock, ThreadUtils}
4848
import org.apache.spark.sql.catalyst.catalog.CatalogTable
49+
import org.apache.spark.sql.delta.actions.InMemoryLogReplay.UniqueFileActionTuple
4950

5051
/** Base class defining abstract optimize command */
5152
abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaCommand {
@@ -453,12 +454,17 @@ class OptimizeExecutor(
453454
val metrics = createMetrics(sparkSession.sparkContext, addedFiles, removedFiles, removedDVs)
454455
commitAndRetry(txn, getOperation(), updates, metrics) { newTxn =>
455456
val newPartitionSchema = newTxn.metadata.partitionSchema
456-
val candidateSetOld = filesToProcess.map(_.path).toSet
457+
// Note: When checking if the candidate set is the same, we need to consider (Path, DV)
458+
// as the key.
459+
val candidateSetOld = filesToProcess.
460+
map(f => UniqueFileActionTuple(f.pathAsUri, f.getDeletionVectorUniqueId)).toSet
461+
457462
// We specifically don't list the files through the transaction since we are potentially
458463
// only processing a subset of them below. If the transaction is still valid, we will
459464
// register the files and predicate below
460465
val candidateSetNew =
461-
newTxn.snapshot.filesForScan(partitionPredicate).files.map(_.path).toSet
466+
newTxn.snapshot.filesForScan(partitionPredicate).files
467+
.map(f => UniqueFileActionTuple(f.pathAsUri, f.getDeletionVectorUniqueId)).toSet
462468

463469
// As long as all of the files that we compacted are still part of the table,
464470
// and the partitioning has not changed it is valid to continue to try
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/*
2+
* Copyright (2021) The Delta Lake Project Authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.apache.spark.sql.delta.optimize
17+
18+
import java.io.File
19+
20+
import scala.concurrent.duration.Duration
21+
22+
import org.apache.spark.SparkException
23+
import org.apache.spark.sql.delta._
24+
import org.apache.spark.sql.delta.concurrency.PhaseLockingTestMixin
25+
import org.apache.spark.sql.delta.concurrency.TransactionExecutionTestMixin
26+
import org.apache.spark.sql.delta.fuzzer.{OptimisticTransactionPhases, PhaseLockingTransactionExecutionObserver}
27+
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
28+
import org.apache.spark.sql.{QueryTest, Row}
29+
import org.apache.spark.sql.test.SharedSparkSession
30+
import org.apache.spark.util.ThreadUtils
31+
32+
class OptimizeConflictSuite extends QueryTest
33+
with SharedSparkSession
34+
with PhaseLockingTestMixin
35+
with TransactionExecutionTestMixin
36+
with DeltaSQLCommandTest {
37+
38+
protected def appendRows(dir: File, numRows: Int, numFiles: Int): Unit = {
39+
spark.range(start = 0, end = numRows, step = 1, numPartitions = numFiles)
40+
.write.format("delta").mode("append").save(dir.getAbsolutePath)
41+
}
42+
43+
test("conflict handling between Optimize and Business Txn") {
44+
withTempDir { tempDir =>
45+
46+
// Create table with 100 rows.
47+
appendRows(tempDir, numRows = 100, numFiles = 10)
48+
49+
// Enable DVs.
50+
sql(s"ALTER TABLE delta.`${tempDir.toString}` " +
51+
"SET TBLPROPERTIES ('delta.enableDeletionVectors' = true);")
52+
val deltaTable = io.delta.tables.DeltaTable.forPath(spark, tempDir.getAbsolutePath)
53+
54+
def optimizeTxn(): Array[Row] = {
55+
deltaTable.optimize().executeCompaction()
56+
Array.empty
57+
}
58+
59+
def deleteTxn(): Array[Row] = {
60+
// Delete 50% of the rows.
61+
sql(s"DELETE FROM delta.`${tempDir}` WHERE id%2 = 0").collect()
62+
}
63+
64+
val Seq(future) = runFunctionsWithOrderingFromObserver(Seq(optimizeTxn)) {
65+
case (optimizeObserver :: Nil) =>
66+
// Create a replacement observer for the retry thread of Optimize.
67+
val retryObserver = new PhaseLockingTransactionExecutionObserver(
68+
OptimisticTransactionPhases.forName("test-replacement-txn"))
69+
70+
// Block Optimize during the first commit attempt.
71+
optimizeObserver.setNextObserver(retryObserver, autoAdvance = true)
72+
unblockUntilPreCommit(optimizeObserver)
73+
busyWaitFor(optimizeObserver.phases.preparePhase.hasEntered, timeout)
74+
75+
// Delete starts and finishes
76+
deleteTxn()
77+
78+
// Allow Optimize to resume.
79+
unblockCommit(optimizeObserver)
80+
busyWaitFor(optimizeObserver.phases.commitPhase.hasLeft, timeout)
81+
optimizeObserver.phases.postCommitPhase.exitBarrier.unblock()
82+
83+
// The first txn will not commit as there was a conflict commit
84+
// (deleteTxn). Optimize will attempt to auto resolve and retry
85+
// Wait for the retry txn to finish.
86+
// Resume the retry txn.
87+
unblockAllPhases(retryObserver)
88+
}
89+
val e = intercept[SparkException] {
90+
ThreadUtils.awaitResult(future, timeout)
91+
}
92+
// The retry txn should fail as the same files are modified(DVs added) by
93+
// the delete txn.
94+
assert(e.getCause.getMessage.contains("DELTA_CONCURRENT_DELETE_READ"))
95+
assert(sql(s"SELECT * FROM delta.`${tempDir}`").count() == 50)
96+
}
97+
}
98+
}

0 commit comments

Comments
 (0)