From 66b93b99a128cfa745d7c2fc85f36a1458999f30 Mon Sep 17 00:00:00 2001 From: Jongyoul Lee Date: Sat, 4 Jul 2015 22:37:03 +0900 Subject: [PATCH] [ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node - Spark supports pyspark on yarn cluster without deploying python libraries from Spark 1.4 - https://issues.apache.org/jira/browse/SPARK-6869 - apache/spark#5580, apache/spark#5478 Author: Jongyoul Lee Closes #118 from jongyoul/ZEPPELIN-18 and squashes the following commits: a47e27c [Jongyoul Lee] - Fixed test script for spark 1.4.0 72a65fd [Jongyoul Lee] - Fixed test script for spark 1.4.0 ee6d100 [Jongyoul Lee] - Cleanup codes 47fd9c9 [Jongyoul Lee] - Cleanup codes 248e330 [Jongyoul Lee] - Cleanup codes 4cd10b5 [Jongyoul Lee] - Removed meaningless codes comments c9cda29 [Jongyoul Lee] - Removed setting SPARK_HOME - Changed the location of pyspark's directory into interpreter/spark ef240f5 [Jongyoul Lee] - Fixed typo 06002fd [Jongyoul Lee] - Fixed typo 4b35c8d [Jongyoul Lee] [ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node - Dummy for trigger 682986e [Jongyoul Lee] rebased 8a7bf47 [Jongyoul Lee] [ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node - rebasing ad610fb [Jongyoul Lee] rebased 94bdf30 [Jongyoul Lee] [ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node - Fixed checkstyle 929333d [Jongyoul Lee] rebased 64b8195 [Jongyoul Lee] [ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node - rebasing 0a2d90e [Jongyoul Lee] rebased b05ae6e [Jongyoul Lee] [ZEPPELIN-18] Remove setting SPARK_HOME for PySpark - Excludes python/** from apache-rat 71e2a92 [Jongyoul Lee] [ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node - Removed verbose setting 0ddb436 [Jongyoul Lee] [ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node - Followed spark's way to support pyspark - https://issues.apache.org/jira/browse/SPARK-6869 - https://github.com/apache/spark/pull/5580 - https://github.com/apache/spark/pull/5478/files 1b192f6 [Jongyoul Lee] [ZEPPELIN-18] Remove setting SPARK_HOME for PySpark - Removed redundant dependency setting 32fd9e1 [Jongyoul Lee] [ZEPPELIN-18] Running pyspark without deploying python libraries to every yarn node - rebasing (cherry picked from commit 3bd2b2122acc0920c98627e147f3893898833889) Signed-off-by: Lee moon soo --- .gitignore | 1 + .travis.yml | 6 +- bin/interpreter.sh | 13 ++++ spark/pom.xml | 77 +++++++++++++++++-- .../zeppelin/spark/PySparkInterpreter.java | 12 --- .../zeppelin/spark/SparkInterpreter.java | 35 +++++++-- 6 files changed, 117 insertions(+), 27 deletions(-) diff --git a/.gitignore b/.gitignore index 93962a16b7e..9bbf4e0852a 100644 --- a/.gitignore +++ b/.gitignore @@ -73,3 +73,4 @@ auto-save-list tramp .\#* *.swp +**/dependency-reduced-pom.xml diff --git a/.travis.yml b/.travis.yml index 4031e781edd..ac80e8a6959 100644 --- a/.travis.yml +++ b/.travis.yml @@ -22,16 +22,16 @@ before_install: - "sh -e /etc/init.d/xvfb start" install: - - mvn package -DskipTests -Phadoop-2.3 -B + - mvn package -DskipTests -Phadoop-2.3 -Ppyspark -B before_script: - script: # spark 1.4 - - mvn package -Pbuild-distr -Phadoop-2.3 -B + - mvn package -Pbuild-distr -Phadoop-2.3 -Ppyspark -B - ./testing/startSparkCluster.sh 1.4.0 2.3 - - SPARK_HOME=./spark-1.4.1-bin-hadoop2.3 mvn verify -Pusing-packaged-distr -Phadoop-2.3 -B + - SPARK_HOME=`pwd`/spark-1.4.0-bin-hadoop2.3 mvn verify -Pusing-packaged-distr -Phadoop-2.3 -Ppyspark -B - ./testing/stopSparkCluster.sh 1.4.0 2.3 # spark 1.3 - mvn clean package -DskipTests -Pspark-1.3 -Phadoop-2.3 -B -pl 'zeppelin-interpreter,spark' diff --git a/bin/interpreter.sh b/bin/interpreter.sh index c214c3081d0..62bc514ef74 100755 --- a/bin/interpreter.sh +++ b/bin/interpreter.sh @@ -73,6 +73,19 @@ if [[ ! -d "${ZEPPELIN_LOG_DIR}" ]]; then $(mkdir -p "${ZEPPELIN_LOG_DIR}") fi +if [[ ! -z "${SPARK_HOME}" ]]; then + PYSPARKPATH="${SPARK_HOME}/python/lib/pyspark.zip:${SPARK_HOME}/python/lib/py4j-0.8.2.1-src.zip" +else + PYSPARKPATH="${ZEPPELIN_HOME}/interpreter/spark/pyspark/pyspark.zip:${ZEPPELIN_HOME}/interpreter/spark/pyspark/py4j-0.8.2.1-src.zip" +fi + +if [[ x"" == x"${PYTHONPATH}" ]]; then + export PYTHONPATH="${PYSPARKPATH}" +else + export PYTHONPATH="${PYTHONPATH}:${PYSPARKPATH}" +fi + +unset PYSPARKPATH ${ZEPPELIN_RUNNER} ${JAVA_INTP_OPTS} -cp ${CLASSPATH} ${ZEPPELIN_SERVER} ${PORT} & pid=$! diff --git a/spark/pom.xml b/spark/pom.xml index ea468bbc808..4ee89c82196 100644 --- a/spark/pom.xml +++ b/spark/pom.xml @@ -48,6 +48,8 @@ org.spark-project.akka 2.3.4-spark + + http://www.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz @@ -473,13 +475,6 @@ - - - net.sf.py4j - py4j - 0.8.2.1 - - org.apache.commons commons-exec @@ -723,6 +718,74 @@ + + pyspark + + http://www.apache.org/dist/spark/spark-${spark.version}/spark-${spark.version}.tgz + + + + + + com.googlecode.maven-download-plugin + download-maven-plugin + 1.2.1 + + + download-pyspark-files + validate + + wget + + + ${spark.download.url} + true + ${project.build.directory}/spark-dist + + + + + + maven-clean-plugin + + + + ${basedir}/../python/build + + + ${project.build.direcoty}/spark-dist + + + + + + org.apache.maven.plugins + maven-antrun-plugin + 1.7 + + + download-and-zip-pyspark-files + generate-resources + + run + + + + + + + + + + + + + + + hadoop-provided diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 092b077359c..852dd335183 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -159,18 +159,6 @@ public void open() { try { Map env = EnvironmentUtils.getProcEnvironment(); - String pythonPath = (String) env.get("PYTHONPATH"); - if (pythonPath == null) { - pythonPath = ""; - } else { - pythonPath += ":"; - } - - pythonPath += getSparkHome() + "/python/lib/py4j-0.8.2.1-src.zip:" - + getSparkHome() + "/python"; - - env.put("PYTHONPATH", pythonPath); - executor.execute(cmd, env, this); pythonscriptRunning = true; } catch (IOException e) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 935b2a59c19..aed39d63b98 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -26,12 +26,9 @@ import java.lang.reflect.Method; import java.net.URL; import java.net.URLClassLoader; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; +import java.util.*; +import com.google.common.base.Joiner; import org.apache.spark.HttpServer; import org.apache.spark.SparkConf; import org.apache.spark.SparkContext; @@ -273,6 +270,34 @@ public SparkContext createSparkContext() { } } + //TODO(jongyoul): Move these codes into PySparkInterpreter.java + + String pysparkBasePath = getSystemDefault("SPARK_HOME", "spark.home", null); + File pysparkPath; + if (null == pysparkBasePath) { + pysparkBasePath = getSystemDefault("ZEPPELIN_HOME", "zeppelin.home", "../"); + pysparkPath = new File(pysparkBasePath, + "interpreter" + File.separator + "spark" + File.separator + "pyspark"); + } else { + pysparkPath = new File(pysparkBasePath, + "python" + File.separator + "lib"); + } + + String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.8.2.1-src.zip"}; + ArrayList pythonLibUris = new ArrayList<>(); + for (String lib : pythonLibs) { + File libFile = new File(pysparkPath, lib); + if (libFile.exists()) { + pythonLibUris.add(libFile.toURI().toString()); + } + } + pythonLibUris.trimToSize(); + if (pythonLibs.length == pythonLibUris.size()) { + conf.set("spark.yarn.dist.files", Joiner.on(",").join(pythonLibUris)); + conf.set("spark.files", conf.get("spark.yarn.dist.files")); + conf.set("spark.submit.pyArchives", Joiner.on(":").join(pythonLibs)); + } + SparkContext sparkContext = new SparkContext(conf); return sparkContext; }