From e886d5ae08b0358710ad2cf6b8b6da80cd7365e8 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:18:00 +0900 Subject: [PATCH 01/25] TAJO-1596 --- .../function/python/PythonScriptEngine.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 74c0e5aef3..1ba013bddc 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -27,12 +27,16 @@ import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.Datum; -import org.apache.tajo.function.*; +import org.apache.tajo.function.FunctionInvocation; +import org.apache.tajo.function.FunctionSignature; +import org.apache.tajo.function.FunctionSupplement; +import org.apache.tajo.function.PythonInvocationDesc; import org.apache.tajo.plan.function.FunctionContext; import org.apache.tajo.plan.function.PythonAggFunctionInvoke.PythonAggFunctionContext; import org.apache.tajo.plan.function.stream.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -310,12 +314,25 @@ public void start(Configuration systemConf) throws IOException { @Override public void shutdown() { - process.destroy(); FileUtil.cleanup(LOG, stdin, stdout, stderr, inputHandler, outputHandler); stdin = null; stdout = stderr = null; inputHandler = null; outputHandler = null; + + try { + int exitCode = process.waitFor(); + if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { + LOG.warn("Process exit code: " + exitCode); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Process exit code: " + exitCode); + } + } + } catch (InterruptedException e) { + LOG.warn(e.getMessage(), e); + } + if (LOG.isDebugEnabled()) { LOG.debug("PythonScriptExecutor shuts down"); } From 08f23103fee90aede6f602922ef3cdd0689c76fb Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:19:50 +0900 Subject: [PATCH 02/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From 7f2dcca6dc8226a08e0b6f78492738ab2e1790ae Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:20:42 +0900 Subject: [PATCH 03/25] trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From 0cc702f55cd60e0175a44ad77165d6c48b89db14 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 04/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From 53f9892ab82e64cf3cc41a6a55e96e79883bd9c1 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:22:11 +0900 Subject: [PATCH 05/25] trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From 43c0bc4247e562d6c9088764b23471d220912029 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 18:29:04 +0900 Subject: [PATCH 06/25] apply std error --- .../src/main/resources/python/controller.py | 1 - .../plan/function/python/PythonScriptEngine.java | 16 ++++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/tajo-core/src/main/resources/python/controller.py b/tajo-core/src/main/resources/python/controller.py index 126ccdc8f0..d99a5e3ecc 100644 --- a/tajo-core/src/main/resources/python/controller.py +++ b/tajo-core/src/main/resources/python/controller.py @@ -115,7 +115,6 @@ def main(self, self.input_stream = sys.stdin # TODO: support controller logging self.log_stream = open(output_stream_path, 'a') - sys.stderr = open(error_stream_path, 'w') sys.path.append(file_path) sys.path.append(cache_path) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 1ba013bddc..2b615228f4 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -19,6 +19,7 @@ package org.apache.tajo.plan.function.python; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -322,6 +323,7 @@ public void shutdown() { try { int exitCode = process.waitFor(); + if (systemConf.get(CommonTestingUtil.TAJO_TEST_KEY, "FALSE").equalsIgnoreCase("TRUE")) { LOG.warn("Process exit code: " + exitCode); } else { @@ -523,8 +525,18 @@ public void callAggFunc(FunctionContext functionContext, Tuple input) { try { inputHandler.putNext(methodName, input, inSchema); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue while executing " + methodName + " with " + input, e); + } catch (Throwable e) { + byte[] bytes; + try { + bytes = new byte[stderr.available()]; + IOUtils.readFully(stderr, bytes); + String message = new String(bytes, Charset.defaultCharset()); + throw new RuntimeException("Failed adding input to inputQueue while executing " + + methodName + " with " + input + ", caused by :" + message, e); + } catch (IOException e1) { + throw new RuntimeException("Failed adding input to inputQueue while executing " + + methodName + " with " + input, e1); + } } try { From 3ad64b1a44d03cd4c128398fbfa0ea7038b5235a Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 19:47:38 +0900 Subject: [PATCH 07/25] add exception handler --- .../function/python/PythonScriptEngine.java | 48 ++++++++++++------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index 2b615228f4..f105889b88 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -37,6 +37,7 @@ import org.apache.tajo.plan.function.stream.*; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.FileUtil; import org.apache.tajo.util.TUtil; @@ -491,14 +492,15 @@ public Datum callScalarFunc(Tuple input) { try { inputHandler.putNext(input, inSchema); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } - Datum result; + + Datum result = null; try { result = outputHandler.getNext().asDatum(0); - } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } return result; @@ -526,23 +528,35 @@ public void callAggFunc(FunctionContext functionContext, Tuple input) { inputHandler.putNext(methodName, input, inSchema); stdin.flush(); } catch (Throwable e) { - byte[] bytes; - try { - bytes = new byte[stderr.available()]; - IOUtils.readFully(stderr, bytes); - String message = new String(bytes, Charset.defaultCharset()); - throw new RuntimeException("Failed adding input to inputQueue while executing " - + methodName + " with " + input + ", caused by :" + message, e); - } catch (IOException e1) { - throw new RuntimeException("Failed adding input to inputQueue while executing " - + methodName + " with " + input, e1); - } + throwException(stderr, new RuntimeException("Failed adding input to inputQueue while executing " + + methodName + " with " + input, e)); } try { outputHandler.getNext(); } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); + } + } + + /** + * Get the standard error streams of the external process and throw the exception + * + * @throws RuntimeException + */ + private void throwException(InputStream stderr, RuntimeException e) { + try { + if (stderr.available() > 0) { + byte[] bytes = new byte[Math.min(stderr.available(), 100 * StorageUnit.KB)]; + IOUtils.readFully(stderr, bytes); + String message = new String(bytes, Charset.defaultCharset()); + + throw new RuntimeException("Python exception caused by: " + message, e); + } else { + throw e; + } + } catch (IOException ioe) { + throw new RuntimeException(ioe.getMessage(), ioe); } } From 539dd12f39cfa9a0cee55d2222d2fd5d68a5d394 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 08/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From 1556adaeedc656dc3c6ea043633be6204d63b0cf Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 09/25] trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From 46c758cea299e73fadbe6cf8470ec80808de25f4 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 10/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From 649cd23da1c944ef06952e1e9f95c121cafca11d Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 11/25] trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From ea60278fb820cc6e18cbe721fff018dcf3c6f0bc Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 12/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From 1412a7c2f726b39c3fba0c5e41edcfd0c46dfcbd Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 13/25] trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From 08dd616f3e77062990799764a763a3826976cfd9 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 14/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From 256b7bcbb4c94c7abd40073190ad5962ef2b8ddb Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 15/25] trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From ddf4792b14b5da8a63d6b4f4fc1441af7934edf0 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 16/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From ed7881c4bb35775e84fa5e1a164276f0f0280364 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 20:22:19 +0900 Subject: [PATCH 17/25] Revert "trigger CI" This reverts commit ddf4792b14b5da8a63d6b4f4fc1441af7934edf0. --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From 25739ade04bcde4f95f0ef84a3644edced79b247 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 18/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From f4f79fd3b61fa5d62c674d32656acff5ae4515ed Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 19/25] trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From d904adc3c9fb2d4c876458df1931cddaf8f049a7 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 20/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From 03425671734908b2378f62746c947a1e15307f82 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 21/25] trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From 1cd5088012c0ca71990a34d04b0d4d80274fe009 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 22/25] trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 1c2a9e76d7..f74a075885 100644 --- a/CHANGES +++ b/CHANGES @@ -2,6 +2,7 @@ Tajo Change Log Release 0.12.0 - unreleased + Release 0.11.0 - unreleased NEW FEATURES From a7ce98c661786eae1efda502707b8b7af1533c3c Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 17:21:18 +0900 Subject: [PATCH 23/25] trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index f74a075885..1c2a9e76d7 100644 --- a/CHANGES +++ b/CHANGES @@ -2,7 +2,6 @@ Tajo Change Log Release 0.12.0 - unreleased - Release 0.11.0 - unreleased NEW FEATURES From 621a2198664db2f0641d339e4b76cba997e151fa Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 20 Aug 2015 22:04:49 +0900 Subject: [PATCH 24/25] remove unused logger --- .../src/main/resources/python/controller.py | 2 -- .../function/python/PythonScriptEngine.java | 36 ++++++++++--------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/tajo-core/src/main/resources/python/controller.py b/tajo-core/src/main/resources/python/controller.py index d99a5e3ecc..8d0f995469 100644 --- a/tajo-core/src/main/resources/python/controller.py +++ b/tajo-core/src/main/resources/python/controller.py @@ -113,8 +113,6 @@ def main(self, self.stream_error = os.fdopen(sys.stderr.fileno(), 'wb', 0) self.input_stream = sys.stdin - # TODO: support controller logging - self.log_stream = open(output_stream_path, 'a') sys.path.append(file_path) sys.path.append(cache_path) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java index f105889b88..083ad038fd 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/function/python/PythonScriptEngine.java @@ -544,7 +544,7 @@ public void callAggFunc(FunctionContext functionContext, Tuple input) { * * @throws RuntimeException */ - private void throwException(InputStream stderr, RuntimeException e) { + private void throwException(InputStream stderr, RuntimeException e) throws RuntimeException { try { if (stderr.available() > 0) { byte[] bytes = new byte[Math.min(stderr.available(), 100 * StorageUnit.KB)]; @@ -570,13 +570,13 @@ public void updatePythonSideContext(PythonAggFunctionContext functionContext) th try { inputHandler.putNext("update_context", functionContext); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } try { outputHandler.getNext(); - } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } } @@ -590,13 +590,13 @@ public void updateJavaSideContext(PythonAggFunctionContext functionContext) thro try { inputHandler.putNext("get_context", EMPTY_INPUT, EMPTY_SCHEMA); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } try { outputHandler.getNext(functionContext); - } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } } @@ -611,14 +611,16 @@ public String getPartialResult(FunctionContext functionContext) { try { inputHandler.putNext("get_partial_result", EMPTY_INPUT, EMPTY_SCHEMA); stdin.flush(); - } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } + String result = null; try { - return outputHandler.getPartialResultString(); - } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + result = outputHandler.getPartialResultString(); + } catch (Throwable e) { + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } + return result; } /** @@ -633,13 +635,13 @@ public Datum getFinalResult(FunctionContext functionContext) { inputHandler.putNext("get_final_result", EMPTY_INPUT, EMPTY_SCHEMA); stdin.flush(); } catch (Exception e) { - throw new RuntimeException("Failed adding input to inputQueue", e); + throwException(stderr, new RuntimeException("Failed adding input to inputQueue", e)); } - Datum result; + Datum result = null; try { result = outputHandler.getNext().asDatum(0); } catch (Exception e) { - throw new RuntimeException("Problem getting output: " + e.getMessage(), e); + throwException(stderr, new RuntimeException("Problem getting output: " + e.getMessage(), e)); } return result; From 4db25479bc8fc82835eb2c4900ed6dc8d7e8743e Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Fri, 21 Aug 2015 10:28:00 +0900 Subject: [PATCH 25/25] remove race condition --- .../src/main/java/org/apache/tajo/master/QueryManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 88389868d9..95562ed2da 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -226,8 +226,9 @@ public QueryInfo scheduleQuery(Session session, QueryContext queryContext, Strin public boolean startQueryJob(QueryId queryId, AllocationResourceProto allocation) { if (submittedQueries.get(queryId).allocateToQueryMaster(allocation)) { - QueryInProgress queryInProgress = submittedQueries.remove(queryId); + QueryInProgress queryInProgress = submittedQueries.get(queryId); runningQueries.put(queryInProgress.getQueryId(), queryInProgress); + submittedQueries.remove(queryId); dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInProgress.getQueryInfo())); return true;