From acf4bc1caa0818493f59da9991239d3caff806bd Mon Sep 17 00:00:00 2001 From: Lianhui Wang Date: Fri, 8 May 2015 08:44:46 -0500 Subject: [PATCH] [SPARK-6869] [PYSPARK] Add pyspark archives path to PYTHONPATH Based on https://github.com/apache/spark/pull/5478 that provide a PYSPARK_ARCHIVES_PATH env. within this PR, we just should export PYSPARK_ARCHIVES_PATH=/user/spark/pyspark.zip,/user/spark/python/lib/py4j-0.8.2.1-src.zip in conf/spark-env.sh when we don't install PySpark on each node of Yarn. i run python application successfully on yarn-client and yarn-cluster with this PR. andrewor14 sryza Sephiroth-Lin Can you take a look at this?thanks. Author: Lianhui Wang Closes #5580 from lianhuiwang/SPARK-6869 and squashes the following commits: 66ffa43 [Lianhui Wang] Update Client.scala c2ad0f9 [Lianhui Wang] Update Client.scala 1c8f664 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 008850a [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 f0b4ed8 [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 150907b [Lianhui Wang] Merge remote-tracking branch 'remotes/apache/master' into SPARK-6869 20402cd [Lianhui Wang] use ZipEntry 9d87c3f [Lianhui Wang] update scala style e7bd971 [Lianhui Wang] address vanzin's comments 4b8a3ed [Lianhui Wang] use pyArchivesEnvOpt e6b573b [Lianhui Wang] address vanzin's comments f11f84a [Lianhui Wang] zip pyspark archives 5192cca [Lianhui Wang] update import path 3b1e4c8 [Lianhui Wang] address tgravescs's comments 9396346 [Lianhui Wang] put zip to make-distribution.sh 0d2baf7 [Lianhui Wang] update import paths e0179be [Lianhui Wang] add zip pyspark archives in build or sparksubmit 31e8e06 [Lianhui Wang] update code style 9f31dac [Lianhui Wang] update code and add comments f72987c [Lianhui Wang] add archives path to PYTHONPATH (cherry picked from commit ebff7327af5efa9f57c605284de4fae6b050ae0f) Signed-off-by: Thomas Graves --- assembly/pom.xml | 21 ++++++++++ .../org/apache/spark/deploy/SparkSubmit.scala | 41 +++++++++++++++++++ project/SparkBuild.scala | 37 ++++++++++++++++- .../org/apache/spark/deploy/yarn/Client.scala | 23 ++++++++--- 4 files changed, 114 insertions(+), 8 deletions(-) diff --git a/assembly/pom.xml b/assembly/pom.xml index 2b4d0a990bf22..626c8577e31fe 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -92,6 +92,27 @@ true + + + org.apache.maven.plugins + maven-antrun-plugin + + + package + + run + + + + + + + + + + + + org.apache.maven.plugins diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 8a0327984e195..329fa06ba8ba5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -332,6 +332,47 @@ object SparkSubmit { } } + // In yarn mode for a python app, add pyspark archives to files + // that can be distributed with the job + if (args.isPython && clusterManager == YARN) { + var pyArchives: String = null + val pyArchivesEnvOpt = sys.env.get("PYSPARK_ARCHIVES_PATH") + if (pyArchivesEnvOpt.isDefined) { + pyArchives = pyArchivesEnvOpt.get + } else { + if (!sys.env.contains("SPARK_HOME")) { + printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.") + } + val pythonPath = new ArrayBuffer[String] + for (sparkHome <- sys.env.get("SPARK_HOME")) { + val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator) + val pyArchivesFile = new File(pyLibPath, "pyspark.zip") + if (!pyArchivesFile.exists()) { + printErrorAndExit("pyspark.zip does not exist for python application in yarn mode.") + } + val py4jFile = new File(pyLibPath, "py4j-0.8.2.1-src.zip") + if (!py4jFile.exists()) { + printErrorAndExit("py4j-0.8.2.1-src.zip does not exist for python application " + + "in yarn mode.") + } + pythonPath += pyArchivesFile.getAbsolutePath() + pythonPath += py4jFile.getAbsolutePath() + } + pyArchives = pythonPath.mkString(",") + } + + pyArchives = pyArchives.split(",").map { localPath=> + val localURI = Utils.resolveURI(localPath) + if (localURI.getScheme != "local") { + args.files = mergeFileLists(args.files, localURI.toString) + new Path(localPath).getName + } else { + localURI.getPath + } + }.mkString(File.pathSeparator) + sysProps("spark.submit.pyArchives") = pyArchives + } + // If we're running a R app, set the main class to our specific R runner if (args.isR && deployMode == CLIENT) { if (args.primaryResource == SPARKR_SHELL) { diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 026855f8f6a5a..186345af0e60e 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -370,6 +370,7 @@ object Assembly { object PySparkAssembly { import sbtassembly.Plugin._ import AssemblyKeys._ + import java.util.zip.{ZipOutputStream, ZipEntry} lazy val settings = Seq( unmanagedJars in Compile += { BuildCommons.sparkHome / "python/lib/py4j-0.8.2.1-src.zip" }, @@ -377,16 +378,48 @@ object PySparkAssembly { // to be included in the assembly. We can't just add "python/" to the assembly's resource dir // list since that will copy unneeded / unwanted files. resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File => + val src = new File(BuildCommons.sparkHome, "python/pyspark") + + val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip") + zipFile.delete() + zipRecursive(src, zipFile) + val dst = new File(outDir, "pyspark") if (!dst.isDirectory()) { require(dst.mkdirs()) } - - val src = new File(BuildCommons.sparkHome, "python/pyspark") copy(src, dst) } ) + private def zipRecursive(source: File, destZipFile: File) = { + val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile)) + addFilesToZipStream("", source, destOutput) + destOutput.flush() + destOutput.close() + } + + private def addFilesToZipStream(parent: String, source: File, output: ZipOutputStream): Unit = { + if (source.isDirectory()) { + output.putNextEntry(new ZipEntry(parent + source.getName())) + for (file <- source.listFiles()) { + addFilesToZipStream(parent + source.getName() + File.separator, file, output) + } + } else { + val in = new FileInputStream(source) + output.putNextEntry(new ZipEntry(parent + source.getName())) + val buf = new Array[Byte](8192) + var n = 0 + while (n != -1) { + n = in.read(buf) + if (n != -1) { + output.write(buf, 0, n) + } + } + in.close() + } + } + private def copy(src: File, dst: File): Seq[File] = { src.listFiles().flatMap { f => val child = new File(dst, f.getName()) diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 20ecaf092e3f8..d21a7393478ce 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -468,6 +468,17 @@ private[spark] class Client( env("SPARK_YARN_USER_ENV") = userEnvs } + // if spark.submit.pyArchives is in sparkConf, append pyArchives to PYTHONPATH + // that can be passed on to the ApplicationMaster and the executors. + if (sparkConf.contains("spark.submit.pyArchives")) { + var pythonPath = sparkConf.get("spark.submit.pyArchives") + if (env.contains("PYTHONPATH")) { + pythonPath = Seq(env.get("PYTHONPATH"), pythonPath).mkString(File.pathSeparator) + } + env("PYTHONPATH") = pythonPath + sparkConf.setExecutorEnv("PYTHONPATH", pythonPath) + } + // In cluster mode, if the deprecated SPARK_JAVA_OPTS is set, we need to propagate it to // executors. But we can't just set spark.executor.extraJavaOptions, because the driver's // SparkContext will not let that set spark* system properties, which is expected behavior for @@ -1074,7 +1085,7 @@ object Client extends Logging { val hiveConf = hiveClass.getMethod("getConf").invoke(hive) val hiveConfClass = mirror.classLoader.loadClass("org.apache.hadoop.hive.conf.HiveConf") - val hiveConfGet = (param:String) => Option(hiveConfClass + val hiveConfGet = (param: String) => Option(hiveConfClass .getMethod("get", classOf[java.lang.String]) .invoke(hiveConf, param)) @@ -1096,7 +1107,7 @@ object Client extends Logging { val hive2Token = new Token[DelegationTokenIdentifier]() hive2Token.decodeFromUrlString(tokenStr) - credentials.addToken(new Text("hive.server2.delegation.token"),hive2Token) + credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token) logDebug("Added hive.Server2.delegation.token to conf.") hiveClass.getMethod("closeCurrent").invoke(null) } else { @@ -1141,13 +1152,13 @@ object Client extends Logging { logInfo("Added HBase security token to credentials.") } catch { - case e:java.lang.NoSuchMethodException => + case e: java.lang.NoSuchMethodException => logInfo("HBase Method not found: " + e) - case e:java.lang.ClassNotFoundException => + case e: java.lang.ClassNotFoundException => logDebug("HBase Class not found: " + e) - case e:java.lang.NoClassDefFoundError => + case e: java.lang.NoClassDefFoundError => logDebug("HBase Class not found: " + e) - case e:Exception => + case e: Exception => logError("Exception when obtaining HBase security token: " + e) } }