From 6bd112811d81aa871b077c655fa0980802e3bc71 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Tue, 2 Aug 2016 04:30:23 -0500 Subject: [PATCH 1/3] Create and inject spark session into sparkr interpreter --- .../apache/zeppelin/spark/SparkRInterpreter.java | 14 +++++++++++--- .../java/org/apache/zeppelin/spark/ZeppelinR.java | 6 +++++- .../apache/zeppelin/spark/ZeppelinRContext.java | 8 ++++++++ spark/src/main/resources/R/zeppelin_sparkr.R | 10 ++++++++-- .../zeppelin/rest/ZeppelinSparkClusterTest.java | 7 +++++-- 5 files changed, 37 insertions(+), 8 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 8329641d8f3..29328a55da7 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.spark.SparkContext; import org.apache.spark.SparkRBackend; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; @@ -70,11 +71,18 @@ public void open() { int port = SparkRBackend.port(); SparkInterpreter sparkInterpreter = getSparkInterpreter(); - ZeppelinRContext.setSparkContext(sparkInterpreter.getSparkContext()); - ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); + SparkContext sc = sparkInterpreter.getSparkContext(); + SparkVersion sparkVersion = new SparkVersion(sc.version()); + ZeppelinRContext.setSparkContext(sc); + if (Utils.isSpark2()) { + ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); + } else { + ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); + } + ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext()); - zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port); + zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion); try { zeppelinR.open(); } catch (IOException e) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index 0ff07403911..24238983595 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -36,6 +36,7 @@ public class ZeppelinR implements ExecuteResultHandler { Logger logger = LoggerFactory.getLogger(ZeppelinR.class); private final String rCmdPath; + private final SparkVersion sparkVersion; private DefaultExecutor executor; private SparkOutputStream outputStream; private PipedOutputStream input; @@ -107,9 +108,11 @@ public Object getValue() { * @param rCmdPath R repl commandline path * @param libPath sparkr library path */ - public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort) { + public ZeppelinR(String rCmdPath, String libPath, + int sparkRBackendPort, SparkVersion sparkVersion) { this.rCmdPath = rCmdPath; this.libPath = libPath; + this.sparkVersion = sparkVersion; this.port = sparkRBackendPort; try { File scriptFile = File.createTempFile("zeppelin_sparkr-", ".R"); @@ -137,6 +140,7 @@ public void open() throws IOException { cmd.addArgument(Integer.toString(hashCode())); cmd.addArgument(Integer.toString(port)); cmd.addArgument(libPath); + cmd.addArgument(Integer.toString(sparkVersion.toNumber())); executor = new DefaultExecutor(); outputStream = new SparkOutputStream(); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java index 82c320d7f21..9ad156efb4e 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinRContext.java @@ -27,6 +27,7 @@ public class ZeppelinRContext { private static SparkContext sparkContext; private static SQLContext sqlContext; private static ZeppelinContext zeppelinContext; + private static Object sparkSession; public static void setSparkContext(SparkContext sparkContext) { ZeppelinRContext.sparkContext = sparkContext; @@ -40,6 +41,10 @@ public static void setSqlContext(SQLContext sqlContext) { ZeppelinRContext.sqlContext = sqlContext; } + public static void setSparkSession(Object sparkSession) { + ZeppelinRContext.sparkSession = sparkSession; + } + public static SparkContext getSparkContext() { return sparkContext; } @@ -52,4 +57,7 @@ public static ZeppelinContext getZeppelinContext() { return zeppelinContext; } + public static Object getSparkSession() { + return sparkSession; + } } diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R index fe2a16b973f..daea69f6c3b 100644 --- a/spark/src/main/resources/R/zeppelin_sparkr.R +++ b/spark/src/main/resources/R/zeppelin_sparkr.R @@ -21,6 +21,7 @@ args <- commandArgs(trailingOnly = TRUE) hashCode <- as.integer(args[1]) port <- as.integer(args[2]) libPath <- args[3] +version <- as.integer(args[4]) rm(args) print(paste("Port ", toString(port))) @@ -41,8 +42,13 @@ assign(".scStartTime", as.integer(Sys.time()), envir = SparkR:::.sparkREnv) # setup spark env assign(".sc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkContext"), envir = SparkR:::.sparkREnv) assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv) -assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) -assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) +if (version >= 200) { + assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv) + assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) +} else { + assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) + assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) +} assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv) z.put <- function(name, object) { diff --git a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java index a65ccbcd762..61dc6d1b928 100644 --- a/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java +++ b/zeppelin-server/src/test/java/org/apache/zeppelin/rest/ZeppelinSparkClusterTest.java @@ -100,13 +100,16 @@ public void sparkRTest() throws IOException { } } - // run markdown paragraph, again + String sqlContextName = "sqlContext"; + if (sparkVersion >= 20) { + sqlContextName = "spark"; + } Paragraph p = note.addParagraph(); Map config = p.getConfig(); config.put("enabled", true); p.setConfig(config); p.setText("%r localDF <- data.frame(name=c(\"a\", \"b\", \"c\"), age=c(19, 23, 18))\n" + - "df <- createDataFrame(sqlContext, localDF)\n" + + "df <- createDataFrame(" + sqlContextName + ", localDF)\n" + "count(df)" ); note.run(p.getId()); From 02822acb2618c24c20dcbc11cc9bf81c32292070 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Tue, 2 Aug 2016 06:30:28 -0500 Subject: [PATCH 2/3] Change indent --- spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java index 24238983595..e0a47b760c3 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/ZeppelinR.java @@ -108,8 +108,8 @@ public Object getValue() { * @param rCmdPath R repl commandline path * @param libPath sparkr library path */ - public ZeppelinR(String rCmdPath, String libPath, - int sparkRBackendPort, SparkVersion sparkVersion) { + public ZeppelinR(String rCmdPath, String libPath, int sparkRBackendPort, + SparkVersion sparkVersion) { this.rCmdPath = rCmdPath; this.libPath = libPath; this.sparkVersion = sparkVersion; From b3df11f2f0681f7fdf09d18506a8a2631217d584 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Tue, 2 Aug 2016 07:59:36 -0500 Subject: [PATCH 3/3] inject sqlContext as well --- .../java/org/apache/zeppelin/spark/SparkRInterpreter.java | 4 +--- spark/src/main/resources/R/zeppelin_sparkr.R | 5 ++--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java index 29328a55da7..5598f098b7d 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkRInterpreter.java @@ -76,10 +76,8 @@ public void open() { ZeppelinRContext.setSparkContext(sc); if (Utils.isSpark2()) { ZeppelinRContext.setSparkSession(sparkInterpreter.getSparkSession()); - } else { - ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); } - + ZeppelinRContext.setSqlContext(sparkInterpreter.getSQLContext()); ZeppelinRContext.setZepplinContext(sparkInterpreter.getZeppelinContext()); zeppelinR = new ZeppelinR(rCmdPath, sparkRLibPath, port, sparkVersion); diff --git a/spark/src/main/resources/R/zeppelin_sparkr.R b/spark/src/main/resources/R/zeppelin_sparkr.R index daea69f6c3b..d9517749bbf 100644 --- a/spark/src/main/resources/R/zeppelin_sparkr.R +++ b/spark/src/main/resources/R/zeppelin_sparkr.R @@ -45,10 +45,9 @@ assign("sc", get(".sc", envir = SparkR:::.sparkREnv), envir=.GlobalEnv) if (version >= 200) { assign(".sparkRsession", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSparkSession"), envir = SparkR:::.sparkREnv) assign("spark", get(".sparkRsession", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) -} else { - assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) - assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) } +assign(".sqlc", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getSqlContext"), envir = SparkR:::.sparkREnv) +assign("sqlContext", get(".sqlc", envir = SparkR:::.sparkREnv), envir = .GlobalEnv) assign(".zeppelinContext", SparkR:::callJStatic("org.apache.zeppelin.spark.ZeppelinRContext", "getZeppelinContext"), envir = .GlobalEnv) z.put <- function(name, object) {