From ed65d8543c99d88dfbad141a0c6a0a82da408a1b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=ED=98=95=EC=A4=80?= Date: Thu, 19 Jun 2014 10:46:13 +0900 Subject: [PATCH] TAJO-882: CLI hangs when a error occurs in the GlobalPlanner. --- .../tajo/engine/planner/LogicalOptimizer.java | 23 ++++++++-- .../utils/test/ErrorInjectionRewriter.java | 40 ++++++++++++++++ .../master/querymaster/QueryMasterTask.java | 4 ++ .../tajo/worker/TajoWorkerClientService.java | 9 ++++ .../org/apache/tajo/TajoTestingCluster.java | 4 ++ .../tajo/engine/query/TestSelectQuery.java | 46 ++++++++++++++++++- 6 files changed, 120 insertions(+), 6 deletions(-) create mode 100644 tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java index 3bf70a7b7b..c882bdf8b3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/LogicalOptimizer.java @@ -20,6 +20,8 @@ import com.google.common.collect.Lists; import com.google.common.collect.Sets; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceStability; import org.apache.tajo.algebra.JoinType; import org.apache.tajo.conf.TajoConf; @@ -32,10 +34,7 @@ import org.apache.tajo.engine.planner.logical.join.GreedyHeuristicJoinOrderAlgorithm; import org.apache.tajo.engine.planner.logical.join.JoinGraph; import org.apache.tajo.engine.planner.logical.join.JoinOrderAlgorithm; -import org.apache.tajo.engine.planner.rewrite.BasicQueryRewriteEngine; -import org.apache.tajo.engine.planner.rewrite.FilterPushDownRule; -import org.apache.tajo.engine.planner.rewrite.PartitionedTableRewriter; -import org.apache.tajo.engine.planner.rewrite.ProjectionPushDownRule; +import org.apache.tajo.engine.planner.rewrite.*; import java.util.LinkedHashSet; import java.util.Set; @@ -49,6 +48,8 @@ */ @InterfaceStability.Evolving public class LogicalOptimizer { + private static final Log LOG = LogFactory.getLog(LogicalOptimizer.class.getName()); + private BasicQueryRewriteEngine rulesBeforeJoinOpt; private BasicQueryRewriteEngine rulesAfterToJoinOpt; private JoinOrderAlgorithm joinOrderAlgorithm = new GreedyHeuristicJoinOrderAlgorithm(); @@ -62,6 +63,20 @@ public LogicalOptimizer(TajoConf systemConf) { rulesAfterToJoinOpt = new BasicQueryRewriteEngine(); rulesAfterToJoinOpt.addRewriteRule(new ProjectionPushDownRule()); rulesAfterToJoinOpt.addRewriteRule(new PartitionedTableRewriter(systemConf)); + + // Currently only use for injecting exception to the testcase. + String userDefinedRewriterClass = systemConf.get("tajo.plan.rewriter.classes"); + if (userDefinedRewriterClass != null && !userDefinedRewriterClass.isEmpty()) { + for (String eachRewriterClass : userDefinedRewriterClass.split(",")) { + try { + RewriteRule rule = (RewriteRule) Class.forName(eachRewriterClass).newInstance(); + rulesAfterToJoinOpt.addRewriteRule(rule); + } catch (Exception e) { + LOG.error("Can't initiate a Rewriter object: " + eachRewriterClass, e); + continue; + } + } + } } public LogicalNode optimize(LogicalPlan plan) throws PlanningException { diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java new file mode 100644 index 0000000000..333df11a54 --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/engine/utils/test/ErrorInjectionRewriter.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.utils.test; + +import org.apache.tajo.engine.planner.LogicalPlan; +import org.apache.tajo.engine.planner.PlanningException; +import org.apache.tajo.engine.planner.rewrite.RewriteRule; + +public class ErrorInjectionRewriter implements RewriteRule { + @Override + public String getName() { + return "ErrorInjectionRewriter"; + } + + @Override + public boolean isEligible(LogicalPlan plan) { + return true; + } + + @Override + public LogicalPlan rewrite(LogicalPlan plan) throws PlanningException { + throw new NullPointerException(); + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index ecf2202c4e..e5de7e6ce9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -496,6 +496,10 @@ public QueryState getState() { } } + public Throwable getInitError() { + return initError; + } + public String getErrorMessage() { if (isInitError()) { return StringUtils.stringifyException(initError); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 2b947fe6fb..abd4e98ec8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.util.StringUtils; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoIdProtos; @@ -210,6 +211,14 @@ public ClientProtos.GetQueryStatusResponse getQueryStatus( builder.setErrorMessage(firstError.getErrorMessage()); builder.setErrorTrace(firstError.getErrorTrace()); } + + if (queryMasterTask.isInitError()) { + Throwable initError = queryMasterTask.getInitError(); + builder.setErrorMessage( + initError.getMessage() == null ? initError.getClass().getName() : initError.getMessage()); + builder.setErrorTrace(StringUtils.stringifyException(initError)); + builder.setState(queryMasterTask.getState()); + } } return builder.build(); } diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 06d2bab2d4..8b00c25f3f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -617,6 +617,10 @@ private static void writeLines(File file, String... lines) public void setAllTajoDaemonConfValue(String key, String value) { tajoMaster.getContext().getConf().set(key, value); + setAllWorkersConfValue(key, value); + } + + public void setAllWorkersConfValue(String key, String value) { for (TajoWorker eachWorker: tajoWorkers) { eachWorker.getConfig().set(key, value); } diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java index 96c51fef1b..372c5958d0 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestSelectQuery.java @@ -21,17 +21,20 @@ import org.apache.tajo.IntegrationTest; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoProtos.QueryState; import org.apache.tajo.TajoTestingCluster; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.client.QueryStatus; +import org.apache.tajo.engine.utils.test.ErrorInjectionRewriter; +import org.apache.tajo.jdbc.TajoResultSet; import org.junit.Test; import org.junit.experimental.categories.Category; import java.sql.ResultSet; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; @Category(IntegrationTest.class) public class TestSelectQuery extends QueryTestCaseBase { @@ -303,6 +306,7 @@ public final void testSelectWithJson() throws Exception { cleanupQuery(res); } + @Test public final void testDatabaseRef() throws Exception { if (!testingCluster.isHCatalogStoreRunning()) { executeString("CREATE DATABASE \"TestSelectQuery\"").close(); @@ -345,4 +349,42 @@ public final void testSumFloatOverflow() throws Exception { assertResultSet(res); cleanupQuery(res); } + + @Test + public final void testQueryMasterTaskInitError() throws Exception { + // In this testcase we can check that a TajoClient receives QueryMasterTask's init error message. + testingCluster.setAllWorkersConfValue("tajo.plan.rewriter.classes", + ErrorInjectionRewriter.class.getCanonicalName()); + + try { + // If client can't receive error status, thread runs forever. + Thread t = new Thread() { + public void run() { + try { + TajoResultSet res = (TajoResultSet) client.executeQueryAndGetResult("select l_orderkey from lineitem"); + QueryStatus status = client.getQueryStatus(res.getQueryId()); + assertEquals(QueryState.QUERY_ERROR, status.getState()); + assertEquals(NullPointerException.class.getName(), status.getErrorMessage()); + cleanupQuery(res); + } catch (Exception e) { + fail(e.getMessage()); + } + } + }; + + t.start(); + + for (int i = 0; i < 10; i++) { + Thread.sleep(1 * 1000); + if (!t.isAlive()) { + break; + } + } + + // If query runs more than 10 secs, test is fail. + assertFalse(t.isAlive()); + } finally { + testingCluster.setAllWorkersConfValue("tajo.plan.rewriter.classes", ""); + } + } } \ No newline at end of file