From e1221712f1964ff56884aba781a09e56b5edde33 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Mon, 7 Sep 2015 11:43:45 -0700 Subject: [PATCH 1/6] Better handling of RemoteInterpreter shutdown. Share scheduler instance among RemoteInterpreter in the same group --- .../interpreter/remote/RemoteInterpreter.java | 30 +++++++++++++++---- .../remote/RemoteInterpreterProcess.java | 15 ++++++---- .../remote/RemoteInterpreterServer.java | 18 ++++++++++- .../remote/RemoteInterpreterTest.java | 2 +- 4 files changed, 51 insertions(+), 14 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index d5d92c83c01..6b613ad0d48 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -32,6 +32,7 @@ import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; import org.apache.zeppelin.interpreter.InterpreterResult.Type; +import org.apache.zeppelin.interpreter.WrappedInterpreter; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; @@ -86,7 +87,7 @@ public RemoteInterpreter(Properties property, this.interpreterRunner = interpreterRunner; this.interpreterPath = interpreterPath; this.env = env; - this.connectTimeout = connectTimeout; + this.connectTimeout = connectTimeout; } @Override @@ -320,11 +321,28 @@ public List completion(String buf, int cursor) { @Override public Scheduler getScheduler() { int maxConcurrency = 10; - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - return SchedulerFactory.singleton().createOrGetRemoteScheduler( - "remoteinterpreter_" + interpreterProcess.hashCode(), - getInterpreterProcess(), - maxConcurrency); + // Share a single RemoteScheduler instance among all RemoteInterpreter in + // the same InterpreterGroup. + // If each RemoteInterpreter use each RemoteScheduler instance, can not guarantee + // job submit sequence when Two or more interpreter shares single Scheduler. + + InterpreterGroup intpGroup = getInterpreterGroup(); + Interpreter firstInterpreter = intpGroup.get(0); + + Interpreter innerInterpreter = firstInterpreter; + while (innerInterpreter instanceof WrappedInterpreter) { + innerInterpreter = ((WrappedInterpreter) innerInterpreter).getInnerInterpreter(); + } + + if (innerInterpreter.equals(this)) { + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + return SchedulerFactory.singleton().createOrGetRemoteScheduler( + "remoteinterpreter_" + interpreterProcess.hashCode(), + getInterpreterProcess(), + maxConcurrency); + } else { + return innerInterpreter.getScheduler(); + } } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index 534af271d2f..0c9e877e4ea 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -37,7 +37,7 @@ */ public class RemoteInterpreterProcess implements ExecuteResultHandler { private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); - + private final AtomicInteger referenceCount; private DefaultExecutor executor; private ExecuteWatchdog watchdog; @@ -124,7 +124,7 @@ public int reference(InterpreterGroup interpreterGroup) { } } } - + clientPool = new GenericObjectPool(new ClientFactory("localhost", port)); remoteInterpreterEventPoller.setInterpreterGroup(interpreterGroup); @@ -151,13 +151,16 @@ public int dereference() { remoteInterpreterEventPoller.shutdown(); // first try shutdown + Client client = null; try { - Client client = getClient(); + client = getClient(); client.shutdown(); - releaseClient(client); } catch (Exception e) { - logger.error("Error", e); - watchdog.destroyProcess(); + // safely ignore exception while client.shutdown() may terminates remote process + } finally { + if (client != null) { + releaseClient(client); + } } clientPool.clear(); diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index 16b188394d0..7405a6660bf 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -100,10 +100,26 @@ public void run() { @Override public void shutdown() throws TException { + interpreterGroup.close(); + interpreterGroup.destroy(); + + server.stop(); + // server.stop() does not always finish server.serve() loop // sometimes server.serve() is hanging even after server.stop() call. // this case, need to force kill the process - server.stop(); + + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < 2000 && server.isServing()) { + try { + Thread.sleep(300); + } catch (InterruptedException e) { + } + } + + if (server.isServing()) { + System.exit(0); + } } public int getPort() { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 0c74cea54d4..7552b88d06f 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -63,7 +63,7 @@ public void setUp() throws Exception { @After public void tearDown() throws Exception { - intpGroup.clone(); + intpGroup.close(); intpGroup.destroy(); } From cdf3c4b8648e48cc0d5c665c9ab1e95fde1da344 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Fri, 13 Nov 2015 21:51:59 +0900 Subject: [PATCH 2/6] Add test for sharing scheduler instance --- .../interpreter/remote/RemoteInterpreter.java | 26 +++----------- .../remote/RemoteInterpreterTest.java | 35 ++++++++++++++++++- 2 files changed, 38 insertions(+), 23 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 6b613ad0d48..9ae6f2e1926 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -321,28 +321,10 @@ public List completion(String buf, int cursor) { @Override public Scheduler getScheduler() { int maxConcurrency = 10; - // Share a single RemoteScheduler instance among all RemoteInterpreter in - // the same InterpreterGroup. - // If each RemoteInterpreter use each RemoteScheduler instance, can not guarantee - // job submit sequence when Two or more interpreter shares single Scheduler. - - InterpreterGroup intpGroup = getInterpreterGroup(); - Interpreter firstInterpreter = intpGroup.get(0); - - Interpreter innerInterpreter = firstInterpreter; - while (innerInterpreter instanceof WrappedInterpreter) { - innerInterpreter = ((WrappedInterpreter) innerInterpreter).getInnerInterpreter(); - } - - if (innerInterpreter.equals(this)) { - RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); - return SchedulerFactory.singleton().createOrGetRemoteScheduler( - "remoteinterpreter_" + interpreterProcess.hashCode(), - getInterpreterProcess(), - maxConcurrency); - } else { - return innerInterpreter.getScheduler(); - } + RemoteInterpreterProcess interpreterProcess = getInterpreterProcess(); + return SchedulerFactory.singleton().createOrGetRemoteScheduler( + "remoteinterpreter_" + interpreterProcess.hashCode(), getInterpreterProcess(), + maxConcurrency); } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 7552b88d06f..a0b888becc5 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -38,7 +38,6 @@ import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterServer.InterpretJob; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterA; import org.apache.zeppelin.interpreter.remote.mock.MockInterpreterB; import org.apache.zeppelin.scheduler.Job; @@ -579,4 +578,38 @@ public void testInterpreterGroupResetDuringProcessRunning() { processA.dereference(); // intpA.close(); } + + @Test + public void testRemoteInterpreterSharesTheSameSchedulerInstanceInTheSameGroup() { + Properties p = new Properties(); + + RemoteInterpreter intpA = new RemoteInterpreter( + p, + MockInterpreterA.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000 + ); + + intpGroup.add(intpA); + intpA.setInterpreterGroup(intpGroup); + + RemoteInterpreter intpB = new RemoteInterpreter( + p, + MockInterpreterB.class.getName(), + new File("../bin/interpreter.sh").getAbsolutePath(), + "fake", + env, + 10 * 1000 + ); + + intpGroup.add(intpB); + intpB.setInterpreterGroup(intpGroup); + + intpA.open(); + intpB.open(); + + assertEquals(intpA.getScheduler(), intpB.getScheduler()); + } } From e4a306f35731c2f4c29d6c0b582987ab48f6ac9c Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Fri, 13 Nov 2015 23:00:04 +0900 Subject: [PATCH 3/6] Clear reference to interpreterProcess from interpreterGroupReference with in close() to make sure not reusing after restart --- .../interpreter/remote/RemoteInterpreter.java | 8 ++- .../remote/RemoteInterpreterTest.java | 54 ++++++++++++++++--- 2 files changed, 55 insertions(+), 7 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 9ae6f2e1926..91dae3a0b49 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -181,7 +181,13 @@ public void close() { interpreterProcess.releaseClient(client); } - interpreterProcess.dereference(); + int r = interpreterProcess.dereference(); + if (r == 0) { + synchronized (interpreterGroupReference) { + InterpreterGroup intpGroup = getInterpreterGroup(); + interpreterGroupReference.remove(getInterpreterGroupKey(intpGroup)); + } + } } @Override diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index a0b888becc5..2e7ee0ce799 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -554,10 +554,10 @@ public void testInterpreterGroupResetAfterProcessFinished() { } @Test - public void testInterpreterGroupResetDuringProcessRunning() { + public void testInterpreterGroupResetDuringProcessRunning() throws InterruptedException { Properties p = new Properties(); - RemoteInterpreter intpA = new RemoteInterpreter( + final RemoteInterpreter intpA = new RemoteInterpreter( p, MockInterpreterA.class.getName(), new File("../bin/interpreter.sh").getAbsolutePath(), @@ -566,16 +566,58 @@ public void testInterpreterGroupResetDuringProcessRunning() { 10 * 1000 ); + intpGroup.add(intpA); intpA.setInterpreterGroup(intpGroup); - RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + intpA.open(); + Job jobA = new Job("jobA", null) { + + @Override + public int progress() { + return 0; + } + + @Override + public Map info() { + return null; + } + + @Override + protected Object jobRun() throws Throwable { + return intpA.interpret("2000", + new InterpreterContext( + "note", + "jobA", + "title", + "text", + new HashMap(), + new GUI(), + new AngularObjectRegistry(intpGroup.getId(), null), + new LinkedList())); + } + + @Override + protected boolean jobAbort() { + return false; + } + + }; + intpA.getScheduler().submit(jobA); + + // wait for job started + while (intpA.getScheduler().getJobsRunning().size() == 0) { + Thread.sleep(100); + } + + // restart interpreter + RemoteInterpreterProcess processA = intpA.getInterpreterProcess(); + intpA.close(); intpA.setInterpreterGroup(new InterpreterGroup(intpA.getInterpreterGroup().getId())); + intpA.open(); RemoteInterpreterProcess processB = intpA.getInterpreterProcess(); - assertEquals(processA.hashCode(), processB.hashCode()); - - processA.dereference(); // intpA.close(); + assertNotSame(processA.hashCode(), processB.hashCode()); } From dddca9bfb2c48f71ecca03d943745d2d425ca82d Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Fri, 13 Nov 2015 23:20:14 +0900 Subject: [PATCH 4/6] Shutdown event poller when interpreter process dies --- .../interpreter/remote/RemoteInterpreterEventPoller.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java index f39f6a6aa10..1b734b73202 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterEventPoller.java @@ -56,7 +56,7 @@ public void setInterpreterGroup(InterpreterGroup interpreterGroup) { public void run() { Client client = null; - while (!shutdown) { + while (!shutdown && interpreterProcess.isRunning()) { try { client = interpreterProcess.getClient(); } catch (Exception e1) { From e289206267be819e3267cda027a9a3869edeaf97 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Fri, 13 Nov 2015 23:20:51 +0900 Subject: [PATCH 5/6] Close all interpreter processes when server stops --- .../main/java/org/apache/zeppelin/server/ZeppelinServer.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java index a6e944da8be..3717eccf142 100644 --- a/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java +++ b/zeppelin-server/src/main/java/org/apache/zeppelin/server/ZeppelinServer.java @@ -112,6 +112,7 @@ public static void main(String[] args) throws Exception { LOG.info("Shutting down Zeppelin Server ... "); try { jettyServer.stop(); + ZeppelinServer.notebook.getInterpreterFactory().close(); } catch (Exception e) { LOG.error("Error while stopping servlet container", e); } @@ -131,6 +132,7 @@ public static void main(String[] args) throws Exception { } jettyServer.join(); + ZeppelinServer.notebook.getInterpreterFactory().close(); } private static Server setupJettyServer(ZeppelinConfiguration conf) From 98eaad7f91d55fdbb866012e5ba8bd8e98788e98 Mon Sep 17 00:00:00 2001 From: Lee moon soo Date: Sat, 14 Nov 2015 00:05:38 +0900 Subject: [PATCH 6/6] Allow null return on getInterpreterProcess --- .../zeppelin/interpreter/remote/RemoteInterpreter.java | 3 ++- .../interpreter/remote/RemoteInterpreterTest.java | 9 +++------ 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 91dae3a0b49..9d01561a61f 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -106,7 +106,8 @@ public RemoteInterpreterProcess getInterpreterProcess() { throw new InterpreterException(e); } } else { - throw new InterpreterException("Unexpected error"); + // closed or not opened yet + return null; } } } diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 2e7ee0ce799..bbda252ed21 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -17,10 +17,7 @@ package org.apache.zeppelin.interpreter.remote; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotSame; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.File; import java.io.IOException; @@ -224,7 +221,7 @@ public void testRemoteSchedulerSharing() throws TTransportException, IOException intpB.close(); RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertFalse(process.isRunning()); + assertNull(process); } @Test @@ -342,7 +339,7 @@ protected boolean jobAbort() { intpB.close(); RemoteInterpreterProcess process = intpA.getInterpreterProcess(); - assertFalse(process.isRunning()); + assertNull(process); } @Test