Skip to content

Commit

Permalink
[SC-54155][DELTA][WARMFIX] Fail a Delta table's stream if logs are sk…
Browse files Browse the repository at this point in the history
…ipped

## What changes were proposed in this pull request?

Introduces a flag "failOnDataLoss" for the Delta streaming source. Our `getChanges` method to get the changes to a streaming source uses `listFrom` which returns the earliest available delta file after a given version. This version may be later than what we should be processing. In such cases, we will fail the stream unless the "failOnDataLoss" flag is set to false.

Imagine the following case:
  1. You start a stream
  2. Process all your data
  3. You stop your stream
  4. The table has a delta.logRetentionDuration set to 1 day, writes happen to your table, and you Delta performs log cleanup
  5. You restart your stream 2 days later. The delta files providing the changes that you need to process have already been deleted.

We should fail in the case above.

## How was this patch tested?

Unit tests

Author: Burak Yavuz <brkyvz@gmail.com>
Author: Burak Yavuz <burak@databricks.com>

#13644 is resolved by brkyvz/deltaFailOn.

GitOrigin-RevId: 355d168763d1909abd3c4a0335dd716829142ed4
  • Loading branch information
brkyvz authored and zsxwing committed Nov 5, 2020
1 parent 0fec639 commit 3a726e6
Show file tree
Hide file tree
Showing 5 changed files with 164 additions and 2 deletions.
15 changes: 15 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/DeltaErrors.scala
Expand Up @@ -216,6 +216,21 @@ object DeltaErrors
""".stripMargin, cause)
}

def failOnDataLossException(expectedVersion: Long, seenVersion: Long): Throwable = {
new IllegalStateException(
s"""The stream from your Delta table was expecting process data from version $expectedVersion,
|but the earliest available version in the _delta_log directory is $seenVersion. The files
|in the transaction log may have been deleted due to log cleanup. In order to avoid losing
|data, we recommend that you restart your stream with a new checkpoint location and to
|increase your delta.logRetentionDuration setting, if you have explicitly set it below 30
|days.
|If you would like to ignore the missed data and continue your stream from where it left
|off, you can set the .option("${DeltaOptions.FAIL_ON_DATA_LOSS_OPTION}", "false") as part
|of your readStream statement.
""".stripMargin
)
}

def staticPartitionsNotSupportedException: Throwable = {
new AnalysisException("Specifying static partitions in the partition spec is" +
" currently not supported during inserts")
Expand Down
10 changes: 9 additions & 1 deletion src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Expand Up @@ -219,12 +219,20 @@ class DeltaLog private(
* Get all actions starting from "startVersion" (inclusive). If `startVersion` doesn't exist,
* return an empty Iterator.
*/
def getChanges(startVersion: Long): Iterator[(Long, Seq[Action])] = {
def getChanges(
startVersion: Long,
failOnDataLoss: Boolean = false): Iterator[(Long, Seq[Action])] = {
val deltas = store.listFrom(deltaFile(logPath, startVersion))
.filter(f => isDeltaFile(f.getPath))
// Subtract 1 to ensure that we have the same check for the inclusive startVersion
var lastSeenVersion = startVersion - 1
deltas.map { status =>
val p = status.getPath
val version = deltaVersion(p)
if (failOnDataLoss && version > lastSeenVersion + 1) {
throw DeltaErrors.failOnDataLossException(lastSeenVersion + 1, version)
}
lastSeenVersion = version
(version, store.read(p).map(Action.fromJson))
}
}
Expand Down
5 changes: 5 additions & 0 deletions src/main/scala/org/apache/spark/sql/delta/DeltaOptions.scala
Expand Up @@ -116,6 +116,9 @@ trait DeltaReadOptions extends DeltaOptionParser {

val ignoreDeletes = options.get(IGNORE_DELETES_OPTION).exists(toBoolean(_, IGNORE_DELETES_OPTION))

val failOnDataLoss = options.get(FAIL_ON_DATA_LOSS_OPTION)
.forall(toBoolean(_, FAIL_ON_DATA_LOSS_OPTION)) // thanks to forall: by default true

val excludeRegex: Option[Regex] = try options.get(EXCLUDE_REGEX_OPTION).map(_.r) catch {
case e: PatternSyntaxException =>
throw new IllegalArgumentException(
Expand Down Expand Up @@ -176,6 +179,7 @@ object DeltaOptions extends DeltaLogging {
val IGNORE_FILE_DELETION_OPTION = "ignoreFileDeletion"
val IGNORE_CHANGES_OPTION = "ignoreChanges"
val IGNORE_DELETES_OPTION = "ignoreDeletes"
val FAIL_ON_DATA_LOSS_OPTION = "failOnDataLoss"
val OPTIMIZE_WRITE_OPTION = "optimizeWrite"
val DATA_CHANGE_OPTION = "dataChange"
val STARTING_VERSION_OPTION = "startingVersion"
Expand All @@ -191,6 +195,7 @@ object DeltaOptions extends DeltaLogging {
IGNORE_FILE_DELETION_OPTION,
IGNORE_CHANGES_OPTION,
IGNORE_DELETES_OPTION,
FAIL_ON_DATA_LOSS_OPTION,
OPTIMIZE_WRITE_OPTION,
DATA_CHANGE_OPTION,
STARTING_TIMESTAMP_OPTION,
Expand Down
Expand Up @@ -115,7 +115,7 @@ case class DeltaSource(

/** Returns matching files that were added on or after startVersion among delta logs. */
def filterAndIndexDeltaLogs(startVersion: Long): Iterator[IndexedFile] = {
deltaLog.getChanges(startVersion).flatMap { case (version, actions) =>
deltaLog.getChanges(startVersion, options.failOnDataLoss).flatMap { case (version, actions) =>
val addFiles = verifyStreamHygieneAndFilterAddFiles(actions, version)
Iterator.single(IndexedFile(version, -1, null)) ++ addFiles
.map(_.asInstanceOf[AddFile])
Expand Down
134 changes: 134 additions & 0 deletions src/test/scala/org/apache/spark/sql/delta/DeltaSourceSuite.scala
Expand Up @@ -1503,6 +1503,140 @@ class DeltaSourceSuite extends DeltaSourceSuiteBase with DeltaSQLCommandTest {
assert(e.getCause.getMessage.contains("version"))
}
}

test("fail on data loss - starting from missing files") {
withTempDirs { case (srcData, targetData, chkLocation) =>
def addData(): Unit = {
spark.range(10).write.format("delta").mode("append").save(srcData.getCanonicalPath)
}

addData()
val df = spark.readStream.format("delta").load(srcData.getCanonicalPath)

val q = df.writeStream.format("delta")
.option("checkpointLocation", chkLocation.getCanonicalPath)
.start(targetData.getCanonicalPath)
q.processAllAvailable()
q.stop()

addData()
addData()
addData()

val srcLog = DeltaLog.forTable(spark, srcData)
// Delete the first file
assert(new File(FileNames.deltaFile(srcLog.logPath, 1).toUri).delete())

val e = intercept[StreamingQueryException] {
val q = df.writeStream.format("delta")
.option("checkpointLocation", chkLocation.getCanonicalPath)
.start(targetData.getCanonicalPath)
q.processAllAvailable()
}
assert(e.getCause.getMessage === DeltaErrors.failOnDataLossException(1L, 2L).getMessage)
}
}

test("fail on data loss - gaps of files") {
withTempDirs { case (srcData, targetData, chkLocation) =>
def addData(): Unit = {
spark.range(10).write.format("delta").mode("append").save(srcData.getCanonicalPath)
}

addData()
val df = spark.readStream.format("delta").load(srcData.getCanonicalPath)

val q = df.writeStream.format("delta")
.option("checkpointLocation", chkLocation.getCanonicalPath)
.start(targetData.getCanonicalPath)
q.processAllAvailable()
q.stop()

addData()
addData()
addData()

val srcLog = DeltaLog.forTable(spark, srcData)
// Delete the second file
assert(new File(FileNames.deltaFile(srcLog.logPath, 2).toUri).delete())

val e = intercept[StreamingQueryException] {
val q = df.writeStream.format("delta")
.option("checkpointLocation", chkLocation.getCanonicalPath)
.start(targetData.getCanonicalPath)
q.processAllAvailable()
}
assert(e.getCause.getMessage === DeltaErrors.failOnDataLossException(2L, 3L).getMessage)
}
}

test("fail on data loss - starting from missing files with option off") {
withTempDirs { case (srcData, targetData, chkLocation) =>
def addData(): Unit = {
spark.range(10).write.format("delta").mode("append").save(srcData.getCanonicalPath)
}

addData()
val df = spark.readStream.format("delta").option("failOnDataLoss", "false")
.load(srcData.getCanonicalPath)

val q = df.writeStream.format("delta")
.option("checkpointLocation", chkLocation.getCanonicalPath)
.start(targetData.getCanonicalPath)
q.processAllAvailable()
q.stop()

addData()
addData()
addData()

val srcLog = DeltaLog.forTable(spark, srcData)
// Delete the first file
assert(new File(FileNames.deltaFile(srcLog.logPath, 1).toUri).delete())

val q2 = df.writeStream.format("delta")
.option("checkpointLocation", chkLocation.getCanonicalPath)
.start(targetData.getCanonicalPath)
q2.processAllAvailable()
q2.stop()

assert(spark.read.format("delta").load(targetData.getCanonicalPath).count() === 30)
}
}

test("fail on data loss - gaps of files with option off") {
withTempDirs { case (srcData, targetData, chkLocation) =>
def addData(): Unit = {
spark.range(10).write.format("delta").mode("append").save(srcData.getCanonicalPath)
}

addData()
val df = spark.readStream.format("delta").option("failOnDataLoss", "false")
.load(srcData.getCanonicalPath)

val q = df.writeStream.format("delta")
.option("checkpointLocation", chkLocation.getCanonicalPath)
.start(targetData.getCanonicalPath)
q.processAllAvailable()
q.stop()

addData()
addData()
addData()

val srcLog = DeltaLog.forTable(spark, srcData)
// Delete the second file
assert(new File(FileNames.deltaFile(srcLog.logPath, 2).toUri).delete())

val q2 = df.writeStream.format("delta")
.option("checkpointLocation", chkLocation.getCanonicalPath)
.start(targetData.getCanonicalPath)
q2.processAllAvailable()
q2.stop()

assert(spark.read.format("delta").load(targetData.getCanonicalPath).count() === 30)
}
}
}

/**
Expand Down

0 comments on commit 3a726e6

Please sign in to comment.