Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-39955][CORE] Improve LaunchTask process to avoid Stage failures caused by fail-to-send LaunchTask messages #37384

Closed
wants to merge 3 commits into from

Conversation

kevin85421
Copy link
Member

@kevin85421 kevin85421 commented Aug 3, 2022

What changes were proposed in this pull request?

milestone2
A. Non-Task Failure: Driver does not send any LaunchTask message to Executor, and thus there is no running task on Executor. It is impossible to be Task Failure.

B. Non-Task Failure: If a RPC failure happens before receiving the StatusUpdate message from Executor, it is Non-Task Failure because the task does not start to run in Executor.

C. Task Failure: If there is any running task in the Executor, we will recognize the RPC failure as Task Failure.

D. Non-Task Failure: There is no running task on the Executor when all tasks on the Executor are in finished states (FINISHED, FAILED, KILLED, LOST). Hence, the RPC failures in this phase are Non-Task Failure.

Why are the changes needed?

There are two possible reasons, including Non-Task Failure and Task Failure, to make RPC failures.

(1) Task Failure: The network is good, but the task causes the executor's JVM crash. Hence, RPC fails.
(2) Non-Task Failure: The executor works well, but the network between Driver and Executor is broken. Hence, RPC fails.

We should handle these two different kinds of failure in different ways. First, if the failure is Task Failure, we should increment the variable numFailures. If the value of numFailures is larger than a threshold, Spark will label the job failed. Second, if the failure is Non-Task Failure, we will not increment the variable numFailures. We will just assign the task to a new executor. Hence, the job will not be recognized as failed due to Non-Task Failure.

However, currently, Spark recognizes every RPC failure as Task Failure. Hence, this PR aims to categorize RPC failures into two categories, Task Failure and Network Failure.

Does this PR introduce any user-facing change?

No

How was this patch tested?

build/sbt "core/testOnly *TaskSchedulerImplSuite -- -z SPARK-39955"
build/sbt "core/testOnly *TaskSetManagerSuite"

@github-actions github-actions bot added the CORE label Aug 3, 2022
@kevin85421
Copy link
Member Author

cc. @Ngone51 @jiangxb1987

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@mridulm
Copy link
Contributor

mridulm commented Aug 4, 2022

I am trying to understand what is the actual usecase we are trying to target here.
There is a behavior change here which has nontrivial implications - and I want to understand why we are doing this.

@kevin85421
Copy link
Member Author

Hi @mridulm,

Task Failure vs Non-Task Failure

We can categorize executor loss reasons into two categories.

(1) Task Failure: The network is good, but the task causes the executor's JVM crash. Hence, executor loses. Take the following as an example, the tasks will close executor's JVM by System.exit(0). If the executor loss is caused by Task Failure, we should increment the variable numFailures. If the value of numFailures is larger than a threshold, Spark will label the job failed.

sc.parallelize(1 to 10, 1).map{ x => System.exit(0) }.count()

(2) Non-Task Failure: If the executor loss is not caused by problematic tasks such as the above example, it is caused by Non-Task Failure. For examples,

Example1: The executor's JVM is closed by the failure of directory creation.
Example2: The executor works well, but the network between Driver and Executor is broken.

In these cases, we should not fail the Spark job. We just need to assign the tasks to other nodes.

Why do we need this PR?

Currently, driver will consider every executor loss as Task Failure. However, some executor losses may be caused by Non-Task Failures (e.g. network disconnection), and thus some extra Spark job failures may occur. With this PR, we can categorize the reason of executor loss into Task Failure and Non-Task Failure, so some extra Spark job failures can be avoided.

@mridulm
Copy link
Contributor

mridulm commented Aug 5, 2022

My query was slightly different - are you actually seeing this issue ? Can you elaborate on what was observed ?
There are corner cases which we do not deal with currently in spark, given the complexity of handling them, and given how infrequent they are. A stage/job is not immediately failed when a single task fails - it is retried, and there are various exclude lists which prevent repeated schedule on the same executor/node and quickly fail the stage/job.

@kevin85421
Copy link
Member Author

Hi @mridulm,

Take a case in Databricks as an example, we have observed that all the sockets for executor connections on driver are closed by unknown reasons. Hence, driver will trigger onDisconnected again and again until Spark job fails. In that period, driver cannot connect to any existing executor, but driver recovers (can connect to new executors) after few minutes. Hence, we need to avoid Spark jobs failing in this period.

@mridulm
Copy link
Contributor

mridulm commented Aug 9, 2022

I am not sure if this is some nuance of Standalone scheduler - will let @Ngone51 or @jiangxb1987 comment better. I have not observed such a behavior in yarn, or heard of something similar in k8s.

In general, would be good to troubleshoot and fix the actual issue (why the behavior of mass disconnect was observed, how to mitigate it) - or atleast understand/explain it: since there might be implications of it elsewhere as well.
Any more details on this would be great !

To add ... after relooking at the PR a bit more closely, my initial concerns on behavior change are mitigated ... but would like to understand the cause better.

@Ngone51
Copy link
Member

Ngone51 commented Aug 9, 2022

@mridulm The massive disconnection issue is an intermittent issue that can't be reproduced. I tend to believe it's not a Spark's issue but due to the bad nodes.

The current fix doesn't target to resolve a specific issue but is a general improvement to Spark. Think about a case where the driver tries to send LaunchTask to an executor and the executor loses at the same time. In this case, previously, the fail-to-launch task (which hasn't even been launched on the executor) would increase the num failures due to ExecutorProcessLost(_, _, causedByApp=true). After this fix, the fail-to-launch task won't increase the num failures since it's still under the launching state.

Does this make sense to you?

@mridulm
Copy link
Contributor

mridulm commented Aug 9, 2022

My main concern is whether we are working around an issue that should be investigated/fixed.
Having said that, as I mentioned above, I dont see an issue as such with the proposed change - and should be a positive enhancement.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Couple of minor comments, rest looks good to me.

Copy link
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@mridulm mridulm closed this in 126870e Aug 11, 2022
@mridulm
Copy link
Contributor

mridulm commented Aug 11, 2022

Merged to master.
Thanks for fixing this @kevin85421 !
Thanks for the review @Ngone51 :-)

@kevin85421
Copy link
Member Author

Thank @mridulm and @Ngone51 for the review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
4 participants