From 13d3e631763be1e68e35d3436eccf37aac3263db Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=ED=98=95=EC=A4=80?= Date: Thu, 12 Jun 2014 23:48:49 +0900 Subject: [PATCH 1/2] TAJO-873: Query status is still RUNNING after session expired. --- .../tajo/master/querymaster/QueryMaster.java | 18 ++-- .../master/querymaster/QueryMasterTask.java | 5 +- .../java/org/apache/tajo/worker/Task.java | 24 ++++- .../master/querymaster/TestQueryMaster.java | 100 ++++++++++++++++++ .../apache/tajo/rpc/RpcErrorController.java | 61 +++++++++++ 5 files changed, 194 insertions(+), 14 deletions(-) create mode 100644 tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryMaster.java create mode 100644 tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcErrorController.java diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index f173c249ad..e44e8a5b97 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -34,6 +34,8 @@ import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TajoAsyncDispatcher; +import org.apache.tajo.master.event.QueryEvent; +import org.apache.tajo.master.event.QueryEventType; import org.apache.tajo.master.event.QueryStartEvent; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; @@ -59,8 +61,6 @@ public class QueryMaster extends CompositeService implements EventHandler { private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName()); - private int querySessionTimeout; - private Clock clock; private TajoAsyncDispatcher dispatcher; @@ -100,7 +100,6 @@ public void init(Configuration conf) { this.systemConf = (TajoConf)conf; this.connPool = RpcConnectionPool.getPool(systemConf); - querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); queryMasterContext = new QueryMasterContext(systemConf); clock = new SystemClock(); @@ -311,6 +310,7 @@ public EventHandler getEventHandler() { } public void stopQuery(QueryId queryId) { + LOG.info("Stop Query: " + queryId); QueryMasterTask queryMasterTask; queryMasterTask = queryMasterTasks.remove(queryId); finishedQueryMasterTasks.put(queryId, queryMasterTask); @@ -336,10 +336,7 @@ public void stopQuery(QueryId queryId) { try { queryMasterTask.stop(); - //if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE") - // && !workerContext.isYarnContainerMode()) { - cleanup(queryId); // TODO We will support yarn mode - //} + cleanup(queryId); // TODO We will support yarn mode } catch (Exception e) { LOG.error(e.getMessage(), e); } @@ -448,6 +445,8 @@ public void run() { tempTasks.addAll(queryMasterTasks.values()); } + int querySessionTimeout = systemConf.getIntVar(TajoConf.ConfVars.QUERY_SESSION_TIMEOUT); + for(QueryMasterTask eachTask: tempTasks) { if(!eachTask.isStopped()) { try { @@ -455,7 +454,10 @@ public void run() { long time = System.currentTimeMillis() - lastHeartbeat; if(lastHeartbeat > 0 && time > querySessionTimeout * 1000) { LOG.warn("Query " + eachTask.getQueryId() + " stopped cause query sesstion timeout: " + time + " ms"); - eachTask.expiredSessionTimeout(); + Query query = eachTask.getQuery(); + if (query != null) { + query.handle(new QueryEvent(eachTask.getQueryId(), QueryEventType.KILL)); + } } } catch (Exception e) { LOG.error(eachTask.getQueryId() + ":" + e.getMessage(), e); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index ecf2202c4e..abff321170 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -360,6 +360,7 @@ public synchronized void startQuery() { dispatcher.register(QueryEventType.class, query); queryTaskContext.getEventHandler().handle(new QueryEvent(queryId, QueryEventType.START)); + touchSessionTime(); } catch (Throwable t) { LOG.error(t.getMessage(), t); initError = t; @@ -457,7 +458,9 @@ public Query getQuery() { } public void expiredSessionTimeout() { - stop(); + //stop(); + QueryMasterQueryCompletedEvent event = new QueryMasterQueryCompletedEvent(queryId); + getEventHandler().handle(event); } public QueryMasterTaskContext getQueryTaskContext() { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index f06595129d..476047a799 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -26,7 +26,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; -import org.apache.hadoop.util.ReflectionUtils; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos; @@ -50,6 +49,7 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.*; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcChannelFactory; +import org.apache.tajo.rpc.RpcErrorController; import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.storage.TupleComparator; import org.apache.tajo.storage.fragment.FileFragment; @@ -685,6 +685,10 @@ Runnable createReporterThread() { int remainingRetries = MAX_RETRIES; @Override public void run() { + RpcErrorController errorControllers[] = new RpcErrorController[2]; + errorControllers[0] = new RpcErrorController(); + errorControllers[1] = new RpcErrorController(); + while (!stop.get() && !stopped) { try { if(executor != null && context.getProgress() < 1.0f) { @@ -697,15 +701,24 @@ public void run() { try { if (context.isPorgressChanged()) { - masterStub.statusUpdate(null, getReport(), NullCallback.get()); + masterStub.statusUpdate(errorControllers[0], getReport(), NullCallback.get()); } else { - masterStub.ping(null, taskId.getProto(), NullCallback.get()); + masterStub.ping(errorControllers[1], taskId.getProto(), NullCallback.get()); + } + for (int i = 0; i < errorControllers.length; i++) { + if (errorControllers[i].failed()) { + String errorText = errorControllers[i].errorText(); + throw new IOException(errorText); + } } } catch (Throwable t) { LOG.error(t.getMessage(), t); - remainingRetries -=1; + if (t.getMessage() != null && t.getMessage().indexOf("java.nio.channels.ClosedChannelException") >= 0) { + remainingRetries = 0; + } else { + remainingRetries -= 1; + } if (remainingRetries == 0) { - ReflectionUtils.logThreadInfo(LOG, "Communication exception", 0); LOG.warn("Last retry, exiting "); throw new RuntimeException(t); } @@ -715,6 +728,7 @@ public void run() { try { pingThread.wait(PROGRESS_INTERVAL); } catch (InterruptedException e) { + break; } } } diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryMaster.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryMaster.java new file mode 100644 index 0000000000..c028a7b9b6 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestQueryMaster.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.querymaster; + +import org.apache.tajo.IntegrationTest; +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.conf.TajoConf.ConfVars; +import org.apache.tajo.ipc.ClientProtos.SubmitQueryResponse; +import org.apache.tajo.master.rm.Worker; +import org.apache.tajo.worker.TajoWorker; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import static org.junit.Assert.*; + +@Category(IntegrationTest.class) +public class TestQueryMaster extends QueryTestCaseBase { + public TestQueryMaster() { + super(TajoConstants.DEFAULT_DATABASE_NAME); + } + + @Test + public void testQuerySessionTimeout() throws Exception { + // If a query's session is expired, that query should be killed. + + TajoClient tajoClient = null; + try { + // 5 secs + testingCluster.setAllTajoDaemonConfValue(ConfVars.QUERY_SESSION_TIMEOUT.varname, "5"); + + conf = testBase.getTestingCluster().getConfiguration(); + tajoClient = new TajoClient(conf); + + // Run test query + // Sleep for 10 secs. + SubmitQueryResponse queryResponse = tajoClient.executeQuery("select sleep(2) from lineitem"); + + //doesn't heartbeat + tajoClient.close(); + tajoClient = null; + + Thread.sleep(10 * 1000); + + QueryId queryId = new QueryId(queryResponse.getQueryId()); + QueryInProgress qip = testingCluster.getMaster().getContext().getQueryJobManager().getFinishedQuery(queryId); + assertNotNull(qip); + + boolean foundQueryMaster = false; + for(TajoWorker eachWorker: testingCluster.getTajoWorkers()) { + QueryMasterTask queryMasterTask = + eachWorker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, true); + + if (!foundQueryMaster) { + foundQueryMaster = queryMasterTask != null; + } + + if (foundQueryMaster) { + assertEquals(QueryState.QUERY_KILLED, queryMasterTask.getQuery().getState()); + assertNull("Shuold be removed from running QueryMasterTask.", + eachWorker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId, false)); + } + } + + assertTrue(foundQueryMaster); + + Thread.sleep(10 * 1000); + + for(Worker eachWorker: testingCluster.getMaster().getContext().getResourceManager().getWorkers().values()) { + assertEquals(0, eachWorker.getResource().getNumRunningTasks()); + } + } finally { + testingCluster.setAllTajoDaemonConfValue(ConfVars.QUERY_SESSION_TIMEOUT.varname, + ConfVars.QUERY_SESSION_TIMEOUT.defaultVal); + if (tajoClient != null) { + tajoClient.close(); + } + } + + } +} diff --git a/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcErrorController.java b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcErrorController.java new file mode 100644 index 0000000000..d9c8e709af --- /dev/null +++ b/tajo-rpc/src/main/java/org/apache/tajo/rpc/RpcErrorController.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.rpc; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; + +public class RpcErrorController implements RpcController { + private String errorText; + private boolean failed; + + @Override + public void reset() { + } + + @Override + public boolean failed() { + return failed; + } + + @Override + public String errorText() { + return errorText; + } + + @Override + public void startCancel() { + + } + + @Override + public void setFailed(String reason) { + failed = true; + errorText = reason; + } + + @Override + public boolean isCanceled() { + return false; + } + + @Override + public void notifyOnCancel(RpcCallback callback) { + } +} From 066a496224d1fc00a92b810eb2d07670257830cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=EA=B9=80=ED=98=95=EC=A4=80?= Date: Mon, 23 Jun 2014 22:03:37 +0900 Subject: [PATCH 2/2] TAJO-873: Query status is still RUNNING after session expired. Add some comment and remove unused method expiredSessionTimeout() --- .../org/apache/tajo/master/querymaster/QueryMasterTask.java | 6 ------ tajo-core/src/main/java/org/apache/tajo/worker/Task.java | 2 ++ 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index abff321170..478b6a9c55 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -457,12 +457,6 @@ public Query getQuery() { return query; } - public void expiredSessionTimeout() { - //stop(); - QueryMasterQueryCompletedEvent event = new QueryMasterQueryCompletedEvent(queryId); - getEventHandler().handle(event); - } - public QueryMasterTaskContext getQueryTaskContext() { return queryTaskContext; } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 7bd00b6b82..ff9cf57924 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -691,7 +691,9 @@ Runnable createReporterThread() { @Override public void run() { RpcErrorController errorControllers[] = new RpcErrorController[2]; + // for updating status errorControllers[0] = new RpcErrorController(); + // for ping errorControllers[1] = new RpcErrorController(); while (!stop.get() && !stopped) {