From ff5be9a41e52454e0f9cae83dd1fd50fbeaa684a Mon Sep 17 00:00:00 2001 From: Sandy Ryza Date: Tue, 29 Apr 2014 23:24:34 -0700 Subject: [PATCH] SPARK-1004. PySpark on YARN This reopens https://github.com/apache/incubator-spark/pull/640 against the new repo Author: Sandy Ryza Closes #30 from sryza/sandy-spark-1004 and squashes the following commits: 89889d4 [Sandy Ryza] Move unzipping py4j to the generate-resources phase so that it gets included in the jar the first time 5165a02 [Sandy Ryza] Fix docs fd0df79 [Sandy Ryza] PySpark on YARN --- bin/pyspark | 1 + bin/pyspark2.cmd | 1 + core/pom.xml | 42 +++++++++++++++++++ .../api/python/PythonWorkerFactory.scala | 10 +---- docs/python-programming-guide.md | 3 ++ python/.gitignore | 3 ++ python/lib/PY4J_VERSION.txt | 1 - python/pyspark/__init__.py | 7 ---- python/pyspark/java_gateway.py | 29 ++++++++++++- python/pyspark/tests.py | 4 +- sbin/spark-config.sh | 3 ++ 11 files changed, 85 insertions(+), 19 deletions(-) delete mode 100644 python/lib/PY4J_VERSION.txt diff --git a/bin/pyspark b/bin/pyspark index cad982bc33477..f5558853e8a4e 100755 --- a/bin/pyspark +++ b/bin/pyspark @@ -46,6 +46,7 @@ export PYSPARK_PYTHON # Add the PySpark classes to the Python path: export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH +export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH # Load the PySpark shell.py script when ./pyspark is used interactively: export OLD_PYTHONSTARTUP=$PYTHONSTARTUP diff --git a/bin/pyspark2.cmd b/bin/pyspark2.cmd index 95791095ec932..d7cfd5eec501c 100644 --- a/bin/pyspark2.cmd +++ b/bin/pyspark2.cmd @@ -45,6 +45,7 @@ rem Figure out which Python to use. if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python set PYTHONPATH=%FWDIR%python;%PYTHONPATH% +set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH% set OLD_PYTHONSTARTUP=%PYTHONSTARTUP% set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py diff --git a/core/pom.xml b/core/pom.xml index 73f573a414050..822b5b1dd7cc2 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -294,6 +294,48 @@ + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + generate-resources + + exec + + + + + unzip + ../python + + -o + lib/py4j*.zip + -d + build + + + + + + + src/main/resources + + + ../python + + pyspark/*.py + + + + ../python/build + + py4j/*.py + + + diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index a5f0f3d5e7eae..02799ce0091b0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) // Create and start the worker - val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") - val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py")) + val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker")) val workerEnv = pb.environment() workerEnv.putAll(envVars) - val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH") - workerEnv.put("PYTHONPATH", pythonPath) val worker = pb.start() // Redirect the worker's stderr to ours @@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon - val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME") - val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py")) + val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon")) val workerEnv = pb.environment() workerEnv.putAll(envVars) - val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH") - workerEnv.put("PYTHONPATH", pythonPath) daemon = pb.start() // Redirect the stderr to ours diff --git a/docs/python-programming-guide.md b/docs/python-programming-guide.md index 98233bf556b79..98c456228af9f 100644 --- a/docs/python-programming-guide.md +++ b/docs/python-programming-guide.md @@ -63,6 +63,9 @@ All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge. Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`. The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`. +# Running PySpark on YARN + +To run PySpark against a YARN cluster, simply set the MASTER environment variable to "yarn-client". # Interactive Use diff --git a/python/.gitignore b/python/.gitignore index 5c56e638f923a..80b361ffbd51c 100644 --- a/python/.gitignore +++ b/python/.gitignore @@ -1,2 +1,5 @@ *.pyc docs/ +pyspark.egg-info +build/ +dist/ diff --git a/python/lib/PY4J_VERSION.txt b/python/lib/PY4J_VERSION.txt deleted file mode 100644 index 04a0cd52a8d9c..0000000000000 --- a/python/lib/PY4J_VERSION.txt +++ /dev/null @@ -1 +0,0 @@ -b7924aabe9c5e63f0a4d8bbd17019534c7ec014e diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 73fe7378ffa63..07df8697bd1a8 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -49,13 +49,6 @@ Main entry point for accessing data stored in Apache Hive.. """ - - -import sys -import os -sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip")) - - from pyspark.conf import SparkConf from pyspark.context import SparkContext from pyspark.sql import SQLContext diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 6bb6c877c942d..032d960e40998 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -24,10 +24,11 @@ from py4j.java_gateway import java_import, JavaGateway, GatewayClient -SPARK_HOME = os.environ["SPARK_HOME"] +def launch_gateway(): + SPARK_HOME = os.environ["SPARK_HOME"] + set_env_vars_for_yarn() -def launch_gateway(): # Launch the Py4j gateway using Spark's run command so that we pick up the # proper classpath and settings from spark-env.sh on_windows = platform.system() == "Windows" @@ -70,3 +71,27 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") java_import(gateway.jvm, "scala.Tuple2") return gateway + +def set_env_vars_for_yarn(): + # Add the spark jar, which includes the pyspark files, to the python path + env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", "")) + if "PYTHONPATH" in env_map: + env_map["PYTHONPATH"] += ":spark.jar" + else: + env_map["PYTHONPATH"] = "spark.jar" + + os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items()) + +def parse_env(env_str): + # Turns a comma-separated of env settings into a dict that maps env vars to + # their values. + env = {} + for var_str in env_str.split(","): + parts = var_str.split("=") + if len(parts) == 2: + env[parts[0]] = parts[1] + elif len(var_str) > 0: + print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str + sys.exit(1) + + return env diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 527104587fd31..8cf9d9cf1bd66 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -30,10 +30,12 @@ from pyspark.context import SparkContext from pyspark.files import SparkFiles -from pyspark.java_gateway import SPARK_HOME from pyspark.serializers import read_int +SPARK_HOME = os.environ["SPARK_HOME"] + + class PySparkTestCase(unittest.TestCase): def setUp(self): diff --git a/sbin/spark-config.sh b/sbin/spark-config.sh index cd2c7b7b0d496..147b506dd5ca3 100755 --- a/sbin/spark-config.sh +++ b/sbin/spark-config.sh @@ -34,3 +34,6 @@ this="$config_bin/$script" export SPARK_PREFIX=`dirname "$this"`/.. export SPARK_HOME=${SPARK_PREFIX} export SPARK_CONF_DIR="$SPARK_HOME/conf" +# Add the PySpark classes to the PYTHONPATH: +export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH +export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH