Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost or fetch failure #17088

Closed
wants to merge 10 commits into from

Conversation

sitalkedia
Copy link

What changes were proposed in this pull request?

Currently, when we detect fetch failure, we only remove the shuffle files produced by the executor, while the host itself might be down and all the shuffle files are not accessible. In case we are running multiple executors on a host, any host going down currently results in multiple fetch failures and multiple retries of the stage, which is very inefficient. If we remove all the shuffle files on that host, on first fetch failure, we can rerun all the tasks on that host in a single stage retry.

How was this patch tested?

Unit testing and also ran a job on the cluster and made sure multiple retries are gone.

@SparkQA
Copy link

SparkQA commented Feb 27, 2017

Test build #73533 has started for PR 17088 at commit 74ca88b.

@mridulm
Copy link
Contributor

mridulm commented Feb 27, 2017

fetch failure does not imply lost executor - it could be a transient issue.
Similarly, executor loss does not imply host loss.

This is quite drastic for a fetch failure : spark already has mechanisms in place to detect executor/host failure - which take care of these failure modes.

@sitalkedia
Copy link
Author

fetch failure does not imply lost executor - it could be a transient issue.
Similarly, executor loss does not imply host loss.

You are right, it could be transient, but we do have retries on the shuffle client to detect transient failure. In case driver receives a fetch failure, we always assume that the output is lost. The current model assumes the output is lost for a particular executor, which makes sense only if the shuffle service is disabled and the executors are serving the shuffle files themselves. However, in case the external shuffle service is enabled, a fetch failure means all output on that host should be marked unavailable.

@ConeyLiu
Copy link
Contributor

I agree with @mridulm, file fetch failure does not imply the executor down or all the executor of the host down.

@sitalkedia
Copy link
Author

This is quite drastic for a fetch failure : spark already has mechanisms in place to detect executor/host failure - which take care of these failure modes.

Unfortunately, mechanisms already in place are not sufficient. Let's imagine a situation where the shuffle service become unresponsive or OOMs, in that case, we will not see any host failure, still the driver will receive fetch failure. Current model assumes all shuffle output for an executor is lost, however, since the shuffle service serves all the shuffle files on that host, we should mark all the shuffle files on that host as unavailable.

@markhamstra
Copy link
Contributor

Even if I completely agreed that removing all of the shuffle files on a host was the correct design choice, I'd still be hesitant to merge this right now. That is simply because we have recently merged other changes related to fetch failure handling, and I'd really like to see some more time pass with just those changes in the code before we introduce more fetch failure changes. I don't want to get in the situation of merging this PR then getting reports of fetch failure bugs in master a week later, and not knowing whether to place the blame on this PR or the other recent fetch failure changes.

That needn't preclude more discussion of this PR or possibly merging it after we have a little more experience and confidence with the code already in master.

/cc @kayousterhout @squito

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm also hesitant on this change, but I see the argument for it.

So it seems like things should be correct either way -- before this change, it may just take spark longer to remove all the shuffle files it needs to. With this change, spark may remove more shuffle files than it needs to, but then it would regenerate them. Even with the old behavior, a transient issue could lead to spark removing (and regenerating) more data than it needs to. We should try to recover as quickly as possible in the more common case, though I'm not certain yet which way to lean.

Its also worth keeping in mind that we don't retry a stage indefinitely, we only give it 4 chances (SPARK-5945). One thing which is helping us currently is that we leave tasks running even after a fetch failure, which is likely to detect all the bad execs on the host. (which often means you need 2 retries, because you don't detect all the failures until after one retry has already started.) But if we were to ever immediately kill tasks after the first fetch failure, then we'd make it more likely that you'd go over 4 retries, just because there were 4 execs on one host. And even with those running tasks, its still relying on some amount of luck to ensure that you hit all those bad executors.

I don't know what the answer is yet, I'll keep thinking about and we should keep discussing.

@@ -1331,7 +1332,7 @@ class DAGScheduler(

// TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, filesLost = true, Some(task.epoch))
handleExecutorLost(bmAddress.executorId, slaveLost = true, Some(task.epoch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you could use bmAddress.host, then you wouldn't need to store another execToHost map (though it would require a little more refactoring)

Copy link
Contributor

@kayousterhout kayousterhout Feb 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's correct to assume that a FetchFailure means that all of the executors on the slave were lost. You could have a failure because one executor died, but the other executors on the host are OK, right? (UPDATED: I realized this is the same comment @mridulm made above)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kayousterhout - This change applies only when external shuffle service is enabled, in that case, a fetch failure would mean that the external shuffle service is unavailable, so we should remove all the output on that host, right? For case, when shuffle service is not enabled, this change should be a no-op.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@squito - Good point, will do.

@kayousterhout
Copy link
Contributor

Can you update the JIRA and PR description to say "un-register the output locations" (or similar) instead of "remove the files"? The current description is misleading since nothing is actually getting removed from the machine -- instead you're just updating the scheduler's state.

Also, does the issue here only arise when the shuffle service is enabled? It seems like when the shuffle service is not being used, DAGScheduler.handleExecutorLost should be called for each lost executors (i.e., for all of the executors on a host, when a whole host is lost), which will correctly un-register all of the output on that executor.

@sitalkedia
Copy link
Author

Also, does the issue here only arise when the shuffle service is enabled?

That is correct. For case, when shuffle service is not enabled, this change should be a no-op.

@sitalkedia sitalkedia changed the title [SPARK-19753][CORE] All shuffle files on a host should be removed in … [SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost of fetch failure Feb 28, 2017
@kayousterhout
Copy link
Contributor

Why is this a no-op when the shuffle service isn't enabled? It looks like you mark the slave as lost in all cases?

@sitalkedia
Copy link
Author

Why is this a no-op when the shuffle service isn't enabled? It looks like you mark the slave as lost in all cases?

@kayousterhout - You are right. It's kind of confusing that we are triggering a SlaveLost even in case of executor lost.
I made changes so that this change is going to effect only when we see a fetch failure and external shuffle service is enabled. Let me know what you think about it.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73636 has finished for PR 17088 at commit 6898c2b.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73638 has finished for PR 17088 at commit 32a2315.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73640 has finished for PR 17088 at commit c7c3129.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

Jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Mar 1, 2017

Test build #73664 has finished for PR 17088 at commit c7c3129.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@kayousterhout
Copy link
Contributor

Can you please file a JIRA for the flaky jenkins failure?

@mridulm
Copy link
Contributor

mridulm commented Mar 2, 2017

+CC @tgravescs You might be interested in this given your comments on on the blacklisting PR.

@tgravescs
Copy link
Contributor

fyi, this is somewhat related to #17113
I mention it because I think both depend on how we handle failures and retries. This and that together could cause bad things to occur. As I mentioned on that pr we specifically had issues with these things on TEZ. So I agree with others that we need to be careful here. Removing all could cause a lot more work then needed. personally with the defaults that spark has for shuffle retries I think a fetch failure can easily be a transient issue (rolling upgrade, temporarily overloaded NM, etc).

I need to refresh my memory on all the interactions and I'll get back.

@sitalkedia
Copy link
Author

@tgravescs - I agree this might cause additional work in situations where shuffle fetch is transient like you mentioned above. But in those cases, IMO, users should tune the shuffle retry configurations to make sure we do not see any fetch failure. Current model assumes file lost in case of fetch failure and in case external shuffle service is enabled, we should clean all the files on a host as opposed to cleaning only for a specific executor.

@kayousterhout - Filed a JIRA for the flaky build - SPARK-19803

@tgravescs
Copy link
Contributor

In this particular case are your map tasks fast or slow. If they are really fast rerunning everything now makes sense, if each of those took 1 hour+ to run, failing all when they don't need to be just wastes time and resource. Rolling upgrades can take longer then 15 seconds to restart NMs. You can have intermittent issues that last > 1 minute. If it took 1 hour to generate that output I want it to retry really hard before failing all of those. Users aren't going to tune each individual job unless they really have to and it might very per stage. Really it should use cost base analysis on how long those tasks ran but that gets more complicated.

It is also possible that some reduce tasks have already fetched the data from those nodes and succeeded and you wouldn't have to rerun all tasks on that host. Due to the way Spark cancels stages on fetch failure whether the reduce tasks from the stage finish before the map can be rerun is very timing dependent. You could end up doing a lot more work then necessary so the question is whether that is ok compared to the cost of not doing this and allowing the application to take a bit longer. Note that if you rerun all the maps and they didn't need to be you might cause the app to take longer too.

How often do you see issues with node managers going down for other then transient issues?
We very rarely see this. Most of the issues we see are transient and have found with other application type (TEZ, MR) that rerunning all is worse because its normally a transient issue. Those application schedulers are a bit different though so not 100% comparable. If the whole node goes down then yarn will inform you the node is lost. Yes that does take like 10 minutes though.

Are you seeing jobs fail due to this or just take longer?
I realize the job could fail is the node is really down and we get enough failures across stages. It does seem like we should do better at this but I'm not sure invalidating all the outputs on a single fetch failure makes sense either.

@tgravescs
Copy link
Contributor

Note alternatively we could change it to not fail on fetch failure. This would seem better to me since there is no reason to throw away all the work you have done but I'm sure that is a much bigger change.

@sitalkedia
Copy link
Author

Rolling upgrades can take longer then 15 seconds to restart NMs. You can have intermittent issues that last > 1 minute. If it took 1 hour to generate that output I want it to retry really hard before failing all of those. Users aren't going to tune each individual job unless they really have to and it might very per stage.

How often are you doing rolling upgrades? I really think in those cases we should be tuning the shuffle fetch configurations to allow for rolling upgrades. Even without this change, current model un-registers files in case of fetch failure, so you might be losing a lot of work already and in worst case you can still loose all the files present in a host.

It is also possible that some reduce tasks have already fetched the data from those nodes and succeeded and you wouldn't have to rerun all tasks on that host.

I am not sure if I get your point here, but this will not rerun reduce tasks that are already fetched data from those nodes and succeeded.

We are seeing this issue very frequently mainly because of node reboot. We are trying to scale Spark to cluster with thousands of machines and probability of seeing failures and reboots during long running jobs is very high.

Are you seeing jobs fail due to this or just take longer?

We are seeing both. As @squito mentioned, we only give 4 chances for a stage to be retried so even one node reboot can trigger 4 retries and cause the job to fail. In case the job gets lucky, the job takes significantly longer than expected in case of fetch failure because of multiple retries of a stage and the way retries are handled by the scheduler is not elegant right now - It does not allow multiple attempts of a stage to run concurrently, which is a separate issue I will address in another PR.

@squito
Copy link
Contributor

squito commented Mar 15, 2017

first, I think we should change the hard-coded limit of 4 stage retries. Its clear to me there is an important reason why users would want a higher limit, so lets make it a config. That is a very simple change. (That doesn't mean we shouldn't be changing something else as well.)

As with #17113, though this is a big change, it seems to actually be more consistent for spark. Of course some failures are transient, but (as has already been pointed out) (a) even the existing behavior will make you do unnecessary work for transient failures and (b) this just slightly increases the amount of work that has to be repeated for those transient failures.

I'm also wondering if there are other options, eg:

@SparkQA
Copy link

SparkQA commented Mar 17, 2017

Test build #74694 has finished for PR 17088 at commit d4979e3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia sitalkedia changed the title [SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost of fetch failure [SPARK-19753][CORE] Un-register all shuffle output on a host in case of slave lost or fetch failure Mar 17, 2017
@SparkQA
Copy link

SparkQA commented Mar 17, 2017

Test build #74710 has finished for PR 17088 at commit 8787db1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@squito squito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm reviewing code for style & clarity, without making a judgement yet on whether this is the right change in behavior. I'd like to keep hearing more opinions on that, though I'm leaning more and more to thinking this is a good idea.

// TODO: This will be really slow if we keep accumulating shuffle map stages
for ((shuffleId, stage) <- shuffleIdToMapStage) {
logInfo("Shuffle files lost for executor: %s (epoch %d)".format(execId, currentEpoch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

log should be outside the for loop, like it was before

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, my bad, thanks for noticing.

@@ -1390,7 +1401,34 @@ class DAGScheduler(
}
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
"(epoch " + currentEpoch + ")")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as long as you're updating this, can you change it to use interpolation instead, just for consistency?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

val firstRDD = new MyRDD(sc, 3, Nil)
val firstShuffleDep = new ShuffleDependency(firstRDD, new HashPartitioner(2))
val firstShuffleId = firstShuffleDep.shuffleId
val shuffleMapRdd = new MyRDD(sc, 3, List(firstShuffleDep))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the number of partitions here needs to match the with the number used in the partitioner from its dependencies. Same below.

I know it doesn't matter in this test, but it becomes hard to understand what is going on in these tests if they have inconsistencies like this

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right, it was confusing before. Changed accordingly.

private[scheduler] def removeExecutorAndUnregisterOutputOnHost(
execId: String,
host: String,
maybeEpoch: Option[Long] = None) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add return type : Unit =

we didn't adopt that convention (nor did scala) till after a lot of this class was written, hence the inconsistency in this file, but new changes should follow this style.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

clearCacheLocs()
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for trying this refactoring, but I don't like the amount of repitition between these two helper methods now. Sorry this code was very confusing even before, and I haven't given constructive suggestions so far ... what do you think of this version?squito@3e33d5e

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made changes as suggested, thanks!

@squito
Copy link
Contributor

squito commented Mar 17, 2017

One thing which I noticed while making sense of what was going in the code (even before) -- IIRC, spark standalone is a bit of a special case. I think it used to be the case that to run multiple executors per node, you had to run multiple worker instances on the node. Eg., see mentions of SPARK_WORKER_INSTANCES here: http://spark.apache.org/docs/1.4.0/spark-standalone.html
which is gone in the latest docs: http://spark.apache.org/docs/latest/spark-standalone.html

but though its not documented, I think you can in fact still use multiple worker instances per node:
https://github.com/apache/spark/blob/master/sbin/start-slave.sh#L88

which means, that when we get the WorkerLost msg in spark standalone, we aren't really sure if all shuffle files on that host have been lost or not:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1694

But I think the consistent thing to do would be to assume that there is just one worker per node, as that is the latest recommended configuration, and go ahead and remove all shuffle files on the node if the external shuffle service is enabled. Which would mean that we'd want to change the handling of the ExecutorLost as well to pass along the host.

@kayousterhout
Copy link
Contributor

One meta question here: why aren't we getting a SlaveLost message in this case? I'm asking since there's already code in #14931 to un-register shuffle service files when we get a SlaveLost message, and that seems like a more bulletproof way of handling the case where an entire slave goes down.

@squito
Copy link
Contributor

squito commented Mar 17, 2017

@kayousterhout I don't think #14931 is really a complete answer to this.

(a) we only get that from standalone mode, no other cluster managers (yarn does not notify applications of failures of any node in the cluster)
(b) even in standalone mode, its only notified if the app has an active executor on the node. However, with dynamic allocation, you may be serving shuffle files from a node even though there are no active executors there anymore.
(c) it could be that the executors on the node appear to be responsive, but shuffle files can't served anyhow -- maybe a disk has gone bad. The executor blacklisting may eventually discover this, but it might not if the tasks don't write to disk (so tasks keep executing successfully on the source of the fetch failures); and even if it the tasks did write to disk, it would take a while for the blacklisting to kick in, and you would still hit the scenario originally described.

(Aside: (c) made me think more about whether we should be removing shuffle data when we blacklist, both for executors and nodes ... I think the behavior will be correct either way, but similar tradeoffs about which situation to optimize for.)

I think that #14931 is just a small optimization when possible, not a mechanism that can be relied upon.

@sitalkedia
Copy link
Author

+1 on that. In our case, we are not seeing the SlaveLost message in most of the cases and even if we do, it is delayed and we received fetch failure before that. So, as @squito pointed out we can not rely on SlaveLost message all the time.

@kayousterhout
Copy link
Contributor

Ok that makes sense. I wanted to make sure that there wasn't some bug in SlaveLost (which might lead to a simpler fix than this) but @squito's description makes it clear that there are a bunch of situations that SlaveLost can't handle correctly.

@SparkQA
Copy link

SparkQA commented Mar 18, 2017

Test build #74768 has finished for PR 17088 at commit 9f64e29.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Mar 18, 2017

Test build #74771 has finished for PR 17088 at commit 9f64e29.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// assume all shuffle data on the node is bad.
Some(bmAddress.host)
} else {
// Deregister shuffle data just for one executor (we don't have any
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: "Unregister" is used elsewhere (function names, etc.), not "deregister".

@@ -1389,8 +1423,7 @@ class DAGScheduler(
clearCacheLocs()
}
} else {
logDebug("Additional executor lost message for " + execId +
"(epoch " + currentEpoch + ")")
logDebug("Additional executor lost message for %s (epoch %d)".format(execId, currentEpoch))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer string interpolation over format.

case SlaveLost(_, true) => true
val workerLost = reason match {
case SlaveLost(_, true) =>
true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: prefer it without the line break for something this simple

val filesLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
removeExecutorAndUnregisterOutputs(
execId = execId,
fileLost = filesLost,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fileLost vs. filesLost naming difference is a little confusing -- is the distinction even conveying a difference worth paying attention to?

@tgravescs
Copy link
Contributor

tgravescs commented Mar 22, 2017

(a) even the existing behavior will make you do unnecessary work for transient failures and (b) this just slightly increases the amount of work that has to be repeated for those transient failures.

Yes a transient fetch failure always causes some more work because you are going to re-run some map tasks, but the question comes down to how much work it is doing. For your b) you can't really say it only "slightly increases" the work because its going to be highly dependent up on the timing of when things finish, how long maps take, and how long the reducers take.

Please correct me if I've missed something in the spark scheduler related to this.

  • The ResultTasks (lets call them reducers) say in Stage 1.0 are running. One of them gets a fetchFailure. This restarts the ShuffleMapTasks for that executor in Stage 0.1. If during the time those maps are running other reducers fetch fail, those get taken into account and will be rerun in same stage 0.1. If a reducer happened to finish successfully while that map Stage 0.1 is running, it ends up failing with a commit denied (at least with the hadoop writer, others might not). If however that reduce was longer lived and the map stage 0.1 finished and then that reducer finished successfully that reduce would be a success.

This timing dependent completion seems very unpredictable (and perhaps a bug in the commit logic but I would have to look more). If your maps take a very long time and your reducers also take a long time then you don't really want to re-run the maps. Of course once it starts stage 1.1 for the reducers that didn't originally fail, if the nodemanager really is down, the new reducer task could fail to fetch data that the old ones had already received, so it would need to be re-run anyway even if the original reducer was still running and would have succeeded.

Sorry if that doesn't make sense, much to complicated. There is also the case where one reducer fails and you are using a static # of executors. So you have 1 slots to rerun the maps. If you now say fail 3 maps instead of 1, its going to potentially take 3 times as long to rerun those maps. My point here is that I think its very dependent upon the conditions, could be faster, could be slower. The question is which happens more often. Generally we see more intermittent issues with nodemanager rather then them going down fully. If they are going down is it due to them crashing or is the entire node going down? If entire node going down are we handling that wrong or not as well as we could? If its the nodemanager crashing I would say you need to look and see why as that shouldn't happen very often.

I'm not sure which one could be better. Due to the way spark schedules I'm ok with invalidating all as well, but think it would be better for us to fix this right, which to me means not throwing away the work of the first stage running reducers. If we need something more short term I think it would be better to wait for a at least a couple fetch failures or a % of reducers failed before invalidating all of it.

@sitalkedia
Copy link
Author

The ResultTasks (lets call them reducers) say in Stage 1.0 are running. One of them gets a fetchFailure. This restarts the ShuffleMapTasks for that executor in Stage 0.1. If during the time those maps are running other reducers fetch fail, those get taken into account and will be rerun in same stage 0.1.

As per my understanding of the code, that is not the case. Currently, the task scheduler does not allow running multiple concurrent of a particular stage (see - https://github.com/sitalkedia/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala#L172). So we wait till the stage 0.1 finishes and rerun the failed maps in another retry stage 0.2. This adds significant latency to the job run.

Generally we see more intermittent issues with nodemanager rather then them going down fully. If they are going down is it due to them crashing or is the entire node going down? If entire node going down are we handling that wrong or not as well as we could?

In our case, we see fetch failure because of multiple reasons like node reboot, disk going bad or any network issue. It's very difficult for the cluster manager to detect these kinds of issues and inform the driver.

If we need something more short term I think it would be better to wait for a at least a couple fetch failures or a % of reducers failed before invalidating all of it.

I know its not ideal, but how about making this behavior configurable? i.e., only unregister all outputs on a host if the configuration is enabled otherwise leave the existing behavior?

@SparkQA
Copy link

SparkQA commented Mar 25, 2017

Test build #75184 has finished for PR 17088 at commit be3b3db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

sitalkedia commented Mar 25, 2017

Filed a JIRA SPARK-20091 to allow running multiple concurrent attempts of a stage. I will make this change as a part of another ongoing PR - #17297

// make sure our test setup is correct
val initialMapStatus1 = mapOutputTracker.mapStatuses.get(0).get
assert(initialMapStatus1.count(_ != null) === 3)
assert(initialMapStatus1.map{_.location.executorId}.toSet ===
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

map{..} => map(..)

// reset the test context with the right shuffle service config
afterEach()
val conf = new SparkConf()
conf.set("spark.shuffle.service.enabled", "true")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add a another test with spark.shuffle.service.enabled = false?


// reduce stage fails with a fetch failure from one host
complete(taskSets(2), Seq(
(FetchFailed(BlockManagerId("exec-hostA2", "hostA", 12345), firstShuffleId, 0, 0, "ignored"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the FetchFailed message should reference shuffleDep.shuffleId instead of firstShuffleId?

@HyukjinKwon
Copy link
Member

gentle ping @sitalkedia

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
10 participants