Skip to content

Commit

Permalink
[SPARK-27575][CORE] Yarn file-related confs should merge new value wi…
Browse files Browse the repository at this point in the history
…th existing value
  • Loading branch information
HeartSaVioR committed Apr 26, 2019
1 parent 0745333 commit 3e54eae
Show file tree
Hide file tree
Showing 2 changed files with 109 additions and 8 deletions.
47 changes: 39 additions & 8 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,27 @@ private[spark] class SparkSubmit extends Logging {
}
}

/**
* Prepare the environment for submitting an application.
*
* @param args the parsed SparkSubmitArguments used for environment preparation.
* @return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
*/
private def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], SparkConf, String) = {
prepareSubmitEnvironment(args, None, None)
}

/**
* Prepare the environment for submitting an application.
*
* @param args the parsed SparkSubmitArguments used for environment preparation.
* @param conf the Hadoop Configuration, this argument will only be set in unit test.
* @param sparkConf the Spark Configuration, this argument will only be set in unit test.
* @return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
Expand All @@ -217,12 +233,16 @@ private[spark] class SparkSubmit extends Logging {
*/
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
conf: Option[HadoopConfiguration] = None,
sparkConfOpt: Option[SparkConf] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = new SparkConf()
val sparkConf = sparkConfOpt match {
case Some(cfg) => cfg
case None => new SparkConf()
}
var childMainClass = ""

// Set the cluster manager
Expand Down Expand Up @@ -544,10 +564,14 @@ private[spark] class SparkSubmit extends Logging {

// Yarn only
OptionAssigner(args.queue, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.queue"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles"),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars"),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files"),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives"),
OptionAssigner(args.pyFiles, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.pyFiles",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.jars, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.jars",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.files, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.files",
mergeFn = Some(mergeFileLists(_, _))),
OptionAssigner(args.archives, YARN, ALL_DEPLOY_MODES, confKey = "spark.yarn.dist.archives",
mergeFn = Some(mergeFileLists(_, _))),

// Other options
OptionAssigner(args.numExecutors, YARN | KUBERNETES, ALL_DEPLOY_MODES,
Expand Down Expand Up @@ -608,7 +632,13 @@ private[spark] class SparkSubmit extends Logging {
(deployMode & opt.deployMode) != 0 &&
(clusterManager & opt.clusterManager) != 0) {
if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) }
if (opt.confKey != null) { sparkConf.set(opt.confKey, opt.value) }
if (opt.confKey != null) {
if (opt.mergeFn.isDefined && sparkConf.contains(opt.confKey)) {
sparkConf.set(opt.confKey, opt.mergeFn.get.apply(sparkConf.get(opt.confKey), opt.value))
} else {
sparkConf.set(opt.confKey, opt.value)
}
}
}
}

Expand Down Expand Up @@ -1381,7 +1411,8 @@ private case class OptionAssigner(
clusterManager: Int,
deployMode: Int,
clOption: String = null,
confKey: String = null)
confKey: String = null,
mergeFn: Option[(String, String) => String] = None)

private[spark] trait SparkSubmitOperation {

Expand Down
70 changes: 70 additions & 0 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -897,6 +897,76 @@ class SparkSubmitSuite
}
}

test("SPARK-27575: yarn confs should merge new value with existing value") {
val tmpJarDir = Utils.createTempDir()
val jar1 = TestUtils.createJarWithFiles(Map("test.resource" -> "1"), tmpJarDir)
val jar2 = TestUtils.createJarWithFiles(Map("test.resource" -> "USER"), tmpJarDir)

val tmpJarDirYarnOpt = Utils.createTempDir()
val jar1YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" -> "2"), tmpJarDirYarnOpt)
val jar2YarnOpt = TestUtils.createJarWithFiles(Map("test.resource" -> "USER2"),
tmpJarDirYarnOpt)

val tmpFileDir = Utils.createTempDir()
val file1 = File.createTempFile("tmpFile1", "", tmpFileDir)
val file2 = File.createTempFile("tmpFile2", "", tmpFileDir)

val tmpFileDirYarnOpt = Utils.createTempDir()
val file1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py", tmpFileDirYarnOpt)
val file2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg", tmpFileDirYarnOpt)

val tmpPyFileDir = Utils.createTempDir()
val pyFile1 = File.createTempFile("tmpPy1", ".py", tmpPyFileDir)
val pyFile2 = File.createTempFile("tmpPy2", ".egg", tmpPyFileDir)

val tmpPyFileDirYarnOpt = Utils.createTempDir()
val pyFile1YarnOpt = File.createTempFile("tmpPy1YarnOpt", ".py", tmpPyFileDirYarnOpt)
val pyFile2YarnOpt = File.createTempFile("tmpPy2YarnOpt", ".egg", tmpPyFileDirYarnOpt)

val tmpArchiveDir = Utils.createTempDir()
val archive1 = File.createTempFile("archive1", ".zip", tmpArchiveDir)
val archive2 = File.createTempFile("archive2", ".zip", tmpArchiveDir)

val tmpArchiveDirYarnOpt = Utils.createTempDir()
val archive1YarnOpt = File.createTempFile("archive1YarnOpt", ".zip", tmpArchiveDirYarnOpt)
val archive2YarnOpt = File.createTempFile("archive2YarnOpt", ".zip", tmpArchiveDirYarnOpt)

val tempPyFile = File.createTempFile("tmpApp", ".py")
tempPyFile.deleteOnExit()

val args = Seq(
"--class", UserClasspathFirstTest.getClass.getName.stripPrefix("$"),
"--name", "testApp",
"--master", "yarn",
"--deploy-mode", "client",
"--jars", s"${tmpJarDir.getAbsolutePath}/*.jar",
"--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
"--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
"--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
tempPyFile.toURI().toString())

val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.dist.files",
Seq(file1YarnOpt, file2YarnOpt).map(_.getAbsolutePath).mkString(","))
sparkConf.set("spark.yarn.dist.pyFiles",
Seq(pyFile1YarnOpt, pyFile2YarnOpt).map(_.getAbsolutePath).mkString(","))
sparkConf.set("spark.yarn.dist.jars",
Seq(jar1YarnOpt, jar2YarnOpt).map(_.toURI.toString).mkString(","))
sparkConf.set("spark.yarn.dist.archives",
Seq(archive1YarnOpt, archive2YarnOpt).map(_.toURI.toString).mkString(","))

val appArgs = new SparkSubmitArguments(args)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, sparkConfOpt = Some(sparkConf))
conf.get("spark.yarn.dist.jars").split(",").toSet should be
(Set(Seq(jar1, jar2, jar1YarnOpt, jar2YarnOpt).map(_.toURI.toString).toList))
conf.get("spark.yarn.dist.files").split(",").toSet should be
(Set(Seq(file1, file2, file1YarnOpt, file2YarnOpt).map(_.toURI.toString)))
conf.get("spark.yarn.dist.pyFiles").split(",").toSet should be
(Set(Seq(pyFile1, pyFile2, pyFile1YarnOpt, pyFile2YarnOpt).map(_.getAbsolutePath)))
conf.get("spark.yarn.dist.archives").split(",").toSet should be
(Set(Seq(archive1, archive2, archive1YarnOpt, archive2YarnOpt).map(_.toURI.toString)))
}

// scalastyle:on println

private def checkDownloadedFile(sourcePath: String, outputPath: String): Unit = {
Expand Down

0 comments on commit 3e54eae

Please sign in to comment.