From 8885ffff03efe1bbc2afa86b24f15c0413d2c05d Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 9 Jul 2018 14:24:50 +0800 Subject: [PATCH 1/6] Add spark.yarn.access.all.hadoopFileSystems --- .../apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 4 +++- .../main/scala/org/apache/spark/deploy/yarn/config.scala | 8 ++++++++ 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 7250e58b6c49a..01868fa3b338c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -200,10 +200,12 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(FileSystem.get(hadoopConf)) + val accessAllFileSystem = sparkConf.get(FILESYSTEMS_TO_ACCESS_ALL) + // Add the list of available namenodes for all namespaces in HDFS federation. // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its // namespaces. - val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") { + val hadoopFilesystems = if (accessAllFileSystem || stagingFS.getScheme == "viewfs") { Set.empty } else { val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index 129084a86597a..b8a6f6f881ccc 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -243,6 +243,14 @@ package object config { "that hosts fs.defaultFS does not need to be listed here.") .fallbackConf(NAMENODES_TO_ACCESS) + private[spark] val FILESYSTEMS_TO_ACCESS_ALL = + ConfigBuilder("spark.yarn.access.all.hadoopFileSystems") + .doc("Whether to get tokens of all filesystem configured in hdfs-site.xml. " + + "The default value is false. " + + "If true, a filesystem failed to get token may cause the entire job failed.") + .booleanConf + .createWithDefault(false) + /* Rolled log aggregation configuration. */ private[spark] val ROLLED_LOG_INCLUDE_PATTERN = From 50ef3c987f96853b3281214435c83b788f4f2f3b Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 9 Jul 2018 15:05:24 +0800 Subject: [PATCH 2/6] accessAllFileSystem -> !accessAllFileSystem --- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 01868fa3b338c..13e663fa075d6 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -205,7 +205,7 @@ object YarnSparkHadoopUtil { // Add the list of available namenodes for all namespaces in HDFS federation. // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its // namespaces. - val hadoopFilesystems = if (accessAllFileSystem || stagingFS.getScheme == "viewfs") { + val hadoopFilesystems = if (!accessAllFileSystem || stagingFS.getScheme == "viewfs") { Set.empty } else { val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") From da1e389a75980234d6e11a2ea7f6b6ad4cab7062 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Mon, 9 Jul 2018 15:40:54 +0800 Subject: [PATCH 3/6] FILESYSTEMS_TO_ACCESS_ALL -> true --- .../apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 61c0c43f7c04f..95e708c26ec8d 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -29,6 +29,7 @@ import org.scalatest.Matchers import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.deploy.yarn.config.FILESYSTEMS_TO_ACCESS_ALL import org.apache.spark.internal.Logging import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -143,7 +144,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } test("SPARK-24149: retrieve all namenodes from HDFS") { - val sparkConf = new SparkConf() + val sparkConf = new SparkConf().set(FILESYSTEMS_TO_ACCESS_ALL, true) val basicFederationConf = new Configuration() basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020") basicFederationConf.set("dfs.nameservices", "ns1,ns2") From 43ffeaa5f349d9b0e67af0f540461853c62862ee Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 10 Jul 2018 10:28:09 +0800 Subject: [PATCH 4/6] NAMENODES_TO_ACCESS default to * --- .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 11 ++++------- .../scala/org/apache/spark/deploy/yarn/config.scala | 10 +--------- .../spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala | 3 +-- 3 files changed, 6 insertions(+), 18 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 13e663fa075d6..7b35758bdff8a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -193,20 +193,17 @@ object YarnSparkHadoopUtil { sparkConf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = { val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS) - .map(new Path(_).getFileSystem(hadoopConf)) - .toSet + val isRequestAllDelegationTokens = filesystemsToAccess.contains("*") val stagingFS = sparkConf.get(STAGING_DIR) .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(FileSystem.get(hadoopConf)) - val accessAllFileSystem = sparkConf.get(FILESYSTEMS_TO_ACCESS_ALL) - // Add the list of available namenodes for all namespaces in HDFS federation. // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its // namespaces. - val hadoopFilesystems = if (!accessAllFileSystem || stagingFS.getScheme == "viewfs") { - Set.empty + val hadoopFilesystems = if (!isRequestAllDelegationTokens || stagingFS.getScheme == "viewfs") { + filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet } else { val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") // Retrieving the filesystem for the nameservices where HA is not enabled @@ -224,7 +221,7 @@ object YarnSparkHadoopUtil { (filesystemsWithoutHA ++ filesystemsWithHA).toSet } - filesystemsToAccess ++ hadoopFilesystems + stagingFS + hadoopFilesystems + stagingFS } } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index b8a6f6f881ccc..b7478de8faa0b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -236,21 +236,13 @@ package object config { "fs.defaultFS does not need to be listed here.") .stringConf .toSequence - .createWithDefault(Nil) + .createWithDefault("*" :: Nil) private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems") .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + "that hosts fs.defaultFS does not need to be listed here.") .fallbackConf(NAMENODES_TO_ACCESS) - private[spark] val FILESYSTEMS_TO_ACCESS_ALL = - ConfigBuilder("spark.yarn.access.all.hadoopFileSystems") - .doc("Whether to get tokens of all filesystem configured in hdfs-site.xml. " + - "The default value is false. " + - "If true, a filesystem failed to get token may cause the entire job failed.") - .booleanConf - .createWithDefault(false) - /* Rolled log aggregation configuration. */ private[spark] val ROLLED_LOG_INCLUDE_PATTERN = diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala index 95e708c26ec8d..61c0c43f7c04f 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala @@ -29,7 +29,6 @@ import org.scalatest.Matchers import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.deploy.yarn.config.FILESYSTEMS_TO_ACCESS_ALL import org.apache.spark.internal.Logging import org.apache.spark.util.{ResetSystemProperties, Utils} @@ -144,7 +143,7 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging } test("SPARK-24149: retrieve all namenodes from HDFS") { - val sparkConf = new SparkConf().set(FILESYSTEMS_TO_ACCESS_ALL, true) + val sparkConf = new SparkConf() val basicFederationConf = new Configuration() basicFederationConf.set("fs.defaultFS", "hdfs://localhost:8020") basicFederationConf.set("dfs.nameservices", "ns1,ns2") From 61b13395724acbc3fbdc1d2a9b9213f6e42c18de Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Tue, 10 Jul 2018 11:04:15 +0800 Subject: [PATCH 5/6] isRequestAllDelegationTokens = filesystemsToAccess.isEmpty --- .../org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 2 +- .../src/main/scala/org/apache/spark/deploy/yarn/config.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 7b35758bdff8a..eccb47d48483c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -193,7 +193,7 @@ object YarnSparkHadoopUtil { sparkConf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = { val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS) - val isRequestAllDelegationTokens = filesystemsToAccess.contains("*") + val isRequestAllDelegationTokens = filesystemsToAccess.isEmpty val stagingFS = sparkConf.get(STAGING_DIR) .map(new Path(_).getFileSystem(hadoopConf)) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala index b7478de8faa0b..129084a86597a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala @@ -236,7 +236,7 @@ package object config { "fs.defaultFS does not need to be listed here.") .stringConf .toSequence - .createWithDefault("*" :: Nil) + .createWithDefault(Nil) private[spark] val FILESYSTEMS_TO_ACCESS = ConfigBuilder("spark.yarn.access.hadoopFileSystems") .doc("Extra Hadoop filesystem URLs for which to request delegation tokens. The filesystem " + From f3aa2a4e7ed180e54157394dc9721c53de444866 Mon Sep 17 00:00:00 2001 From: Yuming Wang Date: Sat, 25 Aug 2018 10:12:55 +0800 Subject: [PATCH 6/6] isRequestAllDelegationTokens -> requestAllDelegationTokens --- .../apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index eccb47d48483c..3a3272216294f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -27,11 +27,8 @@ import org.apache.hadoop.yarn.api.ApplicationConstants import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} import org.apache.hadoop.yarn.util.ConverterUtils -import org.apache.spark.{SecurityManager, SparkConf, SparkException} -import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.yarn.config._ -import org.apache.spark.deploy.yarn.security.YARNHadoopDelegationTokenManager -import org.apache.spark.internal.config._ import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.util.Utils @@ -193,7 +190,7 @@ object YarnSparkHadoopUtil { sparkConf: SparkConf, hadoopConf: Configuration): Set[FileSystem] = { val filesystemsToAccess = sparkConf.get(FILESYSTEMS_TO_ACCESS) - val isRequestAllDelegationTokens = filesystemsToAccess.isEmpty + val requestAllDelegationTokens = filesystemsToAccess.isEmpty val stagingFS = sparkConf.get(STAGING_DIR) .map(new Path(_).getFileSystem(hadoopConf)) @@ -202,7 +199,7 @@ object YarnSparkHadoopUtil { // Add the list of available namenodes for all namespaces in HDFS federation. // If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its // namespaces. - val hadoopFilesystems = if (!isRequestAllDelegationTokens || stagingFS.getScheme == "viewfs") { + val hadoopFilesystems = if (!requestAllDelegationTokens || stagingFS.getScheme == "viewfs") { filesystemsToAccess.map(new Path(_).getFileSystem(hadoopConf)).toSet } else { val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")