-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28699][CORE][2.3] Fix a corner case for aborting indeterminate stage #25508
[SPARK-28699][CORE][2.3] Fix a corner case for aborting indeterminate stage #25508
Conversation
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed. In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug. It makes the corner case of indeterminate stage abort as expected. New UT in DAGSchedulerSuite. Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected: ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() ``` Closes apache#25498 from xuanyuanking/SPARK-28699-followup. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 0d3a783) Signed-off-by: Yuanjian Li <xyliyuanjian@gmail.com>
Test build #109394 has finished for PR 25508 at commit
|
Retest this please. |
Test build #109422 has finished for PR 25508 at commit
|
Could you fix the UT failure?
|
Yeah, I'm looking into this, seems the behavior is not the same between 2.3 and 2.4. |
@@ -2521,33 +2521,19 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi | |||
(Success, makeMapStatus("hostD", 2)))) | |||
assert(mapOutputTracker.findMissingPartitions(shuffleId2) === Some(Seq.empty)) | |||
|
|||
// Simulate the scenario of executor lost | |||
runEvent(ExecutorLost("exec-hostC", ExecutorKilled)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The behavior difference between 2.3 and 2.4 is related to #21758, which move the output status clean work forward: https://github.com/apache/spark/pull/21758/files#diff-6a9ff7fb74fd490a50462d45db2d5e11L1390.
So I fix the behavior by simulating the executor lost because here we want a scenario of missing some partitions while rerunning the shuffle map stage.
Test build #109480 has finished for PR 25508 at commit
|
cc @cloud-fan |
Supply UT for this cherry-pick in the last commit. |
Thank you for update this, too. @xuanyuanking . |
Test build #109542 has finished for PR 25508 at commit
|
thanks, merging to 2.3! |
… stage ### What changes were proposed in this pull request? Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed. ### Why are the changes needed? In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug. ### Does this PR introduce any user-facing change? It makes the corner case of indeterminate stage abort as expected. ### How was this patch tested? New UT in DAGSchedulerSuite. Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected: ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() ``` Closes #25508 from xuanyuanking/spark-28699-backport-2.3. Authored-by: Yuanjian Li <xyliyuanjian@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.
Why are the changes needed?
In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.
Does this PR introduce any user-facing change?
It makes the corner case of indeterminate stage abort as expected.
How was this patch tested?
New UT in DAGSchedulerSuite.
Run below integrated test with
local-cluster[5, 2, 5120]
, and setspark.sql.execution.sortBeforeRepartition
=false, it will abort the indeterminate stage as expected: