From 09441ad1356fbab50d7ad25057653d0388ad6c41 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 11 Apr 2018 22:53:48 +0800 Subject: [PATCH] ZEPPELIN-3391. Incorrect status shown for '%livy2.conf' and %spark2.conf' interpreters --- .../remote/RemoteInterpreterServer.java | 14 ++++-------- .../zeppelin/scheduler/FIFOScheduler.java | 22 ++++++++++++++----- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java index b5c7aef511b..401be36b28c 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/interpreter/remote/RemoteInterpreterServer.java @@ -453,16 +453,10 @@ public RemoteInterpreterResult interpret(String sessionId, String className, Str progressMap.remove(interpreterContext.getParagraphId()); - InterpreterResult result; - if (job.getStatus() == Status.ERROR) { - result = new InterpreterResult(Code.ERROR, Job.getStack(job.getException())); - } else { - result = (InterpreterResult) job.getReturn(); - - // in case of job abort in PENDING status, result can be null - if (result == null) { - result = new InterpreterResult(Code.KEEP_PREVIOUS_RESULT); - } + InterpreterResult result = (InterpreterResult) job.getReturn(); + // in case of job abort in PENDING status, result can be null + if (result == null) { + result = new InterpreterResult(Code.KEEP_PREVIOUS_RESULT); } return convert(result, context.getConfig(), diff --git a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java index 7ca4a0e894a..fd467b6e6b6 100644 --- a/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java +++ b/zeppelin-interpreter/src/main/java/org/apache/zeppelin/scheduler/FIFOScheduler.java @@ -23,6 +23,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; +import org.apache.zeppelin.interpreter.InterpreterResult; import org.apache.zeppelin.scheduler.Job.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -137,15 +138,26 @@ public void run() { listener.jobStarted(scheduler, runningJob); } runningJob.run(); + Object jobResult = runningJob.getReturn(); if (runningJob.isAborted()) { runningJob.setStatus(Status.ABORT); + LOGGER.debug("Job Aborted, " + runningJob.getId() + ", " + + runningJob.getErrorMessage()); + } else if (runningJob.getException() != null) { + LOGGER.debug("Job Error, " + runningJob.getId() + ", " + + runningJob.getReturn()); + runningJob.setStatus(Status.ERROR); + } else if (jobResult != null && jobResult instanceof InterpreterResult + && ((InterpreterResult) jobResult).code() == InterpreterResult.Code.ERROR) { + LOGGER.debug("Job Error, " + runningJob.getId() + ", " + + runningJob.getReturn()); + runningJob.setStatus(Status.ERROR); } else { - if (runningJob.getException() != null) { - runningJob.setStatus(Status.ERROR); - } else { - runningJob.setStatus(Status.FINISHED); - } + LOGGER.debug("Job Finished, " + runningJob.getId() + ", Result: " + + runningJob.getReturn()); + runningJob.setStatus(Status.FINISHED); } + if (listener != null) { listener.jobFinished(scheduler, runningJob); }