From 79221c712e9d8794a3e36f08fcd29f247bb9138f Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 21 Jan 2015 11:40:49 -0800 Subject: [PATCH 1/7] [SPARK-2669] [yarn] Distribute client configuration to AM. Currently, when Spark launches the Yarn AM, the process will use the local Hadoop configuration on the node where the AM launches, if one is present. A more correct approach is to use the same configuration used to launch the Spark job, since the user may have made modifications (such as adding app-specific configs). The approach taken here is to use the distributed cache to make all files in the Hadoop configuration directory available to the AM. This is a little overkill since only the AM needs them (the executors use the broadcast Hadoop configuration from the driver), but is the easier approach. Even though only a few files in that directory may end up being used, all of them are uploaded. This allows supporting use cases such as when auxiliary configuration files are used for SSL configuration, or when uploading a Hive configuration directory. Not all of these may be reflected in a o.a.h.conf.Configuration object, but may be needed when a driver in cluster mode instantiates, for example, a HiveConf object instead. --- .../org/apache/spark/deploy/yarn/Client.scala | 50 ++++++++++++++++--- .../spark/deploy/yarn/ClientSuite.scala | 13 +++-- 2 files changed, 50 insertions(+), 13 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index d4eeccf64275f..a2464d2a283eb 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,6 +17,7 @@ package org.apache.spark.deploy.yarn +import java.io.File import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer @@ -70,12 +71,6 @@ private[spark] class Client( def stop(): Unit = yarnClient.stop() - /* ------------------------------------------------------------------------------------- * - | The following methods have much in common in the stable and alpha versions of Client, | - | but cannot be implemented in the parent trait due to subtle API differences across | - | hadoop versions. | - * ------------------------------------------------------------------------------------- */ - /** * Submit an application running our ApplicationMaster to the ResourceManager. * @@ -267,6 +262,45 @@ private[spark] class Client( } } + // Distribute the Hadoop config files. These are only really used by the AM, since executors + // will use the configuration object broadcast by the driver. But this is the easiest way to + // make sure the files are available for the AM. The files are placed in a subdirectory so + // that they do not clash with other user files. This directory is then added to the classpath + // of all processes (both the AM and all the executors), just to make sure that everybody is + // using the same default config. + // + // This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR + // shows up in the classpath before YARN_CONF_DIR. + // + // Currently this makes a shallow copy of the conf directory. If there are cases where a + // Hadoop config directory contains subdirectories, this code will have to be fixed. + val hadoopConfFiles = new HashMap[String, File]() + Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => + sys.env.get(envKey).foreach { path => + val dir = new File(path) + if (dir.isDirectory()) { + dir.listFiles().foreach { file => + if (!hadoopConfFiles.contains(file.getName())) { + hadoopConfFiles(file.getName()) = file + } + } + } + } + } + + val hadoopConfPath = new Path(dst, HADOOP_CONF_DIR) + fs.mkdirs(hadoopConfPath) + + hadoopConfFiles.foreach { case (name, file) => + val destPath = copyFileToRemote(hadoopConfPath, new Path(file.toURI()), + replication, true) + distCacheMgr.addResource(fs, hadoopConf, destPath, + localResources, + LocalResourceType.FILE, + HADOOP_CONF_DIR + Path.SEPARATOR + name, + statCache) + } + /** * Do the same for any additional resources passed in through ClientArguments. * Each resource category is represented by a 3-tuple of: @@ -660,6 +694,9 @@ object Client extends Logging { // Distribution-defined classpath to add to processes val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH" + // Subdirectory where the user's hadoop config files are written in the job staging dir. + val HADOOP_CONF_DIR = "__hadoop_conf_dir__" + /** * Find the user-defined Spark jar if configured, or return the jar containing this * class if not. @@ -770,6 +807,7 @@ object Client extends Logging { extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach(addClasspathEntry(_, env)) addClasspathEntry(Environment.PWD.$(), env) + addClasspathEntry(Environment.PWD.$() + Path.SEPARATOR + HADOOP_CONF_DIR, env) // Normally the users app.jar is last in case conflicts with spark jars if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index aad50015b717f..5a12bce7ab828 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -20,6 +20,11 @@ package org.apache.spark.deploy.yarn import java.io.File import java.net.URI +import scala.collection.JavaConversions._ +import scala.collection.mutable.{ HashMap => MutableHashMap } +import scala.reflect.ClassTag +import scala.util.Try + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.MRJobConfig @@ -28,16 +33,9 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.mockito.Matchers._ import org.mockito.Mockito._ - - import org.scalatest.FunSuite import org.scalatest.Matchers -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ HashMap => MutableHashMap } -import scala.reflect.ClassTag -import scala.util.Try - import org.apache.spark.{SparkException, SparkConf} import org.apache.spark.util.Utils @@ -99,6 +97,7 @@ class ClientSuite extends FunSuite with Matchers { } }) cp should contain (Environment.PWD.$()) + cp should contain (Environment.PWD.$() + Path.SEPARATOR + Client.HADOOP_CONF_DIR) cp should contain (s"${Environment.PWD.$()}${File.separator}*") cp should not contain (Client.SPARK_JAR) cp should not contain (Client.APP_JAR) From 34bdbd81ca95fc4b844686f852a06b2c937d522c Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Tue, 17 Feb 2015 12:27:26 -0800 Subject: [PATCH 2/7] Fix test. The test was setting HADOOP_CONF_DIR to the temp directory, causing the same file to be distributed via Yarn's cache multiple times. Yarn doesn't seem to like that; so fix the test and add a check in Client.scala to detect that situation and fail early when it happens. --- .../org/apache/spark/deploy/yarn/Client.scala | 15 ++++++++++++++- .../apache/spark/deploy/yarn/ClientSuite.scala | 16 ++++++++++++++++ .../spark/deploy/yarn/YarnClusterSuite.scala | 5 ++++- 3 files changed, 34 insertions(+), 2 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 209f1e628b22c..5af3d9f1f554e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -22,7 +22,7 @@ import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap, ListBuffer, Map} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.util.{Try, Success, Failure} import com.google.common.base.Objects @@ -211,6 +211,7 @@ private[spark] class Client( val fs = FileSystem.get(hadoopConf) val dst = new Path(fs.getHomeDirectory(), appStagingDir) val nns = getNameNodesToAccess(sparkConf) + dst + val distributedUris = new HashSet[String] obtainTokensForNamenodes(nns, hadoopConf, credentials) val replication = sparkConf.getInt("spark.yarn.submit.file.replication", @@ -228,6 +229,15 @@ private[spark] class Client( "for alternatives.") } + def addDistributedUri(uri: URI): Unit = { + val uriStr = uri.toString() + if (distributedUris.contains(uriStr)) { + throw new IllegalArgumentException( + s"Resource $uri added multiple times to distributed cache.") + } + distributedUris += uriStr + } + /** * Copy the given main resource to the distributed cache if the scheme is not "local". * Otherwise, set the corresponding key in our SparkConf to handle it downstream. @@ -245,6 +255,7 @@ private[spark] class Client( if (!localPath.isEmpty()) { val localURI = new URI(localPath) if (localURI.getScheme != LOCAL_SCHEME) { + addDistributedUri(localURI) val src = getQualifiedLocalPath(localURI, hadoopConf) val destPath = copyFileToRemote(dst, src, replication) val destFs = FileSystem.get(destPath.toUri(), hadoopConf) @@ -288,6 +299,7 @@ private[spark] class Client( fs.mkdirs(hadoopConfPath) hadoopConfFiles.foreach { case (name, file) => + addDistributedUri(file.toURI()) val destPath = copyFileToRemote(hadoopConfPath, new Path(file.toURI()), replication) distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, @@ -313,6 +325,7 @@ private[spark] class Client( flist.split(',').foreach { file => val localURI = new URI(file.trim()) if (localURI.getScheme != LOCAL_SCHEME) { + addDistributedUri(localURI) val localPath = new Path(localURI) val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) val destPath = copyFileToRemote(dst, localPath, replication) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 88ecfd6b5854d..bb56581aada57 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -195,6 +195,22 @@ class ClientSuite extends FunSuite with Matchers { assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") } + test("conflicting distributed cache resources") { + val conf = new Configuration() + val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK) + val jar = "/same.jar" + val args = new ClientArguments(Array("--jar", jar, "--addJars", jar), sparkConf) + + val client = spy(new Client(args, conf, sparkConf)) + doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), + any(classOf[Path]), anyShort()) + + val tempDir = Utils.createTempDir() + intercept[IllegalArgumentException] { + client.prepareLocalResources(tempDir.getAbsolutePath()) + } + } + object Fixtures { val knownDefYarnAppCP: Seq[String] = diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 0e37276ba724b..e11757a2c999b 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -77,6 +77,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit private var yarnCluster: MiniYARNCluster = _ private var tempDir: File = _ private var fakeSparkJar: File = _ + private var hadoopConfDir: File = _ private var logConfDir: File = _ override def beforeAll() { @@ -120,6 +121,8 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) + hadoopConfDir = new File(tempDir, Client.HADOOP_CONF_DIR) + assert(hadoopConfDir.mkdir()) } override def afterAll() { @@ -257,7 +260,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit appArgs Utils.executeAndGetOutput(argv, - extraEnvironment = Map("YARN_CONF_DIR" -> tempDir.getAbsolutePath())) + extraEnvironment = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath())) } /** From e3d0613cec8c439310b042f5b21c3d75859eece8 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 11 Mar 2015 09:20:02 -0700 Subject: [PATCH 3/7] Review feedback. --- docs/running-on-yarn.md | 6 +- .../org/apache/spark/deploy/yarn/Client.scala | 57 ++++++++++--------- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md index 2b93eef6c26ed..b755068c8e7b2 100644 --- a/docs/running-on-yarn.md +++ b/docs/running-on-yarn.md @@ -201,7 +201,11 @@ Most of the configs are the same for Spark on YARN as for other deployment modes # Launching Spark on YARN Ensure that `HADOOP_CONF_DIR` or `YARN_CONF_DIR` points to the directory which contains the (client side) configuration files for the Hadoop cluster. -These configs are used to write to the dfs and connect to the YARN ResourceManager. +These configs are used to write to the dfs and connect to the YARN ResourceManager. The +configuration contained in this directory will be distributed to the YARN cluster so that all +containers used by the application use the same configuration. If the configuration references +Java system properties or environment variables not managed by YARN, they should also be set in the +Spark application's configuration (driver, executors, and the AM when running in client mode). There are two deploy modes that can be used to launch Spark applications on YARN. In yarn-cluster mode, the Spark driver runs inside an application master process which is managed by YARN on the cluster, and the client can go away after initiating the application. In yarn-client mode, the driver runs in the client process, and the application master is only used for requesting resources from YARN. diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 5af3d9f1f554e..69d650ac9a670 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -211,6 +211,9 @@ private[spark] class Client( val fs = FileSystem.get(hadoopConf) val dst = new Path(fs.getHomeDirectory(), appStagingDir) val nns = getNameNodesToAccess(sparkConf) + dst + // Used to keep track of URIs added to the distributed cache. If the same URI is added + // multiple times, YARN will fail to launch containers for the app with an internal + // error. val distributedUris = new HashSet[String] obtainTokensForNamenodes(nns, hadoopConf, credentials) @@ -229,13 +232,15 @@ private[spark] class Client( "for alternatives.") } - def addDistributedUri(uri: URI): Unit = { + def addDistributedUri(uri: URI): Boolean = { val uriStr = uri.toString() if (distributedUris.contains(uriStr)) { - throw new IllegalArgumentException( - s"Resource $uri added multiple times to distributed cache.") + logWarning(s"Resource $uri added multiple times to distributed cache.") + false + } else { + distributedUris += uriStr + true } - distributedUris += uriStr } /** @@ -255,12 +260,13 @@ private[spark] class Client( if (!localPath.isEmpty()) { val localURI = new URI(localPath) if (localURI.getScheme != LOCAL_SCHEME) { - addDistributedUri(localURI) - val src = getQualifiedLocalPath(localURI, hadoopConf) - val destPath = copyFileToRemote(dst, src, replication) - val destFs = FileSystem.get(destPath.toUri(), hadoopConf) - distCacheMgr.addResource(destFs, hadoopConf, destPath, - localResources, LocalResourceType.FILE, destName, statCache) + if (addDistributedUri(localURI)) { + val src = getQualifiedLocalPath(localURI, hadoopConf) + val destPath = copyFileToRemote(dst, src, replication) + val destFs = FileSystem.get(destPath.toUri(), hadoopConf) + distCacheMgr.addResource(destFs, hadoopConf, destPath, + localResources, LocalResourceType.FILE, destName, statCache) + } } else if (confKey != null) { // If the resource is intended for local use only, handle this downstream // by setting the appropriate property @@ -299,13 +305,11 @@ private[spark] class Client( fs.mkdirs(hadoopConfPath) hadoopConfFiles.foreach { case (name, file) => - addDistributedUri(file.toURI()) - val destPath = copyFileToRemote(hadoopConfPath, new Path(file.toURI()), replication) - distCacheMgr.addResource(fs, hadoopConf, destPath, - localResources, - LocalResourceType.FILE, - HADOOP_CONF_DIR + Path.SEPARATOR + name, - statCache) + if (addDistributedUri(file.toURI())) { + val destPath = copyFileToRemote(hadoopConfPath, new Path(file.toURI()), replication) + distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.FILE, + HADOOP_CONF_DIR + Path.SEPARATOR + name, statCache) + } } /** @@ -325,14 +329,15 @@ private[spark] class Client( flist.split(',').foreach { file => val localURI = new URI(file.trim()) if (localURI.getScheme != LOCAL_SCHEME) { - addDistributedUri(localURI) - val localPath = new Path(localURI) - val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) - distCacheMgr.addResource( - fs, hadoopConf, destPath, localResources, resType, linkname, statCache) - if (addToClasspath) { - cachedSecondaryJarLinks += linkname + if (addDistributedUri(localURI)) { + val localPath = new Path(localURI) + val linkname = Option(localURI.getFragment()).getOrElse(localPath.getName()) + val destPath = copyFileToRemote(dst, localPath, replication) + distCacheMgr.addResource( + fs, hadoopConf, destPath, localResources, resType, linkname, statCache) + if (addToClasspath) { + cachedSecondaryJarLinks += linkname + } } } else if (addToClasspath) { // Resource is intended for local use only and should be added to the class path @@ -725,7 +730,7 @@ object Client extends Logging { // Distribution-defined classpath to add to processes val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH" - // Subdirectory where the user's hadoop config files are written in the job staging dir. + // Subdirectory where the user's hadoop config files are written in the app staging dir. val HADOOP_CONF_DIR = "__hadoop_conf_dir__" /** From cbb9fb3b16f3711cae8d3eae8c7573a88d294798 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Wed, 18 Mar 2015 06:50:51 -0700 Subject: [PATCH 4/7] Remove stale test. --- .../apache/spark/deploy/yarn/ClientSuite.scala | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index c81356f986c61..af067dbf68159 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -202,22 +202,6 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll { assert(caught.getMessage === "Can't get Master Kerberos principal for use as renewer") } - test("conflicting distributed cache resources") { - val conf = new Configuration() - val sparkConf = new SparkConf().set(Client.CONF_SPARK_JAR, SPARK) - val jar = "/same.jar" - val args = new ClientArguments(Array("--jar", jar, "--addJars", jar), sparkConf) - - val client = spy(new Client(args, conf, sparkConf)) - doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]), - any(classOf[Path]), anyShort()) - - val tempDir = Utils.createTempDir() - intercept[IllegalArgumentException] { - client.prepareLocalResources(tempDir.getAbsolutePath()) - } - } - object Fixtures { val knownDefYarnAppCP: Seq[String] = From ed45b7df3e636794fae139ebecef81c3da949aeb Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Thu, 2 Apr 2015 15:08:10 -0700 Subject: [PATCH 5/7] Zip all config files and upload them as an archive. This avoids creating lots of small files in the NN when starting up the job. The archive uses the lowest compression level so that it's faster. --- .../org/apache/spark/deploy/yarn/Client.scala | 39 ++++++++++++------- .../spark/deploy/yarn/YarnClusterSuite.scala | 1 + 2 files changed, 27 insertions(+), 13 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 5e8270d872e5a..4a47facf9142e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -17,15 +17,17 @@ package org.apache.spark.deploy.yarn -import java.io.File +import java.io.{File, FileOutputStream} import java.net.{InetAddress, UnknownHostException, URI, URISyntaxException} import java.nio.ByteBuffer +import java.util.zip.{ZipEntry, ZipOutputStream} import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, ListBuffer, Map} import scala.util.{Try, Success, Failure} import com.google.common.base.Objects +import com.google.common.io.Files import org.apache.hadoop.io.DataOutputBuffer import org.apache.hadoop.conf.Configuration @@ -277,10 +279,10 @@ private[spark] class Client( // Distribute the Hadoop config files. These are only really used by the AM, since executors // will use the configuration object broadcast by the driver. But this is the easiest way to - // make sure the files are available for the AM. The files are placed in a subdirectory so - // that they do not clash with other user files. This directory is then added to the classpath - // of all processes (both the AM and all the executors), just to make sure that everybody is - // using the same default config. + // make sure the files are available for the AM. The files are zipped and added to the job as + // an archive, so that YARN will explode it when distributing to the AM. This directory is then + // added to the classpath of all processes (both the AM and all the executors), just to make + // sure that everybody is using the same default config. // // This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR // shows up in the classpath before YARN_CONF_DIR. @@ -301,15 +303,26 @@ private[spark] class Client( } } - val hadoopConfPath = new Path(dst, HADOOP_CONF_DIR) - fs.mkdirs(hadoopConfPath) + if (!hadoopConfFiles.isEmpty) { + val hadoopConfArchive = File.createTempFile(HADOOP_CONF_DIR, ".jar", + new File(Utils.getLocalDir(sparkConf))) + require(addDistributedUri(hadoopConfArchive.toURI())) - hadoopConfFiles.foreach { case (name, file) => - if (addDistributedUri(file.toURI())) { - val destPath = copyFileToRemote(hadoopConfPath, new Path(file.toURI()), replication) - distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.FILE, - HADOOP_CONF_DIR + Path.SEPARATOR + name, statCache) + val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive)) + try { + hadoopConfStream.setLevel(0) + hadoopConfFiles.foreach { case (name, file) => + hadoopConfStream.putNextEntry(new ZipEntry(name)) + Files.copy(file, hadoopConfStream) + hadoopConfStream.closeEntry() + } + } finally { + hadoopConfStream.close() } + + val destPath = copyFileToRemote(dst, new Path(hadoopConfArchive.toURI()), replication) + distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, + HADOOP_CONF_DIR, statCache) } /** @@ -731,7 +744,7 @@ object Client extends Logging { val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH" // Subdirectory where the user's hadoop config files are written in the app staging dir. - val HADOOP_CONF_DIR = "__hadoop_conf_dir__" + val HADOOP_CONF_DIR = "__hadoop_conf__" /** * Find the user-defined Spark jar if configured, or return the jar containing this diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index e11757a2c999b..4b0938af84688 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -123,6 +123,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) hadoopConfDir = new File(tempDir, Client.HADOOP_CONF_DIR) assert(hadoopConfDir.mkdir()) + File.createTempFile("token", ".txt", hadoopConfDir) } override def afterAll() { From f6931521b239ba89cb961ddd1f3c529681568f42 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 6 Apr 2015 19:09:25 -0700 Subject: [PATCH 6/7] Le sigh. --- .../org/apache/spark/deploy/yarn/Client.scala | 100 ++++++++++-------- 1 file changed, 56 insertions(+), 44 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 4a47facf9142e..15d0794f2fea6 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -277,50 +277,9 @@ private[spark] class Client( } } - // Distribute the Hadoop config files. These are only really used by the AM, since executors - // will use the configuration object broadcast by the driver. But this is the easiest way to - // make sure the files are available for the AM. The files are zipped and added to the job as - // an archive, so that YARN will explode it when distributing to the AM. This directory is then - // added to the classpath of all processes (both the AM and all the executors), just to make - // sure that everybody is using the same default config. - // - // This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR - // shows up in the classpath before YARN_CONF_DIR. - // - // Currently this makes a shallow copy of the conf directory. If there are cases where a - // Hadoop config directory contains subdirectories, this code will have to be fixed. - val hadoopConfFiles = new HashMap[String, File]() - Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => - sys.env.get(envKey).foreach { path => - val dir = new File(path) - if (dir.isDirectory()) { - dir.listFiles().foreach { file => - if (!hadoopConfFiles.contains(file.getName())) { - hadoopConfFiles(file.getName()) = file - } - } - } - } - } - - if (!hadoopConfFiles.isEmpty) { - val hadoopConfArchive = File.createTempFile(HADOOP_CONF_DIR, ".jar", - new File(Utils.getLocalDir(sparkConf))) - require(addDistributedUri(hadoopConfArchive.toURI())) - - val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive)) - try { - hadoopConfStream.setLevel(0) - hadoopConfFiles.foreach { case (name, file) => - hadoopConfStream.putNextEntry(new ZipEntry(name)) - Files.copy(file, hadoopConfStream) - hadoopConfStream.closeEntry() - } - } finally { - hadoopConfStream.close() - } - - val destPath = copyFileToRemote(dst, new Path(hadoopConfArchive.toURI()), replication) + createConfArchive().foreach { file => + require(addDistributedUri(file.toURI())) + val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication) distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, HADOOP_CONF_DIR, statCache) } @@ -366,6 +325,59 @@ private[spark] class Client( localResources } + /** + * Create an archive with the Hadoop config files for distribution. + * + * These are only really used by the AM, since executors will use the configuration object + * broadcast by the driver. But this is the easiest way to make sure the files are available for + * the AM. The files are zipped and added to the job as an archive, so that YARN will explode it + * when distributing to the AM. This directory is then added to the classpath of all processes + * (both the AM and all the executors), just to make sure that everybody is using the same default + * config. + * + * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR + * shows up in the classpath before YARN_CONF_DIR. + * + * Currently this makes a shallow copy of the conf directory. If there are cases where a + * Hadoop config directory contains subdirectories, this code will have to be fixed. + */ + private def createConfArchive(): Option[File] = { + val hadoopConfFiles = new HashMap[String, File]() + Seq("HADOOP_CONF_DIR", "YARN_CONF_DIR").foreach { envKey => + sys.env.get(envKey).foreach { path => + val dir = new File(path) + if (dir.isDirectory()) { + dir.listFiles().foreach { file => + if (!hadoopConfFiles.contains(file.getName())) { + hadoopConfFiles(file.getName()) = file + } + } + } + } + } + + if (!hadoopConfFiles.isEmpty) { + val hadoopConfArchive = File.createTempFile(HADOOP_CONF_DIR, ".jar", + new File(Utils.getLocalDir(sparkConf))) + + val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive)) + try { + hadoopConfStream.setLevel(0) + hadoopConfFiles.foreach { case (name, file) => + hadoopConfStream.putNextEntry(new ZipEntry(name)) + Files.copy(file, hadoopConfStream) + hadoopConfStream.closeEntry() + } + } finally { + hadoopConfStream.close() + } + + Some(hadoopConfArchive) + } else { + None + } + } + /** * Set up the environment for launching our ApplicationMaster container. */ From 013f0fb0162a83bbb2ea7b94da7b754fa7dc9f6a Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Mon, 13 Apr 2015 13:38:00 -0700 Subject: [PATCH 7/7] Review feedback. --- .../org/apache/spark/deploy/yarn/Client.scala | 30 ++++++++++--------- .../spark/deploy/yarn/ExecutorRunnable.scala | 2 +- .../spark/deploy/yarn/ClientSuite.scala | 4 +-- .../spark/deploy/yarn/YarnClusterSuite.scala | 2 +- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 15d0794f2fea6..0e68a74b6815d 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -281,7 +281,7 @@ private[spark] class Client( require(addDistributedUri(file.toURI())) val destPath = copyFileToRemote(dst, new Path(file.toURI()), replication) distCacheMgr.addResource(fs, hadoopConf, destPath, localResources, LocalResourceType.ARCHIVE, - HADOOP_CONF_DIR, statCache) + LOCALIZED_HADOOP_CONF_DIR, statCache, appMasterOnly = true) } /** @@ -328,12 +328,10 @@ private[spark] class Client( /** * Create an archive with the Hadoop config files for distribution. * - * These are only really used by the AM, since executors will use the configuration object - * broadcast by the driver. But this is the easiest way to make sure the files are available for - * the AM. The files are zipped and added to the job as an archive, so that YARN will explode it - * when distributing to the AM. This directory is then added to the classpath of all processes - * (both the AM and all the executors), just to make sure that everybody is using the same default - * config. + * These are only used by the AM, since executors will use the configuration object broadcast by + * the driver. The files are zipped and added to the job as an archive, so that YARN will explode + * it when distributing to the AM. This directory is then added to the classpath of the AM + * process, just to make sure that everybody is using the same default config. * * This follows the order of precedence set by the startup scripts, in which HADOOP_CONF_DIR * shows up in the classpath before YARN_CONF_DIR. @@ -357,7 +355,7 @@ private[spark] class Client( } if (!hadoopConfFiles.isEmpty) { - val hadoopConfArchive = File.createTempFile(HADOOP_CONF_DIR, ".jar", + val hadoopConfArchive = File.createTempFile(LOCALIZED_HADOOP_CONF_DIR, ".zip", new File(Utils.getLocalDir(sparkConf))) val hadoopConfStream = new ZipOutputStream(new FileOutputStream(hadoopConfArchive)) @@ -385,7 +383,7 @@ private[spark] class Client( logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.driver.extraClassPath") - populateClasspath(args, yarnConf, sparkConf, env, extraCp) + populateClasspath(args, yarnConf, sparkConf, env, true, extraCp) env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() @@ -755,8 +753,8 @@ object Client extends Logging { // Distribution-defined classpath to add to processes val ENV_DIST_CLASSPATH = "SPARK_DIST_CLASSPATH" - // Subdirectory where the user's hadoop config files are written in the app staging dir. - val HADOOP_CONF_DIR = "__hadoop_conf__" + // Subdirectory where the user's hadoop config files will be placed. + val LOCALIZED_HADOOP_CONF_DIR = "__hadoop_conf__" /** * Find the user-defined Spark jar if configured, or return the jar containing this @@ -871,14 +869,18 @@ object Client extends Logging { conf: Configuration, sparkConf: SparkConf, env: HashMap[String, String], + isAM: Boolean, extraClassPath: Option[String] = None): Unit = { extraClassPath.foreach(addClasspathEntry(_, env)) addClasspathEntry( YarnSparkHadoopUtil.expandEnvironment(Environment.PWD), env ) - addClasspathEntry( - YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + HADOOP_CONF_DIR, - env) + + if (isAM) { + addClasspathEntry( + YarnSparkHadoopUtil.expandEnvironment(Environment.PWD) + Path.SEPARATOR + + LOCALIZED_HADOOP_CONF_DIR, env) + } if (sparkConf.getBoolean("spark.yarn.user.classpath.first", false)) { val userClassPath = diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index c1d3f7320f53c..7e9870f838a06 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -277,7 +277,7 @@ class ExecutorRunnable( private def prepareEnvironment(container: Container): HashMap[String, String] = { val env = new HashMap[String, String]() val extraCp = sparkConf.getOption("spark.executor.extraClassPath") - Client.populateClasspath(null, yarnConf, sparkConf, env, extraCp) + Client.populateClasspath(null, yarnConf, sparkConf, env, false, extraCp) sparkConf.getExecutorEnv.foreach { case (key, value) => // This assumes each executor environment variable set here is a path diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index af067dbf68159..634621b47daeb 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -93,7 +93,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll { val env = new MutableHashMap[String, String]() val args = new ClientArguments(Array("--jar", USER, "--addJars", ADDED), sparkConf) - Client.populateClasspath(args, conf, sparkConf, env) + Client.populateClasspath(args, conf, sparkConf, env, true) val cp = env("CLASSPATH").split(":|;|") s"$SPARK,$USER,$ADDED".split(",").foreach({ entry => @@ -113,7 +113,7 @@ class ClientSuite extends FunSuite with Matchers with BeforeAndAfterAll { Environment.PWD.$() } cp should contain(pwdVar) - cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.HADOOP_CONF_DIR}") + cp should contain (s"$pwdVar${Path.SEPARATOR}${Client.LOCALIZED_HADOOP_CONF_DIR}") cp should not contain (Client.SPARK_JAR) cp should not contain (Client.APP_JAR) } diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala index 4b0938af84688..d436be25969af 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala @@ -121,7 +121,7 @@ class YarnClusterSuite extends FunSuite with BeforeAndAfterAll with Matchers wit logInfo(s"RM address in configuration is ${config.get(YarnConfiguration.RM_ADDRESS)}") fakeSparkJar = File.createTempFile("sparkJar", null, tempDir) - hadoopConfDir = new File(tempDir, Client.HADOOP_CONF_DIR) + hadoopConfDir = new File(tempDir, Client.LOCALIZED_HADOOP_CONF_DIR) assert(hadoopConfDir.mkdir()) File.createTempFile("token", ".txt", hadoopConfDir) }