From 0ba4de81c1ccbff195d9df63cf4dd582e01ee24f Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 19 Apr 2016 00:34:14 +0800 Subject: [PATCH 1/4] init commit --- .../org/apache/spark/deploy/yarn/Client.scala | 28 +++++++------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 04e91f8553d51..09538b4c216c2 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -117,6 +117,11 @@ private[spark] class Client( private var appId: ApplicationId = null + // The app staging dir based on the STAGING_DIR configuration if configured + // otherwise based on the users home directory. + private val appStagingBaseDir = sparkConf.get(STAGING_DIR).map { new Path(_) } + .getOrElse(FileSystem.get(hadoopConf).getHomeDirectory()) + def reportLauncherState(state: SparkAppHandle.State): Unit = { launcherBackend.setState(state) } @@ -182,8 +187,8 @@ private[spark] class Client( val appStagingDir = getAppStagingDir(appId) try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) - val fs = FileSystem.get(hadoopConf) - val stagingDirPath = getAppStagingDirPath(sparkConf, fs, appStagingDir) + val stagingDirPath = new Path(appStagingBaseDir, appStagingDir) + val fs = stagingDirPath.getFileSystem(hadoopConf) if (!preserveFiles && fs.exists(stagingDirPath)) { logInfo("Deleting staging directory " + stagingDirPath) fs.delete(stagingDirPath, true) @@ -356,8 +361,8 @@ private[spark] class Client( logInfo("Preparing resources for our AM container") // Upload Spark and the application JAR to the remote file system if necessary, // and add them as local resources to the application master. - val fs = FileSystem.get(hadoopConf) - val dst = getAppStagingDirPath(sparkConf, fs, appStagingDir) + val dst = new Path(appStagingBaseDir, appStagingDir) + val fs = dst.getFileSystem(hadoopConf) val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) // Used to keep track of URIs added to the distributed cache. If the same URI is added @@ -667,8 +672,7 @@ private[spark] class Client( env("SPARK_YARN_STAGING_DIR") = stagingDir env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() if (loginFromKeytab) { - val remoteFs = FileSystem.get(hadoopConf) - val stagingDirPath = getAppStagingDirPath(sparkConf, remoteFs, stagingDir) + val stagingDirPath = new Path(appStagingBaseDir, stagingDir) val credentialsFile = "credentials-" + UUID.randomUUID().toString sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) logInfo(s"Credentials file set to: $credentialsFile") @@ -1438,16 +1442,4 @@ private object Client extends Logging { uri.startsWith(s"$LOCAL_SCHEME:") } - /** - * Returns the app staging dir based on the STAGING_DIR configuration if configured - * otherwise based on the users home directory. - */ - private def getAppStagingDirPath( - conf: SparkConf, - fs: FileSystem, - appStagingDir: String): Path = { - val baseDir = conf.get(STAGING_DIR).map { new Path(_) }.getOrElse(fs.getHomeDirectory()) - new Path(baseDir, appStagingDir) - } - } From c0374afab5d858e948d69f03dcbf6ca95dfc1dd0 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 19 Apr 2016 10:41:31 +0800 Subject: [PATCH 2/4] address comments --- .../org/apache/spark/deploy/yarn/Client.scala | 29 +++++++++---------- .../spark/deploy/yarn/ClientSuite.scala | 2 +- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 09538b4c216c2..a537753f806a9 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -184,10 +184,9 @@ private[spark] class Client( * Cleanup application staging directory. */ private def cleanupStagingDir(appId: ApplicationId): Unit = { - val appStagingDir = getAppStagingDir(appId) + val stagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) try { val preserveFiles = sparkConf.get(PRESERVE_STAGING_FILES) - val stagingDirPath = new Path(appStagingBaseDir, appStagingDir) val fs = stagingDirPath.getFileSystem(hadoopConf) if (!preserveFiles && fs.exists(stagingDirPath)) { logInfo("Deleting staging directory " + stagingDirPath) @@ -195,7 +194,7 @@ private[spark] class Client( } } catch { case ioe: IOException => - logWarning("Failed to cleanup staging dir " + appStagingDir, ioe) + logWarning("Failed to cleanup staging dir " + stagingDirPath, ioe) } } @@ -356,14 +355,13 @@ private[spark] class Client( * Exposed for testing. */ def prepareLocalResources( - appStagingDir: String, + destDir: Path, pySparkArchives: Seq[String]): HashMap[String, LocalResource] = { logInfo("Preparing resources for our AM container") // Upload Spark and the application JAR to the remote file system if necessary, // and add them as local resources to the application master. - val dst = new Path(appStagingBaseDir, appStagingDir) - val fs = dst.getFileSystem(hadoopConf) - val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + dst + val fs = destDir.getFileSystem(hadoopConf) + val nns = YarnSparkHadoopUtil.get.getNameNodesToAccess(sparkConf) + destDir YarnSparkHadoopUtil.get.obtainTokensForNamenodes(nns, hadoopConf, credentials) // Used to keep track of URIs added to the distributed cache. If the same URI is added // multiple times, YARN will fail to launch containers for the app with an internal @@ -373,9 +371,9 @@ private[spark] class Client( YarnSparkHadoopUtil.get.obtainTokenForHBase(sparkConf, hadoopConf, credentials) val replication = sparkConf.get(STAGING_FILE_REPLICATION).map(_.toShort) - .getOrElse(fs.getDefaultReplication(dst)) + .getOrElse(fs.getDefaultReplication(destDir)) val localResources = HashMap[String, LocalResource]() - FileSystem.mkdirs(fs, dst, new FsPermission(STAGING_DIR_PERMISSION)) + FileSystem.mkdirs(fs, destDir, new FsPermission(STAGING_DIR_PERMISSION)) val statCache: Map[URI, FileStatus] = HashMap[URI, FileStatus]() @@ -418,7 +416,7 @@ private[spark] class Client( val localPath = getQualifiedLocalPath(localURI, hadoopConf) val linkname = targetDir.map(_ + "/").getOrElse("") + destName.orElse(Option(localURI.getFragment())).getOrElse(localPath.getName()) - val destPath = copyFileToRemote(dst, localPath, replication) + val destPath = copyFileToRemote(destDir, localPath, replication) val destFs = FileSystem.get(destPath.toUri(), hadoopConf) distCacheMgr.addResource( destFs, hadoopConf, destPath, localResources, resType, linkname, statCache, @@ -663,16 +661,15 @@ private[spark] class Client( * Set up the environment for launching our ApplicationMaster container. */ private def setupLaunchEnv( - stagingDir: String, + stagingDirPath: Path, pySparkArchives: Seq[String]): HashMap[String, String] = { logInfo("Setting up the launch environment for our AM container") val env = new HashMap[String, String]() populateClasspath(args, yarnConf, sparkConf, env, sparkConf.get(DRIVER_CLASS_PATH)) env("SPARK_YARN_MODE") = "true" - env("SPARK_YARN_STAGING_DIR") = stagingDir + env("SPARK_YARN_STAGING_DIR") = stagingDirPath.toString env("SPARK_USER") = UserGroupInformation.getCurrentUser().getShortUserName() if (loginFromKeytab) { - val stagingDirPath = new Path(appStagingBaseDir, stagingDir) val credentialsFile = "credentials-" + UUID.randomUUID().toString sparkConf.set(CREDENTIALS_FILE_PATH, new Path(stagingDirPath, credentialsFile).toString) logInfo(s"Credentials file set to: $credentialsFile") @@ -772,15 +769,15 @@ private[spark] class Client( : ContainerLaunchContext = { logInfo("Setting up container launch context for our AM") val appId = newAppResponse.getApplicationId - val appStagingDir = getAppStagingDir(appId) + val appStagingDirPath = new Path(appStagingBaseDir, getAppStagingDir(appId)) val pySparkArchives = if (sparkConf.get(IS_PYTHON_APP)) { findPySparkArchives() } else { Nil } - val launchEnv = setupLaunchEnv(appStagingDir, pySparkArchives) - val localResources = prepareLocalResources(appStagingDir, pySparkArchives) + val launchEnv = setupLaunchEnv(appStagingDirPath, pySparkArchives) + val localResources = prepareLocalResources(appStagingDirPath, pySparkArchives) // Set the environment variables to be passed on to the executors. distCacheMgr.setDistFilesEnv(launchEnv) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 74e268dc48473..2b43340553681 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -147,7 +147,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val tempDir = Utils.createTempDir() try { - client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) sparkConf.get(APP_JAR) should be (Some(USER)) // The non-local path should be propagated by name only, since it will end up in the app's From 59eb03cc46ce9526fdf733fe0ded3f8131215cb4 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 19 Apr 2016 11:33:09 +0800 Subject: [PATCH 3/4] fix ut --- .../scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index 2b43340553681..7084dacb9cd2f 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -238,7 +238,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val client = createClient(sparkConf) val tempDir = Utils.createTempDir() - client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) assert(sparkConf.get(SPARK_JARS) === Some(Seq(s"local:${jar4.getPath()}", s"local:${single.getAbsolutePath()}/*"))) @@ -260,14 +260,14 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConf().set(SPARK_ARCHIVE, archive.getPath()) val client = createClient(sparkConf) - client.prepareLocalResources(temp.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(archive.toURI())), anyShort()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) sparkConf.set(SPARK_ARCHIVE, LOCAL_SCHEME + ":" + archive.getPath()) intercept[IllegalArgumentException] { - client.prepareLocalResources(temp.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) } } @@ -280,7 +280,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val sparkConf = new SparkConfWithEnv(Map("SPARK_HOME" -> temp.getAbsolutePath())) val client = createClient(sparkConf) - client.prepareLocalResources(temp.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil) verify(client).copyFileToRemote(any(classOf[Path]), meq(new Path(jar.toURI())), anyShort()) classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*")) } From 61b51f2bcff0b0e45cb237779db72c53641a93d3 Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Tue, 19 Apr 2016 12:00:57 +0800 Subject: [PATCH 4/4] merge master and fix ut --- .../test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index ed19e27a83eae..06efd44b5df9c 100644 --- a/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -308,7 +308,7 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll val client = createClient(sparkConf) val tempDir = Utils.createTempDir() - client.prepareLocalResources(tempDir.getAbsolutePath(), Nil) + client.prepareLocalResources(new Path(tempDir.getAbsolutePath()), Nil) // Only jar2 will be added to SECONDARY_JARS, jar3 which has the same name with jar1 will be // ignored.