From f3d11a07a9e3ef405d2c7376f802c92a14ce6788 Mon Sep 17 00:00:00 2001 From: astroshim Date: Sun, 18 Sep 2016 22:26:13 +0900 Subject: [PATCH 1/7] add checking the type should be detected. --- .../apache/zeppelin/interpreter/InterpreterOutput.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java index df0c210b8d9..e7462d8b23b 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java @@ -213,11 +213,17 @@ public byte[] toByteArray() throws IOException { return out.toByteArray(); } + private boolean typeShouldBeDetected() { + return (outList.size() > 0) ? false : true; + } + public void flush() throws IOException { synchronized (outList) { buffer.flush(); byte[] bytes = buffer.toByteArray(); - bytes = detectTypeFromLine(bytes); + if (typeShouldBeDetected()) { + bytes = detectTypeFromLine(bytes); + } if (bytes != null) { outList.add(bytes); if (type == InterpreterResult.Type.TEXT) { From 89a40b33dd11534ab44bd840774ecb669d365b5e Mon Sep 17 00:00:00 2001 From: astroshim Date: Thu, 29 Sep 2016 21:49:30 +0900 Subject: [PATCH 2/7] change condition of parsing --- .../java/org/apache/zeppelin/interpreter/InterpreterOutput.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java index e7462d8b23b..af6dbd680de 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterOutput.java @@ -214,7 +214,7 @@ public byte[] toByteArray() throws IOException { } private boolean typeShouldBeDetected() { - return (outList.size() > 0) ? false : true; + return getType() == InterpreterResult.Type.TABLE ? false : true; } public void flush() throws IOException { From 1d0ab93d266cc38c2758491baad6be1a3505010d Mon Sep 17 00:00:00 2001 From: astroshim Date: Fri, 30 Sep 2016 11:21:21 +0900 Subject: [PATCH 3/7] add testcase --- .../apache/zeppelin/interpreter/InterpreterOutputTest.java | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java index f8f4809a5eb..28a2a86bf2b 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/InterpreterOutputTest.java @@ -115,6 +115,13 @@ public void testType2() throws IOException { assertEquals(InterpreterResult.Type.HTML, out.getType()); } + @Test + public void testMagicData() throws IOException { + out.write("%table col1\tcol2\n%html

This is a hack

\t234\n".getBytes()); + assertEquals(InterpreterResult.Type.TABLE, out.getType()); + assertEquals("col1\tcol2\n%html

This is a hack

\t234\n", new String(out.toByteArray())); + } + @Override public void onAppend(InterpreterOutput out, byte[] line) { numAppendEvent++; From 499aa6bf3439b9e2524de59067f6d88eb6aec9bf Mon Sep 17 00:00:00 2001 From: astroshim Date: Thu, 20 Oct 2016 08:31:04 +0900 Subject: [PATCH 4/7] add PySparkInterpreter testcase --- .../zeppelin/spark/PySparkInterpreter.java | 54 ++++++- .../spark/PySparkInterpreterTest.java | 138 ++++++++++++++++++ 2 files changed, 184 insertions(+), 8 deletions(-) create mode 100644 spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index ed85558028d..30579e8982a 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -45,13 +45,8 @@ import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SQLContext; -import org.apache.zeppelin.interpreter.Interpreter; -import org.apache.zeppelin.interpreter.InterpreterContext; -import org.apache.zeppelin.interpreter.InterpreterException; -import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.LazyOpenInterpreter; -import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.slf4j.Logger; @@ -165,6 +160,49 @@ public void open() { } } + private Map setupPySparkEnv() throws IOException{ + Map env = EnvironmentUtils.getProcEnvironment(); + + String pysparkBasePath = new InterpreterProperty("SPARK_HOME", null, null, null).getValue(); + File pysparkPath; + if (null == pysparkBasePath) { + pysparkBasePath = + new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue(); + pysparkPath = new File(pysparkBasePath, + "interpreter" + File.separator + "spark" + File.separator + "pyspark"); + } else { + pysparkPath = new File(pysparkBasePath, + "python" + File.separator + "lib"); + } + + String pythonPath = (String) env.get("PYTHONPATH"); +/* + if (pythonPath == null) { + pythonPath = ""; + } else { + pythonPath += ":"; + } +*/ + //Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist + String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip", + "py4j-0.10.1-src.zip"}; + //ArrayList pythonLibUris = new ArrayList<>(); + for (String lib : pythonLibs) { + File libFile = new File(pysparkPath, lib); + if (libFile.exists()) { + if (pythonPath == null) { + pythonPath = ""; + } else { + pythonPath += ":"; + } + pythonPath += libFile.getAbsolutePath(); + //pythonLibUris.add(libFile.toURI().toString()); + } + } + env.put("PYTHONPATH", pythonPath); + return env; + } + private void createGatewayServerAndStartScript() { // create python script createPythonScript(); @@ -198,8 +236,8 @@ private void createGatewayServerAndStartScript() { try { - Map env = EnvironmentUtils.getProcEnvironment(); - + //Map env = EnvironmentUtils.getProcEnvironment(); + Map env = setupPySparkEnv(); executor.execute(cmd, env, this); pythonscriptRunning = true; } catch (IOException e) { diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java new file mode 100644 index 00000000000..90524ba6b91 --- /dev/null +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -0,0 +1,138 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zeppelin.spark; +import org.apache.zeppelin.interpreter.*; +import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.junit.After; +import org.junit.Before; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.File; +import java.util.*; + +import static org.junit.Assert.*; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class PySparkInterpreterTest { + public static LazyOpenInterpreter sparkInterpreter; + //public static SparkInterpreter sparkInterpreter; + public static PySparkInterpreter pySparkInterpreter; + public static InterpreterGroup intpGroup; + private File tmpDir; + public static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class); + + private static final String INTERPRETER_SCRIPT = + System.getProperty("os.name").startsWith("Windows") ? + "../bin/interpreter.cmd" : + "../bin/interpreter.sh"; + + public static Properties getPySparkTestProperties() { + Properties p = new Properties(); + p.setProperty("master", "local[*]"); + p.setProperty("spark.app.name", "Zeppelin Test"); + p.setProperty("zeppelin.spark.useHiveContext", "true"); + p.setProperty("zeppelin.spark.maxResult", "1000"); + p.setProperty("zeppelin.spark.importImplicit", "true"); + p.setProperty("zeppelin.pyspark.python", "python"); + //p.setProperty("zeppelin.interpreter.localRepo", "/home/nflabs/zeppelin/local-repo"); + + return p; + } + + @Before + public void setUp() throws Exception { + tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); + System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); + //System.setProperty("zeppelin.interpreter.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); + tmpDir.mkdirs(); + + +// String s = System.getProperty("PATH"); + //System.setProperty("PYTHONPATH", "/home/nflabs/zeppelin/spark-dependencies/target/spark-dist/spark-2.0.0/python:/home/nflabs/zeppelin/spark-dependencies/target/spark-dist/spark-2.0.0/python/lib/py4j-0.10.1-src.zip"); +/* + //System.setProperty("PATH", "/home/nflabs/zeppelin/spark-dependencies/target/spark-dist/spark-2.0.0/python/lib/py4j-0.10.1-src.zip"); + s = System.getProperty("PATH"); +*/ + + intpGroup = new InterpreterGroup(); + intpGroup.put("note", new LinkedList()); + +// SparkConf conf = sparkInterpreter.getSparkContext().getConf(); +// String zip = conf.get("spark.files"); +/* + + pysparkBasePath = + new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue(); + pysparkPath = new File(pysparkBasePath, + "interpreter" + File.separator + "spark" + File.separator + "pyspark"); +*/ + + + /* + RemoteInterpreter remoteInterpreter = createPysparkInterpreter(getPySparkTestProperties(), "note"); + intpGroup.get("note").add(remoteInterpreter); + remoteInterpreter.setInterpreterGroup(intpGroup); + remoteInterpreter.open(); +*/ + + if (sparkInterpreter == null) { + sparkInterpreter = new LazyOpenInterpreter(new SparkInterpreter(getPySparkTestProperties())); + //sparkInterpreter = new SparkInterpreter(getPySparkTestProperties()); + intpGroup.get("note").add(sparkInterpreter); + sparkInterpreter.setInterpreterGroup(intpGroup); + //sparkInterpreter.open(); + } + + if (pySparkInterpreter == null) { + //pySparkInterpreter = new LazyOpenInterpreter(new PySparkInterpreter(getPySparkTestProperties())); + pySparkInterpreter = new PySparkInterpreter(getPySparkTestProperties()); + intpGroup.get("note").add(pySparkInterpreter); + pySparkInterpreter.setInterpreterGroup(intpGroup); + pySparkInterpreter.open(); + } + } + + @After + public void tearDown() throws Exception { + delete(tmpDir); + } + + private void delete(File file) { + if (file.isFile()) file.delete(); + else if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null && files.length > 0) { + for (File f : files) { + delete(f); + } + } + file.delete(); + } + } + + @Test + public void testPySparkCompletion() { + //pySparkInterpreter.interpret("int(\"123\")", context).code(); + //List completions = pySparkInterpreter.completion("sc.", "sc.".length()); + List completions = pySparkInterpreter.completion("sc.", "sc.".length()); + assertTrue(completions.size() > 0); + } +} From 51aa813f0e4a9c2000f72e1ec10279f9f9328915 Mon Sep 17 00:00:00 2001 From: astroshim Date: Fri, 21 Oct 2016 23:43:45 +0900 Subject: [PATCH 5/7] add PySparkInterpreter testcase. --- .../zeppelin/spark/PySparkInterpreter.java | 44 ++---------- .../spark/PySparkInterpreterTest.java | 72 ++++++++----------- 2 files changed, 32 insertions(+), 84 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 30579e8982a..9e95c29b36e 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -160,46 +160,12 @@ public void open() { } } - private Map setupPySparkEnv() throws IOException{ + public Map setupPySparkEnv() throws IOException{ Map env = EnvironmentUtils.getProcEnvironment(); - - String pysparkBasePath = new InterpreterProperty("SPARK_HOME", null, null, null).getValue(); - File pysparkPath; - if (null == pysparkBasePath) { - pysparkBasePath = - new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue(); - pysparkPath = new File(pysparkBasePath, - "interpreter" + File.separator + "spark" + File.separator + "pyspark"); - } else { - pysparkPath = new File(pysparkBasePath, - "python" + File.separator + "lib"); + if (!env.containsKey("PYTHONPATH")) { + SparkConf conf = getSparkConf(); + env.put("PYTHONPATH", conf.get("spark.files").replaceAll(",", ":")); } - - String pythonPath = (String) env.get("PYTHONPATH"); -/* - if (pythonPath == null) { - pythonPath = ""; - } else { - pythonPath += ":"; - } -*/ - //Only one of py4j-0.9-src.zip and py4j-0.8.2.1-src.zip should exist - String[] pythonLibs = new String[]{"pyspark.zip", "py4j-0.9-src.zip", "py4j-0.8.2.1-src.zip", - "py4j-0.10.1-src.zip"}; - //ArrayList pythonLibUris = new ArrayList<>(); - for (String lib : pythonLibs) { - File libFile = new File(pysparkPath, lib); - if (libFile.exists()) { - if (pythonPath == null) { - pythonPath = ""; - } else { - pythonPath += ":"; - } - pythonPath += libFile.getAbsolutePath(); - //pythonLibUris.add(libFile.toURI().toString()); - } - } - env.put("PYTHONPATH", pythonPath); return env; } @@ -234,9 +200,7 @@ private void createGatewayServerAndStartScript() { executor.setStreamHandler(streamHandler); executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); - try { - //Map env = EnvironmentUtils.getProcEnvironment(); Map env = setupPySparkEnv(); executor.execute(cmd, env, this); pythonscriptRunning = true; diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index 90524ba6b91..a5f7d1fcafe 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -16,8 +16,11 @@ */ package org.apache.zeppelin.spark; +import org.apache.zeppelin.display.AngularObjectRegistry; +import org.apache.zeppelin.display.GUI; import org.apache.zeppelin.interpreter.*; -import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; +import org.apache.zeppelin.resource.LocalResourcePool; +import org.apache.zeppelin.user.AuthenticationInfo; import org.junit.After; import org.junit.Before; import org.junit.FixMethodOrder; @@ -33,16 +36,11 @@ @FixMethodOrder(MethodSorters.NAME_ASCENDING) public class PySparkInterpreterTest { public static LazyOpenInterpreter sparkInterpreter; - //public static SparkInterpreter sparkInterpreter; public static PySparkInterpreter pySparkInterpreter; public static InterpreterGroup intpGroup; private File tmpDir; public static Logger LOGGER = LoggerFactory.getLogger(PySparkInterpreterTest.class); - - private static final String INTERPRETER_SCRIPT = - System.getProperty("os.name").startsWith("Windows") ? - "../bin/interpreter.cmd" : - "../bin/interpreter.sh"; + private InterpreterContext context; public static Properties getPySparkTestProperties() { Properties p = new Properties(); @@ -52,8 +50,6 @@ public static Properties getPySparkTestProperties() { p.setProperty("zeppelin.spark.maxResult", "1000"); p.setProperty("zeppelin.spark.importImplicit", "true"); p.setProperty("zeppelin.pyspark.python", "python"); - //p.setProperty("zeppelin.interpreter.localRepo", "/home/nflabs/zeppelin/local-repo"); - return p; } @@ -61,53 +57,42 @@ public static Properties getPySparkTestProperties() { public void setUp() throws Exception { tmpDir = new File(System.getProperty("java.io.tmpdir") + "/ZeppelinLTest_" + System.currentTimeMillis()); System.setProperty("zeppelin.dep.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); - //System.setProperty("zeppelin.interpreter.localrepo", tmpDir.getAbsolutePath() + "/local-repo"); tmpDir.mkdirs(); - -// String s = System.getProperty("PATH"); - //System.setProperty("PYTHONPATH", "/home/nflabs/zeppelin/spark-dependencies/target/spark-dist/spark-2.0.0/python:/home/nflabs/zeppelin/spark-dependencies/target/spark-dist/spark-2.0.0/python/lib/py4j-0.10.1-src.zip"); -/* - //System.setProperty("PATH", "/home/nflabs/zeppelin/spark-dependencies/target/spark-dist/spark-2.0.0/python/lib/py4j-0.10.1-src.zip"); - s = System.getProperty("PATH"); -*/ - intpGroup = new InterpreterGroup(); intpGroup.put("note", new LinkedList()); -// SparkConf conf = sparkInterpreter.getSparkContext().getConf(); -// String zip = conf.get("spark.files"); -/* - - pysparkBasePath = - new InterpreterProperty("ZEPPELIN_HOME", "zeppelin.home", "../", null).getValue(); - pysparkPath = new File(pysparkBasePath, - "interpreter" + File.separator + "spark" + File.separator + "pyspark"); -*/ - - - /* - RemoteInterpreter remoteInterpreter = createPysparkInterpreter(getPySparkTestProperties(), "note"); - intpGroup.get("note").add(remoteInterpreter); - remoteInterpreter.setInterpreterGroup(intpGroup); - remoteInterpreter.open(); -*/ - if (sparkInterpreter == null) { sparkInterpreter = new LazyOpenInterpreter(new SparkInterpreter(getPySparkTestProperties())); - //sparkInterpreter = new SparkInterpreter(getPySparkTestProperties()); intpGroup.get("note").add(sparkInterpreter); sparkInterpreter.setInterpreterGroup(intpGroup); - //sparkInterpreter.open(); } if (pySparkInterpreter == null) { - //pySparkInterpreter = new LazyOpenInterpreter(new PySparkInterpreter(getPySparkTestProperties())); pySparkInterpreter = new PySparkInterpreter(getPySparkTestProperties()); intpGroup.get("note").add(pySparkInterpreter); pySparkInterpreter.setInterpreterGroup(intpGroup); pySparkInterpreter.open(); } + + context = new InterpreterContext("note", "id", "title", "text", + new AuthenticationInfo(), + new HashMap(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LocalResourcePool("id"), + new LinkedList(), + new InterpreterOutput(new InterpreterOutputListener() { + @Override + public void onAppend(InterpreterOutput out, byte[] line) { + + } + + @Override + public void onUpdate(InterpreterOutput out, byte[] output) { + + } + })); } @After @@ -129,10 +114,9 @@ else if (file.isDirectory()) { } @Test - public void testPySparkCompletion() { - //pySparkInterpreter.interpret("int(\"123\")", context).code(); - //List completions = pySparkInterpreter.completion("sc.", "sc.".length()); - List completions = pySparkInterpreter.completion("sc.", "sc.".length()); - assertTrue(completions.size() > 0); + public void testBasicIntp() { + assertEquals(InterpreterResult.Code.SUCCESS, + pySparkInterpreter.interpret("a = 1\n", context).code()); } + } From 265a82b93a66ec568bc3550c89b89fd562a403cb Mon Sep 17 00:00:00 2001 From: astroshim Date: Sat, 22 Oct 2016 00:50:04 +0900 Subject: [PATCH 6/7] change scope --- .../main/java/org/apache/zeppelin/spark/PySparkInterpreter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 9e95c29b36e..7795f95d1a7 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -160,7 +160,7 @@ public void open() { } } - public Map setupPySparkEnv() throws IOException{ + private Map setupPySparkEnv() throws IOException{ Map env = EnvironmentUtils.getProcEnvironment(); if (!env.containsKey("PYTHONPATH")) { SparkConf conf = getSparkConf(); From fcdd7e5b5d1e9146186306a4e7f7732a8c137aa3 Mon Sep 17 00:00:00 2001 From: astroshim Date: Fri, 4 Nov 2016 09:45:34 +0900 Subject: [PATCH 7/7] rebase --- .../zeppelin/spark/PySparkInterpreter.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index ed85558028d..420ebd5fda9 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -49,9 +49,9 @@ import org.apache.zeppelin.interpreter.InterpreterContext; import org.apache.zeppelin.interpreter.InterpreterException; import org.apache.zeppelin.interpreter.InterpreterResult; -import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.LazyOpenInterpreter; import org.apache.zeppelin.interpreter.WrappedInterpreter; +import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.thrift.InterpreterCompletion; import org.apache.zeppelin.spark.dep.SparkDependencyContext; import org.slf4j.Logger; @@ -115,7 +115,7 @@ public void open() { // load libraries from Dependency Interpreter URL [] urls = new URL[0]; - List urlList = new LinkedList(); + List urlList = new LinkedList<>(); if (depInterpreter != null) { SparkDependencyContext depc = depInterpreter.getDependencyContext(); @@ -165,6 +165,15 @@ public void open() { } } + private Map setupPySparkEnv() throws IOException{ + Map env = EnvironmentUtils.getProcEnvironment(); + if (!env.containsKey("PYTHONPATH")) { + SparkConf conf = getSparkConf(); + env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":")); + } + return env; + } + private void createGatewayServerAndStartScript() { // create python script createPythonScript(); @@ -196,10 +205,8 @@ private void createGatewayServerAndStartScript() { executor.setStreamHandler(streamHandler); executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); - try { - Map env = EnvironmentUtils.getProcEnvironment(); - + Map env = setupPySparkEnv(); executor.execute(cmd, env, this); pythonscriptRunning = true; } catch (IOException e) {