Skip to content

Commit

Permalink
Update pyspark.zip auto
Browse files Browse the repository at this point in the history
  • Loading branch information
Sephiroth-Lin committed Apr 17, 2015
1 parent 3a0ec77 commit 547fd95
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ private[spark] object PythonUtils {
pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip").mkString(File.separator)
}
pythonPath ++= SparkContext.jarOfObject(this)
sys.env.get("PYSPARK_ARCHIVES_PATH") match {
case Some(path) => pythonPath += path
case None => // do nothing
}
pythonPath.mkString(File.pathSeparator)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

val pythonPath = PythonUtils.mergePythonPaths(
PythonUtils.sparkPythonPath,
envVars.getOrElse("PYTHONPATH", ""),
envVars.getOrElse("PYTHONPATH", sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "")),
sys.env.getOrElse("PYTHONPATH", ""))

def create(): Socket = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ object PythonRunner {
pathElements ++= formattedPyFiles
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
pathElements += sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)

// Launch Python process
Expand Down
3 changes: 0 additions & 3 deletions docs/submitting-applications.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,6 @@ For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`
files to be distributed with your application. If you depend on multiple Python files we recommend
packaging them into a `.zip` or `.egg`.

As Python can not read files from assembly jar which packaged by JDK1.7+, so packaging pyspark into a
`.zip`(the name contains "pyspark") and use `--py-files` argument of `spark-submit` to distribute it.

# Launching Applications with spark-submit

Once a user application is bundled, it can be launched using the `bin/spark-submit` script.
Expand Down
26 changes: 25 additions & 1 deletion yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ private[spark] class Client(
List(
(SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
(APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
(PYSPARK_ARCHIVES, pysparkArchives(sparkConf), CONF_PYSPARK_ARCHIVES),
("log4j.properties", oldLog4jConf.orNull, null)
).foreach { case (destName, _localPath, confKey) =>
val localPath: String = if (_localPath != null) _localPath.trim() else ""
Expand Down Expand Up @@ -386,6 +387,12 @@ private[spark] class Client(
val appStagingDir = getAppStagingDir(appId)
val localResources = prepareLocalResources(appStagingDir)
val launchEnv = setupLaunchEnv(appStagingDir)
// From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are
// package by JDK 1.7+, so we ship PySpark archives to executors as assembly jar, and add this
// path to PYTHONPATH.
for ((resPath, res) <- localResources if resPath.contains(PYSPARK_ARCHIVES)) {
launchEnv("PYSPARK_ARCHIVES_PATH") = resPath
}
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
amContainer.setLocalResources(localResources)
amContainer.setEnvironment(launchEnv)
Expand Down Expand Up @@ -681,9 +688,10 @@ object Client extends Logging {
new Client(args, sparkConf).run()
}

// Alias for the Spark assembly jar and the user jar
// Alias for the Spark assembly jar, the user jar and PySpark archives
val SPARK_JAR: String = "__spark__.jar"
val APP_JAR: String = "__app__.jar"
val PYSPARK_ARCHIVES: String = "__pyspark__.zip"

// URI scheme that identifies local resources
val LOCAL_SCHEME = "local"
Expand All @@ -695,6 +703,9 @@ object Client extends Logging {
val CONF_SPARK_JAR = "spark.yarn.jar"
val ENV_SPARK_JAR = "SPARK_JAR"

// Location of any user-defined PySpark archives
val CONF_PYSPARK_ARCHIVES = "spark.pyspark.archives"

// Internal config to propagate the location of the user's jar to the driver/executors
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"

Expand Down Expand Up @@ -733,6 +744,19 @@ object Client extends Logging {
}
}

/**
* Find the user-defined PySpark archives if configured, or return default.
* The default pyspark.zip is in the same path with assembly jar.
*/
private def pysparkArchives(conf: SparkConf): String = {
if (conf.contains(CONF_PYSPARK_ARCHIVES)) {
conf.get(CONF_PYSPARK_ARCHIVES)
} else {
val sparkJarPath = SparkContext.jarOfClass(this.getClass).head
sparkJarPath.substring(0, sparkJarPath.lastIndexOf('/')) + "/pyspark.zip"
}
}

/**
* Return the path to the given application's staging directory.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,7 @@ class ExecutorRunnable(

val localResources = prepareLocalResources
ctx.setLocalResources(localResources)
// From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are
// package by JDK 1.7+, so we ship PySpark archives to executors by Yarn with --py-files, and
// add this path to PYTHONPATH.
for ((resPath, res) <- localResources if resPath.contains("pyspark")) {
env("PYSPARK_ARCHIVES_PATH") = resPath
}

ctx.setEnvironment(env)

val credentials = UserGroupInformation.getCurrentUser().getCredentials()
Expand Down Expand Up @@ -304,6 +299,12 @@ class ExecutorRunnable(
}

System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }

// Add PySpark archives path
sys.env.get("PYSPARK_ARCHIVES_PATH") match {
case Some(pythonArchivesPath) => env("PYSPARK_ARCHIVES_PATH") = pythonArchivesPath
case None =>
}
env
}
}

0 comments on commit 547fd95

Please sign in to comment.