-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-45873][CORE][YARN][K8S] Make ExecutorFailureTracker more tolerant when app remains sufficient resources #43746
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…ant when app remains sufficient resources
|
cc @dongjoon-hyun @cloud-fan @HyukjinKwon @LuciferYang @tgravescs, thank you |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you, @yaooqinn .
|
cc @mridulm for YARN part because this might introduce a behavior change which the job never fail again with the executor failures. BTW, for the record, I believe this is mainly useful for K8s environment with Horizontal Pod Scheduler case, and I guess the failure situation is rare in YARN environment. That's the reason why I didn't ask a migration guide for this configuration and the behavior change. |
|
Thank you for the check @dongjoon-hyun |
|
Assuming that the Spark App runs in a resource prioritization environment, due to its low priority on a single machine, it may be preempted at any time (killed by the resource monitor on the single machine), but the queue resources are sufficient to quickly apply for new resources. In this scenario, will the Spark App never fail as before? |
@LuciferYang. It depends, but I would never use the
|
|
|
core/src/main/scala/org/apache/spark/scheduler/cluster/SchedulerBackendUtils.scala
Outdated
Show resolved
Hide resolved
...netes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
Outdated
Show resolved
Hide resolved
|
Thank you @LuciferYang for the suggestions. BTW,
It is ratio>=1, which makes |
Got |
tgravescs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so I'm a bit concerned about this being confusing to users and not really be configurable, except special cases.
Stage cancelled because SparkContext was shut down and Max number of executor failures (20) reached
20 is a lot of failures. What is the real issue causing this? ie why are these executors failing? How long was the app running? Is it some cloud environment they are going away, is it really an issue with the application or its configuration?
Meanwhile, it still had 90% of maxNumExecutors and was about to finish. In its final moments (less than 10 seconds),
how does Spark know it would have finished and those wouldn't have also failed? The point of the feature and the existing settings are that if you have had that many failures something is likely wrong and you need to fix it. it may have been that by letting this go longer it would have just wasted more time and resources if those other ones were also going to fail.
It kind of feels like you shouldn't even have this feature enabled if you have that many failures but I would like to understand why these failures were happening to see if this makes sense or something else for certain circumstances.
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
| // insufficient for keep the app running, it will fail the application directly; otherwise, | ||
| // it survives this check round. | ||
| def insufficientResourcesRetained(): Boolean = { | ||
| totalRegisteredExecutors.get() < maxExecutors * minSurviveRatio |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
with dynamic allocation maxExecutors is Int.MaxValue, so how does that really work with it? I would basically say it doesn't.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When maxExecutors is Int.MaxValue, the value of max failures is 2 * Int.MaxValue or explicit value is given via spark.executor.maxFailures. I guess we don't change the behavior here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Exactly so this feature doesn't apply to how many users normally run and could be confusing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess Int.MaxValue for maxExecutors is rare. It's problematic itself to use in prod.
|
It is not clear to me what the issue is, why we are taking this proposed approach, and what the underlying cause is. I can speculate as to why this is happening, but can you update the jira (or pr description) with more details on what the issue is ? And why this proposal should work ? |
|
Hi @tgravescs, thank you for the detailed review.
I do not fully agree with that. It shall be more handy to set compared with spark.executor.maxFailures. In most cases, users may not be familiar with the "spark.executor.maxFailures" setting and may simply use the default values. If an application has a low executor limit, the default value of 3 for maxFailures can be very unreliable.
The failures can be divided into two kinds. The first one is for both existing and new executors, i.e. exit on 143(killed by resource managers), oom, etc., which is OK to fail the app w/ or w/o this PR. The second one is only for new executors, i.e., some of the external dependencies file changes by expected or unexpected maintenance behaviors or rejections from resource managers, which this PR mainly focuses on to reduce the risk of an app being killed all of a sudden. In the second case, 20 is a relatively small number, as the allocating requests and responses go very quickly.
The app I described above in the PR description ran for 1.5 hours. It failed because it hit the max executor failures while the root cause was one of the shared UDF jar changed by a developer, who turned out not to be the app owner. Yarn failed to bring up new executors, so the 20 failures were collected within 10 secs. We have a monitor for all our spark apps on Kubernetes and Yarn. The probability of apps failing with executor max failures is low for the total amount apps. But it turns out to be a daily issue. See
As I have answered the first question, spark knows(might be delayed) to finish or fail. Both the failed executors and live ones are still being counted. Considering the delay and reliability, TBH, I haven't got a silver bullet for both of them. So, |
Hi @mridulm. Thanks for the questions, but can you make your question much more specific? As for me, your questions are already described in the PR description based on what I understand and my findings. It would be great if I could get your point more precisely. |
Oh I just realized you added this config - ie ported the yarn feature to k8s and I think you mean spark.executor.maxNumFailures. I had missed this go by.
If users changes a jar mid application, this is really bad IMHO. How do you know your application doesn't get different results on different executors. Say that had actually worked but the logic changed in the udf. This to me is a process side of things and Spark did the right thing in failing and it should have failed. Would you have known as quickly if it hadn't failed that someone pushed a bad jar? I assume maybe next application run sometime later but it still would have caused some app to fail.
I'm not sure I follow this statement, you see this kind of issue daily and its because users push bad jars that much or why do you see it daily? I'm trying to understand how much this is really a problem that Spark should be solving. Do you see failures where having the feature on actually helps you? I kind of assume so since you ported it to k8s but if not just turn it off. I can see a reliability aspect here that if you have a sufficient number of executors already allocated and running, then just keep running instead of killing the entire application. How you achieve that though vs this proposal I'm not sure I agree with. If user set a minimum number of executors, why isn't this just that number? As one of the other comments stated this approach is useless for normal users with dynamic allocation so why doesn't it apply to that case. |
The PR description is describing what the change is trying to do, along with details I am not sure are related to why executor failures were observed/why application eventually failed - would be great to understand what is going on which we are trying to mitigate with this. Having said that, looking at your response to @tgravescs query here gives a lot more context (thanks !) It points to an issue with how users are running the spark application - spark is not tolerant to out of band changes to the artifacts (jars, etc) while it is running - this needs to be fixed at the application end, not in spark, and we should not try to work around this issue - agree with @tgravescs on this. |
|
Hi @tgravescs
I(Or our customer you mean) didn't add it, 20 failure is calculated by 10 max executors * 2
Let's take Spark Thrift Server and Spark Connect as examples, Other cases excluding the jar issue above:
These are not errors that belong to the app or resource manager, but rather ways to optimize resource usage. They are not corner cases. |
Preemption on yarn shouldn't be going against the number of failed executors. If it is then something has changed and we should fix that.
Can you be more specific here, why is this going to cause failures that aren't similar to YARN dynamic allocation getting more executors? Is it scaling down and the containers are marked as failed vs yarn marking them as preempted and not counting against failures? Is there anyway to know on k8s this happened so we could not count them? it seems like if this is really an issue the feature should be off by default on k8s
This is a consequence of using a shared environment. Ideally Spark would isolate it from other and other users wouldn't be affected but that unfortunately isn't the case. I'm not sure your environment but ideally users test things before running in some production environment and breaking things. If this feature doesn't really work or has issues on k8s then there should be a way to disable it, which seems like more what you want here right? You are essentially saying you don't want it to fail the application, thus turn the feature off and you should just do monitoring on your own to catch issues. Note, the documentation on this feature are missing, I made some comments: 40872e9 can you please fix those? |
Yes, you are right
Yeah, the test step is necessary before the prod. But as you said 'ideally'. System robust takes precedence over that.
What does 'this feature' point to? Why do you always mention k8s when I give evidence on yarn? At least, the detailed examples are about yarn, the 39/40 item shown in the above snapshots are also yarn. Well, for k8s, ExecutorFailureTracker works well for app initialization to fail fast for continuous pod failures. ExecutorFailureTracker does not work well on apps with sufficient pods, and then som failures occur on new pod allocation |
What do you mean by this, are you saying the Spark on YARN handling of preempted containers is not working properly? Meaning if the container is preempted it should not show up as an executor failure. Are you seeing those preempted containers show up as failed?
Sorry I misunderstood your environment here, I thought you were running on k8s but it looks like you running on YARN. by feature I mean the spark.yarn.max.executor.failures/spark.executor.maxNumFailures config and its functionality. So unless yarn preemption handling is broken (please answer question above), you gave one very specific use case where user added a bad JAR, in that use case it seems like you just don't want spark.executor.maxNumFailures enabled at all. You said you don't want the app to fail so admins can come fix things up and not have it affect other users. If that is the case then Spark should allow users to turn spark.executor.maxNumFailures off or I assume you could do the same thing by setting it to int.maxvalue. As implemented this seems very arbitrary and I would think hard for a normal user to set and use this feature. You have it as a ratio, which normally I'm in favor of but really only works if you have max executors set so it is really just a hardcoded number. That number seems arbitrary as its just depends on if you get lucky and happen to have that before some users pushes a bad jar. I don't understand why this isn't the same as minimum number of executors as that seems more in line - saying you need some minimum number for this application to run and by the way its ok to keep running with this is launching new executors is failing. If there is some other issues with Spark Connect and add jars maybe that is a different conversation about isolation (https://issues.apache.org/jira/browse/SPARK-44146). Or maybe it needs to better prevent users from adding jars with the same name. |
PREEMPTED is ok, and its cases are not counted by executor failure tracker. I was wrong about this before, sorry to bother.
There are pros and cons to this suggestion, I guess. Disabling the executor failure tracker certainly keeps the app alive, but at the same time invalidates fast fail.
Most of configurations with numeric value and the defaults in spark are arbitrary?
I can try to use the minimum to removing the uncertainty of ratio |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |
|
We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. |


What changes were proposed in this pull request?
This PR introduces a new configuration,
spark.executor.failureTracker.keepaliveOnMinLiveExecutors.enabled, which defaults to false.When false, it fully respects the executor failures counted by ExecutorFailureTracker, and if the count exceeds the maximum number of executor failures, the app will exit forcefully and immediately. Note that this is the behavior we currently have.
When true, if the count exceeds the maximum, we do an extra check to decide whether we shall exit the app directly or not. It checks whether it still has sufficient resources or not based on the
initial executors*spark.scheduler.minRegisteredResourcesRatio. The app continues with sufficient resources and fails if the live executors keep dying, resulting in insufficient resources. If there is no more dying, the app gets the job finished.Why are the changes needed?
The ExecutorFailureTracker is overly sensitive to executor failures and tends to overreact by terminating the application immediately upon reaching the threshold, regardless of whether the current resources obtained are sufficient or not. Since executor allocation depends on various factors such as resource managers, host environments, and external dependencies, it may become unavailable for some time. During this period, ExecutorFailureTracker may accumulate enough failures to mistakenly kill itself.
Here is also an example from our prod,
The application had been running for hours before suddenly crashed with
Stage cancelled because SparkContext was shut downandMax number of executor failures (20) reached. Meanwhile, it still had 90% of maxNumExecutors and was about to finish. In its final moments (less than 10 seconds), it requested only one additional executor. Well, speaking of the outcomes, it was definitely an incorrect step.The threshold of ExecutorFailureTracker is inflexible to use. It's pre-configured by
spark.executor.maxNumFailuresor calculated by2 * max number of executor. It does not consider the actual numbers of live executors.Does this PR introduce any user-facing change?
yes, new configuation
How was this patch tested?
new unit tests
Was this patch authored or co-authored using generative AI tooling?