diff --git a/bin/pyspark b/bin/pyspark index 10e35e0f1734e..2b9cdbf4883d3 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -48,17 +48,19 @@ export PYSPARK_PYTHON export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH -# Load the PySpark shell.py script when ./pyspark is used interactively: -export OLD_PYTHONSTARTUP=$PYTHONSTARTUP -export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py - if [ -n "$IPYTHON_OPTS" ]; then IPYTHON=1 fi -# Only use ipython if no command line arguments were provided [SPARK-1134] -if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then - exec ipython $IPYTHON_OPTS +# If a python file is provided, directly run spark-submit +if [[ "$1" =~ \.py$ ]]; then + exec $FWDIR/bin/spark-submit "$@" else - exec "$PYSPARK_PYTHON" "$@" + # Only use ipython if no command line arguments were provided [SPARK-1134] + if [[ "$IPYTHON" = "1" ]]; then + subcommand="%run $FWDIR/python/pyspark/repl.py $@" + exec ipython $IPYTHON_OPTS -c "$subcommand" + else + exec "$PYSPARK_PYTHON" $FWDIR/python/pyspark/repl.py "$@" + fi fi diff --git a/bin/spark-shell b/bin/spark-shell index 7f03349c5e910..4dd24b6bc4c9d 100755 --- a/bin/spark-shell +++ b/bin/spark-shell @@ -50,7 +50,7 @@ function main(){ stty icanon echo > /dev/null 2>&1 else export SPARK_SUBMIT_OPTS - $FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main + $FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main fi } diff --git a/bin/spark-submit b/bin/spark-submit index 63903b17a2902..9e7cecedd0325 100755 --- a/bin/spark-submit +++ b/bin/spark-submit @@ -41,5 +41,5 @@ if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY fi -$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}" +exec $SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}" diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonAppRunner.scala similarity index 89% rename from core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala rename to core/src/main/scala/org/apache/spark/deploy/PythonAppRunner.scala index e20d4486c8f0c..65e823fe5da53 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonAppRunner.scala @@ -23,15 +23,14 @@ import scala.collection.JavaConversions._ import org.apache.spark.api.python.{PythonUtils, RedirectThread} /** - * A main class used by spark-submit to launch Python applications. It executes python as a - * subprocess and then has it connect back to the JVM to access system properties, etc. + * Main class used by spark-submit to launch Python applications. It executes python as a + * sub-process and then has it connect back to the JVM to access system properties, etc. */ -object PythonRunner { +object PythonAppRunner { def main(args: Array[String]) { val primaryResource = args(0) val pyFiles = args(1) val otherArgs = args.slice(2, args.length) - val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf // Launch a Py4J gateway server for the process to connect to; this will let it see our @@ -42,7 +41,7 @@ object PythonRunner { // Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the // python directories in SPARK_HOME (if set), and any files in the pyFiles argument val pathElements = new ArrayBuffer[String] - pathElements ++= pyFiles.split(",") + pathElements ++= Option(pyFiles).getOrElse("").split(",") pathElements += PythonUtils.sparkPythonPath pathElements += sys.env.getOrElse("PYTHONPATH", "") val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonShellRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonShellRunner.scala new file mode 100644 index 0000000000000..24dcb84cf274e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/PythonShellRunner.scala @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy + +import java.io.PrintWriter +import java.net.ServerSocket + +import org.apache.spark.Logging + +/** + * Main class used by spark-submit to launch the Python shell. + */ +object PythonShellRunner extends Logging { + private val LISTENER_PORT = 7744 + + def main(args: Array[String]) { + + // Start the gateway server for python to access Spark objects + val gatewayServer = new py4j.GatewayServer(null, 0) + gatewayServer.start() + + // Start the server that tells python what port the gateway server is bound to + val pythonListener = new ServerSocket(LISTENER_PORT) + + logInfo("Python shell server listening for connections on port " + LISTENER_PORT) + + try { + val socket = pythonListener.accept() + val writer = new PrintWriter(socket.getOutputStream) + writer.print(gatewayServer.getListeningPort) + writer.close() + } finally { + pythonListener.close() + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e86182e4c56ce..6b36782fee7ad 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -41,10 +41,10 @@ object SparkSubmit { private var clusterManager: Int = LOCAL /** - * A special jar name that indicates the class being run is inside of Spark itself, - * and therefore no user jar is needed. + * Special primary resource names that represent shells rather than application jars. */ - private val RESERVED_JAR_NAME = "spark-internal" + private val SPARK_SHELL = "spark-shell" + private val PYSPARK_SHELL = "pyspark-shell" def main(args: Array[String]) { val appArgs = new SparkSubmitArguments(args) @@ -71,8 +71,8 @@ object SparkSubmit { * entries for the child, a list of system properties, a list of env vars * and the main class for the child */ - private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String], - ArrayBuffer[String], Map[String, String], String) = { + private[spark] def createLaunchEnv(args: SparkSubmitArguments) + : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { if (args.master.startsWith("local")) { clusterManager = LOCAL } else if (args.master.startsWith("yarn")) { @@ -121,24 +121,30 @@ object SparkSubmit { printErrorAndExit("Cannot currently run driver on the cluster in Mesos") } - // If we're running a Python app, set the Java class to run to be our PythonRunner, add - // Python files to deployment list, and pass the main file and Python path to PythonRunner + // If we're running a python app, set the main class to our specific python runner if (isPython) { if (deployOnCluster) { printErrorAndExit("Cannot currently run Python driver programs on cluster") } - args.mainClass = "org.apache.spark.deploy.PythonRunner" - args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource) - val pyFiles = Option(args.pyFiles).getOrElse("") - args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs - args.primaryResource = RESERVED_JAR_NAME - sysProps("spark.submit.pyFiles") = pyFiles + if (args.primaryResource == PYSPARK_SHELL) { + args.mainClass = "org.apache.spark.deploy.PythonShellRunner" + } else { + // If a python file is provided, add it to the child arguments and list of files to deploy. + // Usage: PythonAppRunner
[app arguments] + args.mainClass = "org.apache.spark.deploy.PythonAppRunner" + args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs + args.files = Utils.mergeFileLists(args.primaryResource, args.files) + } + Option(args.pyFiles).foreach { pyFiles => + args.files = Utils.mergeFileLists(args.files, args.pyFiles) + sysProps("spark.submit.pyFiles") = pyFiles + } } // If we're deploying into YARN, use yarn.Client as a wrapper around the user class if (!deployOnCluster) { childMainClass = args.mainClass - if (args.primaryResource != RESERVED_JAR_NAME) { + if (isUserJar(args.primaryResource)) { childClasspath += args.primaryResource } } else if (clusterManager == YARN) { @@ -219,7 +225,7 @@ object SparkSubmit { // For python files, the primary resource is already distributed as a regular file if (!isYarnCluster && !isPython) { var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq()) - if (args.primaryResource != RESERVED_JAR_NAME) { + if (isUserJar(args.primaryResource)) { jars = jars ++ Seq(args.primaryResource) } sysProps.put("spark.jars", jars.mkString(",")) @@ -293,8 +299,8 @@ object SparkSubmit { } private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) { - val localJarFile = new File(new URI(localJar).getPath()) - if (!localJarFile.exists()) { + val localJarFile = new File(new URI(localJar).getPath) + if (!localJarFile.exists) { printWarning(s"Jar $localJar does not exist, skipping.") } @@ -303,14 +309,24 @@ object SparkSubmit { } /** - * Merge a sequence of comma-separated file lists, some of which may be null to indicate - * no files, into a single comma-separated string. + * Return whether the given primary resource represents a user jar. + */ + private def isUserJar(primaryResource: String): Boolean = { + !isShell(primaryResource) && !isPython(primaryResource) + } + + /** + * Return whether the given primary resource represents a shell. + */ + private def isShell(primaryResource: String): Boolean = { + primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL + } + + /** + * Return whether the given primary resource requires running python. */ - private[spark] def mergeFileLists(lists: String*): String = { - val merged = lists.filter(_ != null) - .flatMap(_.split(",")) - .mkString(",") - if (merged == "") null else merged + private[spark] def isPython(primaryResource: String): Boolean = { + primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 2d327aa3fb27f..256626e9ed6af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -298,7 +298,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) { case v => primaryResource = v inSparkOpts = false - isPython = v.endsWith(".py") + isPython = SparkSubmit.isPython(v) parse(tail) } } else { diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 388f7222428db..a9b184f284e09 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1166,4 +1166,17 @@ private[spark] object Utils extends Logging { true } } + + + /** + * Merge a sequence of comma-separated file lists into a single comma-separated string. + * The provided strings may be null or empty to indicate no files. + */ + def mergeFileLists(lists: String*): String = { + lists + .filter(_ != null) + .filter(_ != "") + .flatMap(_.split(",")) + .mkString(",") + } } diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 3d0936fdca911..6ff686fdad27e 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -24,6 +24,10 @@ from py4j.java_gateway import java_import, JavaGateway, GatewayClient +# Handler to avoid sending ctrl-c / SIGINT to the Java gateway +def ignoreInterrupt(): + signal.signal(signal.SIGINT, signal.SIG_IGN) + def launch_gateway(): SPARK_HOME = os.environ["SPARK_HOME"] @@ -38,10 +42,7 @@ def launch_gateway(): command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer", "--die-on-broken-pipe", "0"] if not on_windows: - # Don't send ctrl-c / SIGINT to the Java gateway: - def preexec_func(): - signal.signal(signal.SIGINT, signal.SIG_IGN) - proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func) + proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=ignoreInterrupt) else: # preexec_fn not supported on Windows proc = Popen(command, stdout=PIPE, stdin=PIPE) diff --git a/python/pyspark/repl.py b/python/pyspark/repl.py new file mode 100755 index 0000000000000..82d0326719e82 --- /dev/null +++ b/python/pyspark/repl.py @@ -0,0 +1,61 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import code +import os +import socket +import subprocess +import sys +import time +from pyspark.java_gateway import ignoreInterrupt + +# Launch spark submit process +sparkHome = os.getcwd() +sparkSubmit = sparkHome + "/bin/spark-submit" +submitArgs = sys.argv[1:] +command = [sparkSubmit, "pyspark-shell"] + submitArgs +process = subprocess.Popen(command, stdout=sys.stdout, preexec_fn=ignoreInterrupt) + +try: + # Read py4j port from the PythonShellRunner server + serverPort = 7744 + retrySeconds = 0.1 + maxAttempts = 20 + numAttempts = 0 + py4jPort = -1 + while py4jPort < 0: + try: + s = socket.socket() + s.connect(("127.0.0.1", serverPort)) + py4jPort = s.recv(1024) + s.close() + except socket.error as se: + if numAttempts < maxAttempts: + numAttempts += 1 + time.sleep(retrySeconds) + else: + raise Exception("Failed to retrieve Py4j gateway server port from server!") + + # Set up Spark environment for python + os.environ["PYSPARK_GATEWAY_PORT"] = py4jPort + execfile(sparkHome + "/python/pyspark/shell.py") + + # Start the REPL + code.interact(local=locals()) + +finally: + process.terminate() diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py index d172d588bfbd8..c560b62306edd 100644 --- a/python/pyspark/shell.py +++ b/python/pyspark/shell.py @@ -17,8 +17,6 @@ """ An interactive shell. - -This file is designed to be launched as a PYTHONSTARTUP script. """ import sys @@ -27,7 +25,6 @@ print("\tSet env variable PYSPARK_PYTHON to Python2 binary and re-run it.") sys.exit(1) - import os import platform import pyspark @@ -40,7 +37,7 @@ if os.environ.get("SPARK_EXECUTOR_URI"): SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"]) -sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files) +sc = SparkContext(appName="PySparkShell", pyFiles=add_files) print("""Welcome to ____ __ @@ -57,9 +54,3 @@ if add_files != None: print("Adding files: [%s]" % ", ".join(add_files)) - -# The ./bin/pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP, -# which allows us to execute the user's PYTHONSTARTUP file: -_pythonstartup = os.environ.get('OLD_PYTHONSTARTUP') -if _pythonstartup and os.path.isfile(_pythonstartup): - execfile(_pythonstartup) diff --git a/repl/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala/org/apache/spark/repl/Main.scala index 14b448d076d84..de6eaa2cdc172 100644 --- a/repl/src/main/scala/org/apache/spark/repl/Main.scala +++ b/repl/src/main/scala/org/apache/spark/repl/Main.scala @@ -17,8 +17,6 @@ package org.apache.spark.repl -import scala.collection.mutable.Set - object Main { private var _interp: SparkILoop = _