Skip to content

Commit

Permalink
Use --conf instead of injecting SparkConf
Browse files Browse the repository at this point in the history
  • Loading branch information
HeartSaVioR committed Apr 26, 2019
1 parent 3e54eae commit 50191a0
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 33 deletions.
24 changes: 2 additions & 22 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -202,27 +202,11 @@ private[spark] class SparkSubmit extends Logging {
}
}

/**
* Prepare the environment for submitting an application.
*
* @param args the parsed SparkSubmitArguments used for environment preparation.
* @return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
* (3) a map of system properties, and
* (4) the main class for the child
*/
private def prepareSubmitEnvironment(args: SparkSubmitArguments)
: (Seq[String], Seq[String], SparkConf, String) = {
prepareSubmitEnvironment(args, None, None)
}

/**
* Prepare the environment for submitting an application.
*
* @param args the parsed SparkSubmitArguments used for environment preparation.
* @param conf the Hadoop Configuration, this argument will only be set in unit test.
* @param sparkConf the Spark Configuration, this argument will only be set in unit test.
* @return a 4-tuple:
* (1) the arguments for the child process,
* (2) a list of classpath entries for the child,
Expand All @@ -233,16 +217,12 @@ private[spark] class SparkSubmit extends Logging {
*/
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None,
sparkConfOpt: Option[SparkConf] = None)
conf: Option[HadoopConfiguration] = None)
: (Seq[String], Seq[String], SparkConf, String) = {
// Return values
val childArgs = new ArrayBuffer[String]()
val childClasspath = new ArrayBuffer[String]()
val sparkConf = sparkConfOpt match {
case Some(cfg) => cfg
case None => new SparkConf()
}
val sparkConf = new SparkConf()
var childMainClass = ""

// Set the cluster manager
Expand Down
20 changes: 9 additions & 11 deletions core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -943,20 +943,18 @@ class SparkSubmitSuite
"--files", s"${tmpFileDir.getAbsolutePath}/tmpFile*",
"--py-files", s"${tmpPyFileDir.getAbsolutePath}/tmpPy*",
"--archives", s"${tmpArchiveDir.getAbsolutePath}/*.zip",
"--conf", "spark.yarn.dist.files=" +
s"${Seq(file1YarnOpt, file2YarnOpt).map(_.getAbsolutePath).mkString(",")}",
"--conf", "spark.yarn.dist.pyFiles=" +
s"${Seq(pyFile1YarnOpt, pyFile2YarnOpt).map(_.getAbsolutePath).mkString(",")}",
"--conf", "spark.yarn.dist.jars=" +
s"${Seq(jar1YarnOpt, jar2YarnOpt).map(_.toURI.toString).mkString(",")}",
"--conf", "spark.yarn.dist.archives=" +
s"${Seq(archive1YarnOpt, archive2YarnOpt).map(_.toURI.toString).mkString(",")}",
tempPyFile.toURI().toString())

val sparkConf = new SparkConf()
sparkConf.set("spark.yarn.dist.files",
Seq(file1YarnOpt, file2YarnOpt).map(_.getAbsolutePath).mkString(","))
sparkConf.set("spark.yarn.dist.pyFiles",
Seq(pyFile1YarnOpt, pyFile2YarnOpt).map(_.getAbsolutePath).mkString(","))
sparkConf.set("spark.yarn.dist.jars",
Seq(jar1YarnOpt, jar2YarnOpt).map(_.toURI.toString).mkString(","))
sparkConf.set("spark.yarn.dist.archives",
Seq(archive1YarnOpt, archive2YarnOpt).map(_.toURI.toString).mkString(","))

val appArgs = new SparkSubmitArguments(args)
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs, sparkConfOpt = Some(sparkConf))
val (_, _, conf, _) = submit.prepareSubmitEnvironment(appArgs)
conf.get("spark.yarn.dist.jars").split(",").toSet should be
(Set(Seq(jar1, jar2, jar1YarnOpt, jar2YarnOpt).map(_.toURI.toString).toList))
conf.get("spark.yarn.dist.files").split(",").toSet should be
Expand Down

0 comments on commit 50191a0

Please sign in to comment.