From 3b5cc25a5926c0752db76f9fc9cffbbf4dacbad1 Mon Sep 17 00:00:00 2001 From: jhkim Date: Fri, 12 Sep 2014 20:07:59 +0900 Subject: [PATCH] TAJO-1037: KillQuery hang in subquery init state --- .../tajo/master/querymaster/QueryMaster.java | 6 +- .../tajo/master/querymaster/SubQuery.java | 23 ++-- .../org/apache/tajo/TajoTestingCluster.java | 38 +++++- .../master/querymaster/TestKillQuery.java | 108 ++++++++++++++++++ 4 files changed, 159 insertions(+), 16 deletions(-) create mode 100644 tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 4af929e9c9..b54675c847 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -34,8 +34,6 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol.ExecutionBlockReport; -import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; import org.apache.tajo.master.TajoAsyncDispatcher; import org.apache.tajo.master.event.QueryStartEvent; import org.apache.tajo.rpc.CallFuture; @@ -47,7 +45,6 @@ import org.apache.tajo.storage.StorageManagerFactory; import org.apache.tajo.util.HAServiceUtil; import org.apache.tajo.util.NetUtils; -import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.worker.TajoWorker; import java.util.ArrayList; @@ -57,7 +54,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat; @@ -404,6 +400,8 @@ public EventHandler getEventHandler() { public void stopQuery(QueryId queryId) { QueryMasterTask queryMasterTask; queryMasterTask = queryMasterTasks.remove(queryId); + if(queryMasterTask == null) return; + finishedQueryMasterTasks.put(queryId, queryMasterTask); if(queryMasterTask != null) { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 9aec9bac68..850ffe841f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -43,7 +43,6 @@ import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.json.CoreGsonHelper; -import org.apache.tajo.engine.plan.proto.PlanProto; import org.apache.tajo.engine.planner.PlannerUtil; import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; @@ -138,7 +137,7 @@ public class SubQuery implements EventHandler { SubQueryEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) .addTransition(SubQueryState.INITED, SubQueryState.KILL_WAIT, - SubQueryEventType.SQ_KILL) + SubQueryEventType.SQ_KILL, new KillTasksTransition()) .addTransition(SubQueryState.INITED, SubQueryState.ERROR, SubQueryEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) @@ -175,7 +174,7 @@ public class SubQuery implements EventHandler { SubQueryEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT, - EnumSet.of(SubQueryEventType.SQ_KILL)) + EnumSet.of(SubQueryEventType.SQ_KILL), new KillTasksTransition()) .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT, SubQueryEventType.SQ_TASK_COMPLETED, TASK_COMPLETED_TRANSITION) @@ -668,13 +667,14 @@ public void run() { LOG.info(subQuery.totalScheduledObjectsCount + " objects are scheduled"); if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks - subQuery.stopScheduler(); - subQuery.finalizeStats(); - subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.SUCCEEDED)); + subQuery.complete(); } else { - subQuery.taskScheduler.start(); - allocateContainers(subQuery); - + if(subQuery.getSynchronizedState() == SubQueryState.INITED) { + subQuery.taskScheduler.start(); + allocateContainers(subQuery); + } else { + subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL)); + } } } catch (Throwable e) { LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e); @@ -1116,7 +1116,10 @@ private static class KillTasksTransition implements SingleArcTransition 100) { + if (++i > 200) { throw new IOException("Timed out waiting for query to start"); } } } + public void waitForQueryState(Query query, TajoProtos.QueryState expected, int delay) throws Exception { + int i = 0; + while (query == null || query.getSynchronizedState() != expected) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + } + if (++i > 200) { + throw new IOException("Timed out waiting"); + } + } + } + + public void waitForSubQueryState(SubQuery subQuery, SubQueryState expected, int delay) throws Exception { + + int i = 0; + while (subQuery == null || subQuery.getSynchronizedState() != expected) { + try { + Thread.sleep(delay); + } catch (InterruptedException e) { + } + if (++i > 200) { + throw new IOException("Timed out waiting"); + } + } + } + public QueryMasterTask getQueryMasterTask(QueryId queryId) { QueryMasterTask qmt = null; for (TajoWorker worker : getTajoWorkers()) { diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java new file mode 100644 index 0000000000..9eebfcdf68 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.querymaster; + +import org.apache.tajo.*; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.engine.parser.SQLAnalyzer; +import org.apache.tajo.engine.planner.LogicalOptimizer; +import org.apache.tajo.engine.planner.LogicalPlan; +import org.apache.tajo.engine.planner.LogicalPlanner; +import org.apache.tajo.engine.planner.global.GlobalPlanner; +import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.master.event.QueryEvent; +import org.apache.tajo.master.event.QueryEventType; +import org.apache.tajo.master.session.Session; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +@Category(IntegrationTest.class) +public class TestKillQuery { + private static TajoTestingCluster cluster; + private static TajoConf conf; + + @BeforeClass + public static void setUp() throws Exception { + cluster = TpchTestBase.getInstance().getTestingCluster(); + conf = cluster.getConfiguration(); + } + + @Test + public final void testKillQueryFromInitState() throws Exception { + SQLAnalyzer analyzer = new SQLAnalyzer(); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); + Session session = LocalTajoTestingUtility.createDummySession(); + CatalogService catalog = cluster.getMaster().getCatalog(); + String query = "select l_orderkey from lineitem group by l_orderkey"; + + LogicalPlanner planner = new LogicalPlanner(catalog); + LogicalOptimizer optimizer = new LogicalOptimizer(conf); + Expr expr = analyzer.parse(query); + LogicalPlan plan = planner.createPlan(defaultContext, expr); + + optimizer.optimize(plan); + + QueryId queryId = QueryIdFactory.newQueryId(System.currentTimeMillis(), 0); + QueryContext queryContext = new QueryContext(conf); + MasterPlan masterPlan = new MasterPlan(queryId, queryContext, plan); + GlobalPlanner globalPlanner = new GlobalPlanner(conf, catalog); + globalPlanner.build(masterPlan); + + QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); + QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), + queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson()); + + queryMasterTask.init(conf); + queryMasterTask.getQueryTaskContext().getDispatcher().start(); + queryMasterTask.startQuery(); + + try{ + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_RUNNING, 2); + } finally { + assertEquals(TajoProtos.QueryState.QUERY_RUNNING, queryMasterTask.getQuery().getSynchronizedState()); + } + + SubQuery subQuery = queryMasterTask.getQuery().getSubQueries().iterator().next(); + assertNotNull(subQuery); + + try{ + cluster.waitForSubQueryState(subQuery, SubQueryState.INITED, 2); + } finally { + assertEquals(SubQueryState.INITED, subQuery.getSynchronizedState()); + } + + // fire kill event + Query q = queryMasterTask.getQuery(); + q.handle(new QueryEvent(queryId, QueryEventType.KILL)); + + try{ + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 10); + } finally { + assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); + } + queryMasterTask.stop(); + } +}