Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-28699][CORE][2.4] Fix a corner case for aborting indeterminate stage #25509

Closed

Conversation

xuanyuanking
Copy link
Member

@xuanyuanking xuanyuanking commented Aug 20, 2019

What changes were proposed in this pull request?

Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.

Why are the changes needed?

In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

Does this PR introduce any user-facing change?

It makes the corner case of indeterminate stage abort as expected.

How was this patch tested?

New UT in DAGSchedulerSuite.
Run below integrated test with local-cluster[5, 2, 5120], and set spark.sql.execution.sortBeforeRepartition=false, it will abort the indeterminate stage as expected:

import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()

Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.

In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

It makes the corner case of indeterminate stage abort as expected.

New UT in DAGSchedulerSuite.
Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected:
```
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25498 from xuanyuanking/SPARK-28699-followup.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 0d3a783)
Signed-off-by: Yuanjian Li <xyliyuanjian@gmail.com>
@SparkQA
Copy link

SparkQA commented Aug 20, 2019

Test build #109395 has finished for PR 25509 at commit 547f157.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-28699][CORE][BACKPORT-2.4] Fix a corner case for aborting indeterminate stage [SPARK-28699][CORE][2.4] Fix a corner case for aborting indeterminate stage Aug 20, 2019
@SparkQA
Copy link

SparkQA commented Aug 20, 2019

Test build #109417 has finished for PR 25509 at commit 547f157.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Aug 21, 2019

Test build #109457 has finished for PR 25509 at commit 547f157.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

retest this please.

@SparkQA
Copy link

SparkQA commented Aug 21, 2019

Test build #109475 has finished for PR 25509 at commit 547f157.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cloud-fan
Copy link
Contributor

retest this please

@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Aug 21, 2019

Hi, @xuanyuanking and @cloud-fan . This PR doesn't include the original test case. Is it okay? It would be great if we can have it, too.

If the removal of UT is intentional, please update the following PR description accordingly.

New UT in DAGSchedulerSuite.

cc @kiszk

@SparkQA
Copy link

SparkQA commented Aug 21, 2019

Test build #109494 has finished for PR 25509 at commit 547f157.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@xuanyuanking
Copy link
Member Author

Yeah, sorry for the missing, add UT in 9a6cca8.

@dongjoon-hyun
Copy link
Member

Thank you for update, @xuanyuanking .

@SparkQA
Copy link

SparkQA commented Aug 22, 2019

Test build #109544 has finished for PR 25509 at commit 9a6cca8.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Merged to branch-2.4.
All R tests are already passed last time and the last commit only adds a Scala UT.

Thank you, @xuanyuanking and @cloud-fan !

dongjoon-hyun pushed a commit that referenced this pull request Aug 22, 2019
… stage

### What changes were proposed in this pull request?
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.

### Why are the changes needed?
In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

### Does this PR introduce any user-facing change?
It makes the corner case of indeterminate stage abort as expected.

### How was this patch tested?
New UT in DAGSchedulerSuite.
Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected:
```scala
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes #25509 from xuanyuanking/SPARK-28699-backport-2.4.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
@xuanyuanking xuanyuanking deleted the SPARK-28699-backport-2.4 branch August 23, 2019 03:44
rluta pushed a commit to rluta/spark that referenced this pull request Sep 17, 2019
… stage

### What changes were proposed in this pull request?
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.

### Why are the changes needed?
In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

### Does this PR introduce any user-facing change?
It makes the corner case of indeterminate stage abort as expected.

### How was this patch tested?
New UT in DAGSchedulerSuite.
Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected:
```scala
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25509 from xuanyuanking/SPARK-28699-backport-2.4.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
kai-chi pushed a commit to kai-chi/spark that referenced this pull request Sep 26, 2019
… stage

### What changes were proposed in this pull request?
Change the logic of collecting the indeterminate stage, we should look at stages from mapStage, not failedStage during handle FetchFailed.

### Why are the changes needed?
In the fetch failed error handle logic, the original logic of collecting indeterminate stage from the fetch failed stage. And in the scenario of the fetch failed happened in the first task of this stage, this logic will cause the indeterminate stage to resubmit partially. Eventually, we are capable of getting correctness bug.

### Does this PR introduce any user-facing change?
It makes the corner case of indeterminate stage abort as expected.

### How was this patch tested?
New UT in DAGSchedulerSuite.
Run below integrated test with `local-cluster[5, 2, 5120]`, and set `spark.sql.execution.sortBeforeRepartition`=false, it will abort the indeterminate stage as expected:
```scala
import scala.sys.process._
import org.apache.spark.TaskContext

val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)}
// kill an executor in the stage that performs repartition(239)
val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x =>
  if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) {
    throw new Exception("pkill -f -n java".!!)
  }
  x
}
val r2 = df.distinct.count()
```

Closes apache#25509 from xuanyuanking/SPARK-28699-backport-2.4.

Authored-by: Yuanjian Li <xyliyuanjian@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants