[SPARK-XXXXX][SS] Use latestCommittedBatchId as currentBatchId when resuming late batch#38716
Closed
viirya wants to merge 2 commits into
Closed
[SPARK-XXXXX][SS] Use latestCommittedBatchId as currentBatchId when resuming late batch#38716viirya wants to merge 2 commits into
viirya wants to merge 2 commits into
Conversation
Member
Author
|
retest this please |
309ee28 to
3faf2f6
Compare
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This patch changes
currentBatchIdwhenMicroBatchExecutiontries to resume from late batch from offset log. Previously it takeslatestBatchIdfrom offset log. This patch changes it tolatestCommittedBatchId.Why are the changes needed?
We have customer streaming job which is unable to restart from failed status after it failed to commit delta files. For example, if previous run failed to commit 14.delta, when the job restarted, it tried to read 14.delta. Because 14.delta doesn't exist (not committed), the job cannot be restarted to resume from late batch.
When
MicroBatchExecutionpopulates start offsets, it reads late batch id (latestBatchId) from offset log and committed batch id (latestCommittedBatchId) from commit log. Currently iflatestCommittedBatchId==latestBatchId- 1, it means that we resume from late batch. But it useslatestBatchIdascurrentBatchIdto run batch. Obviously,latestBatchIdis 14 for above example, andlatestCommittedBatchIdis 13.Because
IncrementalExecutionusescurrentBatchIdto load checkpointed states, it tries to load version 14 of delta files. But version 14 is not committed in late run. So resume run always fails because it cannot load non existing delta file.We should use
latestCommittedBatchIdascurrentBatchIdinstead oflatestBatchIdfor the case resuming last batch.Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests.