-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45182][CORE] Ignore task completion from old stage after retrying indeterminate stages #42950
Conversation
@cloud-fan @caican00 can you take a look? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an important fix IMO, thanks for working on it @mayurdb
shuffleStage.pendingPartitions -= task.partitionId | ||
// Ignore task completion for old attempt of indeterminate stage | ||
val ignoreIndeterminate = stage.isIndeterminate && | ||
task.stageAttemptId < stage.latestInfo.attemptNumber() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not wrap the entire case in if (!ignoreIndeterminate) {
(and add a log message about it) ?
+CC @Ngone51 |
@@ -1903,13 +1903,20 @@ private[spark] class DAGScheduler( | |||
|
|||
case smt: ShuffleMapTask => | |||
val shuffleStage = stage.asInstanceOf[ShuffleMapStage] | |||
shuffleStage.pendingPartitions -= task.partitionId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for deterministic stages, we do accept laggard tasks?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if determinate, this is fine.
task.stageAttemptId < stage.latestInfo.attemptNumber() | ||
if (!ignoreIndeterminate) { | ||
shuffleStage.pendingPartitions -= task.partitionId | ||
} | ||
val status = event.result.asInstanceOf[MapStatus] | ||
val execId = status.location.executorId | ||
logDebug("ShuffleMapTask finished on " + execId) | ||
if (executorFailureEpoch.contains(execId) && | ||
smt.epoch <= executorFailureEpoch(execId)) { | ||
logInfo(s"Ignoring possibly bogus $smt completion from executor $execId") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not related to this PR, just noticed that we may mark the partition as completed but do not register the map output.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, I am unsure about this.
+CC @Ngone51, @attilapiros who reviewed the original change - thoughts ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this explains: https://github.com/apache/spark/pull/21976/files#diff-85de35b2e85646ed499c545a3be1cd3ffd525a88aae835a9c621f877eebadcb6L1385
We should probably add back that comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this is kind of funny - take a look at what the above replaced @cloud-fan : https://github.com/apache/spark/pull/16620/files#diff-85de35b2e85646ed499c545a3be1cd3ffd525a88aae835a9c621f877eebadcb6R1183 :-)
Both of these actually do not account for DETERMINATE/INDETERMINATE changes we made subsequently.
IMO, for INDETERMINATE stages, we should ignore task completion events from previous attempts - since we have already cancelled the stage attempt.
Having said that, I have not thought through the nuances here.
+CC @jiangxb1987 as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also noticed this but seems to be fine since the missing partitions are defined by mapOutputTrackerMaster.findMissingPartitions
rather than pendingPartitions
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If a task is finished but the map output is not registered, the missing partitions would be recomputed at the end of the stage, so this should be fine.
LGTM if all tests pass |
b37ee53
to
5880cf1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
thanks, merging to master/3.5! |
…ing indeterminate stages ### What changes were proposed in this pull request? [SPARK-25342](https://issues.apache.org/jira/browse/SPARK-25342) Added a support for rolling back shuffle map stage so that all tasks of the stage can be retried when the stage output is indeterminate. This is done by clearing all map outputs at the time of stage submission. This approach workouts well except for this case: Assume both Shuffle 1 and 2 are indeterminate ShuffleMapStage1 ––> Shuffle 1 ---–> ShuffleMapStage2 ----> Shuffle 2 ----> ResultStage - ShuffleMapStage1 is complete - A task from ShuffleMapStage2 fails with FetchFailed. Other tasks are still running - Both ShuffleMapStage1 and ShuffleMapStage2 are retried - ShuffleMapStage1 is retried and completes - ShuffleMapStage2 reattempt is scheduled for execution - Before all tasks of ShuffleMapStage2 reattempt could finish, one/more laggard tasks from the original attempt of ShuffleMapStage2 finish and ShuffleMapStage2 also gets marked as complete - Result Stage gets scheduled and finishes After this change, such laggard tasks from the old attempt of the indeterminate stage will be ignored ### Why are the changes needed? This can give wrong result when indeterminate stages needs to be retried under the circumstances mentioned above ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? A new test case ### Was this patch authored or co-authored using generative AI tooling? No Closes #42950 from mayurdb/rollbackFix. Authored-by: mayurb <mayurb@uber.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 7ffc0b7) Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
SPARK-25342 Added a support for rolling back shuffle map stage so that all tasks of the stage can be retried when the stage output is indeterminate. This is done by clearing all map outputs at the time of stage submission. This approach workouts well except for this case:
Assume both Shuffle 1 and 2 are indeterminate
ShuffleMapStage1 ––> Shuffle 1 ---–> ShuffleMapStage2 ----> Shuffle 2 ----> ResultStage
After this change, such laggard tasks from the old attempt of the indeterminate stage will be ignored
Why are the changes needed?
This can give wrong result when indeterminate stages needs to be retried under the circumstances mentioned above
Does this PR introduce any user-facing change?
No
How was this patch tested?
A new test case
Was this patch authored or co-authored using generative AI tooling?
No