From 6731e5628c5c139bd9f7189416a0312313cfda7a Mon Sep 17 00:00:00 2001 From: astroshim Date: Tue, 7 Feb 2017 23:47:45 +0900 Subject: [PATCH 1/7] add signal to cancel job --- .../zeppelin/spark/PySparkInterpreter.java | 19 ++++++++++++++++++- .../main/resources/python/zeppelin_pyspark.py | 8 ++++++-- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 0679fcc7b12..1f73d9a47f8 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -73,6 +73,7 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand private String scriptPath; boolean pythonscriptRunning = false; private static final int MAX_TIMEOUT_SEC = 10; + private long pythonPid = 0; public PySparkInterpreter(Properties property) { super(property); @@ -310,7 +311,8 @@ public void setStatementsFinished(String out, boolean error) { boolean pythonScriptInitialized = false; Integer pythonScriptInitializeNotifier = new Integer(0); - public void onPythonScriptInitialized() { + public void onPythonScriptInitialized(long pid) { + pythonPid = pid; synchronized (pythonScriptInitializeNotifier) { pythonScriptInitialized = true; pythonScriptInitializeNotifier.notifyAll(); @@ -411,10 +413,25 @@ public InterpreterResult interpret(String st, InterpreterContext context) { } } + public void interrupt() throws IOException { + if (pythonPid > -1) { + logger.info("Sending SIGINT signal to PID : " + pythonPid); + Runtime.getRuntime().exec("kill -SIGINT " + pythonPid); + } else { + logger.warn("Non UNIX/Linux system, close the interpreter"); + close(); + } + } + @Override public void cancel(InterpreterContext context) { SparkInterpreter sparkInterpreter = getSparkInterpreter(); sparkInterpreter.cancel(context); + try { + interrupt(); + } catch (IOException e) { + e.printStackTrace(); + } } @Override diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index c59d2f4e357..6cfff107dd6 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -15,7 +15,7 @@ # limitations under the License. # -import os, sys, getopt, traceback, json, re +import os, sys, signal, getopt, traceback, json, re from py4j.java_gateway import java_import, JavaGateway, GatewayClient from py4j.protocol import Py4JJavaError @@ -226,7 +226,11 @@ def getCompletion(self, text_value): result = json.dumps(list(filter(lambda x : not re.match("^__.*", x), list(completionList)))) self.interpreterObject.setStatementsFinished(result, False) +def handler_stop_signals(signum, frame): + sys.exit("Got signal. " + str(signum)) + +signal.signal(signal.SIGINT, handler_stop_signals) output = Logger() sys.stdout = output sys.stderr = output @@ -252,7 +256,7 @@ def getCompletion(self, text_value): java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") intp = gateway.entry_point -intp.onPythonScriptInitialized() +intp.onPythonScriptInitialized(os.getpid()) jsc = intp.getJavaSparkContext() From 65d8cc697179d1e12f18e95cc04e30a43608e7cb Mon Sep 17 00:00:00 2001 From: astroshim Date: Tue, 7 Feb 2017 23:54:23 +0900 Subject: [PATCH 2/7] init python pid variable --- .../java/org/apache/zeppelin/spark/PySparkInterpreter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 1f73d9a47f8..96d4ac100d2 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -73,11 +73,12 @@ public class PySparkInterpreter extends Interpreter implements ExecuteResultHand private String scriptPath; boolean pythonscriptRunning = false; private static final int MAX_TIMEOUT_SEC = 10; - private long pythonPid = 0; + private long pythonPid; public PySparkInterpreter(Properties property) { super(property); + pythonPid = -1; try { File scriptFile = File.createTempFile("zeppelin_pyspark-", ".py"); scriptPath = scriptFile.getAbsolutePath(); From 678c183000ce194cfd17c8a6979ccedf6611a83f Mon Sep 17 00:00:00 2001 From: astroshim Date: Wed, 8 Feb 2017 13:23:58 +0900 Subject: [PATCH 3/7] remove signal handler --- spark/src/main/resources/python/zeppelin_pyspark.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index 6cfff107dd6..c59d2f4e357 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -15,7 +15,7 @@ # limitations under the License. # -import os, sys, signal, getopt, traceback, json, re +import os, sys, getopt, traceback, json, re from py4j.java_gateway import java_import, JavaGateway, GatewayClient from py4j.protocol import Py4JJavaError @@ -226,11 +226,7 @@ def getCompletion(self, text_value): result = json.dumps(list(filter(lambda x : not re.match("^__.*", x), list(completionList)))) self.interpreterObject.setStatementsFinished(result, False) -def handler_stop_signals(signum, frame): - sys.exit("Got signal. " + str(signum)) - -signal.signal(signal.SIGINT, handler_stop_signals) output = Logger() sys.stdout = output sys.stderr = output @@ -256,7 +252,7 @@ def handler_stop_signals(signum, frame): java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") intp = gateway.entry_point -intp.onPythonScriptInitialized(os.getpid()) +intp.onPythonScriptInitialized() jsc = intp.getJavaSparkContext() From c0cac4ee0df9bf3140c3465da5a4679dca87f6c1 Mon Sep 17 00:00:00 2001 From: astroshim Date: Wed, 8 Feb 2017 13:36:03 +0900 Subject: [PATCH 4/7] fix logging --- .../main/java/org/apache/zeppelin/spark/PySparkInterpreter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 96d4ac100d2..92960eaad2f 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -431,7 +431,7 @@ public void cancel(InterpreterContext context) { try { interrupt(); } catch (IOException e) { - e.printStackTrace(); + logger.error("Error", e); } } From f26eacf5041eb266d04c237df50be42fdd86f87b Mon Sep 17 00:00:00 2001 From: astroshim Date: Sun, 12 Feb 2017 12:30:22 +0900 Subject: [PATCH 5/7] add test-case for canceling. --- .../spark/PySparkInterpreterTest.java | 31 +++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index 35b876dc322..c186a551c78 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -36,6 +36,8 @@ import java.util.LinkedList; import java.util.List; import java.util.Properties; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import static org.junit.Assert.*; @@ -121,4 +123,33 @@ public void testCompletion() { assertTrue(completions.size() > 0); } } + + private class infinityPythonJob implements Runnable { + @Override + public void run() { + String code = "import time\nwhile True:\n time.sleep(1)" ; + InterpreterResult ret = pySparkInterpreter.interpret(code, context); + assertNotNull(ret); + Pattern expectedMessage = Pattern.compile("KeyboardInterrupt"); + Matcher m = expectedMessage.matcher(ret.message().toString()); + assertTrue(m.find()); + } + } + + @Test + public void testCancelIntp() { + if (getSparkVersionNumber() > 11) { + + assertEquals(InterpreterResult.Code.SUCCESS, + pySparkInterpreter.interpret("a = 1\n", context).code()); + + new Thread(new infinityPythonJob()).start(); + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + pySparkInterpreter.cancel(context); + } + } } From bc12eaaec6dcfe06911359ed5cae7f9d663d66e0 Mon Sep 17 00:00:00 2001 From: astroshim Date: Wed, 15 Feb 2017 17:11:14 +0900 Subject: [PATCH 6/7] pass pid to java --- spark/src/main/resources/python/zeppelin_pyspark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark/src/main/resources/python/zeppelin_pyspark.py b/spark/src/main/resources/python/zeppelin_pyspark.py index c59d2f4e357..d9c68c28970 100644 --- a/spark/src/main/resources/python/zeppelin_pyspark.py +++ b/spark/src/main/resources/python/zeppelin_pyspark.py @@ -252,7 +252,7 @@ def getCompletion(self, text_value): java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") intp = gateway.entry_point -intp.onPythonScriptInitialized() +intp.onPythonScriptInitialized(os.getpid()) jsc = intp.getJavaSparkContext() From 84bf09ae2eec0efefaa4c0a34fda21345ccbf06c Mon Sep 17 00:00:00 2001 From: astroshim Date: Sun, 19 Feb 2017 00:36:45 +0900 Subject: [PATCH 7/7] fix testcase --- .../zeppelin/spark/PySparkInterpreterTest.java | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java index faeb0a72ab2..36975126418 100644 --- a/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java +++ b/spark/src/test/java/org/apache/zeppelin/spark/PySparkInterpreterTest.java @@ -136,19 +136,18 @@ public void run() { } @Test - public void testCancelIntp() { + public void testCancelIntp() throws InterruptedException { if (getSparkVersionNumber() > 11) { - assertEquals(InterpreterResult.Code.SUCCESS, pySparkInterpreter.interpret("a = 1\n", context).code()); - new Thread(new infinityPythonJob()).start(); - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - e.printStackTrace(); - } + Thread t = new Thread(new infinityPythonJob()); + t.start(); + Thread.sleep(5000); pySparkInterpreter.cancel(context); + assertTrue(t.isAlive()); + t.join(2000); + assertFalse(t.isAlive()); } } }