From 8e8c219e5fe445aee8e2abbbe49cf09a4a8603fa Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 14 Mar 2018 16:05:50 +0800 Subject: [PATCH] ZEPPELIN-3330. Add more test for RemoteInterpreterServer --- .../interpreter/BaseZeppelinContext.java | 6 +- .../interpreter/InterpreterContext.java | 10 +- .../remote/RemoteInterpreterServer.java | 29 +- .../remote/RemoteInterpreterServerTest.java | 284 +++++++++++++----- 4 files changed, 234 insertions(+), 95 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java index e38a29f82b8..2e9a9de55d4 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/BaseZeppelinContext.java @@ -772,7 +772,7 @@ public void registerHook(String event, String cmd, String replName) { */ @Experimental public void registerHook(String event, String cmd) { - String className = interpreterContext.getClassName(); + String className = interpreterContext.getInterpreterClassName(); registerHook(event, cmd, className); } @@ -794,7 +794,7 @@ public String getHook(String event, String replName) { */ @Experimental public String getHook(String event) { - String className = interpreterContext.getClassName(); + String className = interpreterContext.getInterpreterClassName(); return getHook(event, className); } @@ -816,7 +816,7 @@ public void unregisterHook(String event, String replName) { */ @Experimental public void unregisterHook(String event) { - String className = interpreterContext.getClassName(); + String className = interpreterContext.getInterpreterClassName(); unregisterHook(event, className); } diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java index 8fa09049773..6157d69a25a 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/InterpreterContext.java @@ -62,7 +62,7 @@ public static void remove() { private AngularObjectRegistry angularObjectRegistry; private ResourcePool resourcePool; private List runners = new ArrayList<>(); - private String className; + private String interpreterClassName; private RemoteEventClientWrapper client; private RemoteWorksController remoteWorksController; private Map progressMap; @@ -214,12 +214,12 @@ public List getRunners() { return runners; } - public String getClassName() { - return className; + public String getInterpreterClassName() { + return interpreterClassName; } - public void setClassName(String className) { - this.className = className; + public void setInterpreterClassName(String className) { + this.interpreterClassName = className; } public RemoteEventClientWrapper getClient() { diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index d50d0ed3e2b..91094f0d2c2 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -199,7 +199,6 @@ public void run() { } }).start(); } - logger.info("Starting remote interpreter server on port {}", port); server.serve(); } @@ -420,21 +419,21 @@ public void close(String sessionId, String className) throws TException { @Override - public RemoteInterpreterResult interpret(String noteId, String className, String st, + public RemoteInterpreterResult interpret(String sessionId, String className, String st, RemoteInterpreterContext interpreterContext) throws TException { if (logger.isDebugEnabled()) { logger.debug("st:\n{}", st); } - Interpreter intp = getInterpreter(noteId, className); + Interpreter intp = getInterpreter(sessionId, className); InterpreterContext context = convert(interpreterContext); - context.setClassName(intp.getClassName()); + context.setInterpreterClassName(intp.getClassName()); Scheduler scheduler = intp.getScheduler(); InterpretJobListener jobListener = new InterpretJobListener(); InterpretJob job = new InterpretJob( interpreterContext.getParagraphId(), - "remoteInterpretJob_" + System.currentTimeMillis(), + "RemoteInterpretJob_" + System.currentTimeMillis(), jobListener, JobProgressPoller.DEFAULT_INTERVAL_MSEC, intp, @@ -642,8 +641,7 @@ protected Object jobRun() throws Throwable { // put result into resource pool if (resultMessages.size() > 0) { int lastMessageIndex = resultMessages.size() - 1; - if (resultMessages.get(lastMessageIndex).getType() == - InterpreterResult.Type.TABLE) { + if (resultMessages.get(lastMessageIndex).getType() == InterpreterResult.Type.TABLE) { context.getResourcePool().put( context.getNoteId(), context.getParagraphId(), @@ -671,10 +669,11 @@ public void setResult(Object results) { @Override - public void cancel(String noteId, String className, RemoteInterpreterContext interpreterContext) - throws TException { + public void cancel(String sessionId, + String className, + RemoteInterpreterContext interpreterContext) throws TException { logger.info("cancel {} {}", className, interpreterContext.getParagraphId()); - Interpreter intp = getInterpreter(noteId, className); + Interpreter intp = getInterpreter(sessionId, className); String jobId = interpreterContext.getParagraphId(); Job job = intp.getScheduler().removeFromWaitingQueue(jobId); @@ -746,8 +745,10 @@ private InterpreterContext convert(RemoteInterpreterContext ric, InterpreterOutp new TypeToken>() { }.getType()); - for (InterpreterContextRunner r : runners) { - contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId())); + if (runners != null) { + for (InterpreterContextRunner r : runners) { + contextRunners.add(new ParagraphRunner(this, r.getNoteId(), r.getParagraphId())); + } } return new InterpreterContext( @@ -794,7 +795,7 @@ public void onUpdate(int index, InterpreterResultMessageOutput out) { String output; try { output = new String(out.toByteArray()); - logger.debug("Output Update: {}", output); + logger.debug("Output Update for index {}: {}", index, output); eventClient.onInterpreterOutputUpdate( noteId, paragraphId, index, out.getType(), output); } catch (IOException e) { @@ -923,7 +924,7 @@ public String getStatus(String sessionId, String jobId) if (interpreters == null) { return Status.UNKNOWN.name(); } - + //TODO(zjffdu) ineffient for loop interpreter and its jobs for (Interpreter intp : interpreters) { for (Job job : intp.getScheduler().getJobsRunning()) { if (jobId.equals(job.getId())) { diff --git a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java index 79a2331a0a3..2ae136216a3 100644 --- a/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java +++ b/zeppelin-interpreter/src/test/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServerTest.java @@ -18,128 +18,266 @@ package org.apache.zeppelin.interpreter.remote; import org.apache.thrift.TException; -import org.junit.After; -import org.junit.Before; +import org.apache.zeppelin.interpreter.Interpreter; +import org.apache.zeppelin.interpreter.InterpreterContext; +import org.apache.zeppelin.interpreter.InterpreterException; +import org.apache.zeppelin.interpreter.InterpreterResult; +import org.apache.zeppelin.interpreter.LazyOpenInterpreter; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterContext; +import org.apache.zeppelin.interpreter.thrift.RemoteInterpreterResult; import org.junit.Test; import java.io.IOException; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicBoolean; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class RemoteInterpreterServerTest { - @Before - public void setUp() throws Exception { - } - @After - public void tearDown() throws Exception { + @Test + public void testStartStop() throws InterruptedException, IOException, TException { + RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", + RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true); + + startRemoteInterpreterServer(server, 10 * 1000); + stopRemoteInterpreterServer(server, 10 * 10000); } @Test - public void testStartStop() throws InterruptedException, IOException, TException { + public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException { RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true); - assertEquals(false, server.isRunning()); + startRemoteInterpreterServer(server, 10 * 1000); + //just send an event on the client queue + server.eventClient.onAppStatusUpdate("", "", "", ""); + stopRemoteInterpreterServer(server, 10 * 10000); + } + + private void startRemoteInterpreterServer(RemoteInterpreterServer server, int timeout) + throws InterruptedException { + assertEquals(false, server.isRunning()); server.start(); long startTime = System.currentTimeMillis(); - boolean running = false; - - while (System.currentTimeMillis() - startTime < 10 * 1000) { + while (System.currentTimeMillis() - startTime < timeout) { if (server.isRunning()) { - running = true; break; - } else { - Thread.sleep(200); } + Thread.sleep(200); } - - assertEquals(true, running); + assertEquals(true, server.isRunning()); assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", server.getPort())); + } + private void stopRemoteInterpreterServer(RemoteInterpreterServer server, int timeout) + throws TException, InterruptedException { + assertEquals(true, server.isRunning()); server.shutdown(); - - while (System.currentTimeMillis() - startTime < 10 * 1000) { - if (server.isRunning()) { - Thread.sleep(200); - } else { - running = false; + long startTime = System.currentTimeMillis(); + while (System.currentTimeMillis() - startTime < timeout) { + if (!server.isRunning()) { break; } + Thread.sleep(200); } - assertEquals(false, running); + assertEquals(false, server.isRunning()); + assertEquals(false, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", + server.getPort())); } - class ShutdownRun implements Runnable { - private RemoteInterpreterServer serv = null; + @Test + public void testInterpreter() throws IOException, TException, InterruptedException { + final RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", + RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true); + + Map intpProperties = new HashMap<>(); + intpProperties.put("property_1", "value_1"); + intpProperties.put("zeppelin.interpreter.localRepo", "/tmp"); + + // create Test1Interpreter in session_1 + server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(), + intpProperties, "user_1"); + Test1Interpreter interpreter1 = (Test1Interpreter) + ((LazyOpenInterpreter) server.interpreterGroup.get("session_1").get(0)) + .getInnerInterpreter(); + assertEquals(1, server.interpreterGroup.getSessionNum()); + assertEquals(1, server.interpreterGroup.get("session_1").size()); + assertEquals(2, interpreter1.getProperties().size()); + assertEquals("value_1", interpreter1.getProperty("property_1")); + + // create Test2Interpreter in session_1 + server.createInterpreter("group_1", "session_1", Test1Interpreter.class.getName(), + intpProperties, "user_1"); + assertEquals(2, server.interpreterGroup.get("session_1").size()); + + // create Test1Interpreter in session_2 + server.createInterpreter("group_1", "session_2", Test1Interpreter.class.getName(), + intpProperties, "user_1"); + assertEquals(2, server.interpreterGroup.getSessionNum()); + assertEquals(2, server.interpreterGroup.get("session_1").size()); + assertEquals(1, server.interpreterGroup.get("session_2").size()); + + final RemoteInterpreterContext intpContext = new RemoteInterpreterContext(); + intpContext.setNoteId("note_1"); + intpContext.setParagraphId("paragraph_1"); + intpContext.setGui("{}"); + intpContext.setNoteGui("{}"); + + // single output of SUCCESS + RemoteInterpreterResult result = server.interpret("session_1", Test1Interpreter.class.getName(), + "SINGLE_OUTPUT_SUCCESS", intpContext); + assertEquals("SUCCESS", result.code); + assertEquals(1, result.getMsg().size()); + assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(0).getData()); + + // combo output of SUCCESS + result = server.interpret("session_1", Test1Interpreter.class.getName(), "COMBO_OUTPUT_SUCCESS", + intpContext); + assertEquals("SUCCESS", result.code); + assertEquals(2, result.getMsg().size()); + assertEquals("INTERPRETER_OUT", result.getMsg().get(0).getData()); + assertEquals("SINGLE_OUTPUT_SUCCESS", result.getMsg().get(1).getData()); + + // single output of ERROR + result = server.interpret("session_1", Test1Interpreter.class.getName(), "SINGLE_OUTPUT_ERROR", + intpContext); + assertEquals("ERROR", result.code); + assertEquals(1, result.getMsg().size()); + assertEquals("SINGLE_OUTPUT_ERROR", result.getMsg().get(0).getData()); + + // getFormType + String formType = server.getFormType("session_1", Test1Interpreter.class.getName()); + assertEquals("NATIVE", formType); + + // cancel + Thread sleepThread = new Thread() { + @Override + public void run() { + try { + server.interpret("session_1", Test1Interpreter.class.getName(), "SLEEP", intpContext); + } catch (TException e) { + e.printStackTrace(); + } + } + }; + sleepThread.start(); + + Thread.sleep(1000); + assertFalse(interpreter1.cancelled.get()); + server.cancel("session_1", Test1Interpreter.class.getName(), intpContext); + assertTrue(interpreter1.cancelled.get()); + + // getProgress + assertEquals(10, server.getProgress("session_1", Test1Interpreter.class.getName(), + intpContext)); + + // close + server.close("session_1", Test1Interpreter.class.getName()); + assertTrue(interpreter1.closed.get()); + } + + public static class Test1Interpreter extends Interpreter { + + AtomicBoolean cancelled = new AtomicBoolean(); + AtomicBoolean closed = new AtomicBoolean(); + + public Test1Interpreter(Properties properties) { + super(properties); + } + + @Override + public void open() { - ShutdownRun(RemoteInterpreterServer serv) { - this.serv = serv; } @Override - public void run() { - try { - serv.shutdown(); - } catch (Exception ex) { - // ignore exception + public InterpreterResult interpret(String st, InterpreterContext context) { + if (st.equals("SINGLE_OUTPUT_SUCCESS")) { + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "SINGLE_OUTPUT_SUCCESS"); + } else if (st.equals("SINGLE_OUTPUT_ERROR")) { + return new InterpreterResult(InterpreterResult.Code.ERROR, "SINGLE_OUTPUT_ERROR"); + } else if (st.equals("COMBO_OUTPUT_SUCCESS")) { + try { + context.out.write("INTERPRETER_OUT"); + } catch (IOException e) { + e.printStackTrace(); + } + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "SINGLE_OUTPUT_SUCCESS"); + } else if (st.equals("SLEEP")) { + try { + Thread.sleep(3 * 1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + return new InterpreterResult(InterpreterResult.Code.SUCCESS, "SLEEP_SUCCESS"); } + return null; } - } - ; + @Override + public void cancel(InterpreterContext context) throws InterpreterException { + cancelled.set(true); + } - @Test - public void testStartStopWithQueuedEvents() throws InterruptedException, IOException, TException { - RemoteInterpreterServer server = new RemoteInterpreterServer("localhost", - RemoteInterpreterUtils.findRandomAvailablePortOnAllLocalInterfaces(), ":", true); - assertEquals(false, server.isRunning()); + @Override + public FormType getFormType() throws InterpreterException { + return FormType.NATIVE; + } - server.start(); - long startTime = System.currentTimeMillis(); - boolean running = false; + @Override + public int getProgress(InterpreterContext context) throws InterpreterException { + return 10; + } - while (System.currentTimeMillis() - startTime < 10 * 1000) { - if (server.isRunning()) { - running = true; - break; - } else { - Thread.sleep(200); - } + @Override + public void close() { + closed.set(true); } - assertEquals(true, running); - assertEquals(true, RemoteInterpreterUtils.checkIfRemoteEndpointAccessible("localhost", - server.getPort())); + } - //just send an event on the client queue - server.eventClient.onAppStatusUpdate("", "", "", ""); + public static class Test2Interpreter extends Interpreter { - ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); - Runnable task = new ShutdownRun(server); + public Test2Interpreter(Properties properties) { + super(properties); + } - executor.schedule(task, 0, TimeUnit.MILLISECONDS); + @Override + public void open() { - while (System.currentTimeMillis() - startTime < 10 * 1000) { - if (server.isRunning()) { - Thread.sleep(200); - } else { - running = false; - break; - } } - executor.shutdown(); + @Override + public InterpreterResult interpret(String st, InterpreterContext context) { + return null; + } - //cleanup environment for next tests - server.shutdown(); + @Override + public void cancel(InterpreterContext context) throws InterpreterException { - assertEquals(false, running); - } + } + @Override + public FormType getFormType() throws InterpreterException { + return FormType.NATIVE; + } + + @Override + public int getProgress(InterpreterContext context) throws InterpreterException { + return 0; + } + + @Override + public void close() { + + } + + } }