Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 10 additions & 8 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,19 @@ export PYSPARK_PYTHON
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
export PYTHONSTARTUP=$FWDIR/python/pyspark/shell.py

if [ -n "$IPYTHON_OPTS" ]; then
IPYTHON=1
fi

# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" && $# = 0 ]] ; then
exec ipython $IPYTHON_OPTS
# If a python file is provided, directly run spark-submit
if [[ "$1" =~ \.py$ ]]; then
exec $FWDIR/bin/spark-submit "$@"
else
exec "$PYSPARK_PYTHON" "$@"
# Only use ipython if no command line arguments were provided [SPARK-1134]
if [[ "$IPYTHON" = "1" ]]; then
subcommand="%run $FWDIR/python/pyspark/repl.py $@"
exec ipython $IPYTHON_OPTS -c "$subcommand"
else
exec "$PYSPARK_PYTHON" $FWDIR/python/pyspark/repl.py "$@"
fi
fi
2 changes: 1 addition & 1 deletion bin/spark-shell
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ function main(){
stty icanon echo > /dev/null 2>&1
else
export SPARK_SUBMIT_OPTS
$FWDIR/bin/spark-submit spark-internal "$@" --class org.apache.spark.repl.Main
$FWDIR/bin/spark-submit spark-shell "$@" --class org.apache.spark.repl.Main
fi
}

Expand Down
2 changes: 1 addition & 1 deletion bin/spark-submit
Original file line number Diff line number Diff line change
Expand Up @@ -41,5 +41,5 @@ if [ -n "$DRIVER_MEMORY" ] && [ $DEPLOY_MODE == "client" ]; then
export SPARK_DRIVER_MEMORY=$DRIVER_MEMORY
fi

$SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"
exec $SPARK_HOME/bin/spark-class org.apache.spark.deploy.SparkSubmit "${ORIG_ARGS[@]}"

Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,14 @@ import scala.collection.JavaConversions._
import org.apache.spark.api.python.{PythonUtils, RedirectThread}

/**
* A main class used by spark-submit to launch Python applications. It executes python as a
* subprocess and then has it connect back to the JVM to access system properties, etc.
* Main class used by spark-submit to launch Python applications. It executes python as a
* sub-process and then has it connect back to the JVM to access system properties, etc.
*/
object PythonRunner {
object PythonAppRunner {
def main(args: Array[String]) {
val primaryResource = args(0)
val pyFiles = args(1)
val otherArgs = args.slice(2, args.length)

val pythonExec = sys.env.get("PYSPARK_PYTHON").getOrElse("python") // TODO: get this from conf

// Launch a Py4J gateway server for the process to connect to; this will let it see our
Expand All @@ -42,7 +41,7 @@ object PythonRunner {
// Build up a PYTHONPATH that includes the Spark assembly JAR (where this class is), the
// python directories in SPARK_HOME (if set), and any files in the pyFiles argument
val pathElements = new ArrayBuffer[String]
pathElements ++= pyFiles.split(",")
pathElements ++= Option(pyFiles).getOrElse("").split(",")
pathElements += PythonUtils.sparkPythonPath
pathElements += sys.env.getOrElse("PYTHONPATH", "")
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.deploy

import java.io.PrintWriter
import java.net.ServerSocket

import org.apache.spark.Logging

/**
* Main class used by spark-submit to launch the Python shell.
*/
object PythonShellRunner extends Logging {
private val LISTENER_PORT = 7744

def main(args: Array[String]) {

// Start the gateway server for python to access Spark objects
val gatewayServer = new py4j.GatewayServer(null, 0)
gatewayServer.start()

// Start the server that tells python what port the gateway server is bound to
val pythonListener = new ServerSocket(LISTENER_PORT)

logInfo("Python shell server listening for connections on port " + LISTENER_PORT)

try {
val socket = pythonListener.accept()
val writer = new PrintWriter(socket.getOutputStream)
writer.print(gatewayServer.getListeningPort)
writer.close()
} finally {
pythonListener.close()
}
}
}
64 changes: 40 additions & 24 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ object SparkSubmit {
private var clusterManager: Int = LOCAL

/**
* A special jar name that indicates the class being run is inside of Spark itself,
* and therefore no user jar is needed.
* Special primary resource names that represent shells rather than application jars.
*/
private val RESERVED_JAR_NAME = "spark-internal"
private val SPARK_SHELL = "spark-shell"
private val PYSPARK_SHELL = "pyspark-shell"

def main(args: Array[String]) {
val appArgs = new SparkSubmitArguments(args)
Expand All @@ -71,8 +71,8 @@ object SparkSubmit {
* entries for the child, a list of system properties, a list of env vars
* and the main class for the child
*/
private[spark] def createLaunchEnv(args: SparkSubmitArguments): (ArrayBuffer[String],
ArrayBuffer[String], Map[String, String], String) = {
private[spark] def createLaunchEnv(args: SparkSubmitArguments)
: (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = {
if (args.master.startsWith("local")) {
clusterManager = LOCAL
} else if (args.master.startsWith("yarn")) {
Expand Down Expand Up @@ -121,24 +121,30 @@ object SparkSubmit {
printErrorAndExit("Cannot currently run driver on the cluster in Mesos")
}

// If we're running a Python app, set the Java class to run to be our PythonRunner, add
// Python files to deployment list, and pass the main file and Python path to PythonRunner
// If we're running a python app, set the main class to our specific python runner
if (isPython) {
if (deployOnCluster) {
printErrorAndExit("Cannot currently run Python driver programs on cluster")
}
args.mainClass = "org.apache.spark.deploy.PythonRunner"
args.files = mergeFileLists(args.files, args.pyFiles, args.primaryResource)
val pyFiles = Option(args.pyFiles).getOrElse("")
args.childArgs = ArrayBuffer(args.primaryResource, pyFiles) ++ args.childArgs
args.primaryResource = RESERVED_JAR_NAME
sysProps("spark.submit.pyFiles") = pyFiles
if (args.primaryResource == PYSPARK_SHELL) {
args.mainClass = "org.apache.spark.deploy.PythonShellRunner"
} else {
// If a python file is provided, add it to the child arguments and list of files to deploy.
// Usage: PythonAppRunner <main python file> <extra python files> [app arguments]
args.mainClass = "org.apache.spark.deploy.PythonAppRunner"
args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs
args.files = Utils.mergeFileLists(args.primaryResource, args.files)
}
Option(args.pyFiles).foreach { pyFiles =>
args.files = Utils.mergeFileLists(args.files, args.pyFiles)
sysProps("spark.submit.pyFiles") = pyFiles
}
}

// If we're deploying into YARN, use yarn.Client as a wrapper around the user class
if (!deployOnCluster) {
childMainClass = args.mainClass
if (args.primaryResource != RESERVED_JAR_NAME) {
if (isUserJar(args.primaryResource)) {
childClasspath += args.primaryResource
}
} else if (clusterManager == YARN) {
Expand Down Expand Up @@ -219,7 +225,7 @@ object SparkSubmit {
// For python files, the primary resource is already distributed as a regular file
if (!isYarnCluster && !isPython) {
var jars = sysProps.get("spark.jars").map(x => x.split(",").toSeq).getOrElse(Seq())
if (args.primaryResource != RESERVED_JAR_NAME) {
if (isUserJar(args.primaryResource)) {
jars = jars ++ Seq(args.primaryResource)
}
sysProps.put("spark.jars", jars.mkString(","))
Expand Down Expand Up @@ -293,8 +299,8 @@ object SparkSubmit {
}

private def addJarToClasspath(localJar: String, loader: ExecutorURLClassLoader) {
val localJarFile = new File(new URI(localJar).getPath())
if (!localJarFile.exists()) {
val localJarFile = new File(new URI(localJar).getPath)
if (!localJarFile.exists) {
printWarning(s"Jar $localJar does not exist, skipping.")
}

Expand All @@ -303,14 +309,24 @@ object SparkSubmit {
}

/**
* Merge a sequence of comma-separated file lists, some of which may be null to indicate
* no files, into a single comma-separated string.
* Return whether the given primary resource represents a user jar.
*/
private def isUserJar(primaryResource: String): Boolean = {
!isShell(primaryResource) && !isPython(primaryResource)
}

/**
* Return whether the given primary resource represents a shell.
*/
private def isShell(primaryResource: String): Boolean = {
primaryResource == SPARK_SHELL || primaryResource == PYSPARK_SHELL
}

/**
* Return whether the given primary resource requires running python.
*/
private[spark] def mergeFileLists(lists: String*): String = {
val merged = lists.filter(_ != null)
.flatMap(_.split(","))
.mkString(",")
if (merged == "") null else merged
private[spark] def isPython(primaryResource: String): Boolean = {
primaryResource.endsWith(".py") || primaryResource == PYSPARK_SHELL
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
case v =>
primaryResource = v
inSparkOpts = false
isPython = v.endsWith(".py")
isPython = SparkSubmit.isPython(v)
parse(tail)
}
} else {
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1166,4 +1166,17 @@ private[spark] object Utils extends Logging {
true
}
}


/**
* Merge a sequence of comma-separated file lists into a single comma-separated string.
* The provided strings may be null or empty to indicate no files.
*/
def mergeFileLists(lists: String*): String = {
lists
.filter(_ != null)
.filter(_ != "")
.flatMap(_.split(","))
.mkString(",")
}
}
9 changes: 5 additions & 4 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
from py4j.java_gateway import java_import, JavaGateway, GatewayClient


# Handler to avoid sending ctrl-c / SIGINT to the Java gateway
def ignoreInterrupt():
signal.signal(signal.SIGINT, signal.SIG_IGN)

def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]

Expand All @@ -38,10 +42,7 @@ def launch_gateway():
command = [os.path.join(SPARK_HOME, script), "py4j.GatewayServer",
"--die-on-broken-pipe", "0"]
if not on_windows:
# Don't send ctrl-c / SIGINT to the Java gateway:
def preexec_func():
signal.signal(signal.SIGINT, signal.SIG_IGN)
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=preexec_func)
proc = Popen(command, stdout=PIPE, stdin=PIPE, preexec_fn=ignoreInterrupt)
else:
# preexec_fn not supported on Windows
proc = Popen(command, stdout=PIPE, stdin=PIPE)
Expand Down
61 changes: 61 additions & 0 deletions python/pyspark/repl.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import code
import os
import socket
import subprocess
import sys
import time
from pyspark.java_gateway import ignoreInterrupt

# Launch spark submit process
sparkHome = os.getcwd()
sparkSubmit = sparkHome + "/bin/spark-submit"
submitArgs = sys.argv[1:]
command = [sparkSubmit, "pyspark-shell"] + submitArgs
process = subprocess.Popen(command, stdout=sys.stdout, preexec_fn=ignoreInterrupt)

try:
# Read py4j port from the PythonShellRunner server
serverPort = 7744
retrySeconds = 0.1
maxAttempts = 20
numAttempts = 0
py4jPort = -1
while py4jPort < 0:
try:
s = socket.socket()
s.connect(("127.0.0.1", serverPort))
py4jPort = s.recv(1024)
s.close()
except socket.error as se:
if numAttempts < maxAttempts:
numAttempts += 1
time.sleep(retrySeconds)
else:
raise Exception("Failed to retrieve Py4j gateway server port from server!")

# Set up Spark environment for python
os.environ["PYSPARK_GATEWAY_PORT"] = py4jPort
execfile(sparkHome + "/python/pyspark/shell.py")

# Start the REPL
code.interact(local=locals())

finally:
process.terminate()
11 changes: 1 addition & 10 deletions python/pyspark/shell.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

"""
An interactive shell.

This file is designed to be launched as a PYTHONSTARTUP script.
"""

import sys
Expand All @@ -27,7 +25,6 @@
print("\tSet env variable PYSPARK_PYTHON to Python2 binary and re-run it.")
sys.exit(1)


import os
import platform
import pyspark
Expand All @@ -40,7 +37,7 @@
if os.environ.get("SPARK_EXECUTOR_URI"):
SparkContext.setSystemProperty("spark.executor.uri", os.environ["SPARK_EXECUTOR_URI"])

sc = SparkContext(os.environ.get("MASTER", "local[*]"), "PySparkShell", pyFiles=add_files)
sc = SparkContext(appName="PySparkShell", pyFiles=add_files)

print("""Welcome to
____ __
Expand All @@ -57,9 +54,3 @@

if add_files != None:
print("Adding files: [%s]" % ", ".join(add_files))

# The ./bin/pyspark script stores the old PYTHONSTARTUP value in OLD_PYTHONSTARTUP,
# which allows us to execute the user's PYTHONSTARTUP file:
_pythonstartup = os.environ.get('OLD_PYTHONSTARTUP')
if _pythonstartup and os.path.isfile(_pythonstartup):
execfile(_pythonstartup)
2 changes: 0 additions & 2 deletions repl/src/main/scala/org/apache/spark/repl/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.repl

import scala.collection.mutable.Set

object Main {
private var _interp: SparkILoop = _

Expand Down