From 7e70289a6bfb17afa4329d122d3f523c36788aca Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 22 Jun 2015 16:38:17 -0700 Subject: [PATCH] Increase interpreter connection timeout and make it configurable --- conf/zeppelin-site.xml.template | 7 ++++ .../interpreter/remote/RemoteInterpreter.java | 11 +++++-- .../remote/RemoteInterpreterProcess.java | 13 +++++--- .../remote/RemoteAngularObjectTest.java | 3 +- .../remote/RemoteInterpreterProcessTest.java | 4 +-- .../remote/RemoteInterpreterTest.java | 33 ++++++++++++------- .../scheduler/RemoteSchedulerTest.java | 3 +- .../zeppelin/conf/ZeppelinConfiguration.java | 1 + .../interpreter/InterpreterFactory.java | 4 ++- 9 files changed, 55 insertions(+), 24 deletions(-) diff --git a/conf/zeppelin-site.xml.template b/conf/zeppelin-site.xml.template index b467a825b70..8f8bf065f3e 100644 --- a/conf/zeppelin-site.xml.template +++ b/conf/zeppelin-site.xml.template @@ -70,6 +70,13 @@ Comma separated interpreter configurations. First interpreter become a default + + zeppelin.interpreter.connect.timeout + 30000 + Interpreter process connect timeout in msec. + + + zeppelin.ssl false diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 1637e9c0430..22818fce872 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -58,11 +58,13 @@ public class RemoteInterpreter extends Interpreter { = new HashMap(); private InterpreterContextRunnerPool interpreterContextRunnerPool; + private int connectTimeout; public RemoteInterpreter(Properties property, String className, String interpreterRunner, - String interpreterPath) { + String interpreterPath, + int connectTimeout) { super(property); this.className = className; @@ -71,18 +73,21 @@ public RemoteInterpreter(Properties property, this.interpreterPath = interpreterPath; env = new HashMap(); interpreterContextRunnerPool = new InterpreterContextRunnerPool(); + this.connectTimeout = connectTimeout; } public RemoteInterpreter(Properties property, String className, String interpreterRunner, String interpreterPath, - Map env) { + Map env, + int connectTimeout) { super(property); this.className = className; this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; this.env = env; + this.connectTimeout = connectTimeout; } @Override @@ -333,7 +338,7 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) { || (!intpProcess.isRunning() && intpProcess.getPort() == -1)) { interpreterGroupReference.put(getInterpreterGroupKey(interpreterGroup), new RemoteInterpreterProcess(interpreterRunner, - interpreterPath, env, interpreterContextRunnerPool)); + interpreterPath, env, interpreterContextRunnerPool, connectTimeout)); logger.info("setInterpreterGroup = " + getInterpreterGroupKey(interpreterGroup) + " class=" + className diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 61fcb706931..5dd2a653982 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -50,26 +50,29 @@ public class RemoteInterpreterProcess implements ExecuteResultHandler { private Map env; private final RemoteInterpreterEventPoller remoteInterpreterEventPoller; private final InterpreterContextRunnerPool interpreterContextRunnerPool; + private int connectTimeout; public RemoteInterpreterProcess(String intpRunner, String intpDir, Map env, - InterpreterContextRunnerPool interpreterContextRunnerPool) { + InterpreterContextRunnerPool interpreterContextRunnerPool, int connectTimeout) { this(intpRunner, intpDir, env, interpreterContextRunnerPool, - new RemoteInterpreterEventPoller()); + new RemoteInterpreterEventPoller(), connectTimeout); } RemoteInterpreterProcess(String intpRunner, String intpDir, Map env, InterpreterContextRunnerPool interpreterContextRunnerPool, - RemoteInterpreterEventPoller remoteInterpreterEventPoller) { + RemoteInterpreterEventPoller remoteInterpreterEventPoller, + int connectTimeout) { this.interpreterRunner = intpRunner; this.interpreterDir = intpDir; this.env = env; this.interpreterContextRunnerPool = interpreterContextRunnerPool; referenceCount = new AtomicInteger(0); this.remoteInterpreterEventPoller = remoteInterpreterEventPoller; + this.connectTimeout = connectTimeout; } @@ -113,7 +116,7 @@ public int reference(InterpreterGroup interpreterGroup) { long startTime = System.currentTimeMillis(); - while (System.currentTimeMillis() - startTime < 5 * 1000) { + while (System.currentTimeMillis() - startTime < connectTimeout) { if (RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", port)) { break; } else { @@ -123,7 +126,7 @@ public int reference(InterpreterGroup interpreterGroup) { } } } - + clientPool = new GenericObjectPool(new ClientFactory("localhost", port)); remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java index d4909e346ec..e6da1ec1df0 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectTest.java @@ -68,7 +68,8 @@ public void setUp() throws Exception { MockInterpreterAngular.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intp); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java index 4ea9a304610..004327292c5 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessTest.java @@ -33,7 +33,7 @@ public class RemoteInterpreterProcessTest { public void testStartStop() { InterpreterGroup intpGroup = new InterpreterGroup(); RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap(), - new InterpreterContextRunnerPool()); + new InterpreterContextRunnerPool(), 10 * 1000); assertFalse(rip.isRunning()); assertEquals(0, rip.referenceCount()); assertEquals(1, rip.reference(intpGroup)); @@ -49,7 +49,7 @@ public void testStartStop() { public void testClientFactory() throws Exception { InterpreterGroup intpGroup = new InterpreterGroup(); RemoteInterpreterProcess rip = new RemoteInterpreterProcess("../bin/interpreter.sh", "nonexists", new HashMap(), - new InterpreterContextRunnerPool(), mock(RemoteInterpreterEventPoller.class)); + new InterpreterContextRunnerPool(), mock(RemoteInterpreterEventPoller.class), 10 * 1000); rip.reference(intpGroup); assertEquals(0, rip.getNumActiveClient()); assertEquals(0, rip.getNumIdleClient()); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 4d5636d36b2..b49f86d2eda 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -74,7 +74,8 @@ public void testRemoteInterperterCall() throws TTransportException, IOException MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intpA); @@ -85,7 +86,8 @@ public void testRemoteInterperterCall() throws TTransportException, IOException MockInterpreterB.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intpB); @@ -135,7 +137,8 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intpA); @@ -146,7 +149,8 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException MockInterpreterB.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intpB); @@ -197,7 +201,8 @@ public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOExc MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intpA); @@ -208,7 +213,8 @@ public void testRemoteSchedulerSharingSubmit() throws TTransportException, IOExc MockInterpreterB.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intpB); @@ -311,7 +317,8 @@ public void testRunOrderPreserved() throws InterruptedException { MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intpA); @@ -390,7 +397,8 @@ public void testRunParallel() throws InterruptedException { MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intpA); @@ -468,7 +476,8 @@ public void testInterpreterGroupResetBeforeProcessStarts() { MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpA.setInterpreterGroup(intpGroup); @@ -489,7 +498,8 @@ public void testInterpreterGroupResetAfterProcessFinished() { MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpA.setInterpreterGroup(intpGroup); @@ -513,7 +523,8 @@ public void testInterpreterGroupResetDuringProcessRunning() { MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpA.setInterpreterGroup(intpGroup); diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java index bb0fb8074f8..2a1075a4173 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/scheduler/RemoteSchedulerTest.java @@ -62,7 +62,8 @@ public void test() throws Exception { MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), "fake", - env + env, + 10 * 1000 ); intpGroup.add(intpA); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java index 6bc8a6cea1e..9f057425251 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/conf/ZeppelinConfiguration.java @@ -401,6 +401,7 @@ public static enum ConfVars { + "org.apache.zeppelin.ignite.IgniteInterpreter," + "org.apache.zeppelin.ignite.IgniteSqlInterpreter"), ZEPPELIN_INTERPRETER_DIR("zeppelin.interpreter.dir", "interpreter"), + ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT("zeppelin.interpreter.connect.timeout", 30000), ZEPPELIN_ENCODING("zeppelin.encoding", "UTF-8"), ZEPPELIN_NOTEBOOK_DIR("zeppelin.notebook.dir", "notebook"), ZEPPELIN_NOTEBOOK_STORAGE("zeppelin.notebook.storage", VFSNotebookRepo.class.getName()), diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java index c8fc485d281..77df7c51d8c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterFactory.java @@ -621,8 +621,10 @@ private Interpreter createRepl(String dirName, String className, private Interpreter createRemoteRepl(String interpreterPath, String className, Properties property) { + int connectTimeout = conf.getInt(ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); LazyOpenInterpreter intp = new LazyOpenInterpreter(new RemoteInterpreter( - property, className, conf.getInterpreterRemoteRunnerPath(), interpreterPath)); + property, className, conf.getInterpreterRemoteRunnerPath(), + interpreterPath, connectTimeout)); return intp; }