From 36306d55971943ca52bf6adde04496ddd29b4bc4 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Sat, 29 Sep 2018 22:32:26 +0800 Subject: [PATCH] ZEPPELIN-3801. Code refactoring of org.apache.zeppelin.interpreter.remote --- .../launcher/InterpreterClient.java | 40 ---- .../launcher/InterpreterLauncher.java | 3 +- .../interpreter/recovery/RecoveryStorage.java | 16 +- .../interpreter/remote/ClientFactory.java | 4 +- .../remote/RemoteInterpreterProcess.java | 94 ++++----- .../SparkInterpreterLauncherTest.java | 45 ++-- .../launcher/StandardInterpreterLauncher.java | 13 +- .../StandardInterpreterLauncherTest.java | 15 +- .../interpreter/InterpreterSetting.java | 8 +- .../RemoteInterpreterEventServer.java | 3 +- .../recovery/FileSystemRecoveryStorage.java | 13 +- .../recovery/NullRecoveryStorage.java | 8 +- .../interpreter/recovery/StopInterpreter.java | 6 +- .../remote/RemoteAngularObject.java | 1 - .../remote/RemoteAngularObjectRegistry.java | 40 ++-- .../interpreter/remote/RemoteInterpreter.java | 196 ++++++++---------- .../RemoteInterpreterProcessListener.java | 16 +- ...emoteRemoteInterpreterManagedProcess.java} | 62 +++--- ...emoteRemoteInterpreterRunningProcess.java} | 15 +- .../remote/RemoteInterpreterTest.java | 5 - 20 files changed, 260 insertions(+), 343 deletions(-) delete mode 100644 zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java rename {zeppelin-zengine => zeppelin-interpreter}/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java (96%) rename {zeppelin-zengine => zeppelin-interpreter}/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java (59%) rename zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/{RemoteInterpreterManagedProcess.java => RemoteRemoteInterpreterManagedProcess.java} (83%) rename zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/{RemoteInterpreterRunningProcess.java => RemoteRemoteInterpreterRunningProcess.java} (86%) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java deleted file mode 100644 index bfd3e446a2f..00000000000 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterClient.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.zeppelin.interpreter.launcher; - -import java.io.IOException; - -/** - * Interface to InterpreterClient which is created by InterpreterLauncher. This is the component - * that is used to for the communication from zeppelin-server process to zeppelin interpreter - * process. - */ -public interface InterpreterClient { - - String getInterpreterSettingName(); - - void start(String userName) throws IOException; - - void stop(); - - String getHost(); - - int getPort(); - - boolean isRunning(); -} diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java index 30cf995db01..f1cfe5a91f3 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/launcher/InterpreterLauncher.java @@ -19,6 +19,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import java.io.IOException; import java.util.Properties; @@ -48,5 +49,5 @@ protected int getConnectTimeout() { return connectTimeout; } - public abstract InterpreterClient launch(InterpreterLaunchContext context) throws IOException; + public abstract RemoteInterpreterProcess launch(InterpreterLaunchContext context) throws IOException; } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java index 8bbe8302fcf..0a2bb12b8aa 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/recovery/RecoveryStorage.java @@ -18,7 +18,7 @@ package org.apache.zeppelin.interpreter.recovery; import org.apache.zeppelin.conf.ZeppelinConfiguration; -import org.apache.zeppelin.interpreter.launcher.InterpreterClient; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import java.io.IOException; import java.util.Map; @@ -31,25 +31,25 @@ public abstract class RecoveryStorage { protected ZeppelinConfiguration zConf; - protected Map restoredClients; + protected Map restoredClients; public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException { this.zConf = zConf; } /** - * Update RecoveryStorage when new InterpreterClient is started + * Update RecoveryStorage when new RemoteInterpreterProcess is started * @param client * @throws IOException */ - public abstract void onInterpreterClientStart(InterpreterClient client) throws IOException; + public abstract void onInterpreterClientStart(RemoteInterpreterProcess client) throws IOException; /** - * Update RecoveryStorage when InterpreterClient is stopped + * Update RecoveryStorage when RemoteInterpreterProcess is stopped * @param client * @throws IOException */ - public abstract void onInterpreterClientStop(InterpreterClient client) throws IOException; + public abstract void onInterpreterClientStop(RemoteInterpreterProcess client) throws IOException; /** * @@ -58,7 +58,7 @@ public RecoveryStorage(ZeppelinConfiguration zConf) throws IOException { * @return * @throws IOException */ - public abstract Map restore() throws IOException; + public abstract Map restore() throws IOException; /** @@ -70,7 +70,7 @@ public void init() throws IOException { this.restoredClients = restore(); } - public InterpreterClient getInterpreterClient(String interpreterGroupId) { + public RemoteInterpreterProcess getInterpreterClient(String interpreterGroupId) { if (restoredClients.containsKey(interpreterGroupId)) { return restoredClients.get(interpreterGroupId); } else { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java similarity index 96% rename from zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java index b2cb78f810e..171e3a06ccf 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/ClientFactory.java @@ -32,7 +32,7 @@ import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; /** - * + * Factory class for Thrift Client */ public class ClientFactory extends BasePooledObjectFactory{ private String host; @@ -53,7 +53,7 @@ public Client create() throws Exception { throw new InterpreterException(e); } - TProtocol protocol = new TBinaryProtocol(transport); + TProtocol protocol = new TBinaryProtocol(transport); Client client = new RemoteInterpreterService.Client(protocol); synchronized (clientSocketMap) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java similarity index 59% rename from zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java rename to zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java index c76814319ac..86f844c47a6 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcess.java @@ -14,27 +14,31 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package org.apache.zeppelin.interpreter.remote; import com.google.gson.Gson; import org.apache.commons.pool2.impl.GenericObjectPool; import org.apache.thrift.TException; -import org.apache.zeppelin.interpreter.launcher.InterpreterClient; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; + /** - * Abstract class for interpreter process + * Interface to RemoteInterpreterProcess which is created by InterpreterLauncher. This is the component + * that is used to for the communication from zeppelin-server process to zeppelin interpreter + * process. */ -public abstract class RemoteInterpreterProcess implements InterpreterClient { - private static final Logger logger = LoggerFactory.getLogger(RemoteInterpreterProcess.class); +public abstract class RemoteInterpreterProcess { - private GenericObjectPool clientPool; + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteInterpreterProcess.class); + + private GenericObjectPool clientPool; private int connectTimeout; - public RemoteInterpreterProcess( - int connectTimeout) { + public RemoteInterpreterProcess(int connectTimeout) { this.connectTimeout = connectTimeout; } @@ -42,75 +46,67 @@ public int getConnectTimeout() { return connectTimeout; } - public synchronized Client getClient() throws Exception { + public abstract String getInterpreterSettingName(); + + public abstract void start(String userName) throws IOException; + + public abstract void stop(); + + public abstract String getHost(); + + public abstract int getPort(); + + public abstract boolean isRunning(); + + private synchronized RemoteInterpreterService.Client getClient() throws Exception { if (clientPool == null || clientPool.isClosed()) { clientPool = new GenericObjectPool<>(new ClientFactory(getHost(), getPort())); } return clientPool.borrowObject(); } - private void releaseClient(Client client) { - releaseClient(client, false); - } - - private void releaseClient(Client client, boolean broken) { + private void releaseClient(RemoteInterpreterService.Client client, boolean broken) { if (broken) { releaseBrokenClient(client); } else { try { clientPool.returnObject(client); } catch (Exception e) { - logger.warn("exception occurred during releasing thrift client", e); + LOGGER.warn("exception occurred during releasing thrift client", e); } } } - private void releaseBrokenClient(Client client) { + private void releaseBrokenClient(RemoteInterpreterService.Client client) { try { clientPool.invalidateObject(client); } catch (Exception e) { - logger.warn("exception occurred during releasing thrift client", e); + LOGGER.warn("exception occurred during releasing thrift client", e); } } /** * Called when angular object is updated in client side to propagate * change to the remote process + * * @param name + * @param sessionId + * @param paragraphId * @param o */ - public void updateRemoteAngularObject(String name, String noteId, String paragraphId, Object o) { - Client client = null; - try { - client = getClient(); - } catch (NullPointerException e) { - // remote process not started - logger.info("NullPointerException in RemoteInterpreterProcess while " + - "updateRemoteAngularObject getClient, remote process not started", e); - return; - } catch (Exception e) { - logger.error("Can't update angular object", e); - } - - boolean broken = false; - try { + public void updateRemoteAngularObject(String name, + String sessionId, + String paragraphId, + Object o) { + callRemoteFunction(client -> { Gson gson = new Gson(); - client.angularObjectUpdate(name, noteId, paragraphId, gson.toJson(o)); - } catch (TException e) { - broken = true; - logger.error("Can't update angular object", e); - } catch (NullPointerException e) { - logger.error("Remote interpreter process not started", e); - return; - } finally { - if (client != null) { - releaseClient(client, broken); - } - } + client.angularObjectUpdate(name, sessionId, paragraphId, gson.toJson(o)); + return null; + }); } public T callRemoteFunction(RemoteFunction func) { - Client client = null; + RemoteInterpreterService.Client client = null; boolean broken = false; try { client = getClient(); @@ -120,8 +116,8 @@ public T callRemoteFunction(RemoteFunction func) { } catch (TException e) { broken = true; throw new RuntimeException(e); - } catch (Exception e1) { - throw new RuntimeException(e1); + } catch (Exception e) { + throw new RuntimeException(e); } finally { if (client != null) { releaseClient(client, broken); @@ -131,11 +127,11 @@ public T callRemoteFunction(RemoteFunction func) { } /** - * * @param */ + @FunctionalInterface public interface RemoteFunction { - T call(Client client) throws Exception; + T call(RemoteInterpreterService.Client client) throws Exception; } /** diff --git a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java index d7dcd0a91cd..ce0e233885c 100644 --- a/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/spark/src/test/java/org/apache/zeppelin/interpreter/launcher/SparkInterpreterLauncherTest.java @@ -23,7 +23,8 @@ import org.apache.commons.io.FileUtils; import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteRemoteInterpreterManagedProcess; import org.junit.Before; import org.junit.Test; @@ -52,9 +53,9 @@ public void testConnectTimeOut() throws IOException { InterpreterOption option = new InterpreterOption(); option.setUserImpersonate(true); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + RemoteInterpreterProcess client = launcher.launch(context); + assertTrue( client instanceof RemoteRemoteInterpreterManagedProcess); + RemoteRemoteInterpreterManagedProcess interpreterProcess = (RemoteRemoteInterpreterManagedProcess) client; assertEquals("name", interpreterProcess.getInterpreterSettingName()); assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); @@ -77,9 +78,9 @@ public void testLocalMode() throws IOException { InterpreterOption option = new InterpreterOption(); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + RemoteInterpreterProcess client = launcher.launch(context); + assertTrue( client instanceof RemoteRemoteInterpreterManagedProcess); + RemoteRemoteInterpreterManagedProcess interpreterProcess = (RemoteRemoteInterpreterManagedProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -102,9 +103,9 @@ public void testYarnClientMode_1() throws IOException { InterpreterOption option = new InterpreterOption(); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + RemoteInterpreterProcess client = launcher.launch(context); + assertTrue( client instanceof RemoteRemoteInterpreterManagedProcess); + RemoteRemoteInterpreterManagedProcess interpreterProcess = (RemoteRemoteInterpreterManagedProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -128,9 +129,9 @@ public void testYarnClientMode_2() throws IOException { InterpreterOption option = new InterpreterOption(); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + RemoteInterpreterProcess client = launcher.launch(context); + assertTrue( client instanceof RemoteRemoteInterpreterManagedProcess); + RemoteRemoteInterpreterManagedProcess interpreterProcess = (RemoteRemoteInterpreterManagedProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -153,9 +154,9 @@ public void testYarnClusterMode_1() throws IOException { InterpreterOption option = new InterpreterOption(); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "spark", "spark", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + RemoteInterpreterProcess client = launcher.launch(context); + assertTrue( client instanceof RemoteRemoteInterpreterManagedProcess); + RemoteRemoteInterpreterManagedProcess interpreterProcess = (RemoteRemoteInterpreterManagedProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -186,9 +187,9 @@ public void testYarnClusterMode_2() throws IOException { Files.createDirectories(localRepoPath); Files.createFile(Paths.get(localRepoPath.toAbsolutePath().toString(), "test.jar")); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + RemoteInterpreterProcess client = launcher.launch(context); + assertTrue( client instanceof RemoteRemoteInterpreterManagedProcess); + RemoteRemoteInterpreterManagedProcess interpreterProcess = (RemoteRemoteInterpreterManagedProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); @@ -220,9 +221,9 @@ public void testYarnClusterMode_3() throws IOException { FileUtils.deleteDirectory(localRepoPath.toFile()); Files.createDirectories(localRepoPath); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + RemoteInterpreterProcess client = launcher.launch(context); + assertTrue( client instanceof RemoteRemoteInterpreterManagedProcess); + RemoteRemoteInterpreterManagedProcess interpreterProcess = (RemoteRemoteInterpreterManagedProcess) client; assertEquals("spark", interpreterProcess.getInterpreterSettingName()); assertTrue(interpreterProcess.getInterpreterDir().endsWith("/interpreter/spark")); assertTrue(interpreterProcess.getLocalRepoDir().endsWith("/local-repo/groupId")); diff --git a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java index 47756c9901f..71c884d5445 100644 --- a/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java +++ b/zeppelin-plugins/launcher/standard/src/main/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncher.java @@ -22,8 +22,9 @@ import org.apache.zeppelin.interpreter.InterpreterOption; import org.apache.zeppelin.interpreter.InterpreterRunner; import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteRemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteRemoteInterpreterRunningProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -44,7 +45,7 @@ public StandardInterpreterLauncher(ZeppelinConfiguration zConf, RecoveryStorage } @Override - public InterpreterClient launch(InterpreterLaunchContext context) throws IOException { + public RemoteInterpreterProcess launch(InterpreterLaunchContext context) throws IOException { LOGGER.info("Launching Interpreter: " + context.getInterpreterSettingGroup()); this.properties = context.getProperties(); InterpreterOption option = context.getOption(); @@ -54,7 +55,7 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep int connectTimeout = getConnectTimeout(); if (option.isExistingProcess()) { - return new RemoteInterpreterRunningProcess( + return new RemoteRemoteInterpreterRunningProcess( context.getInterpreterSettingName(), connectTimeout, option.getHost(), @@ -62,7 +63,7 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep } else { // try to recover it first if (zConf.isRecoveryEnabled()) { - InterpreterClient recoveredClient = + RemoteInterpreterProcess recoveredClient = recoveryStorage.getInterpreterClient(context.getInterpreterGroupId()); if (recoveredClient != null) { if (recoveredClient.isRunning()) { @@ -79,7 +80,7 @@ public InterpreterClient launch(InterpreterLaunchContext context) throws IOExcep // create new remote process String localRepoPath = zConf.getInterpreterLocalRepoPath() + "/" + context.getInterpreterSettingId(); - return new RemoteInterpreterManagedProcess( + return new RemoteRemoteInterpreterManagedProcess( runner != null ? runner.getPath() : zConf.getInterpreterRemoteRunnerPath(), context.getZeppelinServerRPCPort(), context.getZeppelinServerHost(), zConf.getInterpreterPortRange(), zConf.getInterpreterDir() + "/" + groupName, localRepoPath, diff --git a/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java b/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java index 60bf915b4d2..385550cfe65 100644 --- a/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java +++ b/zeppelin-plugins/launcher/standard/src/test/java/org/apache/zeppelin/interpreter/launcher/StandardInterpreterLauncherTest.java @@ -19,7 +19,8 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterOption; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; +import org.apache.zeppelin.interpreter.remote.RemoteRemoteInterpreterManagedProcess; import org.junit.Before; import org.junit.Test; @@ -47,9 +48,9 @@ public void testLauncher() throws IOException { InterpreterOption option = new InterpreterOption(); option.setUserImpersonate(true); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + RemoteInterpreterProcess client = launcher.launch(context); + assertTrue( client instanceof RemoteRemoteInterpreterManagedProcess); + RemoteRemoteInterpreterManagedProcess interpreterProcess = (RemoteRemoteInterpreterManagedProcess) client; assertEquals("name", interpreterProcess.getInterpreterSettingName()); assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); @@ -71,9 +72,9 @@ public void testConnectTimeOut() throws IOException { InterpreterOption option = new InterpreterOption(); option.setUserImpersonate(true); InterpreterLaunchContext context = new InterpreterLaunchContext(properties, option, null, "user1", "intpGroupId", "groupId", "groupName", "name", 0, "host"); - InterpreterClient client = launcher.launch(context); - assertTrue( client instanceof RemoteInterpreterManagedProcess); - RemoteInterpreterManagedProcess interpreterProcess = (RemoteInterpreterManagedProcess) client; + RemoteInterpreterProcess client = launcher.launch(context); + assertTrue( client instanceof RemoteRemoteInterpreterManagedProcess); + RemoteRemoteInterpreterManagedProcess interpreterProcess = (RemoteRemoteInterpreterManagedProcess) client; assertEquals("name", interpreterProcess.getInterpreterSettingName()); assertEquals(".//interpreter/groupName", interpreterProcess.getInterpreterDir()); assertEquals(".//local-repo/groupId", interpreterProcess.getLocalRepoDir()); diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java index 95530b074f0..d1d6a0a53d8 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/InterpreterSetting.java @@ -32,6 +32,7 @@ import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; import org.apache.zeppelin.helium.ApplicationEventListener; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.launcher.InterpreterLaunchContext; import org.apache.zeppelin.interpreter.launcher.InterpreterLauncher; import org.apache.zeppelin.interpreter.lifecycle.NullLifecycleManager; @@ -39,7 +40,6 @@ import org.apache.zeppelin.interpreter.recovery.RecoveryStorage; import org.apache.zeppelin.interpreter.remote.RemoteAngularObjectRegistry; import org.apache.zeppelin.interpreter.remote.RemoteInterpreter; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.plugin.PluginManager; import org.slf4j.Logger; @@ -716,9 +716,9 @@ synchronized RemoteInterpreterProcess createInterpreterProcess(String interprete InterpreterLaunchContext launchContext = new InterpreterLaunchContext(properties, option, interpreterRunner, userName, interpreterGroupId, id, group, name, interpreterEventServer.getPort(), interpreterEventServer.getHost()); - RemoteInterpreterProcess process = (RemoteInterpreterProcess) launcher.launch(launchContext); - recoveryStorage.onInterpreterClientStart(process); - return process; + RemoteInterpreterProcess intpClient = launcher.launch(launchContext); + recoveryStorage.onInterpreterClientStart(intpClient); + return intpClient; } List getOrCreateSession(String user, String noteId) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java index d7dd304c3d5..5105b3f76c1 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/RemoteInterpreterEventServer.java @@ -28,7 +28,7 @@ import org.apache.zeppelin.interpreter.remote.AppendOutputRunner; import org.apache.zeppelin.interpreter.remote.InvokeResourceMethodEventMessage; import org.apache.zeppelin.interpreter.remote.RemoteAngularObject; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterManagedProcess; +import org.apache.zeppelin.interpreter.remote.RemoteRemoteInterpreterManagedProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcessListener; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterUtils; @@ -165,7 +165,6 @@ public void registerInterpreterProcess(RegisterInfo registerInfo) throws TExcept LOGGER.warn("Interpreter process does not existed yet for InterpreterGroup: " + registerInfo.getInterpreterGroupId()); } - interpreterProcess.processStarted(registerInfo.port, registerInfo.host); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java index 4dffff10ee2..33cda5bd015 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/FileSystemRecoveryStorage.java @@ -23,9 +23,8 @@ import org.apache.zeppelin.interpreter.InterpreterSetting; import org.apache.zeppelin.interpreter.InterpreterSettingManager; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; -import org.apache.zeppelin.interpreter.launcher.InterpreterClient; import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; -import org.apache.zeppelin.interpreter.remote.RemoteInterpreterRunningProcess; +import org.apache.zeppelin.interpreter.remote.RemoteRemoteInterpreterRunningProcess; import org.apache.zeppelin.notebook.FileSystemStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,12 +65,12 @@ public FileSystemRecoveryStorage(ZeppelinConfiguration zConf, } @Override - public void onInterpreterClientStart(InterpreterClient client) throws IOException { + public void onInterpreterClientStart(RemoteInterpreterProcess client) throws IOException { save(client.getInterpreterSettingName()); } @Override - public void onInterpreterClientStop(InterpreterClient client) throws IOException { + public void onInterpreterClientStop(RemoteInterpreterProcess client) throws IOException { save(client.getInterpreterSettingName()); } @@ -93,8 +92,8 @@ private void save(String interpreterSettingName) throws IOException { } @Override - public Map restore() throws IOException { - Map clients = new HashMap<>(); + public Map restore() throws IOException { + Map clients = new HashMap<>(); List paths = fs.list(new Path(recoveryDir + "/*.recovery")); for (Path path : paths) { @@ -109,7 +108,7 @@ public Map restore() throws IOException { String[] hostPort = tokens[1].split(":"); int connectTimeout = zConf.getInt(ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_CONNECT_TIMEOUT); - RemoteInterpreterRunningProcess client = new RemoteInterpreterRunningProcess( + RemoteRemoteInterpreterRunningProcess client = new RemoteRemoteInterpreterRunningProcess( interpreterSettingName, connectTimeout, hostPort[0], Integer.parseInt(hostPort[1])); // interpreterSettingManager may be null when this class is used when it is used // stop-interpreter.sh diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java index 3a7d12c70f3..8195e10b39c 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/NullRecoveryStorage.java @@ -19,7 +19,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterSettingManager; -import org.apache.zeppelin.interpreter.launcher.InterpreterClient; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import java.io.IOException; import java.util.Map; @@ -38,17 +38,17 @@ public NullRecoveryStorage(ZeppelinConfiguration zConf, } @Override - public void onInterpreterClientStart(InterpreterClient client) throws IOException { + public void onInterpreterClientStart(RemoteInterpreterProcess client) throws IOException { } @Override - public void onInterpreterClientStop(InterpreterClient client) throws IOException { + public void onInterpreterClientStop(RemoteInterpreterProcess client) throws IOException { } @Override - public Map restore() throws IOException { + public Map restore() throws IOException { return null; } } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java index d74b1621e7e..d29430a0fbd 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/recovery/StopInterpreter.java @@ -2,7 +2,7 @@ import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.interpreter.InterpreterSettingManager; -import org.apache.zeppelin.interpreter.launcher.InterpreterClient; +import org.apache.zeppelin.interpreter.remote.RemoteInterpreterProcess; import org.apache.zeppelin.util.ReflectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,9 +29,9 @@ public static void main(String[] args) throws IOException { new Object[] {zConf, null}); LOGGER.info("Using RecoveryStorage: " + recoveryStorage.getClass().getName()); - Map restoredClients = recoveryStorage.restore(); + Map restoredClients = recoveryStorage.restore(); if (restoredClients != null) { - for (InterpreterClient client : restoredClients.values()) { + for (RemoteInterpreterProcess client : restoredClients.values()) { LOGGER.info("Stop Interpreter Process: " + client.getHost() + ":" + client.getPort()); client.stop(); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java index 62c8efd2010..aca5cda4c6e 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObject.java @@ -19,7 +19,6 @@ import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectListener; -import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; /** diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java index 7458ce5a589..256ddbf0c20 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteAngularObjectRegistry.java @@ -18,15 +18,10 @@ package org.apache.zeppelin.interpreter.remote; import com.google.gson.Gson; -import org.apache.thrift.TException; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.AngularObjectRegistryListener; -import org.apache.zeppelin.interpreter.InterpreterGroup; import org.apache.zeppelin.interpreter.ManagedInterpreterGroup; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService.Client; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.List; @@ -34,7 +29,7 @@ * Proxy for AngularObjectRegistry that exists in remote interpreter process */ public class RemoteAngularObjectRegistry extends AngularObjectRegistry { - Logger logger = LoggerFactory.getLogger(RemoteAngularObjectRegistry.class); + private static Gson gson = new Gson(); private ManagedInterpreterGroup interpreterGroup; public RemoteAngularObjectRegistry(String interpreterId, @@ -44,13 +39,14 @@ public RemoteAngularObjectRegistry(String interpreterId, this.interpreterGroup = interpreterGroup; } - private RemoteInterpreterProcess getRemoteInterpreterProcess() { + private RemoteInterpreterProcess getInterpreterClient() { return interpreterGroup.getRemoteInterpreterProcess(); } /** * When ZeppelinServer side code want to add angularObject to the registry, * this method should be used instead of add() + * * @param name * @param o * @param noteId @@ -61,21 +57,16 @@ public AngularObject addAndNotifyRemoteProcess(final String name, final String noteId, final String paragraphId) { - RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); + RemoteInterpreterProcess remoteInterpreterProcess = getInterpreterClient(); if (!remoteInterpreterProcess.isRunning()) { return super.add(name, o, noteId, paragraphId, true); } remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction() { - @Override - public Void call(Client client) throws Exception { - Gson gson = new Gson(); - client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); - return null; - } - } - ); + client -> { + client.angularObjectAdd(name, noteId, paragraphId, gson.toJson(o)); + return null; + }); return super.add(name, o, noteId, paragraphId, true); @@ -84,6 +75,7 @@ public Void call(Client client) throws Exception { /** * When ZeppelinServer side code want to remove angularObject from the registry, * this method should be used instead of remove() + * * @param name * @param noteId * @param paragraphId @@ -92,23 +84,19 @@ public Void call(Client client) throws Exception { public AngularObject removeAndNotifyRemoteProcess(final String name, final String noteId, final String paragraphId) { - RemoteInterpreterProcess remoteInterpreterProcess = getRemoteInterpreterProcess(); + RemoteInterpreterProcess remoteInterpreterProcess = getInterpreterClient(); if (remoteInterpreterProcess == null || !remoteInterpreterProcess.isRunning()) { return super.remove(name, noteId, paragraphId); } remoteInterpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction() { - @Override - public Void call(Client client) throws Exception { + client -> { client.angularObjectRemove(name, noteId, paragraphId); return null; - } - } - ); + }); return super.remove(name, noteId, paragraphId); } - + public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) { List all = getAll(noteId, paragraphId); for (AngularObject ao : all) { @@ -118,7 +106,7 @@ public void removeAllAndNotifyRemoteProcess(String noteId, String paragraphId) { @Override protected AngularObject createNewAngularObject(String name, Object o, String noteId, String - paragraphId) { + paragraphId) { return new RemoteAngularObject(name, o, noteId, paragraphId, interpreterGroup, getAngularObjectListener()); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java index 36f6021ea5b..444c7fe6cc9 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreter.java @@ -21,7 +21,6 @@ import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import org.apache.thrift.TException; -import org.apache.zeppelin.conf.ZeppelinConfiguration; import org.apache.zeppelin.display.AngularObject; import org.apache.zeppelin.display.AngularObjectRegistry; import org.apache.zeppelin.display.GUI; @@ -46,7 +45,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; @@ -125,7 +123,7 @@ public void open() throws InterpreterException { // depends on other interpreter. e.g. PySparkInterpreter depends on SparkInterpreter. // also see method Interpreter.getInterpreterInTheSameSessionByClassName for (Interpreter interpreter : getInterpreterGroup() - .getOrCreateSession(this.getUserName(), sessionId)) { + .getOrCreateSession(this.getUserName(), sessionId)) { try { if (!(interpreter instanceof ConfInterpreter)) { ((RemoteInterpreter) interpreter).internal_create(); @@ -135,22 +133,20 @@ public void open() throws InterpreterException { } } - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction() { - @Override - public Void call(Client client) throws Exception { - LOGGER.info("Open RemoteInterpreter {}", getClassName()); - // open interpreter here instead of in the jobRun method in RemoteInterpreterServer - // client.open(sessionId, className); - // Push angular object loaded from JSON file to remote interpreter - synchronized (getInterpreterGroup()) { - if (!getInterpreterGroup().isAngularRegistryPushed()) { - pushAngularObjectRegistryToRemote(client); - getInterpreterGroup().setAngularRegistryPushed(true); + interpreterProcess.callRemoteFunction( + client -> { + LOGGER.info("Open RemoteInterpreter {}", getClassName()); + // open interpreter here instead of in the jobRun method in RemoteInterpreterServer + // client.open(sessionId, className); + // Push angular object loaded from JSON file to remote interpreter + synchronized (getInterpreterGroup()) { + if (!getInterpreterGroup().isAngularRegistryPushed()) { + pushAngularObjectRegistryToRemote(client); + getInterpreterGroup().setAngularRegistryPushed(true); + } } - } - return null; - } - }); + return null; + }); isOpened = true; this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); } @@ -161,15 +157,13 @@ private void internal_create() throws IOException { synchronized (this) { if (!isCreated) { this.interpreterProcess = getOrCreateInterpreterProcess(); - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction() { - @Override - public Void call(Client client) throws Exception { - LOGGER.info("Create RemoteInterpreter {}", getClassName()); - client.createInterpreter(getInterpreterGroup().getId(), sessionId, - className, (Map) getProperties(), getUserName()); - return null; - } - }); + interpreterProcess.callRemoteFunction( + client -> { + LOGGER.info("Create RemoteInterpreter {}", getClassName()); + client.createInterpreter(getInterpreterGroup().getId(), sessionId, + className, (Map) getProperties(), getUserName()); + return null; + }); isCreated = true; } } @@ -185,13 +179,13 @@ public void close() throws InterpreterException { } catch (IOException e) { throw new InterpreterException(e); } - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction() { - @Override - public Void call(Client client) throws Exception { - client.close(sessionId, className); - return null; - } - }); + if (interpreterProcess.isRunning()) { + interpreterProcess.callRemoteFunction( + client -> { + client.close(sessionId, className); + return null; + }); + } isOpened = false; this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); } else { @@ -213,46 +207,44 @@ public InterpreterResult interpret(final String st, final InterpreterContext con } catch (IOException e) { throw new InterpreterException(e); } + if (!interpreterProcess.isRunning()) { + throw new InterpreterException("Interpreter Process has not been launched"); + } this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); return interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction() { - @Override - public InterpreterResult call(Client client) throws Exception { - - RemoteInterpreterResult remoteResult = client.interpret( - sessionId, className, st, convert(context)); - Map remoteConfig = (Map) gson.fromJson( - remoteResult.getConfig(), new TypeToken>() { - }.getType()); - context.getConfig().clear(); - if (remoteConfig != null) { - context.getConfig().putAll(remoteConfig); - } - GUI currentGUI = context.getGui(); - GUI currentNoteGUI = context.getNoteGui(); - if (form == FormType.NATIVE) { - GUI remoteGui = GUI.fromJson(remoteResult.getGui()); - GUI remoteNoteGui = GUI.fromJson(remoteResult.getNoteGui()); - currentGUI.clear(); - currentGUI.setParams(remoteGui.getParams()); - currentGUI.setForms(remoteGui.getForms()); - currentNoteGUI.setParams(remoteNoteGui.getParams()); - currentNoteGUI.setForms(remoteNoteGui.getForms()); - } else if (form == FormType.SIMPLE) { - final Map currentForms = currentGUI.getForms(); - final Map currentParams = currentGUI.getParams(); - final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); - final Map remoteForms = remoteGUI.getForms(); - final Map remoteParams = remoteGUI.getParams(); - currentForms.putAll(remoteForms); - currentParams.putAll(remoteParams); - } - - InterpreterResult result = convert(remoteResult); - return result; + client -> { + RemoteInterpreterResult remoteResult = client.interpret( + sessionId, className, st, convert(context)); + Map remoteConfig = (Map) gson.fromJson( + remoteResult.getConfig(), new TypeToken>() { + }.getType()); + context.getConfig().clear(); + if (remoteConfig != null) { + context.getConfig().putAll(remoteConfig); } - } - ); + GUI currentGUI = context.getGui(); + GUI currentNoteGUI = context.getNoteGui(); + if (form == FormType.NATIVE) { + GUI remoteGui = GUI.fromJson(remoteResult.getGui()); + GUI remoteNoteGui = GUI.fromJson(remoteResult.getNoteGui()); + currentGUI.clear(); + currentGUI.setParams(remoteGui.getParams()); + currentGUI.setForms(remoteGui.getForms()); + currentNoteGUI.setParams(remoteNoteGui.getParams()); + currentNoteGUI.setForms(remoteNoteGui.getForms()); + } else if (form == FormType.SIMPLE) { + final Map currentForms = currentGUI.getForms(); + final Map currentParams = currentGUI.getParams(); + final GUI remoteGUI = GUI.fromJson(remoteResult.getGui()); + final Map remoteForms = remoteGUI.getForms(); + final Map remoteParams = remoteGUI.getParams(); + currentForms.putAll(remoteForms); + currentParams.putAll(remoteParams); + } + + InterpreterResult result = convert(remoteResult); + return result; + }); } @@ -268,14 +260,16 @@ public void cancel(final InterpreterContext context) throws InterpreterException } catch (IOException e) { throw new InterpreterException(e); } + if (!interpreterProcess.isRunning()) { + interpreterProcess.stop(); + } else { + interpreterProcess.callRemoteFunction( + client -> { + client.cancel(sessionId, className, convert(context)); + return null; + }); + } this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); - interpreterProcess.callRemoteFunction(new RemoteInterpreterProcess.RemoteFunction() { - @Override - public Void call(Client client) throws Exception { - client.cancel(sessionId, className, convert(context)); - return null; - } - }); } @Override @@ -298,12 +292,9 @@ public FormType getFormType() throws InterpreterException { } this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); FormType type = interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction() { - @Override - public FormType call(Client client) throws Exception { - formType = FormType.valueOf(client.getFormType(sessionId, className)); - return formType; - } + client -> { + formType = FormType.valueOf(client.getFormType(sessionId, className)); + return formType; }); return type; } @@ -321,14 +312,15 @@ public int getProgress(final InterpreterContext context) throws InterpreterExcep } catch (IOException e) { throw new InterpreterException(e); } - this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); - return interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction() { - @Override - public Integer call(Client client) throws Exception { - return client.getProgress(sessionId, className, convert(context)); - } - }); + if (interpreterProcess.isRunning()) { + this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); + return interpreterProcess.callRemoteFunction( + client -> + client.getProgress(sessionId, className, convert(context)) + ); + } else { + return 0; + } } @@ -347,13 +339,8 @@ public List completion(final String buf, final int cursor } this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); return interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction>() { - @Override - public List call(Client client) throws Exception { - return client.completion(sessionId, className, buf, cursor, - convert(interpreterContext)); - } - }); + client -> client.completion(sessionId, className, buf, cursor, convert(interpreterContext)) + ); } public String getStatus(final String jobId) { @@ -369,20 +356,13 @@ public String getStatus(final String jobId) { } this.lifecycleManager.onInterpreterUse(this.getInterpreterGroup(), sessionId); return interpreterProcess.callRemoteFunction( - new RemoteInterpreterProcess.RemoteFunction() { - @Override - public String call(Client client) throws Exception { - return client.getStatus(sessionId, jobId); - } - }); + client -> client.getStatus(sessionId, jobId) + ); } @Override public Scheduler getScheduler() { - int maxConcurrency = Integer.parseInt( - getProperty("zeppelin.interpreter.max.poolsize", - ZeppelinConfiguration.ConfVars.ZEPPELIN_INTERPRETER_MAX_POOL_SIZE.getIntValue() + "")); // one session own one Scheduler, so that when one session is closed, all the jobs/paragraphs // running under the scheduler of this session will be aborted. Scheduler s = new RemoteScheduler( diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java index ac7fa1e19f1..c229a6077f4 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterProcessListener.java @@ -23,17 +23,15 @@ import java.util.Map; /** - * Event from remoteInterpreterProcess + * Listener interface for handling event from RemoteInterpreterProcess */ public interface RemoteInterpreterProcessListener { - public void onOutputAppend(String noteId, String paragraphId, int index, String output); - public void onOutputUpdated( - String noteId, String paragraphId, int index, InterpreterResult.Type type, String output); - public void onOutputClear(String noteId, String paragraphId); + void onOutputAppend(String noteId, String paragraphId, int index, String output); + void onOutputUpdated(String noteId, String paragraphId, int index, + InterpreterResult.Type type, String output); + void onOutputClear(String noteId, String paragraphId); void runParagraphs(String noteId, List paragraphIndices, List paragraphIds, - String curParagraphId) - throws IOException; - - public void onParaInfosReceived(String noteId, String paragraphId, + String curParagraphId) throws IOException; + void onParaInfosReceived(String noteId, String paragraphId, String interpreterSettingId, Map metaInfos); } diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteRemoteInterpreterManagedProcess.java similarity index 83% rename from zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java rename to zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteRemoteInterpreterManagedProcess.java index a990808f1c3..ffc6b686d32 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterManagedProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteRemoteInterpreterManagedProcess.java @@ -26,7 +26,6 @@ import org.apache.commons.exec.LogOutputStream; import org.apache.commons.exec.PumpStreamHandler; import org.apache.commons.exec.environment.EnvironmentUtils; -import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,10 +38,10 @@ /** * This class manages start / stop of remote interpreter process */ -public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess +public class RemoteRemoteInterpreterManagedProcess extends RemoteInterpreterProcess implements ExecuteResultHandler { - private static final Logger logger = LoggerFactory.getLogger( - RemoteInterpreterManagedProcess.class); + private static final Logger LOGGER = LoggerFactory.getLogger( + RemoteRemoteInterpreterManagedProcess.class); private final String interpreterRunner; private final int zeppelinServerRPCPort; @@ -58,10 +57,9 @@ public class RemoteInterpreterManagedProcess extends RemoteInterpreterProcess private final String interpreterSettingName; private final String interpreterGroupId; private final boolean isUserImpersonated; - private Map env; - public RemoteInterpreterManagedProcess( + public RemoteRemoteInterpreterManagedProcess( String intpRunner, int zeppelinServerRPCPort, String zeppelinServerRPCHost, @@ -98,7 +96,7 @@ public int getPort() { @Override public void start(String userName) throws IOException { - // start server process + // start interpreter process CommandLine cmdLine = CommandLine.parse(interpreterRunner); cmdLine.addArgument("-d", false); cmdLine.addArgument(interpreterDir, false); @@ -122,7 +120,7 @@ public void start(String userName) throws IOException { executor = new DefaultExecutor(); ByteArrayOutputStream cmdOut = new ByteArrayOutputStream(); - ProcessLogOutputStream processOutput = new ProcessLogOutputStream(logger); + ProcessLogOutputStream processOutput = new ProcessLogOutputStream(LOGGER); processOutput.setOutputStream(cmdOut); executor.setStreamHandler(new PumpStreamHandler(processOutput)); @@ -133,8 +131,8 @@ public void start(String userName) throws IOException { Map procEnv = EnvironmentUtils.getProcEnvironment(); procEnv.putAll(env); - logger.info("Run interpreter process {}", cmdLine); - executor.execute(cmdLine, procEnv, this); + LOGGER.info("Run interpreter process {}", cmdLine); + executor.execute(cmdLine, procEnv, RemoteRemoteInterpreterManagedProcess.this); } catch (IOException e) { running.set(false); throw new RuntimeException(e); @@ -147,46 +145,45 @@ public void start(String userName) throws IOException { } } if (!running.get()) { - throw new IOException(new String( + LOGGER.error(new String( String.format("Interpreter Process creation is time out in %d seconds", - getConnectTimeout()/1000) + "\n" + "You can increase timeout threshold via " + + getConnectTimeout() / 1000) + "\n" + "You can increase timeout threshold via " + "setting zeppelin.interpreter.connect.timeout of this interpreter.\n" + cmdOut.toString())); + } else { + LOGGER.info("Interpreter Process for InterpreterGroup: " + interpreterGroupId + + " is launched successfully"); } } catch (InterruptedException e) { - logger.error("Remote interpreter is not accessible"); + LOGGER.error("Remote interpreter is not accessible"); } processOutput.setOutputStream(null); } public void stop() { if (isRunning()) { - logger.info("Kill interpreter process"); + LOGGER.info("Kill interpreter process"); try { - callRemoteFunction(new RemoteFunction() { - @Override - public Void call(RemoteInterpreterService.Client client) throws Exception { - client.shutdown(); - return null; - } - }); + callRemoteFunction( + client -> { + client.shutdown(); + return null; + }); } catch (Exception e) { - logger.warn("ignore the exception when shutting down"); + LOGGER.warn("ignore the exception when shutting down"); } - watchdog.destroyProcess(); } - + watchdog.destroyProcess(); executor = null; watchdog = null; running.set(false); - logger.info("Remote process terminated"); + LOGGER.info("Remote process terminated"); } @Override public void onProcessComplete(int exitValue) { - logger.info("Interpreter process exited {}", exitValue); + LOGGER.info("Interpreter process exited {}", exitValue); running.set(false); - } @Override @@ -201,8 +198,11 @@ public void processStarted(int port, String host) { @Override public void onProcessFailed(ExecuteException e) { - logger.info("Interpreter process failed {}", e); - running.set(false); + LOGGER.info("Interpreter process failed {}", e); + synchronized (running) { + running.set(false); + running.notify(); + } } @VisibleForTesting @@ -253,7 +253,7 @@ protected void processLine(String s, int i) { } @Override - public void write(byte [] b) throws IOException { + public void write(byte[] b) throws IOException { super.write(b); if (out != null) { @@ -266,7 +266,7 @@ public void write(byte [] b) throws IOException { } @Override - public void write(byte [] b, int offset, int len) throws IOException { + public void write(byte[] b, int offset, int len) throws IOException { super.write(b, offset, len); if (out != null) { diff --git a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteRemoteInterpreterRunningProcess.java similarity index 86% rename from zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java rename to zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteRemoteInterpreterRunningProcess.java index 19da682f971..df631b1ed48 100644 --- a/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterRunningProcess.java +++ b/zeppelin-zengine/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteRemoteInterpreterRunningProcess.java @@ -16,7 +16,6 @@ */ package org.apache.zeppelin.interpreter.remote; -import org.apache.zeppelin.helium.ApplicationEventListener; import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterService; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -24,18 +23,18 @@ /** * This class connects to existing process */ -public class RemoteInterpreterRunningProcess extends RemoteInterpreterProcess { - private final Logger logger = LoggerFactory.getLogger(RemoteInterpreterRunningProcess.class); +public class RemoteRemoteInterpreterRunningProcess extends RemoteInterpreterProcess { + private final Logger LOGGER = LoggerFactory.getLogger(RemoteRemoteInterpreterRunningProcess.class); + private final String host; private final int port; private final String interpreterSettingName; - public RemoteInterpreterRunningProcess( + public RemoteRemoteInterpreterRunningProcess( String interpreterSettingName, int connectTimeout, String host, - int port - ) { + int port) { super(connectTimeout); this.interpreterSettingName = interpreterSettingName; this.host = host; @@ -68,7 +67,7 @@ public void stop() { // when you want to force stop it. ENV ZEPPELIN_FORCE_STOP control that. if (System.getenv("ZEPPELIN_FORCE_STOP") != null) { if (isRunning()) { - logger.info("Kill interpreter process"); + LOGGER.info("Kill interpreter process"); try { callRemoteFunction(new RemoteFunction() { @Override @@ -78,7 +77,7 @@ public Void call(RemoteInterpreterService.Client client) throws Exception { } }); } catch (Exception e) { - logger.warn("ignore the exception when shutting down interpreter process.", e); + LOGGER.warn("ignore the exception when shutting down interpreter process.", e); } } } diff --git a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java index 5b059ef90f4..c203079e843 100644 --- a/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java +++ b/zeppelin-zengine/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterTest.java @@ -24,16 +24,11 @@ import org.apache.zeppelin.display.ui.OptionInput; import org.apache.zeppelin.interpreter.*; import org.apache.zeppelin.interpreter.InterpreterResult.Code; -import org.apache.zeppelin.interpreter.remote.mock.GetAngularObjectSizeInterpreter; -import org.apache.zeppelin.interpreter.remote.mock.GetEnvPropertyInterpreter; -import org.apache.zeppelin.user.AuthenticationInfo; -import org.junit.After; import org.junit.Before; import org.junit.Test; import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.List; import java.util.Map;