Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-9439] [yarn] External shuffle service robust to NM restarts using leveldb #7943

Closed

Conversation

@squito
Copy link
Contributor

commented Aug 4, 2015

https://issues.apache.org/jira/browse/SPARK-9439

In general, Yarn apps should be robust to NodeManager restarts. However, if you run spark with the external shuffle service on, after a NM restart all shuffles fail, b/c the shuffle service has lost some state with info on each executor. (Note the shuffle data is perfectly fine on disk across a NM restart, the problem is we've lost the small bit of state that lets us find those files.)

The solution proposed here is that the external shuffle service can write out its state to leveldb (backed by a local file) every time an executor is added. When running with yarn, that file is in the NM's local dir. Whenever the service is started, it looks for that file, and if it exists, it reads the file and re-registers all executors there.

Nothing is changed in non-yarn modes with this patch. The service is not given a place to save the state to, so it operates the same as before. This should make it easy to update other cluster managers as well, by just supplying the right file & the equivalent of yarn's initializeApplication -- I'm not familiar enough with those modes to know how to do that.

// one, so we can keep processing new apps
logger.error("error opening leveldb file {}. Creating new file, will not be able to " +
"recover state for existing applications", registeredExecutorFile, e);
for (File f: registeredExecutorFile.listFiles()) {

This comment has been minimized.

Copy link
@vanzin

vanzin Aug 17, 2015

Contributor

minor, but if somehow the underlying file is a file and not a directory, this will throw an NPE.

}

private static AppExecId parseDbAppExecKey(String s) throws IOException {
int p = s.indexOf(';');

This comment has been minimized.

Copy link
@vanzin

vanzin Aug 17, 2015

Contributor

Can this be s.substring(APP_KEY_PREFIX.length() + 1)?

assert(execStateFile.exists(), s"$execStateFile did not exist")

// now we pretend the shuffle service goes down, and comes back up
s1.stop()

This comment has been minimized.

Copy link
@vanzin

vanzin Aug 17, 2015

Contributor

Do these tests cause YarnShuffleService.serviceInit to be called? If so, probably need a more guaranteed way of calling stop(), otherwise you'll end up leaking open ports when tests fail.

This comment has been minimized.

Copy link
@squito

squito Aug 17, 2015

Author Contributor

there is already an afterEach which calls stop() on everything.

import java.io.File
import java.util.{List => JList, Map => JMap}
import java.util.Map.Entry

This comment has been minimized.

Copy link
@vanzin

vanzin Aug 17, 2015

Contributor

nit: too many empty lines.

@vanzin

This comment has been minimized.

Copy link
Contributor

commented Aug 17, 2015

Left some minor comments, but overall this LGTM.

@squito

This comment has been minimized.

Copy link
Contributor Author

commented Aug 17, 2015

jenkins, retest this please

@squito squito changed the title [SPARK-9439] [wip] [yarn] External shuffle service robust to NM restarts using leveldb [SPARK-9439] [yarn] External shuffle service robust to NM restarts using leveldb Aug 17, 2015

@SparkQA

This comment has been minimized.

Copy link

commented Aug 18, 2015

Test build #41056 timed out for PR 7943 at commit 0d285d3 after a configured wait of 175m.

@squito

This comment has been minimized.

Copy link
Contributor Author

commented Aug 18, 2015

jenkins, retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 18, 2015

Test build #41136 timed out for PR 7943 at commit 0d285d3 after a configured wait of 175m.

@squito

This comment has been minimized.

Copy link
Contributor Author

commented Aug 18, 2015

Jenkins, retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 18, 2015

Test build #41158 timed out for PR 7943 at commit 0d285d3 after a configured wait of 175m.

@squito

This comment has been minimized.

Copy link
Contributor Author

commented Aug 19, 2015

Jenkins, retest this please

@SparkQA

This comment has been minimized.

Copy link

commented Aug 19, 2015

Test build #41244 timed out for PR 7943 at commit 0d285d3 after a configured wait of 175m.

@squito

This comment has been minimized.

Copy link
Contributor Author

commented Aug 19, 2015

ok ready to give up on jenkins -- the yarn tests have passed regularly.

@tgravescs can you take another look?

@tgravescs

This comment has been minimized.

Copy link
Contributor

commented Aug 20, 2015

lgtm.

@vanzin any further comments?

Was this going to go into 1.5? I know we had mentioned it but not sure if its to late at this point. We can ask Reynold if nobody has confirmed

@vanzin

This comment has been minimized.

Copy link
Contributor

commented Aug 20, 2015

No, nothing to add. I chatted with @rxin briefly about this and he had concerns about pulling it into 1.5 because of timing, but that was like 100 patches to branch-1.5 ago.

@tgravescs

This comment has been minimized.

Copy link
Contributor

commented Aug 20, 2015

ok, I'll talk to him.

@SparkQA

This comment has been minimized.

Copy link

commented Aug 20, 2015

Test build #1676 has finished for PR 7943 at commit 0d285d3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • public static class AppExecId
    • public static class StoreVersion
@tgravescs

This comment has been minimized.

Copy link
Contributor

commented Aug 21, 2015

sounds like we missed 1.5 on this so we can put in master for 1.6

@tgravescs

This comment has been minimized.

Copy link
Contributor

commented Aug 21, 2015

committed to master

@asfgit asfgit closed this in 708036c Aug 21, 2015

@johd01

This comment has been minimized.

Copy link

commented Jan 13, 2016

Hi

This does not seem to work properly.

After the first nodemanager restart:
2016-01-13 08:22:24,472 ERROR shuffle.ExternalShuffleBlockResolver (ExternalShuffleBlockResolver.java:(113)) - error opening leveldb file
/var/hadoop/data1/yarn/nm/registeredExecutors.ldb. Creating new file, will not be able to recover state for existing applications
org.fusesource.leveldbjni.internal.NativeDB$DBException: IO error: /usr/hdp/2.3.2.0-2950/hadoop-yarn/
/var/hadoop/data1/yarn/nm/registeredExecutors.ldb/LOCK: No such file or directory
at org.fusesource.leveldbjni.internal.NativeDB.checkStatus(NativeDB.java:200)
at org.fusesource.leveldbjni.internal.NativeDB.open(NativeDB.java:218)
at org.fusesource.leveldbjni.JniDBFactory.open(JniDBFactory.java:168)
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.(ExternalShuffleBlockResolver.java:100)
at org.apache.spark.network.shuffle.ExternalShuffleBlockResolver.(ExternalShuffleBlockResolver.java:81)
at org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.(ExternalShuffleBlockHandler.java:56)
at org.apache.spark.network.yarn.YarnShuffleService.serviceInit(YarnShuffleService.java:128)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices.serviceInit(AuxServices.java:143)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceInit(ContainerManagerImpl.java:245)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:107)
at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:291)
at org.apache.hadoop.service.AbstractService.init(AbstractService.java:163)
at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartNodeManager(NodeManager.java:537)
at org.apache.hadoop.yarn.server.nodemanager.NodeManager.main(NodeManager.java:585)
2016-01-13 08:22:24,474 WARN shuffle.ExternalShuffleBlockResolver (ExternalShuffleBlockResolver.java:(123)) - error deleting
/var/hadoop/data1/yarn/nm/registeredExecutors.ldb
2016-01-13 08:22:24,474 ERROR yarn.YarnShuffleService (YarnShuffleService.java:serviceInit(130)) - Failed to initialize external shuffle service
java.io.IOException: Unable to create state store

Workaround: switch back to spark-1.5.2-yarn.shuffle.jar

@ashangit

This comment has been minimized.

Copy link
Contributor

commented Mar 3, 2016

Hi @johd01,
I have faced the same issue and open a pull request to solve it: #11475.

There is a workaround that you can apply waiting to have this pull request merged. You will need to check that path set on the yarn-site.xml configuration yarn.nodemanager.local-dirs does not start with spaces, return charrier... or is not a URI (file:///data/yarn/ won't work, this will work /data/yarn/)

ashangit pushed a commit to ashangit/spark that referenced this pull request Oct 19, 2016
[SPARK-9439] [YARN] External shuffle service robust to NM restarts us…
…ing leveldb

https://issues.apache.org/jira/browse/SPARK-9439

In general, Yarn apps should be robust to NodeManager restarts.  However, if you run spark with the external shuffle service on, after a NM restart all shuffles fail, b/c the shuffle service has lost some state with info on each executor.  (Note the shuffle data is perfectly fine on disk across a NM restart, the problem is we've lost the small bit of state that lets us *find* those files.)

The solution proposed here is that the external shuffle service can write out its state to leveldb (backed by a local file) every time an executor is added.  When running with yarn, that file is in the NM's local dir.  Whenever the service is started, it looks for that file, and if it exists, it reads the file and re-registers all executors there.

Nothing is changed in non-yarn modes with this patch.  The service is not given a place to save the state to, so it operates the same as before.  This should make it easy to update other cluster managers as well, by just supplying the right file & the equivalent of yarn's `initializeApplication` -- I'm not familiar enough with those modes to know how to do that.

Author: Imran Rashid <irashid@cloudera.com>

Closes apache#7943 from squito/leveldb_external_shuffle_service_NM_restart and squashes the following commits:

0d285d3 [Imran Rashid] review feedback
70951d6 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
5c71c8c [Imran Rashid] save executor to db before registering; style
2499c8c [Imran Rashid] explicit dependency on jackson-annotations
795d28f [Imran Rashid] review feedback
81f80e2 [Imran Rashid] Merge branch 'master' into leveldb_external_shuffle_service_NM_restart
594d520 [Imran Rashid] use json to serialize application executor info
1a7980b [Imran Rashid] version
8267d2a [Imran Rashid] style
e9f99e8 [Imran Rashid] cleanup the handling of bad dbs a little
9378ba3 [Imran Rashid] fail gracefully on corrupt leveldb files
acedb62 [Imran Rashid] switch to writing out one record per executor
79922b7 [Imran Rashid] rely on yarn to call stopApplication; assorted cleanup
12b6a35 [Imran Rashid] save registered executors when apps are removed; add tests
c878fbe [Imran Rashid] better explanation of shuffle service port handling
694934c [Imran Rashid] only open leveldb connection once per service
d596410 [Imran Rashid] store executor data in leveldb
59800b7 [Imran Rashid] Files.move in case renaming is unsupported
32fe5ae [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart
d7450f0 [Imran Rashid] style
f729e2b [Imran Rashid] debugging
4492835 [Imran Rashid] lol, dont use a PrintWriter b/c of scalastyle checks
0a39b98 [Imran Rashid] Merge branch 'master' into external_shuffle_service_NM_restart
55f49fc [Imran Rashid] make sure the service doesnt die if the registered executor file is corrupt; add tests
245db19 [Imran Rashid] style
62586a6 [Imran Rashid] just serialize the whole executors map
bdbbf0d [Imran Rashid] comments, remove some unnecessary changes
857331a [Imran Rashid] better tests & comments
bb9d1e6 [Imran Rashid] formatting
bdc4b32 [Imran Rashid] rename
86e0cb9 [Imran Rashid] for tests, shuffle service finds an open port
23994ff [Imran Rashid] style
7504de8 [Imran Rashid] style
a36729c [Imran Rashid] cleanup
efb6195 [Imran Rashid] proper unit test, and no longer leak if apps stop during NM restart
dd93dc0 [Imran Rashid] test for shuffle service w/ NM restarts
d596969 [Imran Rashid] cleanup imports
0e9d69b [Imran Rashid] better names
9eae119 [Imran Rashid] cleanup lots of duplication
1136f44 [Imran Rashid] test needs to have an actual shuffle
0b588bd [Imran Rashid] more fixes ...
ad122ef [Imran Rashid] more fixes
5e5a7c3 [Imran Rashid] fix build
c69f46b [Imran Rashid] maybe working version, needs tests & cleanup ...
bb3ba49 [Imran Rashid] minor cleanup
36127d3 [Imran Rashid] wip
b9d2ced [Imran Rashid] incomplete setup for external shuffle service tests

(cherry picked from commit 708036c)
asfgit pushed a commit that referenced this pull request Mar 19, 2019
[SPARK-26288][CORE] add initRegisteredExecutorsDB
## What changes were proposed in this pull request?

As we all know that spark on Yarn uses DB #7943 to record RegisteredExecutors information which can be reloaded and used again when the ExternalShuffleService is restarted .

The RegisteredExecutors information can't be recorded both in the mode of spark's standalone and spark on k8s , which will cause the RegisteredExecutors information to be lost ,when the ExternalShuffleService is restarted.

To solve the problem above, a method is proposed and is committed .

## How was this patch tested?
new  unit tests

Closes #23393 from weixiuli/SPARK-26288.

Authored-by: weixiuli <weixiuli@jd.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
cfmcgrady added a commit to cfmcgrady/spark that referenced this pull request Aug 1, 2019
[SPARK-26288][CORE] add initRegisteredExecutorsDB
As we all know that spark on Yarn uses DB apache#7943 to record RegisteredExecutors information which can be reloaded and used again when the ExternalShuffleService is restarted .

The RegisteredExecutors information can't be recorded both in the mode of spark's standalone and spark on k8s , which will cause the RegisteredExecutors information to be lost ,when the ExternalShuffleService is restarted.

To solve the problem above, a method is proposed and is committed .

new  unit tests

Closes apache#23393 from weixiuli/SPARK-26288.

Authored-by: weixiuli <weixiuli@jd.com>
Signed-off-by: Imran Rashid <irashid@cloudera.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants
You can’t perform that action at this time.