Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-26268][CORE] Do not resubmit tasks when executors are lost #24462

Closed
wants to merge 1 commit into from

Conversation

bsidhom
Copy link

@bsidhom bsidhom commented Apr 25, 2019

The DAGScheduler assumes that shuffle data is lost if an executor is lost and the Spark external shuffle service is not enabled. However, external shuffle managers other than the default external shuffle service can store shuffle data outside of the Spark cluster itself. In this case, completed map taks are not rerun even if the corresponding executors are lost.

The new spark.shuffle.external.enabled property allows this new external shuffle behavior to be toggled (independently off the built-in Spark external shuffle service).

@bsidhom
Copy link
Author

bsidhom commented Apr 25, 2019

Note that this is a work in progress/suggestion for how this might be done. I don't know if we want to use the property in this exact way, but the "externalness" of the shuffle service should be independently togglable.

@liupc
Copy link

liupc commented Apr 26, 2019

Do you mean you customized an external shuffle manager to store data outside the cluster by your self? If so, I think maybe you should do this change in your custom branch.
Or you should also include the change of shuffle manager in this PR.

@gcz2022
Copy link

gcz2022 commented Apr 26, 2019

@liupc Since shuffle manager is pluggable in Spark, this 'resubmit switch' in scheduler should also be configurable.
@bsidhom This looks good!

@liupc
Copy link

liupc commented Apr 26, 2019

@gczsjdy Yes, it's pluggable, but I am not sure whether it's good to add this config, for it's almost unused unless adding custom shuffle manager and compile a new spark package.
Maybe we can llisten to other committer's suggestion.

@@ -1791,7 +1791,8 @@ private[spark] class DAGScheduler(
// if the cluster manager explicitly tells us that the entire worker was lost, then
// we know to unregister shuffle output. (Note that "worker" specifically refers to the process
// from a Standalone cluster, where the shuffle service lives in the Worker.)
val fileLost = workerLost || !env.blockManager.externalShuffleServiceEnabled
val fileLost = workerLost || (!env.blockManager.externalShuffleServiceEnabled &&
!sc.conf.get(config.EXTERNAL_SHUFFLE_ENABLED))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With worker lost and EXTERNAL_SHUFFLE_ENABLED = true, fileLost should be false? Due to the shuffle files metadata doesn't reside in Worker.
I think this should be !sc.conf.get(config.EXTERNAL_SHUFFLE_ENABLED) && (workerLost || (!env.blockManager.externalShuffleServiceEnabled))

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch--I had missed this since I never tested it in standalone mode (only YARN).

The DAGScheduler assumes that shuffle data is lost if an executor is
lost and the Spark external shuffle service is not enabled. However,
external shuffle managers other than the default external shuffle
service can store shuffle data outside of the Spark cluster itself. In
this case, completed map taks are not rerun even if the corresponding
executors are lost.

The new `spark.shuffle.external.enabled` property allows this new
external shuffle behavior to be toggled (independently off the built-in
Spark external shuffle service).
@bsidhom
Copy link
Author

bsidhom commented Apr 26, 2019

@liupc The problem with the current situation is that you have to recompile and ship all of Spark core if you want to use a custom external shuffle manager. This configuration change allows you to use a custom shuffle manager as a "plugin" that you install alongside Spark. This substantially speeds compilation time and means that you don't need to make error-prone changes to the scheduler.

@liupc
Copy link

liupc commented Apr 26, 2019

IIUC, there is another problem: even if the shuffle data is stored outside the cluster, nobody would serve for the fetch of these blocks when executor is lost, then if not rerun these completed map tasks, how can the subsequent shuffle fetch the data?

@bsidhom
Copy link
Author

bsidhom commented Apr 26, 2019

In this case, the shuffle data server is external to Spark. This is the entire reason we do not want to rerun completed map tasks.

@liupc
Copy link

liupc commented Apr 26, 2019

If so, this is not the common case, It's too special and depends on so many custom codes. I still don't think it's good to add this config in master branch, you should do this in your own branch.

cc @srowen @squito

@squito
Copy link
Contributor

squito commented Apr 26, 2019

I understand the motivation here, but I think this should be handled as part of the new shuffle storage plugin mechanism under https://issues.apache.org/jira/browse/SPARK-25299. We've actually been discussing the right way to expose this kind of knob to a plugin, if its necessary. I don't think we've gotten around to writing up docs about this yet, sorry

So while I don't think this we should make the change in this PR, I'd be really interested in hearing more about the system you are using for storing shuffle data.

fyi @mccheah @yifeih

Copy link
Contributor

@attilapiros attilapiros left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand this change especially the part:
"The new spark.shuffle.external.enabled property allows this new external shuffle behavior to be toggled (independently off the built-in Spark external shuffle service)."

Independently?
But if the old spark.shuffle.service.enabled is false and the new spark.shuffle.external.enabled is true how do you got the shuffle blocks? As in the BlockManager you are not using the ExternalShuffleClient for accessing the blocks:

private[spark] val shuffleClient = if (externalShuffleServiceEnabled) {
val transConf = SparkTransportConf.fromSparkConf(conf, "shuffle", numUsableCores)
new ExternalShuffleClient(transConf, securityManager,
securityManager.isAuthenticationEnabled(), conf.get(config.SHUFFLE_REGISTRATION_TIMEOUT))
} else {
blockTransferService
}

Moreover this change very much overloads the word "external" as the spark built-in shuffle service is already external. What about independent or any other a synonym? (On the other hand I am just interested in the idea behind; currently not convinced by the change.)

@gcz2022
Copy link

gcz2022 commented Apr 28, 2019

@liupc Under remote shuffle, if certain executors are lost, we can still fetch the shuffle data from remote storage(i.e. HDFS), we don't need executors to 'serve' the shuffle data. Also I don't understand why you mentioned this is too special... 'shuffle data server external to Spark' may be the new common...

@liupc
Copy link

liupc commented Apr 28, 2019

@gczsjdy I understand this use case, it's a common need, but the current pluggable implementation is too flexible, and the whole impl of the shuffle manager are customed, but this PR is so highly relies on the kind of shuffle manager which must be external, that's why I think we should not do this in master branch, or we should put them together.
what I mean by 'special' is that without the impl of external shuffle manager in master branch, It's odd to add these config alone and it's not commonly used with default shuffle impl.

So maybe the suggestion of @squito is the good way to do it.

@gcz2022
Copy link

gcz2022 commented Apr 29, 2019

@liupc Thanks for the explanation.

@bsidhom
Copy link
Author

bsidhom commented Apr 29, 2019

To give context: this is to support a HDFS shuffle implementation, which I have not yet had a chance to upload. This shuffle implementation could live outside of Spark itself, but it does need to have this configuration param added.

I've been following along with SPARK-25299, but it looks like these scheduler changes have not yet been introduced. This is one component of SPARK-25299 that we need to figure out, and I think it makes sense to have it broken out as its own blocking issue. (In general, that jira should be decomposed into issues that can be addressed/discussed with smaller scope.)

@attilapiros I'm aware of the issues you point out here. If the in-Spark "external" shuffle service is used, you need to manually set this new property to false. Setting it to true would be incorrect since the shuffle manager you're using in this case is not really "external". Unfortunately, "external" in this case is very overloaded and I'd love to hear suggestions for clearer names.

This approach requires users to understand the implications of the new configurations, but only those who wish to use HDFS shuffle (or some other external shuffle implementation).

Adding a flag is the simplest way to allow the flexibility required to introduce an "external-to-Spark" shuffle manager without introducing core interface changes. While this could be configured incorrectly, it's unlikely and many of our configurations require understanding before changing from the default value.

A better long-term approach could be something like: add a method to the ShuffleManager interface that allows the implementation to indicate whether it can serve blocks without an executor (and also remove the hard dependency on BlockManager since it isn't needed in this case).

@jealous
Copy link

jealous commented Apr 29, 2019

We have also developed our own shuffle manager. Here is the code:
https://github.com/MemVerge/splash/
It doesn't rely on the executor. When the shuffle data is persisted to a storage cluster, you can still retrieve it when the executor dies.
Spark has defined an interface for shuffle manager and it's already a configuration option in the application. So I agree that we should also make it a configuration option on whether we need to resubmit tasks when the executors are lost.

@Ngone51
Copy link
Member

Ngone51 commented Apr 30, 2019

I think @attilapiros has a good point here. And now I'm wondering that how does your(@bsidhom) pluggable shuffle manager achives a shuffle process (map & reduce) with ExternalShuffleService disabled ?

And I have similar question for @jealous . Does your shuffle manager still works with ExternalShuffleService disabled ?

@jealous
Copy link

jealous commented Apr 30, 2019

Hi @Ngone51 , our shuffle manager is an in-place replacement of the vanilla shuffle manager. It allows the user to write the shuffle file to external storage. The files could be located with the app id, shuffle id, mapper id and reducer id so that we don't need an extra registry for the shuffle files. And yes, we don't rely on an ExternalShuffleService.

@gcz2022
Copy link

gcz2022 commented Apr 30, 2019

@bsidhom I agree that it would be ideal if there's a field in ShuffleManager indicating 'whether it can serve blocks without an executor'.
I have also implemented a Hadoop-compatible file system shuffle manager. : P What do you mean by your implementation 'could live outside of Spark itself'?

@bsidhom
Copy link
Author

bsidhom commented Apr 30, 2019

@Ngone51 As @jealous mentions, shuffle implementations that maintain their own state outside of the Spark system does not need executors or the built-in Spark shuffle service to serve shuffle data.

@gczsjdy By "outside of Spark" I mean that shuffle manager implementations do not need to live inside of (a fork of) the Spark repo but can instead be built and shipped as "plugins"*. Building separately makes it easier and faster to compile, test, and iterate. That's how we currently build the HDFS/HCFS shuffle manager. You can then distribute the jar with a Spark deployment by dropping it under the $SPARK_HOME/jars directory; files in that directory (or a configured HDFS location) are then staged and made available to all executors.

Unfortunately because the shuffle manager class is instantiated at the time of SparkEnv creation, you can't simply depend on a shuffle manager library from client code and then use it while executing in a distributed mode.

* Note that because ShuffleManager and related classes are spark-private, your shuffle implementation does need to live under a subpackage of org.apache.spark, but it can still be compiled separately.

@squito
Copy link
Contributor

squito commented May 1, 2019

A better long-term approach could be something like: add a method to the ShuffleManager interface that allows the implementation to indicate whether it can serve blocks without an executor (and also remove the hard dependency on BlockManager since it isn't needed in this case).

agreed. I think that is a better way to expose this. But I'd rather not put in a config here in the meantime, and I'd like to wait a bit on the other shuffle plugin stuff to say what that api should be exactly.

The old ShufflePlugin api was there really just to manage the Sort vs. Hash shuffle, when that was introduced long ago. I think it needs some updates for the new proliferation of distributed shuffle managers (I'm trying to say "distributed" instead of "external" for this new class of shuffle managers).

@Ngone51
Copy link
Member

Ngone51 commented May 5, 2019

Hi @jealous Can you give any link or source file name for that part ? I'd like to learn it more for implementation details.

@jealous
Copy link

jealous commented May 6, 2019

Hi @Ngone51 , check our project page here:
https://github.com/MemVerge/splash/
And the design document here:
https://github.com/MemVerge/splash/blob/master/doc/Design.md
The idea is to extract all file/network IO from the shuffle manager. We supply a shared file system implementation for the storage interface. But you could always implement your own plugin.
And we have the same issue, we don't want to re-submit the tasks when the executors are lost. Because the shuffle files are persisted somewhere else.

@squito
Copy link
Contributor

squito commented May 31, 2019

FWIW, its at least clear to me know that this will be needed even with SPARK-25299, as that work is orthogonal to this mostly (as discussed here: https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit?disco=AAAADAYd_10).

I think it makes sense to expose this as a method on ShuffleManager

@gcz2022
Copy link

gcz2022 commented Jul 8, 2019

@squito I agree with you, but still want to make sure I understand it right: The function https://docs.google.com/document/d/1d6egnL6WHOwWZe8MWv3m8n4PToNacdx7n_0iMSWwhCQ/edit?disco=AAAADN6g3wY resembles this PR's work. However, it just works when users leverage the new ShuffleIO pluggable API. But since the lower-level ShuffleIO & upper-level ShuffleManager API will both exist in the future Spark. We still need this PR's work for a custom ShuffleManager implementation to avoid resubmitting map tasks(for example when shuffle files are persisted in DFS).

So actually we have plans to push this PR forward?

@bsidhom Could we create a field in ShuffleManager like most people agreed on?

@yifeih
Copy link
Contributor

yifeih commented Jul 9, 2019

Actually, I think how it would work with the shuffle storage API is that you now return Option.empty instead of the BlockManagerId in the map status returned from ShuffleWriter.stop() to denote that the shuffle file doesn't live on an executor. Then, the removeExecutorAndUnregisterOutputs() function won't be able to find any shuffle files that exist on the executor. This branch contains the most up-to-date version of the API and how it integrates with the rest of Spark if you want to take a look: https://github.com/palantir/spark/tree/yh/attempt-id-bmAddr

@gcz2022
Copy link

gcz2022 commented Jul 9, 2019

@yifeih It's very interesting, but I didn't 100% get it. The new ShuffleIO API will not influence the existed ShuffleManager API(which means the developers who have implemented a custom ShuffleManager don't need to modify their code) right? If this is true then this PR is needed for ShuffleManager implementers.

@yifeih
Copy link
Contributor

yifeih commented Jul 9, 2019

The new ShuffleIO interface sort of does influence the existing ShuffleManager API. The MapStatus object has been modified in the ShuffleIO API. The MapStatus object is returned by the ShuffleWriter, which is part of the ShuffleManager API

@gcz2022
Copy link

gcz2022 commented Jul 12, 2019

@yifeih Thank you, I understand now. But can your way (making MapStatus able to contain an empty location in order to not resubmit map stage tasks) deals with this condition: the MapStatus returned contains a valid location, at the same time, we don't want the Driver to unregister this shuffle output when executors lost(Maybe due to the map output is also backed up in DFS)?

In other words, what Driver decides to do when invalidating an executor(what this PR works on) and how the Executors tell Driver the MapStatus(with or without a location) are two different things.

@squito
Copy link
Contributor

squito commented Jul 16, 2019

I took another look at @yifeih 's changes, and I think she's right, that will be sufficient. Now you're custom shuffle manager should just return a MapStatus with executorId == null:

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1827

https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/MapOutputTracker.scala#L126

It seems like even now this would almost work, except that having a execId == null would mess up the epoch checks: https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala#L1827

Most importantly, we should just document the semantics of returning a null executorId in the MapStatus as part of the ShuffleManager contract.

@yifeih
Copy link
Contributor

yifeih commented Jul 17, 2019

Yup, actually, as @squito pointed out, it's actually probably better to allow MapStatus.location to be a null value instead of changing it to be an Option[BlockManagerId] to preserve the API for those who have implemented the ShuffleManager interface. I'll make those change on my branch in the next few days.

@gcz2022
Copy link

gcz2022 commented Jul 23, 2019

@squito I met with a condition that cannot be satisfied without this PR:

  • On map side, all shuffle files are written to remote Hadoop filesystem, there isn't any shuffle files managed by BlockManagers. So I simply should make ShuffleWriters return MapStatus.location == null? No, because it cannot fulfil the need during shuffle write.
  • On reduce side, I want to read the shuffle index files from the cache on the executors who wrote them, so I need the BlockManagerId in the MapStatus to tell each reducer which executor to find.

This is what I mentioned:

what Driver decides to do when invalidating an executor(what this PR works on) and how the Executors tell Driver the MapStatus(with or without a location) are two different things.

@squito
Copy link
Contributor

squito commented Jul 29, 2019

why do you want to store the data files on hdfs, but the index files on the executors? This seems to have the worst of both worlds -- the (bad) resiliency of local storage, and the (bad) performance of remote reads. Or is the index file backed up somewhere as well?

I definitely understand the general problem with knowing what to do about shuffle data when an executor is lost. So I understand why you want to do something like what this PR does. But it probably makes more sense to address this as part of the other shuffle api changes, if possible, not as another config.

You might not be able to do everything you want -- in particular, the new api does not support multiple locations for shuffle data. We decided that was out-of-scope, for now (but maybe a future enhancement). Is that what you're looking for -- one copy on the executors local disk, and another copy on hdfs?

@gcz2022
Copy link

gcz2022 commented Aug 8, 2019

@squito Index and data files are both stored on DFS, the difference is that: data files are directly read from DFS, however, for index files, a reducer fetches them from the executors('s cache) who wrote them, if there aren't required index files in cache, they will be loaded from DFS. This approach simulates the external shuffle service's cache, but instead of in another Java process, it's in Executor.

This approach needs a reasonable place(and it's the coordinated map executor) to cache index files. Returning a None location for mapper task will make

  • The no resubmit tasks' need satisfied
  • But the cache feature not satisfied : (

@squito
Copy link
Contributor

squito commented Aug 8, 2019

ok, I see what you're trying to do -- and yeah I don't think you can do it with the api we are proposing. That is a bummer, but we also need to try to draw a reasonable balance here, so I don't know if this case is really that compelling for extending the api. Are you sure your cache feature really saves you that much? You've still got to make a remote read for the index file

@gcz2022
Copy link

gcz2022 commented Aug 9, 2019

@squito Yeah it saves us much, from a TPC-DS 1T benchmark, 30% queries get 1.1x+ performance boost, 13% get 1.2x + performance boost. There's still remote read, but only once(if index files are not swapped out because of insufficient cache space), and this feature can take advantage of internal network bandwith inside computing cluster, releasing the compute-storage network, which may be the bottleneck of the workload.

By 'reasonable balance', did you mean not considering complex conditions? I think probably it's beneficial to make it clear through discussion. Making this work in a long term is also fine by me. I tried to make a point that the current solution to not resubmitting map tasks by modifying MapStatus is not enough, due to it only cares about what Executors tell the Driver about the map outputs'(tasks') location. However, we should also grant Driver the right(for example, by add a config like this PR did) to not resubmit the map tasks even if it knows (not empty MapStatus location) which one to resubmit.

@squito
Copy link
Contributor

squito commented Aug 12, 2019

The "reasonable balance" I was talking about was between extending the spark api to cover more use cases, while still keeping it supportable and maintainable, by making incremental improvements where we can. For example, I completely think that having multiple block locations is reasonable; but I'd rather not take it on right now.

thanks for providing your benchmarks. Those are pretty compelling numbers ... still I'm hesitant to add a config for just this one use case right now.

I'm open to hearing from others about more use cases that would benefit from this

@gcz2022
Copy link

gcz2022 commented Aug 13, 2019

Thank you @squito

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 30, 2019
@github-actions github-actions bot closed this Dec 31, 2019
@bsidhom bsidhom deleted the external-shuffle branch August 3, 2021 17:07
@bsidhom bsidhom restored the external-shuffle branch August 3, 2021 17:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
10 participants