[SPARK-28214][STREAMING][TESTS] CheckpointSuite: wait for batch to be fully processed before accessing DStreamCheckpointData #25731
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This patch fixes the bug regarding accessing
DStreamCheckpointData.currentCheckpointFiles
without guarding which makes the testbasic rdd checkpoints + dstream graph checkpoint recovery
being flaky.There're two possible points to make test failing:
DStreamCheckpointData.update
: it clearscurrentCheckpointFiles
and adds new checkpointFiles. Race condition can happen between main thread for test and JobGenerator's event loop thread.lastProcessedBatch
guarantees that all events for given time are processed, as commented:// last batch whose completion,checkpointing and metadata cleanup has been completed
. That means, if we wait for time for exactly same amount as advanced the time in test (multiply of checkpoint interval as well as batch duration) we can expect nothing will happen in DStreamCheckpointData afterwards unless we advance the clock.This patch applies the observation above.
Why are the changes needed?
The test is reported as flaky as SPARK-28214, and the test code seems unsafe.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Modified UT. I've added some debug messages and confirmed no method in DStreamCheckpointData is being called between "after waiting lastProcessedBatch" and "advancing clock" even I added huge amount of sleep between twos, which avoids race-condition.
I was also able to make existing test artificially failing (not 100% consistently but high likely) via adding sleep between
currentCheckpointFiles.clear()
andcurrentCheckpointFiles ++= checkpointFiles
inDStreamCheckpointData.update
, and confirmed modified test doesn't fail the test multiple times.