From bce3c1d33e5ab48146c2d70e81935e361fcff9c2 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 29 Jun 2015 12:53:10 -0700 Subject: [PATCH 1/2] Print more stacktrace --- spark/src/main/resources/python/zeppelin_pyspark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index e29544e54f2..925a791557c 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -117,6 +117,6 @@ def reset(self): excInnerError = excInnerError[innerErrorStart:] intp.setStatementsFinished(excInnerError + str(sys.exc_info()), True) except: - intp.setStatementsFinished(str(sys.exc_info()), True) + intp.setStatementsFinished(traceback.format_exc(), True) output.reset() From 1fa4bf6035d276124b75f55008532cd1a91b2427 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 29 Jun 2015 14:07:37 -0700 Subject: [PATCH 2/2] apply auto_convert for spark 1.4 --- .../zeppelin/spark/PySparkInterpreter.java | 1 + .../main/resources/python/zeppelin_pyspark.py | 14 ++++++++++---- .../rest/ZeppelinSparkClusterTest.java | 19 +++++++++++++++++++ 3 files changed, 30 insertions(+), 4 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 95eefd809c8..092b077359c 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -137,6 +137,7 @@ public void open() { CommandLine cmd = CommandLine.parse(getProperty("zeppelin.pyspark.python")); cmd.addArgument(scriptPath, false); cmd.addArgument(Integer.toString(port), false); + cmd.addArgument(getJavaSparkContext().version(), false); executor = new DefaultExecutor(); outputStream = new ByteArrayOutputStream(); PipedOutputStream ps = new PipedOutputStream(); diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 925a791557c..802015d7eb8 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -32,7 +32,12 @@ from pyspark.sql import SQLContext, HiveContext, SchemaRDD, Row client = GatewayClient(port=int(sys.argv[1])) -gateway = JavaGateway(client) +sparkVersion = sys.argv[2] + +if sparkVersion.startswith("1.4"): + gateway = JavaGateway(client, auto_convert = True) +else: + gateway = JavaGateway(client) java_import(gateway.jvm, "org.apache.spark.SparkEnv") java_import(gateway.jvm, "org.apache.spark.SparkConf") @@ -45,15 +50,15 @@ jsc = intp.getJavaSparkContext() -if jsc.version().startswith("1.2"): +if sparkVersion.startswith("1.2"): java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") -elif jsc.version().startswith("1.3"): +elif sparkVersion.startswith("1.3"): java_import(gateway.jvm, "org.apache.spark.sql.*") java_import(gateway.jvm, "org.apache.spark.sql.hive.*") -elif jsc.version().startswith("1.4"): +elif sparkVersion.startswith("1.4"): java_import(gateway.jvm, "org.apache.spark.sql.*") java_import(gateway.jvm, "org.apache.spark.sql.hive.*") @@ -64,6 +69,7 @@ conf = SparkConf(_jvm = gateway.jvm, _jconf = jconf) sc = SparkContext(jsc=jsc, gateway=gateway, conf=conf) sqlc = SQLContext(sc, intp.getSQLContext()) +sqlContext = sqlc z = intp.getZeppelinContext() diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index 758a1e4b813..fd4a8b301bf 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -88,6 +88,25 @@ public void pySparkTest() throws IOException { } ZeppelinServer.notebook.removeNote(note.id()); } + + @Test + public void pySparkAutoConvertOptionTest() throws IOException { + // create new note + Note note = ZeppelinServer.notebook.createNote(); + + int sparkVersion = getSparkVersionNumber(note); + + if (isPyspark() && sparkVersion >= 14) { // auto_convert enabled from spark 1.4 + // run markdown paragraph, again + Paragraph p = note.addParagraph(); + p.setText("%pyspark\nfrom pyspark.sql.functions import *\n" + + "print(sqlContext.range(0, 10).withColumn('uniform', rand(seed=10) * 3.14).count())"); + note.run(p.getId()); + waitForFinish(p); + assertEquals("10\n", p.getResult().message()); + } + ZeppelinServer.notebook.removeNote(note.id()); + } @Test public void zRunTest() throws IOException {