From d4375bddd41542bdf94bd08a40b073f7046720e3 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Sun, 4 May 2014 15:49:12 -0700 Subject: [PATCH 1/7] Clean up description of spark-submit args a bit and add Python ones --- .../spark/deploy/SparkSubmitArguments.scala | 70 +++++++++++++------ 1 file changed, 50 insertions(+), 20 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 45defb9a3f92d..f7f2c5b4da692 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy import java.io.{File, FileInputStream, IOException} import java.util.Properties +import java.util.jar.JarFile import scala.collection.JavaConversions._ import scala.collection.mutable.{ArrayBuffer, HashMap} @@ -52,6 +53,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null var verbose: Boolean = false + var isPython: Boolean = false + var pyFiles: String = null parseOpts(args.toList) loadDefaults() @@ -107,15 +110,34 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { master = Option(master).getOrElse(System.getenv("MASTER")) deployMode = Option(deployMode).getOrElse(System.getenv("DEPLOY_MODE")) + // Try to set main class from JAR if no --class argument is given + if (mainClass == null && !isPython && primaryResource != null) { + val jar = new JarFile(primaryResource) + // Note that this might still return null if no main-class is set; we catch that later + mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class") + } + // Global defaults. These should be keep to minimum to avoid confusing behavior. master = Option(master).getOrElse("local[*]") + + // Set name from main class if not given + name = Option(name).orElse(Option(mainClass)).getOrElse(new File(primaryResource).getName) } /** Ensure that required fields exists. Call this only once all defaults are loaded. */ private def checkRequiredArguments() = { - if (args.length == 0) printUsageAndExit(-1) - if (primaryResource == null) SparkSubmit.printErrorAndExit("Must specify a primary resource") - if (mainClass == null) SparkSubmit.printErrorAndExit("Must specify a main class with --class") + if (args.length == 0) { + printUsageAndExit(-1) + } + if (primaryResource == null) { + SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python file)") + } + if (mainClass == null && !isPython) { + SparkSubmit.printErrorAndExit("Must specify a main class with --class") + } + if (pyFiles != null && !isPython) { + SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script") + } if (master.startsWith("yarn")) { val hasHadoopEnv = sys.env.contains("HADOOP_CONF_DIR") || sys.env.contains("YARN_CONF_DIR") @@ -234,6 +256,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { files = value parse(tail) + case ("--py-files") :: value :: tail => + pyFiles = value + parse(tail) + case ("--archives") :: value :: tail => archives = value parse(tail) @@ -260,9 +286,10 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { val errMessage = s"Unrecognized option '$value'." SparkSubmit.printErrorAndExit(errMessage) case v => - primaryResource = v - inSparkOpts = false - parse(tail) + primaryResource = v + inSparkOpts = false + isPython = v.endsWith(".py") + parse(tail) } } else { childArgs += value @@ -270,7 +297,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } case Nil => - } + } } private def printUsageAndExit(exitCode: Int, unknownParam: Any = null) { @@ -279,23 +306,26 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { outStream.println("Unknown/unsupported param " + unknownParam) } outStream.println( - """Usage: spark-submit [options] [app options] + """Usage: spark-submit [options] [app options] |Options: | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. - | --deploy-mode DEPLOY_MODE Mode to deploy the app in, either 'client' or 'cluster'. - | --class CLASS_NAME Name of your app's main class (required for Java apps). - | --name NAME The name of your application (Default: 'Spark'). - | --jars JARS A comma-separated list of local jars to include on the - | driver classpath and that SparkContext.addJar will work - | with. Doesn't work on standalone with 'cluster' deploy mode. - | --files FILES Comma separated list of files to be placed in the working dir - | of each executor. + | --deploy-mode DEPLOY_MODE Where to run the driver program: either 'client' to run + | on the local machine, or 'cluster' to run inside cluster. + | --class CLASS_NAME Your application's main class (for Java apps). + | --name NAME A name of your application. + | --jars JARS Comma-separated list of local jars to include on the driver + | and executor classpaths. Doesn't work for drivers in + | standalone mode with 'cluster' deploy mode. + | --py-files PY_FILES Comma-separated list of files to place on the PYTHONPATH + | for Python apps. Can be .py, .zip, or .egg files. + | --files FILES Comma-separated list of files to be placed in the working + | directory of each executor. | --properties-file FILE Path to a file from which to load extra properties. If not | specified, this will look for conf/spark-defaults.conf. | | --driver-memory MEM Memory for driver (e.g. 1000M, 2G) (Default: 512M). - | --driver-java-options Extra Java options to pass to the driver - | --driver-library-path Extra library path entries to pass to the driver + | --driver-java-options Extra Java options to pass to the driver. + | --driver-library-path Extra library path entries to pass to the driver. | --driver-class-path Extra class path entries to pass to the driver. Note that | jars added with --jars are automatically included in the | classpath. @@ -312,9 +342,9 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { | YARN-only: | --executor-cores NUM Number of cores per executor (Default: 1). | --queue QUEUE_NAME The YARN queue to submit to (Default: 'default'). - | --num-executors NUM Number of executors to (Default: 2). + | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the - | working dir of each executor.""".stripMargin + | working directory of each executor.""".stripMargin ) SparkSubmit.exitFn() } From 47c0655da0ef53fc37b2c41a4cfa0030fc85d84b Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 5 May 2014 00:52:22 -0700 Subject: [PATCH 2/7] More work to make spark-submit work with Python: - Launch Py4J gateway server in-process and execute Python main class - Redirect its output to PythonRunner - Various misc fixes to messages and error reporting in SparkSubmit --- assembly/pom.xml | 13 --- core/pom.xml | 5 + .../api/python/PythonWorkerFactory.scala | 4 +- .../apache/spark/deploy/PythonRunner.scala | 73 ++++++++++++ .../org/apache/spark/deploy/SparkSubmit.scala | 106 +++++++++++------- .../spark/deploy/SparkSubmitArguments.scala | 36 +++--- .../scala/org/apache/spark/util/Utils.scala | 8 +- .../spark/deploy/SparkSubmitSuite.scala | 6 +- project/SparkBuild.scala | 4 +- python/pyspark/java_gateway.py | 67 ++++++----- 10 files changed, 218 insertions(+), 104 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala diff --git a/assembly/pom.xml b/assembly/pom.xml index bdb38806492a6..7d123fb1d7f02 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -40,14 +40,6 @@ root - - - - lib - file://${project.basedir}/lib - - - org.apache.spark @@ -84,11 +76,6 @@ spark-sql_${scala.binary.version} ${project.version} - - net.sf.py4j - py4j - 0.8.1 - diff --git a/core/pom.xml b/core/pom.xml index c24c7be204087..8fe215ab24289 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -247,6 +247,11 @@ pyrolite 2.0.1 + + net.sf.py4j + py4j + 0.8.1 + target/scala-${scala.binary.version}/classes diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 02799ce0091b0..ff2bc98698d40 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -78,7 +78,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) // Create and start the worker - val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker")) + val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker")) val workerEnv = pb.environment() workerEnv.putAll(envVars) val worker = pb.start() @@ -151,7 +151,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon - val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon")) + val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon")) val workerEnv = pb.environment() workerEnv.putAll(envVars) daemon = pb.start() diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala new file mode 100644 index 0000000000000..93fdb7e8f20ba --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -0,0 +1,73 @@ +package org.apache.spark.deploy + +import java.io.{IOException, File, InputStream, OutputStream} + +import scala.collection.mutable.ArrayBuffer +import scala.collection.JavaConversions._ + +import org.apache.spark.SparkContext + +/** + * A main class used by spark-submit to launch Python applications. It executes python as a + * subprocess and then has it connect back to the JVM to access system properties, etc. + */ +object PythonRunner { + def main(args: Array[String]) { + val primaryResource = args(0) + val pyFiles = args(1) + val otherArgs = args.slice(2, args.length) + + val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf + + // Launch a Py4J gateway server for the process to connect to; this will let it see our + // Java system properties and such + val gatewayServer = new py4j.GatewayServer(null, 0) + gatewayServer.start() + + // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the + // python directories in SPARK_HOME (if set), and any files in the pyFiles argument + val sparkJar = SparkContext.jarOfObject(this).get + val pathSeparator: String = System.getProperty("path.separator") + val pythonPath = new ArrayBuffer[String] + pythonPath += sparkJar + pythonPath ++= pyFiles.split(",").filter(_ != "") + for (sparkHome <- sys.env.get("SPARK_HOME")) { + pythonPath += Seq(sparkHome, "python").mkString(File.separator) + pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator) + } + for (oldPythonPath <- sys.env.get("PYTHONPATH")) { + pythonPath ++= oldPythonPath.split(pathSeparator) + } + + // Launch Python process + val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs) + val env = builder.environment() + env.put("PYTHONPATH", pythonPath.mkString(pathSeparator)) + env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort) + builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize + val process = builder.start() + + new RedirectThread(process.getInputStream, System.out, "redirect output").start() + + System.exit(process.waitFor()) + } + + /** + * A utility class to redirect the child process's stdout or stderr + */ + class RedirectThread(in: InputStream, out: OutputStream, name: String) extends Thread(name) { + setDaemon(true) + override def run() { + scala.util.control.Exception.ignoring(classOf[IOException]) { + // FIXME: We copy the stream on the level of bytes to avoid encoding problems. + val buf = new Array[Byte](1024) + var len = in.read(buf) + while (len != -1) { + out.write(buf, 0, len) + out.flush() + len = in.read(buf) + } + } + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index fb30e8a70f682..101025f08407b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -60,11 +60,11 @@ object SparkSubmit { private[spark] var exitFn: () => Unit = () => System.exit(-1) private[spark] def printErrorAndExit(str: String) = { - printStream.println("error: " + str) - printStream.println("run with --help for more information or --verbose for debugging output") + printStream.println("Error: " + str) + printStream.println("Run with --help for usage help or --verbose for debug output") exitFn() } - private[spark] def printWarning(str: String) = printStream.println("warning: " + str) + private[spark] def printWarning(str: String) = printStream.println("Warning: " + str) /** * @return @@ -83,7 +83,7 @@ object SparkSubmit { } else if (appArgs.master.startsWith("mesos")) { clusterManager = MESOS } else { - printErrorAndExit("master must start with yarn, mesos, spark, or local") + printErrorAndExit("Master must start with yarn, mesos, spark, or local") } // Because "yarn-cluster" and "yarn-client" encapsulate both the master @@ -116,9 +116,20 @@ object SparkSubmit { var childMainClass = "" if (clusterManager == MESOS && deployOnCluster) { - printErrorAndExit("Mesos does not support running the driver on the cluster") + printErrorAndExit("Cannot run driver on the cluster in Mesos") } + // If we're running a Python app, set the Java class to run to be our PythonRunner, add + // Python files to deployment list, and pass the main file and Python path to PythonRunner + if (appArgs.isPython) { + appArgs.mainClass = "org.apache.spark.deploy.PythonRunner" + appArgs.files = mergeFileLists(appArgs.files, appArgs.pyFiles, appArgs.primaryResource) + val pyFiles = Option(appArgs.pyFiles).getOrElse("") + appArgs.childArgs = ArrayBuffer(appArgs.primaryResource, pyFiles) ++ appArgs.childArgs + appArgs.primaryResource = RESERVED_JAR_NAME + } + + // If we're deploying into YARN, use yarn.Client as a wrapper around the user class if (!deployOnCluster) { childMainClass = appArgs.mainClass if (appArgs.primaryResource != RESERVED_JAR_NAME) { @@ -130,8 +141,8 @@ object SparkSubmit { childArgs += ("--class", appArgs.mainClass) } + // Make sure YARN is included in our build if we're trying to use it if (clusterManager == YARN) { - // The choice of class is arbitrary, could use any spark-yarn class if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " + "with YARN support." @@ -142,38 +153,39 @@ object SparkSubmit { // Special flag to avoid deprecation warnings at the client sysProps("SPARK_SUBMIT") = "true" + // A list of rules to map each argument to system properties or command-line options in + // each deploy mode; we iterate through these below val options = List[OptionAssigner]( - new OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), - new OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true, + OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), + OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraClassPath"), - new OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true, + OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true, sysProp = "spark.driver.extraJavaOptions"), - new OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true, + OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraLibraryPath"), - new OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"), - new OptionAssigner(appArgs.name, YARN, true, clOption = "--name"), - new OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"), - new OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"), - new OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"), - new OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"), - new OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"), - new OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false, + OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"), + OptionAssigner(appArgs.name, YARN, true, clOption = "--name"), + OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"), + OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"), + OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"), + OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"), + OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"), + OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false, sysProp = "spark.executor.memory"), - new OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"), - new OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"), - new OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"), - new OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"), - new OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false, + OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"), + OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"), + OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"), + OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"), + OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false, sysProp = "spark.cores.max"), - new OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"), - new OptionAssigner(appArgs.files, YARN, true, clOption = "--files"), - new OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), - new OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"), - new OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"), - new OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), - new OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), - new OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false, - sysProp = "spark.app.name") + OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"), + OptionAssigner(appArgs.files, YARN, true, clOption = "--files"), + OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), + OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"), + OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"), + OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), + OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), + OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name") ) // For client mode make any added jars immediately visible on the classpath @@ -183,9 +195,10 @@ object SparkSubmit { } } + // Map all arguments to command-line options or system properties for our chosen mode for (opt <- options) { if (opt.value != null && deployOnCluster == opt.deployOnCluster && - (clusterManager & opt.clusterManager) != 0) { + (clusterManager & opt.clusterManager) != 0) { if (opt.clOption != null) { childArgs += (opt.clOption, opt.value) } else if (opt.sysProp != null) { @@ -230,8 +243,8 @@ object SparkSubmit { } private def launch(childArgs: ArrayBuffer[String], childClasspath: ArrayBuffer[String], - sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) { - + sysProps: Map[String, String], childMainClass: String, verbose: Boolean = false) + { if (verbose) { printStream.println(s"Main class:\n$childMainClass") printStream.println(s"Arguments:\n${childArgs.mkString("\n")}") @@ -273,15 +286,26 @@ object SparkSubmit { val url = localJarFile.getAbsoluteFile.toURI.toURL loader.addURL(url) } + + /** + * Merge a sequence of comma-separated file lists, some of which may be null to indicate + * no files, into a single comma-separated string. + */ + private[spark] def mergeFileLists(lists: String*): String = { + val merged = lists.filter(_ != null) + .flatMap(_.split(",")) + .mkString(",") + if (merged == "") null else merged + } } /** * Provides an indirection layer for passing arguments as system properties or flags to * the user's driver program or to downstream launcher tools. */ -private[spark] class OptionAssigner(val value: String, - val clusterManager: Int, - val deployOnCluster: Boolean, - val clOption: String = null, - val sysProp: String = null -) { } +private[spark] case class OptionAssigner( + value: String, + clusterManager: Int, + deployOnCluster: Boolean, + clOption: String = null, + sysProp: String = null) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index f7f2c5b4da692..7ddf9cdaab49f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -79,7 +79,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { } /** Fill in any undefined values based on the current properties file or built-in defaults. */ - private def loadDefaults() = { + private def loadDefaults(): Unit = { // Use common defaults file, if not specified by user if (propertiesFile == null) { @@ -112,16 +112,25 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { // Try to set main class from JAR if no --class argument is given if (mainClass == null && !isPython && primaryResource != null) { - val jar = new JarFile(primaryResource) - // Note that this might still return null if no main-class is set; we catch that later - mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class") + try { + val jar = new JarFile(primaryResource) + // Note that this might still return null if no main-class is set; we catch that later + mainClass = jar.getManifest.getMainAttributes.getValue("Main-Class") + } catch { + case e: Exception => + SparkSubmit.printErrorAndExit("Failed to read JAR: " + primaryResource) + return + } } // Global defaults. These should be keep to minimum to avoid confusing behavior. master = Option(master).getOrElse("local[*]") // Set name from main class if not given - name = Option(name).orElse(Option(mainClass)).getOrElse(new File(primaryResource).getName) + name = Option(name).orElse(Option(mainClass)).orNull + if (name == null && primaryResource != null) { + name = Utils.stripDirectory(primaryResource) + } } /** Ensure that required fields exists. Call this only once all defaults are loaded. */ @@ -133,7 +142,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { SparkSubmit.printErrorAndExit("Must specify a primary resource (JAR or Python file)") } if (mainClass == null && !isPython) { - SparkSubmit.printErrorAndExit("Must specify a main class with --class") + SparkSubmit.printErrorAndExit("No main class set in JAR; please specify one with --class") } if (pyFiles != null && !isPython) { SparkSubmit.printErrorAndExit("--py-files given but primary resource is not a Python script") @@ -165,6 +174,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { | queue $queue | numExecutors $numExecutors | files $files + | pyFiles $pyFiles | archives $archives | mainClass $mainClass | primaryResource $primaryResource @@ -309,15 +319,15 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { """Usage: spark-submit [options] [app options] |Options: | --master MASTER_URL spark://host:port, mesos://host:port, yarn, or local. - | --deploy-mode DEPLOY_MODE Where to run the driver program: either 'client' to run - | on the local machine, or 'cluster' to run inside cluster. - | --class CLASS_NAME Your application's main class (for Java apps). + | --deploy-mode DEPLOY_MODE Where to run the driver program: either "client" to run + | on the local machine, or "cluster" to run inside cluster. + | --class CLASS_NAME Your application's main class (for Java / Scala apps). | --name NAME A name of your application. | --jars JARS Comma-separated list of local jars to include on the driver | and executor classpaths. Doesn't work for drivers in - | standalone mode with 'cluster' deploy mode. - | --py-files PY_FILES Comma-separated list of files to place on the PYTHONPATH - | for Python apps. Can be .py, .zip, or .egg files. + | standalone mode with "cluster" deploy mode. + | --py-files PY_FILES Comma-separated list of .zip or .egg files to place on the + | PYTHONPATH for Python apps. | --files FILES Comma-separated list of files to be placed in the working | directory of each executor. | --properties-file FILE Path to a file from which to load extra properties. If not @@ -341,7 +351,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { | | YARN-only: | --executor-cores NUM Number of cores per executor (Default: 1). - | --queue QUEUE_NAME The YARN queue to submit to (Default: 'default'). + | --queue QUEUE_NAME The YARN queue to submit to (Default: "default"). | --num-executors NUM Number of executors to launch (Default: 2). | --archives ARCHIVES Comma separated list of archives to be extracted into the | working directory of each executor.""".stripMargin diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index bef4dab3d7cc1..202bd46956f87 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -46,7 +46,6 @@ import org.apache.spark.serializer.{DeserializationStream, SerializationStream, * Various utility methods used by Spark. */ private[spark] object Utils extends Logging { - val random = new Random() def sparkBin(sparkHome: String, which: String): File = { @@ -1082,4 +1081,11 @@ private[spark] object Utils extends Logging { def isTesting = { sys.env.contains("SPARK_TESTING") || sys.props.contains("spark.testing") } + + /** + * Strip the directory from a path name + */ + def stripDirectory(path: String): String = { + path.split(File.separator).last + } } diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index b3541b4a40b79..d7e3b22ed476e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -83,7 +83,7 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handle binary specified but not class") { - testPrematureExit(Array("foo.jar"), "Must specify a main class") + testPrematureExit(Array("foo.jar"), "No main class") } test("handles arguments with --key=val") { @@ -94,9 +94,9 @@ class SparkSubmitSuite extends FunSuite with ShouldMatchers { } test("handles arguments to user program") { - val clArgs = Seq("--name", "myApp", "userjar.jar", "some", "--random", "args", "here") + val clArgs = Seq("--name", "myApp", "--class", "Foo", "userjar.jar", "some", "--weird", "args") val appArgs = new SparkSubmitArguments(clArgs) - appArgs.childArgs should be (Seq("some", "--random", "args", "here")) + appArgs.childArgs should be (Seq("some", "--weird", "args")) } test("handles YARN cluster mode") { diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index a2597e3e6ddd6..a78b062243dc1 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -356,7 +356,8 @@ object SparkBuild extends Build { "com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm), "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), "com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil), - "org.spark-project" % "pyrolite" % "2.0.1" + "org.spark-project" % "pyrolite" % "2.0.1", + "net.sf.py4j" % "py4j" % "0.8.1" ), libraryDependencies ++= maybeAvro ) @@ -568,7 +569,6 @@ object SparkBuild extends Build { ) def assemblyProjSettings = sharedSettings ++ Seq( - libraryDependencies += "net.sf.py4j" % "py4j" % "0.8.1", name := "spark-assembly", assembleDeps in Compile <<= (packageProjects.map(packageBin in Compile in _) ++ Seq(packageDependency in Compile)).dependOn, jarName in assembly <<= version map { v => "spark-assembly-" + v + "-hadoop" + hadoopVersion + ".jar" }, diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 032d960e40998..b5743f0b7bd7c 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -29,37 +29,43 @@ def launch_gateway(): set_env_vars_for_yarn() - # Launch the Py4j gateway using Spark's run command so that we pick up the - # proper classpath and settings from spark-env.sh - on_windows = platform.system() == "Windows" - script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class" - command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", - "--die-on-broken-pipe", "0"] - if not on_windows: - # Don't send ctrl-c / SIGINT to the Java gateway: - def preexec_func(): - signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + gateway_port = -1 + if "PYSPARK_GATEWAY_PORT" in os.environ: + gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) else: - # preexec_fn not supported on Windows - proc = Popen(command, stdout=PIPE, stdin=PIPE) - # Determine which ephemeral port the server started on: - port = int(proc.stdout.readline()) - # Create a thread to echo output from the GatewayServer, which is required - # for Java log output to show up: - class EchoOutputThread(Thread): - def __init__(self, stream): - Thread.__init__(self) - self.daemon = True - self.stream = stream + # Launch the Py4j gateway using Spark's run command so that we pick up the + # proper classpath and settings from spark-env.sh + on_windows = platform.system() == "Windows" + script = "./bin/spark-class.cmd" if on_windows else "./bin/spark-class" + command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", + "--die-on-broken-pipe", "0"] + if not on_windows: + # Don't send ctrl-c / SIGINT to the Java gateway: + def preexec_func(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + else: + # preexec_fn not supported on Windows + proc = Popen(command, stdout=PIPE, stdin=PIPE) + # Determine which ephemeral port the server started on: + gateway_port = int(proc.stdout.readline()) + # Create a thread to echo output from the GatewayServer, which is required + # for Java log output to show up: + class EchoOutputThread(Thread): + def __init__(self, stream): + Thread.__init__(self) + self.daemon = True + self.stream = stream + + def run(self): + while True: + line = self.stream.readline() + sys.stderr.write(line) + EchoOutputThread(proc.stdout).start() - def run(self): - while True: - line = self.stream.readline() - sys.stderr.write(line) - EchoOutputThread(proc.stdout).start() # Connect to the gateway - gateway = JavaGateway(GatewayClient(port=port), auto_convert=False) + gateway = JavaGateway(GatewayClient(port=gateway_port), auto_convert=False) + # Import the classes used by PySpark java_import(gateway.jvm, "org.apache.spark.SparkConf") java_import(gateway.jvm, "org.apache.spark.api.java.*") @@ -70,8 +76,10 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") java_import(gateway.jvm, "scala.Tuple2") + return gateway + def set_env_vars_for_yarn(): # Add the spark jar, which includes the pyspark files, to the python path env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", "")) @@ -82,6 +90,7 @@ def set_env_vars_for_yarn(): os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items()) + def parse_env(env_str): # Turns a comma-separated of env settings into a dict that maps env vars to # their values. @@ -93,5 +102,5 @@ def parse_env(env_str): elif len(var_str) > 0: print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str sys.exit(1) - + return env From 15f8e1ef7eead23922aaec5e63e439c054355911 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Mon, 5 May 2014 16:59:47 -0700 Subject: [PATCH 3/7] Set PYTHONPATH in PythonWorkerFactory in case it wasn't set from outside --- .../apache/spark/api/python/PythonUtils.scala | 27 +++++++++++++++++++ .../api/python/PythonWorkerFactory.scala | 5 ++++ .../apache/spark/deploy/PythonRunner.scala | 20 +++++--------- 3 files changed, 39 insertions(+), 13 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala new file mode 100644 index 0000000000000..6ee358e86564d --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -0,0 +1,27 @@ +package org.apache.spark.api.python + +import java.io.File + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.SparkContext + +private[spark] object PythonUtils { + private val pathSeparator = System.getProperty("path.separator") + + /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */ + def sparkPythonPath: String = { + val pythonPath = new ArrayBuffer[String] + for(sparkHome <- sys.env.get("SPARK_HOME")) { + pythonPath += Seq(sparkHome, "python").mkString(File.separator) + pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator) + } + pythonPath ++= SparkContext.jarOfObject(this) + pythonPath.mkString(pathSeparator) + } + + /** Merge PYTHONPATHS with the appropriate separator. Ignores blank strings. */ + def mergePythonPaths(paths: String*): String = { + paths.filter(_ != "").mkString(pathSeparator) + } +} diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index ff2bc98698d40..b0bf4e052b3e9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -37,6 +37,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val daemonHost = InetAddress.getByAddress(Array(127, 0, 0, 1)) var daemonPort: Int = 0 + val pythonPath = PythonUtils.mergePythonPaths( + PythonUtils.sparkPythonPath, envVars.getOrElse("PYTHONPATH", "")) + def create(): Socket = { if (useDaemon) { createThroughDaemon() @@ -81,6 +84,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.worker")) val workerEnv = pb.environment() workerEnv.putAll(envVars) + workerEnv.put("PYTHONPATH", pythonPath) val worker = pb.start() // Redirect the worker's stderr to ours @@ -154,6 +158,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val pb = new ProcessBuilder(Seq(pythonExec, "-u", "-m", "pyspark.daemon")) val workerEnv = pb.environment() workerEnv.putAll(envVars) + workerEnv.put("PYTHONPATH", pythonPath) daemon = pb.start() // Redirect the stderr to ours diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 93fdb7e8f20ba..e04b67adbcae5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -6,6 +6,7 @@ import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import org.apache.spark.SparkContext +import org.apache.spark.api.python.PythonUtils /** * A main class used by spark-submit to launch Python applications. It executes python as a @@ -26,23 +27,16 @@ object PythonRunner { // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the // python directories in SPARK_HOME (if set), and any files in the pyFiles argument - val sparkJar = SparkContext.jarOfObject(this).get - val pathSeparator: String = System.getProperty("path.separator") - val pythonPath = new ArrayBuffer[String] - pythonPath += sparkJar - pythonPath ++= pyFiles.split(",").filter(_ != "") - for (sparkHome <- sys.env.get("SPARK_HOME")) { - pythonPath += Seq(sparkHome, "python").mkString(File.separator) - pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator) - } - for (oldPythonPath <- sys.env.get("PYTHONPATH")) { - pythonPath ++= oldPythonPath.split(pathSeparator) - } + val pathElements = new ArrayBuffer[String] + pathElements ++= pyFiles.split(",") + pathElements += PythonUtils.sparkPythonPath + pathElements += sys.env.getOrElse("PYTHONPATH", "") + val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) // Launch Python process val builder = new ProcessBuilder(Seq(pythonExec, "-u", primaryResource) ++ otherArgs) val env = builder.environment() - env.put("PYTHONPATH", pythonPath.mkString(pathSeparator)) + env.put("PYTHONPATH", pythonPath) env.put("PYSPARK_GATEWAY_PORT", "" + gatewayServer.getListeningPort) builder.redirectErrorStream(true) // Ugly but needed for stdout and stderr to synchronize val process = builder.start() From 4650412c7794a0002a80b82831e29c8b095c206f Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 6 May 2014 01:43:02 -0700 Subject: [PATCH 4/7] Add pyFiles to PYTHONPATH in executors, remove old YARN stuff, add tests --- .../org/apache/spark/deploy/SparkSubmit.scala | 19 ++- docs/python-programming-guide.md | 28 ++-- python/pyspark/context.py | 6 + python/pyspark/java_gateway.py | 28 ---- python/pyspark/tests.py | 131 +++++++++++++++++- 5 files changed, 160 insertions(+), 52 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 101025f08407b..277e739c2a6f6 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -116,17 +116,21 @@ object SparkSubmit { var childMainClass = "" if (clusterManager == MESOS && deployOnCluster) { - printErrorAndExit("Cannot run driver on the cluster in Mesos") + printErrorAndExit("Cannot currently run driver on the cluster in Mesos") } // If we're running a Python app, set the Java class to run to be our PythonRunner, add // Python files to deployment list, and pass the main file and Python path to PythonRunner if (appArgs.isPython) { + if (deployOnCluster) { + printErrorAndExit("Cannot currently run Python driver programs on cluster") + } appArgs.mainClass = "org.apache.spark.deploy.PythonRunner" appArgs.files = mergeFileLists(appArgs.files, appArgs.pyFiles, appArgs.primaryResource) val pyFiles = Option(appArgs.pyFiles).getOrElse("") appArgs.childArgs = ArrayBuffer(appArgs.primaryResource, pyFiles) ++ appArgs.childArgs appArgs.primaryResource = RESERVED_JAR_NAME + sysProps("spark.submit.pyFiles") = pyFiles } // If we're deploying into YARN, use yarn.Client as a wrapper around the user class @@ -144,9 +148,8 @@ object SparkSubmit { // Make sure YARN is included in our build if we're trying to use it if (clusterManager == YARN) { if (!Utils.classIsLoadable("org.apache.spark.deploy.yarn.Client") && !Utils.isTesting) { - val msg = "Could not load YARN classes. This copy of Spark may not have been compiled " + - "with YARN support." - throw new Exception(msg) + printErrorAndExit("Could not load YARN classes. " + + "This copy of Spark may not have been compiled with YARN support.") } } @@ -183,6 +186,7 @@ object SparkSubmit { OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"), OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"), + OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name") @@ -210,8 +214,11 @@ object SparkSubmit { // For standalone mode, add the application jar automatically so the user doesn't have to // call sc.addJar. TODO: Standalone mode in the cluster if (clusterManager == STANDALONE) { - val existingJars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) - sysProps.put("spark.jars", (existingJars ++ Seq(appArgs.primaryResource)).mkString(",")) + var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) + if (appArgs.primaryResource != RESERVED_JAR_NAME) { + jars = jars ++ Seq(appArgs.primaryResource) + } + sysProps.put("spark.jars", jars.mkString(",")) } if (deployOnCluster && clusterManager == STANDALONE) { diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index 98c456228af9f..8ea22e15a4b69 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -60,12 +60,9 @@ By default, PySpark requires `python` to be available on the system `PATH` and u All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.net/), are bundled with PySpark and automatically imported. -Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`. -The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`. +Standalone PySpark applications should be run using the `bin/spark-submit` script, which automatically +configures the Java and Python environment for running Spark. -# Running PySpark on YARN - -To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client". # Interactive Use @@ -103,7 +100,7 @@ $ MASTER=local[4] ./bin/pyspark ## IPython -It is also possible to launch PySpark in [IPython](http://ipython.org), the +It is also possible to launch the PySpark shell in [IPython](http://ipython.org), the enhanced Python interpreter. PySpark works with IPython 1.0.0 and later. To use IPython, set the `IPYTHON` variable to `1` when running `bin/pyspark`: @@ -123,18 +120,17 @@ IPython also works on a cluster or on multiple cores if you set the `MASTER` env # Standalone Programs -PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/pyspark`. +PySpark can also be used from standalone Python scripts by creating a SparkContext in your script and running the script using `bin/spark-submit`. The Quick Start guide includes a [complete example](quick-start.html#a-standalone-app-in-python) of a standalone Python application. -Code dependencies can be deployed by listing them in the `pyFiles` option in the SparkContext constructor: +Code dependencies can be deployed by passing .zip or .egg files in the `--py-files` option of `spark-submit`: -{% highlight python %} -from pyspark import SparkContext -sc = SparkContext("local", "App Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) +{% highlight bash %} +./bin/spark-submit --py-files lib1.zip,lib2.zip my_script.py {% endhighlight %} Files listed here will be added to the `PYTHONPATH` and shipped to remote worker machines. -Code dependencies can be added to an existing SparkContext using its `addPyFile()` method. +Code dependencies can also be added to an existing SparkContext at runtime using its `addPyFile()` method. You can set [configuration properties](configuration.html#spark-properties) by passing a [SparkConf](api/python/pyspark.conf.SparkConf-class.html) object to SparkContext: @@ -142,12 +138,16 @@ You can set [configuration properties](configuration.html#spark-properties) by p {% highlight python %} from pyspark import SparkConf, SparkContext conf = (SparkConf() - .setMaster("local") .setAppName("My app") .set("spark.executor.memory", "1g")) sc = SparkContext(conf = conf) {% endhighlight %} +`spark-submit` supports launching Python applications on standalone, Mesos or YARN clusters, through +its `--master` argument. However, it currently requires the Python driver program to run on the local +machine, not the cluster (i.e. the `--deploy-mode` parameter cannot be `cluster`). + + # API Docs [API documentation](api/python/index.html) for PySpark is available as Epydoc. @@ -164,6 +164,6 @@ some example applications. PySpark also includes several sample programs in the [`python/examples` folder](https://github.com/apache/spark/tree/master/python/examples). You can run them by passing the files to `pyspark`; e.g.: - ./bin/pyspark python/examples/wordcount.py + ./bin/spark-submit python/examples/wordcount.py Each program prints usage help when run without arguments. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index c74dc5fd4f854..c7dc85ea03544 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -158,6 +158,12 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None, for path in (pyFiles or []): self.addPyFile(path) + # Deploy code dependencies set by spark-submit; these will already have been added + # with SparkContext.addFile, so we just need to add them + for path in self._conf.get("spark.submit.pyFiles", "").split(","): + if path != "": + self._python_includes.append(os.path.basename(path)) + # Create a temporary directory inside spark.local.dir: local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf()) self._temp_dir = \ diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index b5743f0b7bd7c..3d0936fdca911 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -27,8 +27,6 @@ def launch_gateway(): SPARK_HOME = os.environ["SPARK_HOME"] - set_env_vars_for_yarn() - gateway_port = -1 if "PYSPARK_GATEWAY_PORT" in os.environ: gateway_port = int(os.environ["PYSPARK_GATEWAY_PORT"]) @@ -78,29 +76,3 @@ def run(self): java_import(gateway.jvm, "scala.Tuple2") return gateway - - -def set_env_vars_for_yarn(): - # Add the spark jar, which includes the pyspark files, to the python path - env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", "")) - if "PYTHONPATH" in env_map: - env_map["PYTHONPATH"] += ":spark.jar" - else: - env_map["PYTHONPATH"] = "spark.jar" - - os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items()) - - -def parse_env(env_str): - # Turns a comma-separated of env settings into a dict that maps env vars to - # their values. - env = {} - for var_str in env_str.split(","): - parts = var_str.split("=") - if len(parts) == 2: - env[parts[0]] = parts[1] - elif len(var_str) > 0: - print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str - sys.exit(1) - - return env diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 8cf9d9cf1bd66..64f2eeb12b4fc 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -22,11 +22,14 @@ from fileinput import input from glob import glob import os +import re import shutil +import subprocess import sys -from tempfile import NamedTemporaryFile +import tempfile import time import unittest +import zipfile from pyspark.context import SparkContext from pyspark.files import SparkFiles @@ -55,7 +58,7 @@ class TestCheckpoint(PySparkTestCase): def setUp(self): PySparkTestCase.setUp(self) - self.checkpointDir = NamedTemporaryFile(delete=False) + self.checkpointDir = tempfile.NamedTemporaryFile(delete=False) os.unlink(self.checkpointDir.name) self.sc.setCheckpointDir(self.checkpointDir.name) @@ -148,7 +151,7 @@ def test_save_as_textfile_with_unicode(self): # Regression test for SPARK-970 x = u"\u00A1Hola, mundo!" data = self.sc.parallelize([x]) - tempFile = NamedTemporaryFile(delete=True) + tempFile = tempfile.NamedTemporaryFile(delete=True) tempFile.close() data.saveAsTextFile(tempFile.name) raw_contents = ''.join(input(glob(tempFile.name + "/part-0000*"))) @@ -172,7 +175,7 @@ def test_cartesian_on_textfile(self): def test_deleting_input_files(self): # Regression test for SPARK-1025 - tempFile = NamedTemporaryFile(delete=False) + tempFile = tempfile.NamedTemporaryFile(delete=False) tempFile.write("Hello World!") tempFile.close() data = self.sc.textFile(tempFile.name) @@ -236,5 +239,125 @@ def test_termination_sigterm(self): from signal import SIGTERM self.do_termination_test(lambda daemon: os.kill(daemon.pid, SIGTERM)) + +class TestSparkSubmit(unittest.TestCase): + def setUp(self): + self.programDir = tempfile.mkdtemp() + self.sparkSubmit = os.path.join(os.environ.get("SPARK_HOME"), "bin", "spark-submit") + + def tearDown(self): + shutil.rmtree(self.programDir) + + def createTempFile(self, name, content): + """ + Create a temp file with the given name and content and return its path. + Strips leading spaces from content up to the first '|' in each line. + """ + pattern = re.compile(r'^ *\|', re.MULTILINE) + content = re.sub(pattern, '', content.strip()) + path = os.path.join(self.programDir, name) + with open(path, "w") as f: + f.write(content) + return path + + def createFileInZip(self, name, content): + """ + Create a zip archive containing a file with the given content and return its path. + Strips leading spaces from content up to the first '|' in each line. + """ + pattern = re.compile(r'^ *\|', re.MULTILINE) + content = re.sub(pattern, '', content.strip()) + path = os.path.join(self.programDir, name + ".zip") + with zipfile.ZipFile(path, 'w') as zip: + zip.writestr(name, content) + return path + + def test_single_script(self): + """Submit and test a single script file""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkContext + | + |sc = SparkContext() + |print sc.parallelize([1, 2, 3]).map(lambda x: x * 2).collect() + """) + proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode) + self.assertIn("[2, 4, 6]", out) + + def test_script_with_local_functions(self): + """Submit and test a single script file calling a global function""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkContext + | + |def foo(x): + | return x * 3 + | + |sc = SparkContext() + |print sc.parallelize([1, 2, 3]).map(foo).collect() + """) + proc = subprocess.Popen([self.sparkSubmit, script], stdout=subprocess.PIPE) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode) + self.assertIn("[3, 6, 9]", out) + + def test_module_dependency(self): + """Submit and test a script with a dependency on another module""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkContext + |from mylib import myfunc + | + |sc = SparkContext() + |print sc.parallelize([1, 2, 3]).map(myfunc).collect() + """) + zip = self.createFileInZip("mylib.py", """ + |def myfunc(x): + | return x + 1 + """) + proc = subprocess.Popen([self.sparkSubmit, "--py-files", zip, script], + stdout=subprocess.PIPE) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode) + self.assertIn("[2, 3, 4]", out) + + def test_module_dependency_on_cluster(self): + """Submit and test a script with a dependency on another module on a cluster""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkContext + |from mylib import myfunc + | + |sc = SparkContext() + |print sc.parallelize([1, 2, 3]).map(myfunc).collect() + """) + zip = self.createFileInZip("mylib.py", """ + |def myfunc(x): + | return x + 1 + """) + proc = subprocess.Popen( + [self.sparkSubmit, "--py-files", zip, "--master", "local-cluster[1,1,512]", script], + stdout=subprocess.PIPE) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode) + self.assertIn("[2, 3, 4]", out) + + def test_single_script_on_cluster(self): + """Submit and test a single script on a cluster""" + script = self.createTempFile("test.py", """ + |from pyspark import SparkContext + | + |def foo(x): + | return x * 2 + | + |sc = SparkContext() + |print sc.parallelize([1, 2, 3]).map(foo).collect() + """) + proc = subprocess.Popen( + [self.sparkSubmit, "--master", "local-cluster[1,1,512]", script], + stdout=subprocess.PIPE) + out, err = proc.communicate() + self.assertEqual(0, proc.returncode) + self.assertIn("[2, 4, 6]", out) + + if __name__ == "__main__": unittest.main() From 0afe886f421f2c6e58f819e0a8d5813b51116095 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 6 May 2014 01:57:53 -0700 Subject: [PATCH 5/7] Add license headers --- .../apache/spark/api/python/PythonUtils.scala | 17 +++++++++++++++++ .../org/apache/spark/deploy/PythonRunner.scala | 17 +++++++++++++++++ 2 files changed, 34 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 6ee358e86564d..d8d0df186a6b7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.api.python import java.io.File diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index e04b67adbcae5..f2e7c7a508b3f 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -1,3 +1,20 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.spark.deploy import java.io.{IOException, File, InputStream, OutputStream} From 051278cc8984fa54be775b44c1310627edb966da Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 6 May 2014 10:35:33 -0700 Subject: [PATCH 6/7] Small style fixes --- .../apache/spark/api/python/PythonUtils.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 128 +++++++++--------- 2 files changed, 65 insertions(+), 65 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index d8d0df186a6b7..0202f4629ede9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -29,7 +29,7 @@ private[spark] object PythonUtils { /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */ def sparkPythonPath: String = { val pythonPath = new ArrayBuffer[String] - for(sparkHome <- sys.env.get("SPARK_HOME")) { + for (sparkHome <- sys.env.get("SPARK_HOME")) { pythonPath += Seq(sparkHome, "python").mkString(File.separator) pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator) } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 277e739c2a6f6..e39723f38347c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -72,15 +72,15 @@ object SparkSubmit { * entries for the child, a list of system propertes, a list of env vars * and the main class for the child */ - private[spark] def createLaunchEnv(appArgs: SparkSubmitArguments): (ArrayBuffer[String], + private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { - if (appArgs.master.startsWith("local")) { + if (args.master.startsWith("local")) { clusterManager = LOCAL - } else if (appArgs.master.startsWith("yarn")) { + } else if (args.master.startsWith("yarn")) { clusterManager = YARN - } else if (appArgs.master.startsWith("spark")) { + } else if (args.master.startsWith("spark")) { clusterManager = STANDALONE - } else if (appArgs.master.startsWith("mesos")) { + } else if (args.master.startsWith("mesos")) { clusterManager = MESOS } else { printErrorAndExit("Master must start with yarn, mesos, spark, or local") @@ -89,26 +89,26 @@ object SparkSubmit { // Because "yarn-cluster" and "yarn-client" encapsulate both the master // and deploy mode, we have some logic to infer the master and deploy mode // from each other if only one is specified, or exit early if they are at odds. - if (appArgs.deployMode == null && - (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) { - appArgs.deployMode = "cluster" + if (args.deployMode == null && + (args.master == "yarn-standalone" || args.master == "yarn-cluster")) { + args.deployMode = "cluster" } - if (appArgs.deployMode == "cluster" && appArgs.master == "yarn-client") { + if (args.deployMode == "cluster" && args.master == "yarn-client") { printErrorAndExit("Deploy mode \"cluster\" and master \"yarn-client\" are not compatible") } - if (appArgs.deployMode == "client" && - (appArgs.master == "yarn-standalone" || appArgs.master == "yarn-cluster")) { - printErrorAndExit("Deploy mode \"client\" and master \"" + appArgs.master + if (args.deployMode == "client" && + (args.master == "yarn-standalone" || args.master == "yarn-cluster")) { + printErrorAndExit("Deploy mode \"client\" and master \"" + args.master + "\" are not compatible") } - if (appArgs.deployMode == "cluster" && appArgs.master.startsWith("yarn")) { - appArgs.master = "yarn-cluster" + if (args.deployMode == "cluster" && args.master.startsWith("yarn")) { + args.master = "yarn-cluster" } - if (appArgs.deployMode != "cluster" && appArgs.master.startsWith("yarn")) { - appArgs.master = "yarn-client" + if (args.deployMode != "cluster" && args.master.startsWith("yarn")) { + args.master = "yarn-client" } - val deployOnCluster = Option(appArgs.deployMode).getOrElse("client") == "cluster" + val deployOnCluster = Option(args.deployMode).getOrElse("client") == "cluster" val childClasspath = new ArrayBuffer[String]() val childArgs = new ArrayBuffer[String]() @@ -121,28 +121,28 @@ object SparkSubmit { // If we're running a Python app, set the Java class to run to be our PythonRunner, add // Python files to deployment list, and pass the main file and Python path to PythonRunner - if (appArgs.isPython) { + if (args.isPython) { if (deployOnCluster) { printErrorAndExit("Cannot currently run Python driver programs on cluster") } - appArgs.mainClass = "org.apache.spark.deploy.PythonRunner" - appArgs.files = mergeFileLists(appArgs.files, appArgs.pyFiles, appArgs.primaryResource) - val pyFiles = Option(appArgs.pyFiles).getOrElse("") - appArgs.childArgs = ArrayBuffer(appArgs.primaryResource, pyFiles) ++ appArgs.childArgs - appArgs.primaryResource = RESERVED_JAR_NAME + args.mainClass = "org.apache.spark.deploy.PythonRunner" + args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource) + val pyFiles = Option(args.pyFiles).getOrElse("") + args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs + args.primaryResource = RESERVED_JAR_NAME sysProps("spark.submit.pyFiles") = pyFiles } // If we're deploying into YARN, use yarn.Client as a wrapper around the user class if (!deployOnCluster) { - childMainClass = appArgs.mainClass - if (appArgs.primaryResource != RESERVED_JAR_NAME) { - childClasspath += appArgs.primaryResource + childMainClass = args.mainClass + if (args.primaryResource != RESERVED_JAR_NAME) { + childClasspath += args.primaryResource } } else if (clusterManager == YARN) { childMainClass = "org.apache.spark.deploy.yarn.Client" - childArgs += ("--jar", appArgs.primaryResource) - childArgs += ("--class", appArgs.mainClass) + childArgs += ("--jar", args.primaryResource) + childArgs += ("--class", args.mainClass) } // Make sure YARN is included in our build if we're trying to use it @@ -159,42 +159,42 @@ object SparkSubmit { // A list of rules to map each argument to system properties or command-line options in // each deploy mode; we iterate through these below val options = List[OptionAssigner]( - OptionAssigner(appArgs.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), - OptionAssigner(appArgs.driverExtraClassPath, STANDALONE | YARN, true, + OptionAssigner(args.master, ALL_CLUSTER_MGRS, false, sysProp = "spark.master"), + OptionAssigner(args.driverExtraClassPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraClassPath"), - OptionAssigner(appArgs.driverExtraJavaOptions, STANDALONE | YARN, true, + OptionAssigner(args.driverExtraJavaOptions, STANDALONE | YARN, true, sysProp = "spark.driver.extraJavaOptions"), - OptionAssigner(appArgs.driverExtraLibraryPath, STANDALONE | YARN, true, + OptionAssigner(args.driverExtraLibraryPath, STANDALONE | YARN, true, sysProp = "spark.driver.extraLibraryPath"), - OptionAssigner(appArgs.driverMemory, YARN, true, clOption = "--driver-memory"), - OptionAssigner(appArgs.name, YARN, true, clOption = "--name"), - OptionAssigner(appArgs.queue, YARN, true, clOption = "--queue"), - OptionAssigner(appArgs.queue, YARN, false, sysProp = "spark.yarn.queue"), - OptionAssigner(appArgs.numExecutors, YARN, true, clOption = "--num-executors"), - OptionAssigner(appArgs.numExecutors, YARN, false, sysProp = "spark.executor.instances"), - OptionAssigner(appArgs.executorMemory, YARN, true, clOption = "--executor-memory"), - OptionAssigner(appArgs.executorMemory, STANDALONE | MESOS | YARN, false, + OptionAssigner(args.driverMemory, YARN, true, clOption = "--driver-memory"), + OptionAssigner(args.name, YARN, true, clOption = "--name"), + OptionAssigner(args.queue, YARN, true, clOption = "--queue"), + OptionAssigner(args.queue, YARN, false, sysProp = "spark.yarn.queue"), + OptionAssigner(args.numExecutors, YARN, true, clOption = "--num-executors"), + OptionAssigner(args.numExecutors, YARN, false, sysProp = "spark.executor.instances"), + OptionAssigner(args.executorMemory, YARN, true, clOption = "--executor-memory"), + OptionAssigner(args.executorMemory, STANDALONE | MESOS | YARN, false, sysProp = "spark.executor.memory"), - OptionAssigner(appArgs.driverMemory, STANDALONE, true, clOption = "--memory"), - OptionAssigner(appArgs.driverCores, STANDALONE, true, clOption = "--cores"), - OptionAssigner(appArgs.executorCores, YARN, true, clOption = "--executor-cores"), - OptionAssigner(appArgs.executorCores, YARN, false, sysProp = "spark.executor.cores"), - OptionAssigner(appArgs.totalExecutorCores, STANDALONE | MESOS, false, + OptionAssigner(args.driverMemory, STANDALONE, true, clOption = "--memory"), + OptionAssigner(args.driverCores, STANDALONE, true, clOption = "--cores"), + OptionAssigner(args.executorCores, YARN, true, clOption = "--executor-cores"), + OptionAssigner(args.executorCores, YARN, false, sysProp = "spark.executor.cores"), + OptionAssigner(args.totalExecutorCores, STANDALONE | MESOS, false, sysProp = "spark.cores.max"), - OptionAssigner(appArgs.files, YARN, false, sysProp = "spark.yarn.dist.files"), - OptionAssigner(appArgs.files, YARN, true, clOption = "--files"), - OptionAssigner(appArgs.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), - OptionAssigner(appArgs.archives, YARN, true, clOption = "--archives"), - OptionAssigner(appArgs.jars, YARN, true, clOption = "--addJars"), - OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), - OptionAssigner(appArgs.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), - OptionAssigner(appArgs.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), - OptionAssigner(appArgs.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name") + OptionAssigner(args.files, YARN, false, sysProp = "spark.yarn.dist.files"), + OptionAssigner(args.files, YARN, true, clOption = "--files"), + OptionAssigner(args.archives, YARN, false, sysProp = "spark.yarn.dist.archives"), + OptionAssigner(args.archives, YARN, true, clOption = "--archives"), + OptionAssigner(args.jars, YARN, true, clOption = "--addJars"), + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.files"), + OptionAssigner(args.files, LOCAL | STANDALONE | MESOS, true, sysProp = "spark.files"), + OptionAssigner(args.jars, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.jars"), + OptionAssigner(args.name, LOCAL | STANDALONE | MESOS, false, sysProp = "spark.app.name") ) // For client mode make any added jars immediately visible on the classpath - if (appArgs.jars != null && !deployOnCluster) { - for (jar <- appArgs.jars.split(",")) { + if (args.jars != null && !deployOnCluster) { + for (jar <- args.jars.split(",")) { childClasspath += jar } } @@ -215,34 +215,34 @@ object SparkSubmit { // call sc.addJar. TODO: Standalone mode in the cluster if (clusterManager == STANDALONE) { var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) - if (appArgs.primaryResource != RESERVED_JAR_NAME) { - jars = jars ++ Seq(appArgs.primaryResource) + if (args.primaryResource != RESERVED_JAR_NAME) { + jars = jars ++ Seq(args.primaryResource) } sysProps.put("spark.jars", jars.mkString(",")) } if (deployOnCluster && clusterManager == STANDALONE) { - if (appArgs.supervise) { + if (args.supervise) { childArgs += "--supervise" } childMainClass = "org.apache.spark.deploy.Client" childArgs += "launch" - childArgs += (appArgs.master, appArgs.primaryResource, appArgs.mainClass) + childArgs += (args.master, args.primaryResource, args.mainClass) } // Arguments to be passed to user program - if (appArgs.childArgs != null) { + if (args.childArgs != null) { if (!deployOnCluster || clusterManager == STANDALONE) { - childArgs ++= appArgs.childArgs + childArgs ++= args.childArgs } else if (clusterManager == YARN) { - for (arg <- appArgs.childArgs) { + for (arg <- args.childArgs) { childArgs += ("--arg", arg) } } } - for ((k, v) <- appArgs.getDefaultSparkProperties) { + for ((k, v) <- args.getDefaultSparkProperties) { if (!sysProps.contains(k)) sysProps(k) = v } From 15e966906322750f403acedfb8dacaed2abd0f4e Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 6 May 2014 11:57:57 -0700 Subject: [PATCH 7/7] Fix some uses of path.separator property --- core/src/main/scala/org/apache/spark/SparkEnv.scala | 4 +++- .../scala/org/apache/spark/api/python/PythonUtils.scala | 6 ++---- repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala | 3 +-- 3 files changed, 6 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index d40ed27da5392..806e77d98fc5f 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -17,6 +17,8 @@ package org.apache.spark +import java.io.File + import scala.collection.JavaConversions._ import scala.collection.mutable import scala.concurrent.Await @@ -304,7 +306,7 @@ object SparkEnv extends Logging { k == "java.class.path" }.getOrElse(("", "")) val classPathEntries = classPathProperty._2 - .split(conf.get("path.separator", ":")) + .split(File.pathSeparator) .filterNot(e => e.isEmpty) .map(e => (e, "System Classpath")) val addedJarsAndFiles = (addedJars ++ addedFiles).map((_, "Added By User")) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 0202f4629ede9..cf69fa1d53fde 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -24,8 +24,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkContext private[spark] object PythonUtils { - private val pathSeparator = System.getProperty("path.separator") - /** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */ def sparkPythonPath: String = { val pythonPath = new ArrayBuffer[String] @@ -34,11 +32,11 @@ private[spark] object PythonUtils { pythonPath += Seq(sparkHome, "python", "lib", "py4j-0.8.1-src.zip").mkString(File.separator) } pythonPath ++= SparkContext.jarOfObject(this) - pythonPath.mkString(pathSeparator) + pythonPath.mkString(File.pathSeparator) } /** Merge PYTHONPATHS with the appropriate separator. Ignores blank strings. */ def mergePythonPaths(paths: String*): String = { - paths.filter(_ != "").mkString(pathSeparator) + paths.filter(_ != "").mkString(File.pathSeparator) } } diff --git a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala index e33f4f9803054..566d96e16ed83 100644 --- a/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala +++ b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala @@ -45,8 +45,7 @@ class ReplSuite extends FunSuite { } val interp = new SparkILoop(in, new PrintWriter(out), master) org.apache.spark.repl.Main.interp = interp - val separator = System.getProperty("path.separator") - interp.process(Array("-classpath", paths.mkString(separator))) + interp.process(Array("-classpath", paths.mkString(File.pathSeparator))) org.apache.spark.repl.Main.interp = null if (interp.sparkContext != null) { interp.sparkContext.stop()