Skip to content

Commit

Permalink
[SPARK-26425][SS] Add more constraint checks to avoid checkpoint corr…
Browse files Browse the repository at this point in the history
…uption

### What changes were proposed in this pull request?

Credits to tdas who reported and described the fix to [SPARK-26425](https://issues.apache.org/jira/browse/SPARK-26425). I just followed the description of the issue.

This patch adds more checks on commit log as well as file streaming source so that multiple concurrent runs of streaming query don't mess up the status of query/checkpoint. This patch addresses two different spots which are having a bit different issues:

1. FileStreamSource.fetchMaxOffset()

In structured streaming, we don't allow multiple streaming queries to run with same checkpoint (including concurrent runs of same query), so query should fail if it fails to write the metadata of specific batch ID due to same batch ID being written by others.

2. commit log

As described in JIRA issue, assertion is already applied to the `offsetLog` for the same reason.

https://github.com/apache/spark/blob/8167714cab93a5c06c23f92c9077fe8b9677ab28/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala#L394-L402

This patch applied the same for commit log.

### Why are the changes needed?

This prevents the inconsistent behavior on streaming query and lets query fail instead.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

N/A, as the change is simple and obvious, and it's really hard to artificially reproduce the issue.

Closes #25965 from HeartSaVioR/SPARK-26425.

Lead-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Co-authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
  • Loading branch information
HeartSaVioR and HeartSaVioR committed Sep 17, 2020
1 parent 7fdb571 commit d936cb3
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,10 +178,16 @@ class FileStreamSource(

if (batchFiles.nonEmpty) {
metadataLogCurrentOffset += 1
metadataLog.add(metadataLogCurrentOffset, batchFiles.map { case (p, timestamp) =>

val fileEntries = batchFiles.map { case (p, timestamp) =>
FileEntry(path = p, timestamp = timestamp, batchId = metadataLogCurrentOffset)
}.toArray)
logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files")
}.toArray
if (metadataLog.add(metadataLogCurrentOffset, fileEntries)) {
logInfo(s"Log offset set to $metadataLogCurrentOffset with ${batchFiles.size} new files")
} else {
throw new IllegalStateException("Concurrent update to the log. Multiple streaming jobs " +
s"detected for $metadataLogCurrentOffset")
}
}

FileStreamSourceOffset(metadataLogCurrentOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,7 +598,9 @@ class MicroBatchExecution(
withProgressLocked {
sinkCommitProgress = batchSinkProgress
watermarkTracker.updateWatermark(lastExecution.executedPlan)
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
assert(commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark)),
"Concurrent update to the commit log. Multiple streaming jobs detected for " +
s"$currentBatchId")
committedOffsets ++= availableOffsets
}
logDebug(s"Completed batch ${currentBatchId}")
Expand Down

0 comments on commit d936cb3

Please sign in to comment.