Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-13669][SPARK-20898][Core] Improve the blacklist mechanism to handle external shuffle service unavailable situation #17113

Closed
wants to merge 8 commits into from

Conversation

jerryshao
Copy link
Contributor

What changes were proposed in this pull request?

Currently we are running into an issue with Yarn work preserving enabled + external shuffle service.
In the work preserving enabled scenario, the failure of NM will not lead to the exit of executors, so executors can still accept and run the tasks. The problem here is when NM is failed, external shuffle service is actually inaccessible, so reduce tasks will always complain about the “Fetch failure”, and the failure of reduce stage will make the parent stage (map stage) rerun. The tricky thing here is Spark scheduler is not aware of the unavailability of external shuffle service, and will reschedule the map tasks on the executor where NM is failed, and again reduce stage will be failed with “Fetch failure”, and after 4 retries, the job is failed. This could also apply to other cluster manager with external shuffle service.

So here the main problem is that we should avoid assigning tasks to those bad executors (where shuffle service is unavailable). Current Spark's blacklist mechanism could blacklist executors/nodes by failure tasks, but it doesn't handle this specific fetch failure scenario. So here propose to improve the current application blacklist mechanism to handle fetch failure issue (especially with external shuffle service unavailable issue), to blacklist the executors/nodes where shuffle fetch is unavailable.

How was this patch tested?

Unit test and small cluster verification.

@jerryshao jerryshao changed the title [SPARK-13669][Core]Improve the blacklist mechanism to handle external shuffle service unavailable situation [SPARK-13669][Core] Improve the blacklist mechanism to handle external shuffle service unavailable situation Mar 1, 2017
@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73668 has finished for PR 17113 at commit a90b23d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

can you clarify the situations you are seeing issues? What happened to the NM in this case. If you have work preserving restart I would think this would actually cause you more problems. The NM could temporarily be down during rolling upgrade and if you blacklist it, it won't be used for a long time.

We have seen issues with TEZ and MR where blacklisting on fetch failures caused more issues then it solved. Most of the fetch failures were transient issues and it caused way more things to be rerun then was actually needed.

This is why we explicitly left it out of the blacklisting feature. See the design doc here in the jira https://issues.apache.org/jira/browse/SPARK-8425.

I didn't have a chance to do a full review but you seem to be blacklisting an executor, what executor is this blacklisting? It looks like you are immediately blacklisting on any fetch failure rather then allowing configurable number?

@markhamstra
Copy link
Contributor

"Current Spark's blacklist mechanism": please be more precise. The most recent released version of Spark, 2.1.0, does not include a lot of recent changes to blacklisting (mostly 93cdb8a). Are the problems you are describing fully explored with the master branch of Spark?

@jerryshao
Copy link
Contributor Author

@tgravescs , the main scenario is external shuffle service unavailable scenario, this could be happened in working preserving + NM failure situation. Also like Mesos + external standalone shuffle service could introduce this issue. In scenarios like rolling upgrade I agreed that NM unavailability is short and this issue could be self-recoverable. One scenario I'm simulating is NM failure. In my test, when NM is failed, RM will detect this failure after 10 minutes by default, before that executors on that NM can still serve the tasks, and Spark doesn't blacklist these containers, so re-issued tasks could still be failed.

FetchFailed will immediately abort the running stage and re-issue parent stage, configurations like failed task number per stage may not be so useful, so my thinking is to backlist these executors/nodes immediately after fetch failure.

This proposal may have many problems for different scenario, that's why I opened here for comments. If you don't think it is necessary to fix then I could close it.

@markhamstra this patch is targeted to master branch and all the investigations and changes is based on master branch.

@tgravescs
Copy link
Contributor

@jerryshao are you actually seeing issues with this on real customer/production jobs? How often? NM failure for us is very rare. I'm not familiar with how mesos would fail differently, the shuffle service there is started as a separate service correct?

We would definitely need to make sure that Spark's retry before really returning fetch failure are good enough to handle the cases with rolling upgrade or intermittent issues with the shuffle, but with our defaults of 3 retries at 5 seconds each not sure that would cover it.

If people are seeing a need for it and actual issues with it then I'm ok with the idea as long as we make it configurable to turn it off. I'm not sure on the blacklist after one fetch failure either. It would seem better to only blacklist after a couple of things have gotten the failure that way you would have a better confidence it was really an issue with the node you are fetching from.

cc @squito

@jerryshao
Copy link
Contributor Author

@tgravescs , thanks a lot for your comments.

Actually the issue here is a simulated one from my test cluster, I didn't get an issue report from real customers.

Yes, in most of the cases Shuffle fetch failure is transient and could be recovered through retry, but for some small jobs this gap between failure and recovery is enough for the job to be failed with retry.

The difficult thing here is that for fetch failure, Spark immediately abort the stage without any retry of the tasks, so unlike normal failures of tasks, we may have no chance to monitor several fetch failures until we decide to blacklist it. That's why in this PR I immediately blacklist the executors/nodes in the application level after fetch failure. The solution is too strict that it will also blacklist some transient failure executors, that's what I mainly concerned about.

So here looking for any comments, greatly appreciated.

@tgravescs
Copy link
Contributor

So I looked at this a little more. I'm more ok with this since Spark doesn't actually invalidate the shuffle output. You are basically just trying to stop new tasks from running on the executors already on that host. Its either going to just blacklist those or kill them if you have that feature on.

Part of the reason we left it off to begin with was again we didn't want to blacklist on the transient ones so we wanted to wait to see if it was truly an issue in real life. if you do put this in I would like it configurable off until we have more data as to if its really a problem users see.

Spark does immediately abort the stage but it doesn't kill the running tasks, so if other tasks fetch failure before it can rerun the map task the scheduler knows about them, but that is very timing dependent.

@markhamstra
Copy link
Contributor

Spark does immediately abort the stage but it doesn't kill the running tasks

Whether running tasks are interrupted on stage abort or not depends on the state of a config boolean -- and ideally we'd like to get to the point where we can confidently set that config so that running tasks are interrupted when the associated job or stage dies.

@mridulm
Copy link
Contributor

mridulm commented Mar 6, 2017

@markhamstra given the impact interruption has on lower layer libraries which dont handle it well (iirc hdfs ?), we probably will not set it to true even if spark code is robust,

@markhamstra
Copy link
Contributor

@mridulm Correct, turning task interruption on by default is not so much a matter of Spark itself handling it well as it is a possible (though not completely known) issue with lower layer libraries not handling interruption well. The original concern with HDFS is likely fixed now, but there are similar concerns with Cassandra and other libraries. Logically, we'd like to interrupt Tasks when associated Jobs or Stages are killed in the DAGScheduler. In practice, nobody knows right now how to do that safely in all circumstances, so the default is to not attempt to interrupt the tasks.

@tgravescs
Copy link
Contributor

Whether running tasks are interrupted on stage abort or not depends on the state of a config boolean -- and ideally we'd like to get to the point where we can confidently set that config so
that running tasks are interrupted when the associated job or stage dies.

@markhamstra which config are you referring to?

@squito
Copy link
Contributor

squito commented Mar 6, 2017

I think killing tasks is only applicable in different scenarios, eg. if the job fails. Currently, spark does not cancel running tasks when the stage fails due to a fetch failure. The taskset is marked as a zombie, but running tasks are left alone.

@markhamstra
Copy link
Contributor

@tgravescs At the config level, it is spark.job.interruptOnCancel or SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, which then gets passed around as a boolean -- e.g. shouldInterruptThread.

@markhamstra
Copy link
Contributor

markhamstra commented Mar 6, 2017

@squito Correct, we really only try to kill running tasks currently on job failure (and if the config setting allows it); but there is the long-standing "TODO: Cancel running tasks in the stage" in case FetchFailed of DAGScheduler#handleTaskCompletion, which has languished as a TODO because resolving it would require us both to make Spark itself handle task interruption in the fetch failure case and to deal with the same issues preventing us from making task interruption the default even for job cancelation.

All I'm really saying is that we shouldn't design in a hard requirement that tasks cannot be interrupted (on job cancelation, fetch failure or some other event), because we'd often really like to be able to kill running tasks -- even though we don't know quite how to do that safely right now.

@mridulm
Copy link
Contributor

mridulm commented Mar 6, 2017

@markhamstra Completely agree, I would love to see this enabled by default. For example, I really hate to see speculative tasks continuing to run when the taskset has completed (for example) - used to make it necessary to overprovision to ensure subsequent taskset has all tasks running from get go.

@tgravescs
Copy link
Contributor

I was not talking about designing this around the killing task part of this, other then in reference to being able to count the # of fetch failures before triggering the blacklisting, but I think that would have to be handled across stages right now. My main concern with this pr is invalidating the shuffle outputs. I do also believe that fetch failures could be handled better but that is more being discussed in #17088.

I do also agree with you that the task killing part should be fixed.

@jerryshao
Copy link
Contributor Author

@tgravescs , I just added a configuration to turn off this feature by default.

Do you have any further comments on it?

@SparkQA
Copy link

SparkQA commented Mar 9, 2017

Test build #74234 has finished for PR 17113 at commit 7ba0623.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

sorry haven't had a chance to get to this to do full review, hopefully tomorrow.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've been thinking about this for a while, and while I thought this seemed like a really drastic change, I now agree with @tgravescs that this could be good to have as long as its off by default.

In fact this change makes a lot of sense given that spark takes the drastic step of marking an executor as failed after one fetch failure (https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1332). When there are transient issues, that can lead to very confusing behavior -- the executor is marked as lost, the block manager updates its state, but then the executor reconnects to the driver and reports its blocks again, which the driver happily accepts. It would be more consistent for spark to totally blacklist (and even actively kill) the executor in those scenarios (as under this patch).

This has caused me tons of confusion when looking at cases ... but I also have never encountered any problems which were caused by that. So just because I'm scared of it being such a huge change, I think we should leave it off by default. Perhaps we should think of a better way to choose the right behavior.

Another thing I thought about as I was reviewing this -- spark currently assumes that a fetchfailure is always the fault of the source, never the destination. I almost wonder if we should count it against both, with some sensible heuristic for looking at a collection of failures and deciding who is at fault.


if (fetchFailed.bmAddress != null) {
blacklistTracker.foreach(_.updateBlacklistForFetchFailure(fetchFailed.bmAddress.host,
fetchFailed.bmAddress.executorId, numTasks - tasksSuccessful))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should use numTasks - tasksSuccessful here. The intent of SparkListenerExecutorBlacklisted.taskFailures field was meant to indicate how many failures occurred on the given executor. In this case, its always just 1.

Maybe we should really add another field "reason", or something like that, which includes whether it was for task failures or fetch failures. Though I really don't think that is necessary ... its probably too much detail for the UI, and there is enough info in the logs for someone that needs to dive deeper.

logInfo(s"Blacklisting executor $exec due to fetch failure")

executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(host, expiryTimeForNewBlacklists))
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, numFailedTasks))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given my other comment at the callsite, I'd get rid of numFailedTasks as an parameter and just hardcode this to 1.

blacklistedExecsOnNode += exec

if (SparkEnv.get.blockManager.externalShuffleServiceEnabled &&
!nodeIdToBlacklistExpiryTime.contains(host)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double-indent second line of if.

also -- this looks extremely drastic, so I think its worth including a comment here about why we do this, something like: If we blacklist on fetch failures, we are implicitly saying that we believe the failure is non-transient, and can't be recovered from (even if this is the first fetch failure). If the external shuffle-service is on, then every other executor on this node would be suffering from the same issue, so we should blacklist (and potentially kill) all of them immediately.

<td>
(Experimental) If set to "true", Spark will blacklist the executors immediately when the fetch failure
happened. If external shuffle service is enabled, then the whole node will be blacklisted. This configuration
is to handle some scenarios where shuffle fetch is available and cannot be recovered through retry.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the last sentence is pretty confusing ... I am not sure what else to put, maybe just leave it out?

This change would be helpful when fetch failures come from non-transient issues, but would cause extra recomputations when the failures are transient. Unfortunately that doesn't really give much guidance to the end user ... I feel like even ourselves we don't really have clear guidelines on when it should be set, its sort of an emergency escape-hatch we're leaving in place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may also want to add the same text as configs above about the timeout and dynamic allocation

Copy link
Contributor Author

@jerryshao jerryshao Mar 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you may also want to add the same text as configs above about the timeout and dynamic allocation

@tgravescs What do you specifically point to? Sorry I cannot get it.

@@ -1039,6 +1039,40 @@ class TaskSetManagerSuite extends SparkFunSuite with LocalSparkContext with Logg
.updateBlacklistForFailedTask(anyString(), anyString(), anyInt())
}

test("update application blacklist for shuffle-fetch") {
// Setup a taskset, and fail some tasks for a fetch failure, preemption, denied commit,
// and killed task.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment is wrong


assert(blacklistTracker.isExecutorBlacklisted(taskDescs(0).executorId))
assert(blacklistTracker.isNodeBlacklisted("host1"))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you should also have a test with kill enabled, to see that all the executors on the host get killed.

@tgravescs
Copy link
Contributor

Another thing I thought about as I was reviewing this -- spark currently assumes that a fetchfailure is always the fault of the source, never the destination. I almost wonder if we should count it against both, with some sensible heuristic for looking at a collection of failures and deciding who is at fault.

I think this makes sense to try to tell the difference and improve our logic there.

Perhaps we should think of a better way to choose the right behavior.

Does this mean you don't want to go with this approach? I'm actually not sure this is a huge change. Its a decent change in behavior but for the cases where nodes really do go down this could help a lot. I think Spark definitely doesn't handle this case well now.

Sorry again I haven't done a full review been trying to think the entire fetch failure scenarios through and have just been busy with other things.

One downside to adding the config BLACKLIST_FETCH_FAILURE_ENABLED limits us on possibly changing this functionality to say only blacklist on multiple fetch failures. We could track fetch failures across stage attempts and say only after it fails X number of tasks which could be across stage attempts do we blacklist it. I guess its marked as experimental so its a bit more ok for us to change it. Perhaps X isn't just failed tasks but failed tasks from a % of different hosts. You could potentially use that also to determine if the source is bad or the destination. If many across hosts have failed then you expect the source is bad, if its just one destination then perhaps that is bad. Still thinking this through.

@squito
Copy link
Contributor

squito commented Mar 15, 2017

sorry i was vague -- I'm saying I'm ok with this as long as its (a) off by default and (b) experimental so we can change it around (which it is).

@SparkQA
Copy link

SparkQA commented Mar 23, 2017

Test build #75095 has finished for PR 17113 at commit f6cc47e.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Mar 24, 2017

Test build #75152 has started for PR 17113 at commit f6cc47e.

@tgravescs
Copy link
Contributor

sorry for the delay on this we have been having some discussion about scheduler changes and the fetch failure handling in the scheduler. Since this is related holding off on this.

@jerryshao
Copy link
Contributor Author

Thanks @tgravescs , no problem.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented May 31, 2017

Test build #77582 has finished for PR 17113 at commit 9a14105.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

please add jira SPARK-20898 to the description since fixing that here

TaskSchedulerImpl.maybeCreateBlacklistTracker(sc),
isLocal = isLocal)
}
private[scheduler] lazy val blacklistTrackerOpt = maybeCreateBlacklistTracker(sc)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A comment here as to why we do this would be good.

@jerryshao jerryshao changed the title [SPARK-13669][Core] Improve the blacklist mechanism to handle external shuffle service unavailable situation [SPARK-13669][SPARK-20898][Core] Improve the blacklist mechanism to handle external shuffle service unavailable situation Jun 1, 2017
…itialized

Change-Id: I2a58f4bc5d84591cd23c93aea5ee50331a6b80b5
@SparkQA
Copy link

SparkQA commented Jun 1, 2017

Test build #77622 has finished for PR 17113 at commit 3cf9cfd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

changes LGTM.

@squito did you have any further comments?

@squito
Copy link
Contributor

squito commented Jun 5, 2017

I'm fine with this, just a couple asks:

  1. is there a test for SPARK-20898? I know at some point in the development of killing blacklisted executors we tested on yarn, I'm really disappointed (in myself) for merging it though it didn't actually work. It would be nice to have a regression test. Maybe that would be a huge pain to do, just wanted to see if you thought about it.

  2. Can you update the jira description for SPARK-13669 now? I think a lot has changed since that was initially opened. Also IIUC, we don't have any particular advice on when users should enable this at the moment, its just an escape hatch you'd like to leave in place? If there is any advice, putting it in the jira would also make sense.

@jerryshao
Copy link
Contributor Author

Hi @squito ,

For the 1st point, I tested manually in real cluster, but I'm not sure how to make it happen in UT, if you think it is necessary to add a UT about this issue, then I will think about how to address it.

For the 2nd point, I will update the JIRA description, thanks for the remainder.

@tgravescs
Copy link
Contributor

Jenkins, test this please

@SparkQA
Copy link

SparkQA commented Jun 20, 2017

Test build #78301 has finished for PR 17113 at commit 3cf9cfd.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jiangxb1987
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Jun 21, 2017

Test build #78334 has finished for PR 17113 at commit 3cf9cfd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

these failures definitely look unrelated. I'll kick once more to try to get clean run.

@tgravescs
Copy link
Contributor

Jenkins, test this please

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78431 has started for PR 17113 at commit 3cf9cfd.

@jiangxb1987
Copy link
Contributor

Emmm... the failure is irrelevant, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78446 has finished for PR 17113 at commit 3cf9cfd.

  • This patch fails to generate documentation.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jerryshao
Copy link
Contributor Author

Looks like Jenkins is not so stable.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@SparkQA
Copy link

SparkQA commented Jun 22, 2017

Test build #78457 has finished for PR 17113 at commit 3cf9cfd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

+1 finally got a clean build, will merge to master

@asfgit asfgit closed this in 9e50a1d Jun 26, 2017
@@ -145,6 +146,74 @@ private[scheduler] class BlacklistTracker (
nextExpiryTime = math.min(execMinExpiry, nodeMinExpiry)
}

private def killBlacklistedExecutor(exec: String): Unit = {
if (conf.get(config.BLACKLIST_KILL_ENABLED)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not make this a private val?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be a just self-preferred choice, this code was not written by me, I just made some refactoring.

assert(blacklist.nodeIdToBlacklistExpiryTime("hostA") ===
2000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
assert(blacklist.nextExpiryTime === 1000 + blacklist.BLACKLIST_TIMEOUT_MILLIS)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also test with SHUFFLE_SERVICE_ENABLED=true and BLACKLIST_KILL_ENABLED=false ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If BLACKLIST_KILL_ENABLED=false, the scenario should be the same as here. It looks like a duplication not so necessary?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense.

robert3005 pushed a commit to palantir/spark that referenced this pull request Jun 29, 2017
…andle external shuffle service unavailable situation

## What changes were proposed in this pull request?

Currently we are running into an issue with Yarn work preserving enabled + external shuffle service.
In the work preserving enabled scenario, the failure of NM will not lead to the exit of executors, so executors can still accept and run the tasks. The problem here is when NM is failed, external shuffle service is actually inaccessible, so reduce tasks will always complain about the “Fetch failure”, and the failure of reduce stage will make the parent stage (map stage) rerun. The tricky thing here is Spark scheduler is not aware of the unavailability of external shuffle service, and will reschedule the map tasks on the executor where NM is failed, and again reduce stage will be failed with “Fetch failure”, and after 4 retries, the job is failed. This could also apply to other cluster manager with external shuffle service.

So here the main problem is that we should avoid assigning tasks to those bad executors (where shuffle service is unavailable). Current Spark's blacklist mechanism could blacklist executors/nodes by failure tasks, but it doesn't handle this specific fetch failure scenario. So here propose to improve the current application blacklist mechanism to handle fetch failure issue (especially with external shuffle service unavailable issue), to blacklist the executors/nodes where shuffle fetch is unavailable.

## How was this patch tested?

Unit test and small cluster verification.

Author: jerryshao <sshao@hortonworks.com>

Closes apache#17113 from jerryshao/SPARK-13669.
cloud-fan pushed a commit that referenced this pull request Feb 9, 2022
### What changes were proposed in this pull request?
#17113 change the behaviors but remain comment not correct, this pr update this

### Why are the changes needed?
Make comment correct

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Not need

Closes #35453 from AngersZhuuuu/SPARK-38150.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants