From 3a1d8a737b22d9f6b32b3c417f8d373d4f617021 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 13 Jun 2018 10:14:14 +0800 Subject: [PATCH] ZEPPELIN-3538. Fail to bootstrap PySpark in yarn cluster mode --- .../zeppelin/python/PythonInterpreter.java | 16 ++++++++++++++-- .../src/main/resources/python/zeppelin_python.py | 11 ++++++++--- .../zeppelin/spark/PySparkInterpreter.java | 6 +++--- .../zeppelin/interpreter/MiniZeppelin.java | 1 + .../src/test/resources/log4j.properties | 2 ++ 5 files changed, 28 insertions(+), 8 deletions(-) diff --git a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java index 95cfc824053..8805dc1b989 100644 --- a/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java +++ b/python/src/main/java/org/apache/zeppelin/python/PythonInterpreter.java @@ -68,7 +68,7 @@ */ public class PythonInterpreter extends Interpreter implements ExecuteResultHandler { private static final Logger LOGGER = LoggerFactory.getLogger(PythonInterpreter.class); - private static final int MAX_TIMEOUT_SEC = 10; + private static final int MAX_TIMEOUT_SEC = 30; private GatewayServer gatewayServer; private DefaultExecutor executor; @@ -292,10 +292,16 @@ public void setPythonExec(String pythonExec) { public class PythonInterpretRequest { public String statements; public boolean isForCompletion; + public boolean isCallHooks; public PythonInterpretRequest(String statements, boolean isForCompletion) { + this(statements, isForCompletion, true); + } + + public PythonInterpretRequest(String statements, boolean isForCompletion, boolean isCallHooks) { this.statements = statements; this.isForCompletion = isForCompletion; + this.isCallHooks = isCallHooks; } public String statements() { @@ -305,6 +311,10 @@ public String statements() { public boolean isForCompletion() { return isForCompletion; } + + public boolean isCallHooks() { + return isCallHooks; + } } // called by Python Process @@ -602,7 +612,9 @@ protected void bootstrapInterpreter(String resourceName) throws IOException { String bootstrapCode = IOUtils.toString(getClass().getClassLoader().getResourceAsStream(resourceName)); try { - InterpreterResult result = interpret(bootstrapCode, InterpreterContext.get()); + // Add hook explicitly, otherwise python will fail to execute the statement + InterpreterResult result = interpret(bootstrapCode + "\n" + "__zeppelin__._displayhook()", + InterpreterContext.get()); if (result.code() != Code.SUCCESS) { throw new IOException("Fail to run bootstrap script: " + resourceName); } diff --git a/python/src/main/resources/python/zeppelin_python.py b/python/src/main/resources/python/zeppelin_python.py index 19fa2201a40..5ad16a496f1 100644 --- a/python/src/main/resources/python/zeppelin_python.py +++ b/python/src/main/resources/python/zeppelin_python.py @@ -111,12 +111,18 @@ def getCompletion(self, text_value): # Get post-execute hooks try: - global_hook = intp.getHook('post_exec_dev') + if req.isCallHooks(): + global_hook = intp.getHook('post_exec_dev') + else: + global_hook = None except: global_hook = None try: - user_hook = __zeppelin__.getHook('post_exec') + if req.isCallHooks(): + user_hook = __zeppelin__.getHook('post_exec') + else: + user_hook = None except: user_hook = None @@ -133,7 +139,6 @@ def getCompletion(self, text_value): to_run_hooks = [] if (nhooks > 0): to_run_hooks = code.body[-nhooks:] - to_run_exec, to_run_single = (code.body[:-(nhooks + 1)], [code.body[-(nhooks + 1)]]) try: diff --git a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 1df6e2edfe6..f3fee2111fb 100644 --- a/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/interpreter/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -130,11 +130,11 @@ public void open() throws InterpreterException { try { URLClassLoader newCl = new URLClassLoader(urls, oldCl); Thread.currentThread().setContextClassLoader(newCl); - // create Python Process and JVM gateway - super.open(); // must create spark interpreter after ClassLoader is set, otherwise the additional jars // can not be loaded by spark repl. this.sparkInterpreter = getSparkInterpreter(); + // create Python Process and JVM gateway + super.open(); } finally { Thread.currentThread().setContextClassLoader(oldCl); } @@ -175,7 +175,7 @@ protected void preCallPython(InterpreterContext context) { String jobDesc = "Started by: " + Utils.getUserName(context.getAuthenticationInfo()); callPython(new PythonInterpretRequest( String.format("if 'sc' in locals():\n\tsc.setJobGroup('%s', '%s')", jobGroup, jobDesc), - false)); + false, false)); } // Run python shell diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java index 923ae5a022f..14d0166e47d 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/MiniZeppelin.java @@ -38,6 +38,7 @@ public void start() throws IOException { FileUtils.copyFile(new File(zeppelinHome, "conf/log4j_yarn_cluster.properties"), new File(confDir, "log4j_yarn_cluster.properties")); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_CONF_DIR.getVarName(), confDir.getAbsolutePath()); System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_NOTEBOOK_DIR.getVarName(), notebookDir.getAbsolutePath()); + System.setProperty(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "60000"); conf = new ZeppelinConfiguration(); interpreterSettingManager = new InterpreterSettingManager(conf, mock(AngularObjectRegistryListener.class), mock(RemoteInterpreterProcessListener.class), mock(ApplicationEventListener.class)); diff --git a/zeppelin-zengine/src/test/resources/log4j.properties b/zeppelin-zengine/src/test/resources/log4j.properties index e45936ebbe7..c8e4342ab6d 100644 --- a/zeppelin-zengine/src/test/resources/log4j.properties +++ b/zeppelin-zengine/src/test/resources/log4j.properties @@ -44,3 +44,5 @@ log4j.logger.org.hibernate.type=ALL log4j.logger.org.apache.hadoop=WARN log4j.logger.org.apache.zeppelin.plugin=DEBUG +log4j.logger.org.apache.zeppelin.spark=DEBUG +log4j.logger.org.apache.zeppelin.python=DEBUG \ No newline at end of file