Skip to content

Commit

Permalink
PySpark on YARN
Browse files Browse the repository at this point in the history
  • Loading branch information
sryza committed Apr 29, 2014
1 parent bf8d0aa commit fd0df79
Show file tree
Hide file tree
Showing 11 changed files with 86 additions and 19 deletions.
1 change: 1 addition & 0 deletions bin/pyspark
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ export PYSPARK_PYTHON

# Add the PySpark classes to the Python path:
export PYTHONPATH=$SPARK_HOME/python/:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH

# Load the PySpark shell.py script when ./pyspark is used interactively:
export OLD_PYTHONSTARTUP=$PYTHONSTARTUP
Expand Down
1 change: 1 addition & 0 deletions bin/pyspark2.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ rem Figure out which Python to use.
if "x%PYSPARK_PYTHON%"=="x" set PYSPARK_PYTHON=python

set PYTHONPATH=%FWDIR%python;%PYTHONPATH%
set PYTHONPATH=%FWDIR%python\lib\py4j-0.8.1-src.zip;%PYTHONPATH%

set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
set PYTHONSTARTUP=%FWDIR%python\pyspark\shell.py
Expand Down
41 changes: 41 additions & 0 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,47 @@
</environmentVariables>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>unzip</executable>
<workingDirectory>../python</workingDirectory>
<arguments>
<argument>-o</argument>
<argument>lib/py4j*.zip</argument>
<argument>-d</argument>
<argument>build</argument>
</arguments>
</configuration>
</plugin>
</plugins>

<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>../python</directory>
<includes>
<include>pyspark/*.py</include>
</includes>
</resource>
<resource>
<directory>../python/build</directory>
<includes>
<include>py4j/*.py</include>
</includes>
</resource>
</resources>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/worker.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
val worker = pb.start()

// Redirect the worker's stderr to ours
Expand Down Expand Up @@ -154,12 +151,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val sparkHome = new ProcessBuilder().environment().get("SPARK_HOME")
val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/daemon.py"))
val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars)
val pythonPath = sparkHome + "/python/" + File.pathSeparator + workerEnv.get("PYTHONPATH")
workerEnv.put("PYTHONPATH", pythonPath)
daemon = pb.start()

// Redirect the stderr to ours
Expand Down
5 changes: 5 additions & 0 deletions docs/python-programming-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@ All of PySpark's library dependencies, including [Py4J](http://py4j.sourceforge.
Standalone PySpark applications should be run using the `bin/pyspark` script, which automatically configures the Java and Python environment using the settings in `conf/spark-env.sh` or `.cmd`.
The script automatically adds the `bin/pyspark` package to the `PYTHONPATH`.

# Running PySpark on YARN

Running PySpark on a YARN-managed cluster requires a few extra steps. The client must reference a ZIP file containing PySpark and its dependencies. To create this file, run "make" inside the `python/` directory in the Spark source. This will generate `pyspark-assembly.zip` under `python/build/`. Then, set the PYSPARK_ZIP environment variable to point to the location of this file. Lastly, set MASTER=yarn-client.

`pyspark-assembly.zip` can be placed either on local disk or on HDFS. If in a public location on HDFS, YARN will be able to cache it on each node so that it doesn't need to be transferred each time an app is run.

# Interactive Use

Expand Down
3 changes: 3 additions & 0 deletions python/.gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
*.pyc
docs/
pyspark.egg-info
build/
dist/
1 change: 0 additions & 1 deletion python/lib/PY4J_VERSION.txt

This file was deleted.

7 changes: 0 additions & 7 deletions python/pyspark/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,6 @@
Main entry point for accessing data stored in Apache Hive..
"""



import sys
import os
sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j-0.8.1-src.zip"))


from pyspark.conf import SparkConf
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
Expand Down
29 changes: 27 additions & 2 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@
from py4j.java_gateway import java_import, JavaGateway, GatewayClient


SPARK_HOME = os.environ["SPARK_HOME"]
def launch_gateway():
SPARK_HOME = os.environ["SPARK_HOME"]

set_env_vars_for_yarn()

def launch_gateway():
# Launch the Py4j gateway using Spark's run command so that we pick up the
# proper classpath and settings from spark-env.sh
on_windows = platform.system() == "Windows"
Expand Down Expand Up @@ -70,3 +71,27 @@ def run(self):
java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext")
java_import(gateway.jvm, "scala.Tuple2")
return gateway

def set_env_vars_for_yarn():
# Add the spark jar, which includes the pyspark files, to the python path
env_map = parse_env(os.environ.get("SPARK_YARN_USER_ENV", ""))
if "PYTHONPATH" in env_map:
env_map["PYTHONPATH"] += ":spark.jar"
else:
env_map["PYTHONPATH"] = "spark.jar"

os.environ["SPARK_YARN_USER_ENV"] = ",".join(k + '=' + v for (k, v) in env_map.items())

def parse_env(env_str):
# Turns a comma-separated of env settings into a dict that maps env vars to
# their values.
env = {}
for var_str in env_str.split(","):
parts = var_str.split("=")
if len(parts) == 2:
env[parts[0]] = parts[1]
elif len(var_str) > 0:
print "Invalid entry in SPARK_YARN_USER_ENV: " + var_str
sys.exit(1)

return env
4 changes: 3 additions & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@

from pyspark.context import SparkContext
from pyspark.files import SparkFiles
from pyspark.java_gateway import SPARK_HOME
from pyspark.serializers import read_int


SPARK_HOME = os.environ["SPARK_HOME"]


class PySparkTestCase(unittest.TestCase):

def setUp(self):
Expand Down
3 changes: 3 additions & 0 deletions sbin/spark-config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,6 @@ this="$config_bin/$script"
export SPARK_PREFIX=`dirname "$this"`/..
export SPARK_HOME=${SPARK_PREFIX}
export SPARK_CONF_DIR="$SPARK_HOME/conf"
# Add the PySpark classes to the PYTHONPATH:
export PYTHONPATH=$SPARK_HOME/python:$PYTHONPATH
export PYTHONPATH=$SPARK_HOME/python/lib/py4j-0.8.1-src.zip:$PYTHONPATH

0 comments on commit fd0df79

Please sign in to comment.