From ceea28ceb797c6707f8b911d521da070fc820316 Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 26 Jan 2015 11:05:17 +0900 Subject: [PATCH 1/5] TAJO-1312: Stage causes Invalid event error: SQ_SHUFFLE_REPORT at KILLED. --- .../org/apache/tajo/querymaster/Stage.java | 2 + .../tajo/worker/TajoWorkerClientService.java | 4 +- .../tajo/querymaster/TestKillQuery.java | 51 ++++++++++++++++++- 3 files changed, 54 insertions(+), 3 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 13394f8db1..208d4a6240 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -257,6 +257,8 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_START, StageEventType.SQ_KILL, StageEventType.SQ_CONTAINER_ALLOCATED, + StageEventType.SQ_SHUFFLE_REPORT, + StageEventType.SQ_STAGE_COMPLETED, StageEventType.SQ_FAILED)) // Transitions from FAILED state diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 2ae4beda16..0b815d897b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -118,8 +118,8 @@ public GetQueryHistoryResponse getQueryHistory(RpcController controller, QueryId try { QueryId queryId = new QueryId(request.getQueryId()); - QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId); - QueryHistory queryHistory = null; + QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); + QueryHistory queryHistory; if (queryMasterTask == null) { queryHistory = workerContext.getHistoryReader().getQueryHistory(queryId.toString()); } else { diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index bd899cd847..cd36a88393 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -18,6 +18,7 @@ package org.apache.tajo.querymaster; +import com.google.common.collect.Lists; import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; import org.apache.tajo.benchmark.TPCH; @@ -29,18 +30,23 @@ import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.master.event.QueryEvent; import org.apache.tajo.master.event.QueryEventType; -import org.apache.tajo.session.Session; +import org.apache.tajo.master.event.StageEvent; +import org.apache.tajo.master.event.StageEventType; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.session.Session; +import org.apache.tajo.worker.TajoWorker; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; import java.io.File; import java.io.IOException; +import java.util.List; import static org.junit.Assert.*; @@ -122,4 +128,47 @@ public final void testKillQueryFromInitState() throws Exception { } queryMasterTask.stop(); } + + @Test + public final void testStageIgnoreState() throws Exception { + String queryStr = "select count(*) from lineitem"; + ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); + QueryId queryId = new QueryId(res.getQueryId()); + cluster.waitForQueryRunning(queryId); + + QueryMasterTask qmt = null; + for (TajoWorker worker : cluster.getTajoWorkers()) { + qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId); + if (qmt != null) { + break; + } + } + Query query = qmt.getQuery(); + client.killQuery(queryId); + + List stages = Lists.newArrayList(query.getStages()); + Stage lastStage = stages.get(stages.size() - 1); + + cluster.waitForQueryState(qmt.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); + + assertEquals(StageState.KILLED, lastStage.getSynchronizedState()); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_START, + new StageEvent(lastStage.getId(), StageEventType.SQ_START)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_KILL, + new StageEvent(lastStage.getId(), StageEventType.SQ_KILL)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_CONTAINER_ALLOCATED, + new StageEvent(lastStage.getId(), StageEventType.SQ_CONTAINER_ALLOCATED)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_SHUFFLE_REPORT, + new StageEvent(lastStage.getId(), StageEventType.SQ_SHUFFLE_REPORT)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_STAGE_COMPLETED, + new StageEvent(lastStage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + + lastStage.getStateMachine().doTransition(StageEventType.SQ_FAILED, + new StageEvent(lastStage.getId(), StageEventType.SQ_FAILED)); + } } From 2d93b82d41132131bae8a7fd7968799dfdffbeaa Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 26 Jan 2015 11:56:43 +0900 Subject: [PATCH 2/5] TAJO-1312: fix faillure test case --- .../org/apache/tajo/querymaster/TestKillQuery.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index cd36a88393..05eacb2c79 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -131,21 +131,21 @@ public final void testKillQueryFromInitState() throws Exception { @Test public final void testStageIgnoreState() throws Exception { - String queryStr = "select count(*) from lineitem"; + String queryStr = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); QueryId queryId = new QueryId(res.getQueryId()); - cluster.waitForQueryRunning(queryId); + cluster.waitForQueryRunning(queryId, 10); + client.killQuery(queryId); QueryMasterTask qmt = null; for (TajoWorker worker : cluster.getTajoWorkers()) { - qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId); + qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true); if (qmt != null) { break; } } - Query query = qmt.getQuery(); - client.killQuery(queryId); + Query query = qmt.getQuery(); List stages = Lists.newArrayList(query.getStages()); Stage lastStage = stages.get(stages.size() - 1); From 423c2fcb8de22887997114cab2662819798f7090 Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 26 Jan 2015 16:17:41 +0900 Subject: [PATCH 3/5] fix status updating bug from heartbeat in unit test case --- .../apache/tajo/master/QueryInProgress.java | 1 + .../org/apache/tajo/TajoTestingCluster.java | 32 ++++++++++------ .../master/scheduler/TestFifoScheduler.java | 8 ++-- .../tajo/querymaster/TestKillQuery.java | 37 +++++++++++-------- 4 files changed, 47 insertions(+), 31 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index 7e2c05f0e6..e7371ddd49 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -168,6 +168,7 @@ public synchronized void submmitQueryToMaster() { queryMasterRpcClient.executeQuery(null, builder.build(), NullCallback.get()); querySubmitted.set(true); + getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_MASTER_LAUNCHED); } catch (Exception e) { LOG.error(e.getMessage(), e); } diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index e548b81f41..04dc5abab8 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -43,13 +43,13 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider; -import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.TajoMaster; +import org.apache.tajo.master.rm.TajoWorkerResourceManager; +import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider; import org.apache.tajo.querymaster.Query; +import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; -import org.apache.tajo.master.rm.TajoWorkerResourceManager; -import org.apache.tajo.plan.rewrite.LogicalPlanTestRuleProvider; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; @@ -776,21 +776,20 @@ public void setAllWorkersConfValue(String key, String value) { } } - public void waitForQueryRunning(QueryId queryId) throws Exception { - waitForQueryRunning(queryId, 50); + public void waitForQuerySubmitted(QueryId queryId) throws Exception { + waitForQuerySubmitted(queryId, 50); } - public void waitForQueryRunning(QueryId queryId, int delay) throws Exception { - QueryInProgress qip = null; + public void waitForQuerySubmitted(QueryId queryId, int delay) throws Exception { + QueryMasterTask qmt = null; int i = 0; - while (qip == null || TajoClientUtil.isQueryWaitingForSchedule(qip.getQueryInfo().getQueryState())) { + while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) { try { Thread.sleep(delay); - if(qip == null){ - TajoMaster master = getMaster(); - qip = master.getContext().getQueryJobManager().getQueryInProgress(queryId); + if (qmt == null) { + qmt = getQueryMasterTask(queryId); } } catch (InterruptedException e) { } @@ -826,4 +825,15 @@ public void waitForStageState(Stage stage, StageState expected, int delay) throw } } } + + public QueryMasterTask getQueryMasterTask(QueryId queryId) { + QueryMasterTask qmt = null; + for (TajoWorker worker : getTajoWorkers()) { + qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true); + if (qmt != null && queryId.equals(qmt.getQueryId())) { + break; + } + } + return qmt; + } } diff --git a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java index e0c30a8bd9..0a8a51c4d5 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/scheduler/TestFifoScheduler.java @@ -68,7 +68,7 @@ public final void testKillScheduledQuery() throws Exception { QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); - cluster.waitForQueryRunning(queryId); + cluster.waitForQuerySubmitted(queryId); client.killQuery(queryId2); assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState()); } @@ -82,7 +82,7 @@ public final void testForwardedQuery() throws Exception { QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); - cluster.waitForQueryRunning(queryId); + cluster.waitForQuerySubmitted(queryId); assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2); @@ -101,9 +101,9 @@ public final void testScheduledQuery() throws Exception { QueryId queryId3 = new QueryId(res3.getQueryId()); QueryId queryId4 = new QueryId(res4.getQueryId()); - cluster.waitForQueryRunning(queryId); + cluster.waitForQuerySubmitted(queryId); - assertTrue(TajoClientUtil.isQueryRunning(client.getQueryStatus(queryId).getState())); + assertFalse(TajoClientUtil.isQueryComplete(client.getQueryStatus(queryId).getState())); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId2).getState()); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState()); diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index 05eacb2c79..28a2811b6c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -39,7 +39,6 @@ import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.session.Session; -import org.apache.tajo.worker.TajoWorker; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -54,6 +53,9 @@ public class TestKillQuery { private static TajoTestingCluster cluster; private static TajoConf conf; private static TajoClient client; + private static String queryStr = "select t1.l_orderkey, t1.l_partkey, t2.c_custkey " + + "from lineitem t1 join customer t2 " + + "on t1.l_orderkey = t2.c_custkey order by t1.l_orderkey"; @BeforeClass public static void setUp() throws Exception { @@ -65,6 +67,11 @@ public static void setUp() throws Exception { client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + "using text location 'file://" + file.getAbsolutePath() + "'"); assertTrue(client.existTable("default.lineitem")); + + file = TPCH.getDataFile("customer"); + client.executeQueryAndGetResult("create external table default.customer (c_custkey int, c_name text) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.customer")); } @AfterClass @@ -79,11 +86,10 @@ public final void testKillQueryFromInitState() throws Exception { QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); Session session = LocalTajoTestingUtility.createDummySession(); CatalogService catalog = cluster.getMaster().getCatalog(); - String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; LogicalPlanner planner = new LogicalPlanner(catalog); LogicalOptimizer optimizer = new LogicalOptimizer(conf); - Expr expr = analyzer.parse(query); + Expr expr = analyzer.parse(queryStr); LogicalPlan plan = planner.createPlan(defaultContext, expr); optimizer.optimize(plan); @@ -131,26 +137,25 @@ public final void testKillQueryFromInitState() throws Exception { @Test public final void testStageIgnoreState() throws Exception { - String queryStr = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; + ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); QueryId queryId = new QueryId(res.getQueryId()); - cluster.waitForQueryRunning(queryId, 10); - client.killQuery(queryId); - - QueryMasterTask qmt = null; - for (TajoWorker worker : cluster.getTajoWorkers()) { - qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true); - if (qmt != null) { - break; - } - } + cluster.waitForQuerySubmitted(queryId); + QueryMasterTask qmt = cluster.getQueryMasterTask(queryId); Query query = qmt.getQuery(); + + query.handle(new QueryEvent(queryId, QueryEventType.KILL)); + + try{ + cluster.waitForQueryState(query, TajoProtos.QueryState.QUERY_KILLED, 50); + } finally { + assertEquals(TajoProtos.QueryState.QUERY_KILLED, query.getSynchronizedState()); + } + List stages = Lists.newArrayList(query.getStages()); Stage lastStage = stages.get(stages.size() - 1); - cluster.waitForQueryState(qmt.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); - assertEquals(StageState.KILLED, lastStage.getSynchronizedState()); lastStage.getStateMachine().doTransition(StageEventType.SQ_START, From 28048fb61125bc1e2d67a0a2bdf479a85db24758 Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 26 Jan 2015 18:08:14 +0900 Subject: [PATCH 4/5] rename to testIgnoreStageStateFromKilled() --- .../test/java/org/apache/tajo/querymaster/TestKillQuery.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index 28a2811b6c..42ad8dab37 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -136,7 +136,7 @@ public final void testKillQueryFromInitState() throws Exception { } @Test - public final void testStageIgnoreState() throws Exception { + public final void testIgnoreStageStateFromKilled() throws Exception { ClientProtos.SubmitQueryResponse res = client.executeQuery(queryStr); QueryId queryId = new QueryId(res.getQueryId()); From 9714811b0abebf0348857537fc704937350c066a Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 26 Jan 2015 19:43:34 +0900 Subject: [PATCH 5/5] add ignoring task event for KILLED -> KILLED --- .../src/main/java/org/apache/tajo/querymaster/Task.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index 1c6a9a3635..ad01b622b0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -172,7 +172,10 @@ public class Task implements EventHandler { // Ignore-able transitions .addTransition(TaskState.KILLED, TaskState.KILLED, EnumSet.of( - TaskEventType.T_KILL, TaskEventType.T_ATTEMPT_SUCCEEDED, TaskEventType.T_ATTEMPT_FAILED)) + TaskEventType.T_KILL, + TaskEventType.T_SCHEDULE, + TaskEventType.T_ATTEMPT_SUCCEEDED, + TaskEventType.T_ATTEMPT_FAILED)) .installTopology();