From d62405dfbbea6ce1e7604721ab1234e5fde5b651 Mon Sep 17 00:00:00 2001 From: lishuming Date: Wed, 9 Aug 2017 10:45:28 +0800 Subject: [PATCH 1/5] [SPARK-21660] Yarn ShuffleService failed to start when the chosen directory become read-only --- .../network/yarn/YarnShuffleService.java | 42 ++++++++++++++++--- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index cd67eb28573e8..d4fe51fab5842 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -332,6 +332,17 @@ protected Path getRecoveryPath(String fileName) { return _recoveryPath; } + /** + * Check the chosen DB file available or not. + */ + protected Boolean checkFileAvailable(File file) { + if (file.canWrite()){ + return true; + } + + return false; + } + /** * Figure out the recovery path and handle moving the DB if YARN NM recovery gets enabled * when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise @@ -340,7 +351,7 @@ protected Path getRecoveryPath(String fileName) { protected File initRecoveryDb(String dbName) { if (_recoveryPath != null) { File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName); - if (recoveryFile.exists()) { + if (recoveryFile.exists() && checkFileAvailable(recoveryFile)) { return recoveryFile; } } @@ -348,12 +359,16 @@ protected File initRecoveryDb(String dbName) { String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); for (String dir : localDirs) { File f = new File(new Path(dir).toUri().getPath(), dbName); + // 1. `_recoveryPath` not exists, `f` should be writable; + // 2. `_recoveryPath` exists, `newLoc` should be writable; if (f.exists()) { if (_recoveryPath == null) { - // If NM recovery is not enabled, we should specify the recovery path using NM local - // dirs, which is compatible with the old code. - _recoveryPath = new Path(dir); - return f; + if (checkFileAvailable(f)) { + // If NM recovery is not enabled, we should specify the recovery path using NM local + // dirs, which is compatible with the old code. + _recoveryPath = new Path(dir); + return f; + } } else { // If the recovery path is set then either NM recovery is enabled or another recovery // DB has been initialized. If NM recovery is enabled and had set the recovery path @@ -374,10 +389,25 @@ protected File initRecoveryDb(String dbName) { dbName, _recoveryPath.toString(), e); } } - return new File(newLoc.toUri().getPath()); + File newLocFile = new File(newLoc.toUri().getPath()); + if (checkFileAvailable(newLocFile)) { + return newLocFile; + } } } } + + // Find a local_dir which is writable, to avoid creating ldb in a read-only disk. + if (_recoveryPath == null) { + for (String dir : localDirs) { + File f = new File(dir); + if (checkFileAvailable(f)) { + _recoveryPath = new Path(dir); + break; + } + } + } + if (_recoveryPath == null) { _recoveryPath = new Path(localDirs[0]); } From 2077537c52b43c6df050a7afe23a453d09e38db6 Mon Sep 17 00:00:00 2001 From: lishuming Date: Thu, 10 Aug 2017 16:45:41 +0800 Subject: [PATCH 2/5] Recovery path had already existed but unavailable, set it to null --- .../network/yarn/YarnShuffleService.java | 22 ++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index d4fe51fab5842..fa33d951f1cb3 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -349,12 +349,24 @@ protected Boolean checkFileAvailable(File file) { * it will uses a YARN local dir. */ protected File initRecoveryDb(String dbName) { + Boolean bolRecoveryPathAvailable = true; + if (_recoveryPath != null) { File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName); - if (recoveryFile.exists() && checkFileAvailable(recoveryFile)) { + + bolRecoveryPathAvailable = checkFileAvailable(recoveryFile); + logger.info("Recovery path {} ldb available: {}.", _recoveryPath, bolRecoveryPathAvailable); + if (recoveryFile.exists() && bolRecoveryPathAvailable) { return recoveryFile; } } + + // If recovery path unavailable, no use it any more. + if (!bolRecoveryPathAvailable) { + logger.warn("Recovery path {} unavailable: set it to null", _recoveryPath); + _recoveryPath = null; + } + // db doesn't exist in recovery path go check local dirs for it String[] localDirs = _conf.getTrimmedStrings("yarn.nodemanager.local-dirs"); for (String dir : localDirs) { @@ -375,6 +387,9 @@ protected File initRecoveryDb(String dbName) { // make sure to move all DBs to the recovery path from the old NM local dirs. // If another DB was initialized first just make sure all the DBs are in the same // location. + if (!bolRecoveryPathAvailable) { + continue; + } Path newLoc = new Path(_recoveryPath, dbName); Path copyFrom = new Path(f.toURI()); if (!newLoc.equals(copyFrom)) { @@ -389,10 +404,7 @@ protected File initRecoveryDb(String dbName) { dbName, _recoveryPath.toString(), e); } } - File newLocFile = new File(newLoc.toUri().getPath()); - if (checkFileAvailable(newLocFile)) { - return newLocFile; - } + return new File(newLoc.toUri().getPath()); } } } From 6841ca4e15761126e144704005d5ee2fa2184790 Mon Sep 17 00:00:00 2001 From: hzlishuming Date: Sat, 19 Aug 2017 17:25:54 +0800 Subject: [PATCH 3/5] format code and add unit test --- .../network/yarn/YarnShuffleService.java | 43 ++++++++++++------ .../yarn/YarnShuffleServiceSuite.scala | 44 +++++++++++++++++++ 2 files changed, 74 insertions(+), 13 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index fa33d951f1cb3..be563a75ee5d9 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -32,6 +32,7 @@ import com.google.common.collect.Lists; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -333,14 +334,29 @@ protected Path getRecoveryPath(String fileName) { } /** - * Check the chosen DB file available or not. + * Checks that the current running process can read, write and execute the given file + * by using methods of the File objects. + * + * @param file File to check + * @return True if process has read, write and execute access on the path, or false. */ - protected Boolean checkFileAvailable(File file) { - if (file.canWrite()){ - return true; - } + protected Boolean checkFileAccess(File file) { + if (!FileUtil.canRead(file)) { + logger.warn("File is not readable: " + file.toString()); + return false; + } + + if (!FileUtil.canWrite(file)) { + logger.warn("File is not writable: " + file.toString()); + return false; + } + if (!FileUtil.canExecute(file)) { + logger.warn("File is not executable: " + file.toString()); return false; + } + + return true; } /** @@ -348,13 +364,13 @@ protected Boolean checkFileAvailable(File file) { * when it previously was not. If YARN NM recovery is enabled it uses that path, otherwise * it will uses a YARN local dir. */ - protected File initRecoveryDb(String dbName) { + protected File initRecoveryDb(String dbName) throws IOException { Boolean bolRecoveryPathAvailable = true; if (_recoveryPath != null) { File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName); - bolRecoveryPathAvailable = checkFileAvailable(recoveryFile); + bolRecoveryPathAvailable = checkFileAccess(recoveryFile); logger.info("Recovery path {} ldb available: {}.", _recoveryPath, bolRecoveryPathAvailable); if (recoveryFile.exists() && bolRecoveryPathAvailable) { return recoveryFile; @@ -375,7 +391,7 @@ protected File initRecoveryDb(String dbName) { // 2. `_recoveryPath` exists, `newLoc` should be writable; if (f.exists()) { if (_recoveryPath == null) { - if (checkFileAvailable(f)) { + if (checkFileAccess(f)) { // If NM recovery is not enabled, we should specify the recovery path using NM local // dirs, which is compatible with the old code. _recoveryPath = new Path(dir); @@ -387,9 +403,6 @@ protected File initRecoveryDb(String dbName) { // make sure to move all DBs to the recovery path from the old NM local dirs. // If another DB was initialized first just make sure all the DBs are in the same // location. - if (!bolRecoveryPathAvailable) { - continue; - } Path newLoc = new Path(_recoveryPath, dbName); Path copyFrom = new Path(f.toURI()); if (!newLoc.equals(copyFrom)) { @@ -413,15 +426,19 @@ protected File initRecoveryDb(String dbName) { if (_recoveryPath == null) { for (String dir : localDirs) { File f = new File(dir); - if (checkFileAvailable(f)) { + if (checkFileAccess(f)) { _recoveryPath = new Path(dir); break; + } else { + logger.warn("Local dir {} is not reachable.", dir); } } } if (_recoveryPath == null) { - _recoveryPath = new Path(localDirs[0]); + throw new IOException("Failed to choose a reachable DB recovery path, " + + "please check `yarn.nodemanager.local-dirs` and `yarn.nodemanager.recovery.dir` is available " + + "in the `yarn-site.xml`."); } return new File(_recoveryPath.toUri().getPath(), dbName); diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index a58784f59676a..953a3da29ca27 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -369,4 +369,48 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd new ApplicationInitializationContext(user, appId, secret) } + test("SPARK-21660: get the correct init recovery path considering file access") { + // Test recovery path is set outside the shuffle service, but is a read-only dir, + // if `yarn.nodemanager.local-dirs` has available dir, return the available local dir. + s1 = new YarnShuffleService + val recoveryDir = Utils.createTempDir() + val recoveryPath = new Path(recoveryDir.toURI) + Files.setPosixFilePermissions(recoveryDir.toPath, EnumSet.of(OWNER_READ, OWNER_EXECUTE)) + s1.setRecoveryPath(recoveryPath) + + s1.init(yarnConfig) + s1._recoveryPath should be + (new Path(yarnConfig.getTrimmedStrings("yarn.nodemanager.local-dirs")(0))) + s1.stop() + + // Test recovery path is set inside the shuffle service, but is a read-only dir, + // and `yarn.nodemanager.local-dirs` has no available dir, return IOException. + s2 = new YarnShuffleService + s2.setRecoveryPath(recoveryPath) + + val yarnConfig2 = new YarnConfiguration() + yarnConfig2.set(YarnConfiguration.NM_AUX_SERVICES, "spark_shuffle") + yarnConfig2.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"), + classOf[YarnShuffleService].getCanonicalName) + yarnConfig2.setInt("spark.shuffle.service.port", 0) + yarnConfig2.setBoolean(YarnShuffleService.STOP_ON_FAILURE_KEY, true) + + val localDir2 = Utils.createTempDir() + Files.setPosixFilePermissions(localDir2.toPath, EnumSet.of(OWNER_READ, OWNER_EXECUTE)) + yarnConfig2.set(YarnConfiguration.NM_LOCAL_DIRS, localDir2.getAbsolutePath) + + try { + val error = intercept[ServiceStateException] { + s2.init(yarnConfig2) + } + assert(error.getCause.isInstanceOf[IOException]) + } finally { + Files.setPosixFilePermissions(recoveryDir.toPath, + EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE)) + Files.setPosixFilePermissions(localDir2.toPath, + EnumSet.of(OWNER_READ, OWNER_WRITE, OWNER_EXECUTE)) + s2.stop() + } + } + } From 2e06fdbbe0294cd8a368c877eefc4a555e5b7bb0 Mon Sep 17 00:00:00 2001 From: LiShuMing Date: Sat, 19 Aug 2017 17:52:10 +0800 Subject: [PATCH 4/5] default file access should not contain --- .../org/apache/spark/network/yarn/YarnShuffleService.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index be563a75ee5d9..cc9e39c108fc9 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -334,7 +334,7 @@ protected Path getRecoveryPath(String fileName) { } /** - * Checks that the current running process can read, write and execute the given file + * Checks that the current running process can read, write the given file * by using methods of the File objects. * * @param file File to check @@ -351,11 +351,6 @@ protected Boolean checkFileAccess(File file) { return false; } - if (!FileUtil.canExecute(file)) { - logger.warn("File is not executable: " + file.toString()); - return false; - } - return true; } From e380c6feb7c9921fa5b74154e187df1acbbc6f92 Mon Sep 17 00:00:00 2001 From: LiShuMing Date: Sat, 19 Aug 2017 20:37:29 +0800 Subject: [PATCH 5/5] should check recovery path access instead of db file --- .../spark/network/yarn/YarnShuffleService.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index cc9e39c108fc9..bbba53b69597c 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -341,6 +341,11 @@ protected Path getRecoveryPath(String fileName) { * @return True if process has read, write and execute access on the path, or false. */ protected Boolean checkFileAccess(File file) { + if (!file.exists()) { + logger.warn("File is not existed: " + file.toString()); + return false; + } + if (!FileUtil.canRead(file)) { logger.warn("File is not readable: " + file.toString()); return false; @@ -351,6 +356,11 @@ protected Boolean checkFileAccess(File file) { return false; } + if (!FileUtil.canExecute(file)) { + logger.warn("File is not executable: " + file.toString()); + return false; + } + return true; } @@ -365,8 +375,9 @@ protected File initRecoveryDb(String dbName) throws IOException { if (_recoveryPath != null) { File recoveryFile = new File(_recoveryPath.toUri().getPath(), dbName); - bolRecoveryPathAvailable = checkFileAccess(recoveryFile); + bolRecoveryPathAvailable = checkFileAccess(new File(_recoveryPath.toUri().getPath())); logger.info("Recovery path {} ldb available: {}.", _recoveryPath, bolRecoveryPathAvailable); + if (recoveryFile.exists() && bolRecoveryPathAvailable) { return recoveryFile; }