From 4c35cce59b6d52bba93f9805a0d5a7dd0be3f6ad Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 20 Jun 2017 14:33:43 -0700 Subject: [PATCH 1/3] [SPARK-9825][yarn] Do not overwrite final Hadoop config entries. When localizing the gateway config files in a YARN application, avoid overwriting final configs by distributing the gateway files to a separate directory, and explicitly loading them into the Hadoop config, instead of placing those files before the cluster's files in the classpath. The implementation is a little hacky, but mostly because there's no API that tells you which are the files that are loaded by a YarnConfiguration object; so the list of files was obtained empirically and is hardcoded in the code. Tested with existing unit tests, and by verifying the behavior in a YARN cluster (final values are not overridden, non-final values are). --- .../org/apache/spark/deploy/yarn/Client.scala | 49 +++++++++++++------ .../deploy/yarn/YarnSparkHadoopUtil.scala | 18 ++++++- 2 files changed, 49 insertions(+), 18 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index e5131e636dc0..8fdf7bf8307b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -666,21 +666,6 @@ private[spark] class Client( private def createConfArchive(): File = { val hadoopConfFiles = new HashMap[String, File]() - // Uploading $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that - // the executors will use the latest configurations instead of the default values. This is - // required when user changes log4j.properties directly to set the log configurations. If - // configuration file is provided through --files then executors will be taking configurations - // from --files instead of $SPARK_CONF_DIR/log4j.properties. - - // Also uploading metrics.properties to distributed cache if exists in classpath. - // If user specify this file using --files then executors will use the one - // from --files instead. - for { prop <- Seq("log4j.properties", "metrics.properties") - url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) - if url.getProtocol == "file" } { - hadoopConfFiles(prop) = new File(url.getPath) - } - Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => sys.env.get(envKey).foreach { path => val dir = new File(path) @@ -705,9 +690,32 @@ private[spark] class Client( try { confStream.setLevel(0) + + // Upload $SPARK_CONF_DIR/log4j.properties file to the distributed cache to make sure that + // the executors will use the latest configurations instead of the default values. This is + // required when user changes log4j.properties directly to set the log configurations. If + // configuration file is provided through --files then executors will be taking configurations + // from --files instead of $SPARK_CONF_DIR/log4j.properties. + + // Also upload metrics.properties to distributed cache if exists in classpath. + // If user specify this file using --files then executors will use the one + // from --files instead. + for { prop <- Seq("log4j.properties", "metrics.properties") + url <- Option(Utils.getContextOrSparkClassLoader.getResource(prop)) + if url.getProtocol == "file" } { + val file = new File(url.getPath()) + confStream.putNextEntry(new ZipEntry(file.getName())) + Files.copy(file, confStream) + confStream.closeEntry() + } + + // Save the Hadoop config files under a separate directory in the archive. This directory + // is appended to the classpath so that the cluster-provided configuration takes precedence. + confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/")) + confStream.closeEntry() hadoopConfFiles.foreach { case (name, file) => if (file.canRead()) { - confStream.putNextEntry(new ZipEntry(name)) + confStream.putNextEntry(new ZipEntry(s"$LOCALIZED_HADOOP_CONF_DIR/$name")) Files.copy(file, confStream) confStream.closeEntry() } @@ -1177,6 +1185,9 @@ private object Client extends Logging { // Subdirectory where the user's Spark and Hadoop config files will be placed. val LOCALIZED_CONF_DIR = "__spark_conf__" + // Subdirectory in the conf directory containing Hadoop config files. + val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__" + // File containing the conf archive in the AM. See prepareLocalResources(). val LOCALIZED_CONF_ARCHIVE = LOCALIZED_CONF_DIR + ".zip" @@ -1288,6 +1299,12 @@ private object Client extends Logging { sys.env.get(ENV_DIST_CLASSPATH).foreach { cp => addClasspathEntry(getClusterPath(sparkConf, cp), env) } + + // Add the localized Hadoop config at the end of the classpath, in case it contains other + // files (such as configuration files for different services) that are not part of the + // YARN cluster's config. + addClasspathEntry( + buildPath(Environment.PWD.$$(), LOCALIZED_CONF_DIR, LOCALIZED_HADOOP_CONF_DIR), env) } /** diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 4522071bd92e..28ad4a8a0eac 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -61,8 +61,22 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { // Return an appropriate (subclass) of Configuration. Creating a config initializes some Hadoop // subsystems. Always create a new config, don't reuse yarnConf. - override def newConfiguration(conf: SparkConf): Configuration = - new YarnConfiguration(super.newConfiguration(conf)) + override def newConfiguration(conf: SparkConf): Configuration = { + val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) + + // These resources may be distributed by Client.scala when starting the YARN application, and + // include any config customizations done at the gateway; overlay them on top of the cluster's + // config, so that final entries are not overwritten. + // + // The list of files used by YarnConfiguration doesn't seem to be available through any API, + // but these are the listed files when you dump a fresh YarnConfiguration into XML. + val resourceDir = Client.LOCALIZED_HADOOP_CONF_DIR + Array("core-site.xml", "hdfs-site.xml", "yarn-site.xml", "mapred-site.xml").foreach { res => + hadoopConf.addResource(s"$resourceDir/$res") + } + + hadoopConf + } // Add any user credentials to the job conf which are necessary for running on a secure Hadoop // cluster From 3d59f0bcc0d658466c9e0e67a583cd7c65729302 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 10 Jul 2017 14:26:33 -0700 Subject: [PATCH 2/3] Create a separate file with the gateway's config. --- .../scala/org/apache/spark/deploy/yarn/Client.scala | 10 ++++++++++ .../spark/deploy/yarn/YarnSparkHadoopUtil.scala | 13 +------------ 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 8fdf7bf8307b..6e261409937f 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -721,6 +721,12 @@ private[spark] class Client( } } + // Save the YARN configuration into a separate file that will be overlayed on top of the + // cluster's Hadoop conf. + confStream.putNextEntry(new ZipEntry(SPARK_HADOOP_CONF_FILE)) + yarnConf.writeXml(confStream) + confStream.closeEntry() + // Save Spark configuration to a file in the archive. val props = new Properties() sparkConf.getAll.foreach { case (k, v) => props.setProperty(k, v) } @@ -1194,6 +1200,10 @@ private object Client extends Logging { // Name of the file in the conf archive containing Spark configuration. val SPARK_CONF_FILE = "__spark_conf__.properties" + // Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the + // cluster's Hadoop config. + val SPARK_HADOOP_CONF_FILE = "__spark_conf__.xml" + // Subdirectory where the user's python files (not archives) will be placed. val LOCALIZED_PYTHON_DIR = "__pyfiles__" diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 28ad4a8a0eac..a687f67c5b69 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -63,18 +63,7 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil { // subsystems. Always create a new config, don't reuse yarnConf. override def newConfiguration(conf: SparkConf): Configuration = { val hadoopConf = new YarnConfiguration(super.newConfiguration(conf)) - - // These resources may be distributed by Client.scala when starting the YARN application, and - // include any config customizations done at the gateway; overlay them on top of the cluster's - // config, so that final entries are not overwritten. - // - // The list of files used by YarnConfiguration doesn't seem to be available through any API, - // but these are the listed files when you dump a fresh YarnConfiguration into XML. - val resourceDir = Client.LOCALIZED_HADOOP_CONF_DIR - Array("core-site.xml", "hdfs-site.xml", "yarn-site.xml", "mapred-site.xml").foreach { res => - hadoopConf.addResource(s"$resourceDir/$res") - } - + hadoopConf.addResource(Client.SPARK_HADOOP_CONF_FILE) hadoopConf } From 302f16cb1162e395aeb17f9a813c306c6bf5374d Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 12 Jul 2017 14:58:30 -0700 Subject: [PATCH 3/3] Change generated config file name. --- .../src/main/scala/org/apache/spark/deploy/yarn/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 6e261409937f..f56794dfe4b0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -1202,7 +1202,7 @@ private object Client extends Logging { // Name of the file containing the gateway's Hadoop configuration, to be overlayed on top of the // cluster's Hadoop config. - val SPARK_HADOOP_CONF_FILE = "__spark_conf__.xml" + val SPARK_HADOOP_CONF_FILE = "__spark_hadoop_conf__.xml" // Subdirectory where the user's python files (not archives) will be placed. val LOCALIZED_PYTHON_DIR = "__pyfiles__"