Skip to content

Commit

Permalink
Don't commit empty transactions on Write
Browse files Browse the repository at this point in the history
Closes #1934
  • Loading branch information
watfordkcf authored and vkorukanti committed Sep 7, 2023
1 parent 27f2ce6 commit d9ba620
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 2 deletions.
Expand Up @@ -100,8 +100,7 @@ case class WriteIntoDelta(
mode, Option(partitionColumns),
options.replaceWhere, options.userMetadata
)
txn.commit(actions, operation)

txn.commitIfNeeded(actions, operation)
}
Seq.empty
}
Expand Down
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.util.FileNames
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.Row
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.expressions.{EqualTo, Literal}
Expand All @@ -35,6 +36,8 @@ class OptimisticTransactionSuite
extends OptimisticTransactionLegacyTests
with OptimisticTransactionSuiteBase {

import testImplicits._

// scalastyle:off: removeFile
private val addA = createTestAddFile(path = "a")
private val addB = createTestAddFile(path = "b")
Expand Down Expand Up @@ -642,4 +645,59 @@ class OptimisticTransactionSuite
checkLastCommitTags(expectedTags = Some(tags2))
}
}


test("empty commits are elided on write by default") {
withTempDir { tableDir =>
val df = Seq((1, 0), (2, 1)).toDF("key", "value")
df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val deltaLog = DeltaLog.forTable(spark, tableDir)

val expectedSnapshot = deltaLog.update()
val expectedDeltaVersion = expectedSnapshot.version

val emptyDf = Seq.empty[(Integer, Integer)].toDF("key", "value")
emptyDf.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val actualSnapshot = deltaLog.update()
val actualDeltaVersion = actualSnapshot.version

checkAnswer(spark.read.format("delta").load(tableDir.getCanonicalPath),
Row(1, 0) :: Row(2, 1) :: Nil)

assert(expectedDeltaVersion === actualDeltaVersion)
}
}

Seq(true, false).foreach { skip =>
test(s"Elide empty commits when requested - skipRecordingEmptyCommits=$skip") {
withSQLConf(DeltaSQLConf.DELTA_SKIP_RECORDING_EMPTY_COMMITS.key -> skip.toString) {
withTempDir { tableDir =>
val df = Seq((1, 0), (2, 1)).toDF("key", "value")
df.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val deltaLog = DeltaLog.forTable(spark, tableDir)

val expectedSnapshot = deltaLog.update()
val expectedDeltaVersion = if (skip) {
expectedSnapshot.version
} else {
expectedSnapshot.version + 1
}

val emptyDf = Seq.empty[(Integer, Integer)].toDF("key", "value")
emptyDf.write.format("delta").mode("append").save(tableDir.getCanonicalPath)

val actualSnapshot = deltaLog.update()
val actualDeltaVersion = actualSnapshot.version

checkAnswer(spark.read.format("delta").load(tableDir.getCanonicalPath),
Row(1, 0) :: Row(2, 1) :: Nil)

assert(expectedDeltaVersion === actualDeltaVersion)
}
}
}
}
}

0 comments on commit d9ba620

Please sign in to comment.