From 9426d8e415c40264c191af44c434dc9f9952c150 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Fri, 15 Jun 2018 11:32:19 +0800 Subject: [PATCH] ZEPPELIN-3352. Improve RemoteInterpreterProcess creation timeout mechanism --- .../zeppelin/conf/ZeppelinConfiguration.java | 2 +- .../launcher/InterpreterClient.java | 4 +++- .../launcher/InterpreterLauncher.java | 13 +++++++++- .../launcher/ShellScriptLauncher.java | 3 +-- .../RemoteInterpreterManagedProcess.java | 10 +++++--- .../launcher/ShellScriptLauncherTest.java | 24 +++++++++++++++++++ .../SparkInterpreterLauncherTest.java | 22 +++++++++++++++++ 7 files changed, 70 insertions(+), 8 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index dc46dd0ac50..635adbffff3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -728,7 +728,7 @@ public enum ConfVars { ZEPPELIN_INTERPRETER_LOCALREPO("zeppelin.interpreter.localRepo", "local-repo"), ZEPPELIN_INTERPRETER_DEP_MVNREPO("zeppelin.interpreter.dep.mvnRepo", "http://repo1.maven.org/maven2/"), - ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), + ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 60000), ZEPPELIN_INTERPRETER_MAX_POOL_SIZE("zeppelin.interpreter.max.poolsize", 10), ZEPPELIN_INTERPRETER_GROUP_ORDER("zeppelin.interpreter.group.order", "spark,md,angular,sh," + "livy,alluxio,file,psql,flink,python,ignite,lens,cassandra,geode,kylin,elasticsearch," diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java index 26da27032f1..bfd3e446a2f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java @@ -17,6 +17,8 @@ package org.apache.zeppelin.interpreter.launcher; +import java.io.IOException; + /** * Interface to InterpreterClient which is created by InterpreterLauncher. This is the component * that is used to for the communication from zeppelin-server process to zeppelin interpreter @@ -26,7 +28,7 @@ public interface InterpreterClient { String getInterpreterSettingName(); - void start(String userName); + void start(String userName) throws IOException; void stop(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java index 1cee20e7a04..30cf995db01 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java @@ -37,5 +37,16 @@ public InterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage recovery this.recoveryStorage = recoveryStorage; } - public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException; + protected int getConnectTimeout() { + int connectTimeout = + zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + if (properties.containsKey( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName())) { + connectTimeout = Integer.parseInt(properties.getProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName())); + } + return connectTimeout; + } + + public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException; } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java index b271d9bfe81..18a6ddecdf9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncher.java @@ -51,8 +51,7 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep InterpreterRunner runner = context.getRunner(); String groupName = context.getInterpreterSettingGroup(); String name = context.getInterpreterSettingName(); - int connectTimeout = - zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); + int connectTimeout = getConnectTimeout(); if (option.isExistingProcess()) { return new RemoteInterpreterRunningProcess( diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java index 5160522680b..db6d263a6b7 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java @@ -97,7 +97,7 @@ public int getPort() { } @Override - public void start(String userName) { + public void start(String userName) throws IOException { // start server process CommandLine cmdLine = CommandLine.parse(interpreterRunner); cmdLine.addArgument("-d", false); @@ -143,11 +143,15 @@ public void start(String userName) { try { synchronized (running) { if (!running.get()) { - running.wait(getConnectTimeout() * 2); + running.wait(getConnectTimeout()); } } if (!running.get()) { - throw new RuntimeException(new String(cmdOut.toByteArray())); + throw new IOException(new String( + String.format("Interpreter Process creation is time out in %d seconds", + getConnectTimeout()/1000) + "\n" + "You can increase timeout threshold via " + + "setting zeppelin.interpreter.connect.timeout of this interpreter.\n" + + cmdOut.toString())); } } catch (InterruptedException e) { logger.error("Remote interpreter is not accessible"); diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java index 6301d0d34d0..ace3f317401 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/ShellScriptLauncherTest.java @@ -53,10 +53,34 @@ public void testLauncher() throws IOException { assertEquals("name", interpreterProcess.getInterpreterSettingName()); assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); + assertEquals(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getIntValue(), + interpreterProcess.getConnectTimeout()); assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); assertEquals(1, interpreterProcess.getEnv().size()); assertEquals("VALUE_1", interpreterProcess.getEnv().get("ENV_1")); assertEquals(true, interpreterProcess.isUserImpersonated()); } + @Test + public void testConnectTimeOut() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + ShellScriptLauncher launcher = new ShellScriptLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("name", interpreterProcess.getInterpreterSettingName()); + assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); + assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); + assertEquals(10000, interpreterProcess.getConnectTimeout()); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertEquals(0, interpreterProcess.getEnv().size()); + assertEquals(true, interpreterProcess.isUserImpersonated()); + } + } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index 2e4b165da76..7aab026fc52 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -37,6 +37,28 @@ public void setUp() { } } + @Test + public void testConnectTimeOut() throws IOException { + ZeppelinConfiguration zConf = new ZeppelinConfiguration(); + SparkInterpreterLauncher launcher = new SparkInterpreterLauncher(zConf, null); + Properties properties = new Properties(); + properties.setProperty( + ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT.getVarName(), "10000"); + InterpreterOption option = new InterpreterOption(); + option.setUserImpersonate(true); + InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); + InterpreterClient client = launcher.launch(context); + assertTrue( client instanceof RemoteInterpreterManagedProcess); + RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + assertEquals("name", interpreterProcess.getInterpreterSettingName()); + assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); + assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); + assertEquals(10000, interpreterProcess.getConnectTimeout()); + assertEquals(zConf.getInterpreterRemoteRunnerPath(), interpreterProcess.getInterpreterRunner()); + assertTrue(interpreterProcess.getEnv().size() >= 2); + assertEquals(true, interpreterProcess.isUserImpersonated()); + } + @Test public void testLocalMode() throws IOException { ZeppelinConfiguration zConf = new ZeppelinConfiguration();