From 747aa7bea774195a1500d805dccb31be7866fc4e Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 6 Jan 2015 12:24:59 +0900 Subject: [PATCH 01/13] TAJO-1279: Cleanup TajoAsyncDispatcher and interrupt stop events. --- .../tajo/master/TajoAsyncDispatcher.java | 232 ------------------ .../tajo/master/event/QueryStopEvent.java | 47 ++++ .../master/querymaster/QueryInProgress.java | 13 +- .../master/querymaster/QueryJobEvent.java | 1 + .../master/querymaster/QueryJobManager.java | 16 +- .../tajo/master/querymaster/QueryMaster.java | 17 +- .../master/querymaster/QueryMasterTask.java | 11 +- .../org/apache/tajo/TajoTestingCluster.java | 2 - 8 files changed, 82 insertions(+), 257 deletions(-) delete mode 100644 tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java deleted file mode 100644 index 751b21bd23..0000000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoAsyncDispatcher.java +++ /dev/null @@ -1,232 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.ShutdownHookManager; -import org.apache.hadoop.yarn.event.Dispatcher; -import org.apache.hadoop.yarn.event.Event; -import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -public class TajoAsyncDispatcher extends AbstractService implements Dispatcher { - - private static final Log LOG = LogFactory.getLog(TajoAsyncDispatcher.class); - - private final BlockingQueue eventQueue; - private volatile boolean stopped = false; - - private Thread eventHandlingThread; - protected final Map, EventHandler> eventDispatchers; - private boolean exitOnDispatchException; - - private String id; - - public TajoAsyncDispatcher(String id) { - this(id, new LinkedBlockingQueue()); - } - - public TajoAsyncDispatcher(String id, BlockingQueue eventQueue) { - super(TajoAsyncDispatcher.class.getName()); - this.id = id; - this.eventQueue = eventQueue; - this.eventDispatchers = new HashMap, EventHandler>(); - } - - Runnable createThread() { - return new Runnable() { - @Override - public void run() { - while (!stopped && !Thread.currentThread().isInterrupted()) { - Event event; - try { - event = eventQueue.take(); - if(LOG.isDebugEnabled()) { - LOG.debug(id + ",event take:" + event.getType() + "," + event); - } - } catch(InterruptedException ie) { - if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted"); - } - return; - } - dispatch(event); - } - } - }; - } - - @Override - public synchronized void init(Configuration conf) { - this.exitOnDispatchException = - conf.getBoolean(Dispatcher.DISPATCHER_EXIT_ON_ERROR_KEY, - Dispatcher.DEFAULT_DISPATCHER_EXIT_ON_ERROR); - super.init(conf); - } - - @Override - public void start() { - //start all the components - super.start(); - eventHandlingThread = new Thread(createThread()); - eventHandlingThread.setName("AsyncDispatcher event handler"); - eventHandlingThread.start(); - - LOG.info("AsyncDispatcher started:" + id); - } - - @Override - public synchronized void stop() { - if(stopped) { - return; - } - stopped = true; - if (eventHandlingThread != null) { - eventHandlingThread.interrupt(); - try { - eventHandlingThread.join(); - } catch (InterruptedException ie) { - LOG.warn("Interrupted Exception while stopping"); - } - } - - // stop all the components - super.stop(); - - LOG.info("AsyncDispatcher stopped:" + id); - } - - @SuppressWarnings("unchecked") - protected void dispatch(Event event) { - //all events go thru this loop - if (LOG.isDebugEnabled()) { - LOG.debug("Dispatching the event " + event.getClass().getName() + "." - + event.toString()); - } - Class type = event.getType().getDeclaringClass(); - - try{ - EventHandler handler = eventDispatchers.get(type); - if(handler != null) { - handler.handle(event); - } else { - throw new Exception("No handler for registered for " + type); - } - } catch (Throwable t) { - //TODO Maybe log the state of the queue - LOG.fatal("Error in dispatcher thread:" + event.getType(), t); - if (exitOnDispatchException && (ShutdownHookManager.get().isShutdownInProgress()) == false) { - LOG.info("Exiting, bye.."); - System.exit(-1); - } - } finally { - } - } - - @SuppressWarnings("unchecked") - @Override - public void register(Class eventType, - EventHandler handler) { - /* check to see if we have a listener registered */ - EventHandler registeredHandler = (EventHandler) - eventDispatchers.get(eventType); - LOG.debug("Registering " + eventType + " for " + handler.getClass()); - if (registeredHandler == null) { - eventDispatchers.put(eventType, handler); - } else if (!(registeredHandler instanceof MultiListenerHandler)){ - /* for multiple listeners of an event add the multiple listener handler */ - MultiListenerHandler multiHandler = new MultiListenerHandler(); - multiHandler.addHandler(registeredHandler); - multiHandler.addHandler(handler); - eventDispatchers.put(eventType, multiHandler); - } else { - /* already a multilistener, just add to it */ - MultiListenerHandler multiHandler - = (MultiListenerHandler) registeredHandler; - multiHandler.addHandler(handler); - } - } - - @Override - public EventHandler getEventHandler() { - return new GenericEventHandler(); - } - - class GenericEventHandler implements EventHandler { - public void handle(Event event) { - /* all this method does is enqueue all the events onto the queue */ - int qSize = eventQueue.size(); - if (qSize !=0 && qSize %1000 == 0) { - LOG.info("Size of event-queue is " + qSize); - } - int remCapacity = eventQueue.remainingCapacity(); - if (remCapacity < 1000) { - LOG.warn("Very low remaining capacity in the event-queue: " - + remCapacity); - } - try { - if(LOG.isDebugEnabled()) { - LOG.debug(id + ",add event:" + - event.getType() + "," + event + "," + - (eventHandlingThread == null ? "null" : eventHandlingThread.isAlive())); - } - eventQueue.put(event); - } catch (InterruptedException e) { - if (!stopped) { - LOG.warn("AsyncDispatcher thread interrupted", e); - } - throw new YarnRuntimeException(e); - } - } - } - - /** - * Multiplexing an event. Sending it to different handlers that - * are interested in the event. - */ - static class MultiListenerHandler implements EventHandler { - List> listofHandlers; - - public MultiListenerHandler() { - listofHandlers = new ArrayList>(); - } - - @Override - public void handle(Event event) { - for (EventHandler handler: listofHandlers) { - handler.handle(event); - } - } - - void addHandler(EventHandler handler) { - listofHandlers.add(handler); - } - - } -} diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java new file mode 100644 index 0000000000..6d57d4a92c --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryStopEvent.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.hadoop.yarn.event.AbstractEvent; +import org.apache.tajo.QueryId; + +/** + * This event is conveyed to QueryMaster. + */ +public class QueryStopEvent extends AbstractEvent { + public enum EventType { + QUERY_STOP + } + + private final QueryId queryId; + + public QueryStopEvent(QueryId queryId) { + super(EventType.QUERY_STOP); + this.queryId = queryId; + } + + public QueryId getQueryId() { + return queryId; + } + + @Override + public String toString() { + return getClass().getName() + "," + getType() + "," + queryId; + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index ca0bd72c6d..ff785e4700 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -23,16 +23,15 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto; -import org.apache.tajo.master.TajoAsyncDispatcher; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.master.session.Session; @@ -55,7 +54,7 @@ public class QueryInProgress extends CompositeService { private Session session; - private TajoAsyncDispatcher dispatcher; + private AsyncDispatcher dispatcher; private LogicalRootNode plan; @@ -88,7 +87,7 @@ public QueryInProgress( @Override public void init(Configuration conf) { - dispatcher = new TajoAsyncDispatcher("QueryInProgress:" + queryId); + dispatcher = new AsyncDispatcher(); this.addService(dispatcher); dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler()); @@ -194,7 +193,8 @@ public void handle(QueryJobEvent queryJobEvent) { } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) { submmitQueryToMaster(); } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) { - stop(); + masterContext.getQueryJobManager().getEventHandler().handle( + new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, queryJobEvent.getQueryInfo())); } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { kill(); } @@ -289,7 +289,8 @@ private void heartbeat(QueryInfo queryInfo) { if (isFinishState(this.queryInfo.getQueryState())) { - stop(); + masterContext.getQueryJobManager().getEventHandler().handle( + new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, this.queryInfo)); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java index 811de1be39..ce30ec7cd3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobEvent.java @@ -37,6 +37,7 @@ public enum Type { QUERY_JOB_START, QUERY_JOB_HEARTBEAT, QUERY_JOB_FINISH, + QUERY_JOB_STOP, QUERY_MASTER_START, QUERY_MASTER_STOP, QUERY_JOB_KILL diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index 34a0d013d4..20357f472a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -40,7 +40,6 @@ import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -187,15 +186,16 @@ public void handle(QueryJobEvent event) { LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); return; } - if(queryInProgress.isStarted()){ + + if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { + queryInProgress.stop(); + } else if (queryInProgress.isStarted()) { queryInProgress.getEventHandler().handle(event); - } else { - if(event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL){ - scheduler.removeQuery(queryInProgress.getQueryId()); - queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { + scheduler.removeQuery(queryInProgress.getQueryId()); + queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); - stopQuery(queryInProgress.getQueryId()); - } + stopQuery(queryInProgress.getQueryId()); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 7623026cdb..641de78e94 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -24,6 +24,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; @@ -35,8 +36,8 @@ import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.TajoAsyncDispatcher; import org.apache.tajo.master.event.QueryStartEvent; +import org.apache.tajo.master.event.QueryStopEvent; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; @@ -66,7 +67,7 @@ public class QueryMaster extends CompositeService implements EventHandler { private Clock clock; - private TajoAsyncDispatcher dispatcher; + private AsyncDispatcher dispatcher; private GlobalPlanner globalPlanner; @@ -110,12 +111,13 @@ public void init(Configuration conf) { clock = new SystemClock(); - this.dispatcher = new TajoAsyncDispatcher("querymaster_" + System.currentTimeMillis()); + this.dispatcher = new AsyncDispatcher(); addIfService(dispatcher); globalPlanner = new GlobalPlanner(systemConf, workerContext); dispatcher.register(QueryStartEvent.EventType.class, new QueryStartEventHandler()); + dispatcher.register(QueryStopEvent.EventType.class, new QueryStopEventHandler()); } catch (Throwable t) { LOG.error(t.getMessage(), t); @@ -360,7 +362,7 @@ public ExecutorService getEventExecutor(){ return eventExecutor; } - public TajoAsyncDispatcher getDispatcher() { + public AsyncDispatcher getDispatcher() { return dispatcher; } @@ -491,6 +493,13 @@ public void handle(QueryStartEvent event) { } } + private class QueryStopEventHandler implements EventHandler { + @Override + public void handle(QueryStopEvent event) { + queryMasterContext.stopQuery(event.getQueryId()); + } + } + class QueryHeartbeatThread extends Thread { public QueryHeartbeatThread() { super("QueryHeartbeatThread"); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 720d60a49e..7c8a1db0b3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -28,6 +28,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.event.AsyncDispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.util.Clock; import org.apache.tajo.*; @@ -51,7 +52,6 @@ import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.TajoAsyncDispatcher; import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; @@ -104,7 +104,7 @@ public class QueryMasterTask extends CompositeService { private String logicalPlanJson; - private TajoAsyncDispatcher dispatcher; + private AsyncDispatcher dispatcher; private final long querySubmitTime; @@ -154,7 +154,7 @@ public void init(Configuration conf) { } addService(resourceAllocator); - dispatcher = new TajoAsyncDispatcher(queryId.toString()); + dispatcher = new AsyncDispatcher(); addService(dispatcher); dispatcher.register(StageEventType.class, new StageEventDispatcher()); @@ -339,7 +339,8 @@ public void handle(QueryMasterQueryCompletedEvent event) { } } LOG.info("Query final state: " + query.getSynchronizedState()); - queryMasterContext.stopQuery(queryId); + + queryMasterContext.getEventHandler().handle(new QueryStopEvent(queryId)); } } @@ -620,7 +621,7 @@ public synchronized EventHandler getEventHandler() { return eventHandler; } - public TajoAsyncDispatcher getDispatcher() { + public AsyncDispatcher getDispatcher() { return dispatcher; } diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 841be45077..b461d6a382 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -164,8 +164,6 @@ void initPropertiesAndConfigs() { if (!StringUtils.isEmpty(LOG_LEVEL)) { Level defaultLevel = Logger.getRootLogger().getLevel(); Logger.getLogger("org.apache.tajo").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); - Logger.getLogger("org.apache.tajo.master.TajoAsyncDispatcher").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), - defaultLevel)); Logger.getLogger("org.apache.hadoop").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); Logger.getLogger("org.apache.zookeeper").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); Logger.getLogger("BlockStateChange").setLevel(Level.toLevel(LOG_LEVEL.toUpperCase(), defaultLevel)); From 39b20b6f5a2d13a2d55b44f403b8d7b1e0f78afb Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 6 Jan 2015 15:37:54 +0900 Subject: [PATCH 02/13] stopped checking --- .../org/apache/tajo/master/querymaster/QueryInProgress.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index ff785e4700..7fa3a4003b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -255,7 +255,7 @@ public QueryInfo getQueryInfo() { } public boolean isStarted() { - return this.querySubmitted.get(); + return !stopped.get() && this.querySubmitted.get(); } private void heartbeat(QueryInfo queryInfo) { From 44c8970722da4ff04c6ec4d2ed4ad7d0e79ad79f Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 6 Jan 2015 16:50:01 +0900 Subject: [PATCH 03/13] remove duplicated event --- .../org/apache/tajo/master/querymaster/QueryInProgress.java | 3 --- .../org/apache/tajo/master/querymaster/QueryMasterTask.java | 3 --- 2 files changed, 6 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index 7fa3a4003b..0a87990fb2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -192,9 +192,6 @@ public void handle(QueryJobEvent queryJobEvent) { new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo())); } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) { submmitQueryToMaster(); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_FINISH) { - masterContext.getQueryJobManager().getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_STOP, queryJobEvent.getQueryInfo())); } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { kill(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 7c8a1db0b3..ab7ff934e4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -225,9 +225,6 @@ public void stop() { tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), TajoMasterProtocol.class, true); } - - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); - masterClientService.stopQueryMaster(null, queryId.getProto(), future); } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { From cc37fe1f7ec84bfe8e62b5101ca90ea27de16192 Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 6 Jan 2015 17:22:10 +0900 Subject: [PATCH 04/13] missing codes --- .../master/querymaster/QueryMasterTask.java | 27 +++++++------------ 1 file changed, 10 insertions(+), 17 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index ab7ff934e4..02026095a6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -38,24 +38,25 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; -import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; -import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.plan.logical.LogicalNode; -import org.apache.tajo.plan.logical.NodeType; -import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.UnimplementedException; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.master.session.Session; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.apache.tajo.plan.logical.LogicalNode; +import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.plan.logical.NodeType; +import org.apache.tajo.plan.logical.ScanNode; +import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; @@ -63,7 +64,6 @@ import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageProperty; import org.apache.tajo.storage.StorageUtil; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.metrics.TajoMetrics; import org.apache.tajo.util.metrics.reporter.MetricsConsoleReporter; import org.apache.tajo.worker.AbstractResourceAllocator; @@ -71,7 +71,6 @@ import java.io.IOException; import java.util.*; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; @@ -231,12 +230,6 @@ public void stop() { connPool.releaseConnection(tmClient); } - try { - future.get(3, TimeUnit.SECONDS); - } catch (Throwable t) { - LOG.warn(t); - } - super.stop(); //TODO change report to tajo master From c508290e12b70ab90d14a81666c84e43468e913d Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 6 Jan 2015 17:23:48 +0900 Subject: [PATCH 05/13] remove unused code --- .../org/apache/tajo/master/querymaster/QueryMasterTask.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 02026095a6..9c789a544f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -58,7 +58,6 @@ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; -import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.storage.StorageManager; @@ -199,8 +198,6 @@ public void stop() { LOG.fatal(t.getMessage(), t); } - CallFuture future = new CallFuture(); - RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf()); NettyClientBase tmClient = null; try { From af58792a6281a463b456e1d0d5bc54d441e02c5b Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 6 Jan 2015 18:20:10 +0900 Subject: [PATCH 06/13] fix the query hang --- .../org/apache/tajo/master/querymaster/QueryJobManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index 20357f472a..13f64562ab 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -188,7 +188,7 @@ public void handle(QueryJobEvent event) { } if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { - queryInProgress.stop(); + stopQuery(event.getQueryInfo().getQueryId()); } else if (queryInProgress.isStarted()) { queryInProgress.getEventHandler().handle(event); } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { From 47bab38d5e02b66d7c6c550ba5d49ac1d64d290c Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 6 Jan 2015 19:19:35 +0900 Subject: [PATCH 07/13] protect a travis stalled build --- .../src/test/java/org/apache/tajo/QueryTestCaseBase.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 875e4507a4..22f1f7b4ed 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -201,6 +201,12 @@ public static void tearDownClass() throws ServiceException { client.close(); } + @Before + public void printTestName() { + /* protect a travis stalled build */ + System.out.println("Run: " + name); + } + public QueryTestCaseBase() { // hive 0.12 does not support quoted identifier. // So, we use lower case database names when Tajo uses HCatalogStore. From 01df32ec05cbfdf49ab629dc671e102f8cb81e33 Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 6 Jan 2015 22:19:40 +0900 Subject: [PATCH 08/13] cleanup the failure test case --- .../java/org/apache/tajo/benchmark/TPCH.java | 22 +++++++++++ .../tajo/master/DefaultTaskScheduler.java | 6 ++- .../org/apache/tajo/QueryTestCaseBase.java | 2 +- .../org/apache/tajo/TajoTestingCluster.java | 6 ++- .../java/org/apache/tajo/TpchTestBase.java | 8 +--- .../master/querymaster/TestKillQuery.java | 37 ++++++++++++++----- .../tajo/scheduler/TestFifoScheduler.java | 34 +++++++++++------ 7 files changed, 82 insertions(+), 33 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java index f4b4d6ae86..e2ea25c6aa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java +++ b/tajo-core/src/main/java/org/apache/tajo/benchmark/TPCH.java @@ -18,6 +18,7 @@ package org.apache.tajo.benchmark; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.protobuf.ServiceException; import org.apache.commons.logging.Log; @@ -33,8 +34,10 @@ import org.apache.tajo.common.TajoDataTypes.Type; import org.apache.tajo.storage.StorageConstants; +import java.io.File; import java.io.IOException; import java.sql.SQLException; +import java.util.List; import java.util.Map; public class TPCH extends BenchmarkSet { @@ -225,4 +228,23 @@ public void loadTable(String tableName) throws ServiceException { throw new ServiceException(s); } } + + public static List getDataFilePaths(String... tables) { + List tablePaths = Lists.newArrayList(); + File file; + for (String table : tables) { + file = getDataFile(table); + tablePaths.add(file.getAbsolutePath()); + } + return tablePaths; + } + + public static File getDataFile(String table) { + File file = new File("src/test/tpch/" + table + ".tbl"); + if (!file.exists()) { + file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + table + + ".tbl"); + } + return file; + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index 1cd6587c53..d47c93a782 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -145,8 +145,10 @@ public void stop() { } // Return all of request callbacks instantly. - for (TaskRequestEvent req : taskRequests.taskRequestQueue) { - req.getCallback().run(stopTaskRunnerReq); + if(taskRequests != null){ + for (TaskRequestEvent req : taskRequests.taskRequestQueue) { + req.getCallback().run(stopTaskRunnerReq); + } } LOG.info("Task Scheduler stopped"); diff --git a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java index 22f1f7b4ed..1605560ffc 100644 --- a/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/QueryTestCaseBase.java @@ -204,7 +204,7 @@ public static void tearDownClass() throws ServiceException { @Before public void printTestName() { /* protect a travis stalled build */ - System.out.println("Run: " + name); + System.out.println("Run: " + name.getMethodName()); } public QueryTestCaseBase() { diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index b461d6a382..0d2f6fa04e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -628,8 +628,10 @@ public void shutdownMiniCluster() throws IOException { this.clusterTestBuildDir = null; } - hbaseUtil.stopZooKeeperCluster(); - hbaseUtil.stopHBaseCluster(); + if(hbaseUtil != null) { + hbaseUtil.stopZooKeeperCluster(); + hbaseUtil.stopHBaseCluster(); + } LOG.info("Minicluster is down"); } diff --git a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java index 0f713e5d36..055dd02d0c 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java +++ b/tajo-core/src/test/java/org/apache/tajo/TpchTestBase.java @@ -22,10 +22,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.benchmark.TPCH; -import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.catalog.Schema; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; import java.io.File; import java.io.IOException; @@ -73,11 +73,7 @@ private TpchTestBase() throws IOException { tables = new String[names.length][]; File file; for (int i = 0; i < names.length; i++) { - file = new File("src/test/tpch/" + names[i] + ".tbl"); - if(!file.exists()) { - file = new File(System.getProperty("user.dir") + "/tajo-core/src/test/tpch/" + names[i] - + ".tbl"); - } + file = TPCH.getDataFile(names[i]); tables[i] = FileUtil.readTextFile(file).split("\n"); paths[i] = file.getAbsolutePath(); } diff --git a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java index c1f41780b2..8ca4cfff49 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/querymaster/TestKillQuery.java @@ -20,34 +20,51 @@ import org.apache.tajo.*; import org.apache.tajo.algebra.Expr; +import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.catalog.CatalogService; +import org.apache.tajo.client.TajoClient; +import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.parser.SQLAnalyzer; -import org.apache.tajo.plan.LogicalOptimizer; -import org.apache.tajo.plan.LogicalPlan; -import org.apache.tajo.plan.LogicalPlanner; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.event.QueryEvent; import org.apache.tajo.master.event.QueryEventType; import org.apache.tajo.master.session.Session; +import org.apache.tajo.plan.LogicalOptimizer; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.LogicalPlanner; +import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.experimental.categories.Category; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; +import java.io.File; +import java.io.IOException; + +import static org.junit.Assert.*; -@Category(IntegrationTest.class) public class TestKillQuery { private static TajoTestingCluster cluster; private static TajoConf conf; + private static TajoClient client; @BeforeClass public static void setUp() throws Exception { - cluster = TpchTestBase.getInstance().getTestingCluster(); + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); conf = cluster.getConfiguration(); + client = new TajoClientImpl(cluster.getConfiguration()); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); + } + + @AfterClass + public static void tearDown() throws IOException { + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); } @Test @@ -56,7 +73,7 @@ public final void testKillQueryFromInitState() throws Exception { QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(conf); Session session = LocalTajoTestingUtility.createDummySession(); CatalogService catalog = cluster.getMaster().getCatalog(); - String query = "select l_orderkey from lineitem group by l_orderkey"; + String query = "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; LogicalPlanner planner = new LogicalPlanner(catalog); LogicalOptimizer optimizer = new LogicalOptimizer(conf); @@ -99,7 +116,7 @@ public final void testKillQueryFromInitState() throws Exception { q.handle(new QueryEvent(queryId, QueryEventType.KILL)); try{ - cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 10); + cluster.waitForQueryState(queryMasterTask.getQuery(), TajoProtos.QueryState.QUERY_KILLED, 50); } finally { assertEquals(TajoProtos.QueryState.QUERY_KILLED, queryMasterTask.getQuery().getSynchronizedState()); } diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java index 18764c2d5e..e08f47c290 100644 --- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java +++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java @@ -18,7 +18,10 @@ package org.apache.tajo.scheduler; -import org.apache.tajo.*; +import org.apache.tajo.QueryId; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.client.TajoClientUtil; @@ -27,34 +30,41 @@ import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import org.junit.experimental.categories.Category; +import java.io.File; import java.sql.ResultSet; import static org.junit.Assert.*; -@Category(IntegrationTest.class) public class TestFifoScheduler { private static TajoTestingCluster cluster; private static TajoConf conf; private static TajoClient client; + private static String query = + "select l_orderkey, l_partkey from lineitem group by l_orderkey, l_partkey order by l_orderkey"; @BeforeClass public static void setUp() throws Exception { - cluster = TpchTestBase.getInstance().getTestingCluster(); + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); conf = cluster.getConfiguration(); - client = new TajoClientImpl(conf); + client = new TajoClientImpl(cluster.getConfiguration()); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); } @AfterClass public static void tearDown() throws Exception { - client.close(); + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); } @Test public final void testKillScheduledQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); @@ -67,7 +77,7 @@ public final void testKillScheduledQuery() throws Exception { @Test public final void testForwardedQuery() throws Exception { - ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res = client.executeQuery(query); ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select * from lineitem limit 1"); assertTrue(res.getIsForwarded()); assertFalse(res2.getIsForwarded()); @@ -86,9 +96,9 @@ public final void testForwardedQuery() throws Exception { @Test public final void testScheduledQuery() throws Exception { ClientProtos.SubmitQueryResponse res = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res2 = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res3 = client.executeQuery("select sleep(1) from lineitem"); - ClientProtos.SubmitQueryResponse res4 = client.executeQuery("select sleep(1) from lineitem"); + ClientProtos.SubmitQueryResponse res2 = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res3 = client.executeQuery(query); + ClientProtos.SubmitQueryResponse res4 = client.executeQuery(query); QueryId queryId = new QueryId(res.getQueryId()); QueryId queryId2 = new QueryId(res2.getQueryId()); From 092f4ac2887c7471c5f57589e71fb2421161908c Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 6 Jan 2015 22:32:15 +0900 Subject: [PATCH 09/13] add closing in some unit test --- .../org/apache/tajo/cli/tsql/TestTajoCli.java | 4 +-- .../org/apache/tajo/jdbc/TestTajoJdbc.java | 29 ++++++++++++++----- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java index b14bfa9519..aff16774c7 100644 --- a/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java +++ b/tajo-core/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java @@ -321,14 +321,14 @@ public void testShowMasters() throws Exception { setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); ByteArrayOutputStream out = new ByteArrayOutputStream(); - tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out); + TajoCli tajoCli = new TajoCli(tajoConf, new String[]{}, System.in, out); tajoCli.executeMetaCommand("\\admin -showmasters"); String consoleResult = new String(out.toByteArray()); String masterAddress = tajoCli.getContext().getConf().getVar(TajoConf.ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS); String host = masterAddress.split(":")[0]; - + tajoCli.close(); assertEquals(consoleResult, host + "\n"); } diff --git a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java index 99baeba9c5..1c763e2d8f 100644 --- a/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java +++ b/tajo-core/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java @@ -26,7 +26,6 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.client.QueryClient; -import org.apache.tajo.conf.TajoConf; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -34,11 +33,13 @@ import java.net.InetSocketAddress; import java.sql.*; -import java.util.*; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import static org.apache.tajo.TajoConstants.DEFAULT_DATABASE_NAME; import static org.junit.Assert.*; -import static org.junit.Assert.assertEquals; @Category(IntegrationTest.class) public class TestTajoJdbc extends QueryTestCaseBase { @@ -113,6 +114,9 @@ public void testStatement() throws Exception { if (stmt != null) { stmt.close(); } + if (conn != null) { + conn.close(); + } } } @@ -194,6 +198,9 @@ public void testPreparedStatement() throws Exception { if (stmt != null) { stmt.close(); } + if (conn != null) { + conn.close(); + } } } @@ -494,11 +501,11 @@ public void testCreateTableWithDateAndTimestamp() throws Exception { int result; Statement stmt = null; ResultSet res = null; - + Connection conn = null; try { String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), DEFAULT_DATABASE_NAME); - Connection conn = DriverManager.getConnection(connUri); + conn = DriverManager.getConnection(connUri); assertTrue(conn.isValid(100)); stmt = conn.createStatement(); @@ -532,6 +539,10 @@ public void testCreateTableWithDateAndTimestamp() throws Exception { if (stmt != null) { stmt.close(); } + + if(conn != null) { + conn.close(); + } } } @@ -539,11 +550,11 @@ public void testCreateTableWithDateAndTimestamp() throws Exception { public void testSortWithDateTime() throws Exception { Statement stmt = null; ResultSet res = null; + Connection conn = null; int result; // skip this test if catalog uses HCatalogStore. // It is because HCatalogStore does not support Time data type. - try { if (!testingCluster.isHCatalogStoreRunning()) { executeDDL("create_table_with_date_ddl.sql", "table1"); @@ -551,7 +562,7 @@ public void testSortWithDateTime() throws Exception { String connUri = buildConnectionUri(tajoMasterAddress.getHostName(), tajoMasterAddress.getPort(), "TestTajoJdbc"); - Connection conn = DriverManager.getConnection(connUri); + conn = DriverManager.getConnection(connUri); assertTrue(conn.isValid(100)); stmt = conn.createStatement(); @@ -576,6 +587,10 @@ public void testSortWithDateTime() throws Exception { if (stmt != null) { stmt.close(); } + + if(conn != null) { + conn.close(); + } } } } \ No newline at end of file From b2a5483c29ad7b7bc22cf3c5191adb4f8fdaf540 Mon Sep 17 00:00:00 2001 From: jhkim Date: Wed, 7 Jan 2015 00:28:43 +0900 Subject: [PATCH 10/13] decrease default hbase handler for testing --- .../src/test/java/org/apache/tajo/HBaseTestClusterUtil.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java index fe2e2818b3..ccf6c0fb37 100644 --- a/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java @@ -101,6 +101,12 @@ public void startHBaseCluster() throws Exception { conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); } conf.setBoolean(REPLICATION_ENABLE_KEY, false); + conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 5); + conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5); + conf.setInt(HConstants.REGION_SERVER_META_HANDLER_COUNT, 2); + conf.setInt(HConstants.MASTER_HANDLER_COUNT, 5); + conf.setInt("hbase.hconnection.threads.max", 5); + conf.setInt("hbase.hconnection.threads.core", 5); createRootDir(); Configuration c = HBaseConfiguration.create(this.conf); From 8f662edaacdb1aff91cd5f7b6decc216ec40d4aa Mon Sep 17 00:00:00 2001 From: jhkim Date: Wed, 7 Jan 2015 16:12:27 +0900 Subject: [PATCH 11/13] cleanup test case & fix test failure --- .../apache/tajo/master/querymaster/Stage.java | 8 ++-- .../org/apache/tajo/worker/TestHistory.java | 44 +++++++++++-------- 2 files changed, 29 insertions(+), 23 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java index e421417ad2..0515e721c6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Stage.java @@ -736,16 +736,16 @@ public StageState transition(final Stage stage, StageEvent stageEvent) { stage.finalizeStats(); state = StageState.SUCCEEDED; } else { + ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); + DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId()); + setShuffleIfNecessary(stage, channel); + initTaskScheduler(stage); // execute pre-processing asyncronously stage.getContext().getQueryMasterContext().getEventExecutor() .submit(new Runnable() { @Override public void run() { try { - ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); - DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId()); - setShuffleIfNecessary(stage, channel); - initTaskScheduler(stage); schedule(stage); stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum(); LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled"); diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java index d320077cea..77aa1d41e2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestHistory.java @@ -20,19 +20,20 @@ import com.google.protobuf.ServiceException; import org.apache.hadoop.service.Service; -import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TajoProtos; import org.apache.tajo.TajoTestingCluster; -import org.apache.tajo.TpchTestBase; +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.benchmark.TPCH; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.querymaster.QueryInfo; -import org.junit.After; -import org.junit.Before; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; +import java.io.File; import java.io.IOException; import java.util.Collection; import java.util.Map; @@ -40,29 +41,34 @@ import static org.junit.Assert.*; public class TestHistory { - private TajoTestingCluster cluster; - private TajoMaster master; - private TajoConf conf; - private TajoClient client; - - @Before - public void setUp() throws Exception { - cluster = TpchTestBase.getInstance().getTestingCluster(); + private static TajoTestingCluster cluster; + private static TajoMaster master; + private static TajoConf conf; + private static TajoClient client; + + @BeforeClass + public static void setUp() throws Exception { + cluster = new TajoTestingCluster(); + cluster.startMiniClusterInLocal(1); master = cluster.getMaster(); conf = cluster.getConfiguration(); - client = new TajoClientImpl(conf); + client = new TajoClientImpl(cluster.getConfiguration()); + File file = TPCH.getDataFile("lineitem"); + client.executeQueryAndGetResult("create external table default.lineitem (l_orderkey int, l_partkey int) " + + "using text location 'file://" + file.getAbsolutePath() + "'"); + assertTrue(client.existTable("default.lineitem")); } - @After - public void tearDown() { - client.close(); + @AfterClass + public static void tearDown() throws IOException { + if (client != null) client.close(); + if (cluster != null) cluster.shutdownMiniCluster(); } - @Test public final void testTaskRunnerHistory() throws IOException, ServiceException, InterruptedException { int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size(); - client.executeQueryAndGetResult("select sleep(1) from lineitem"); + client.executeQueryAndGetResult("select count(*) from lineitem"); Collection finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); assertTrue(finishedQueries.size() > beforeFinishedQueriesCount); @@ -89,7 +95,7 @@ public final void testTaskRunnerHistory() throws IOException, ServiceException, @Test public final void testTaskHistory() throws IOException, ServiceException, InterruptedException { int beforeFinishedQueriesCount = master.getContext().getQueryJobManager().getFinishedQueries().size(); - client.executeQueryAndGetResult("select sleep(1) from lineitem"); + client.executeQueryAndGetResult("select count(*) from lineitem"); Collection finishedQueries = master.getContext().getQueryJobManager().getFinishedQueries(); assertTrue(finishedQueries.size() > beforeFinishedQueriesCount); From 7581a1722207f09ddc711c84cf750d7091234f75 Mon Sep 17 00:00:00 2001 From: jhkim Date: Wed, 7 Jan 2015 19:47:21 +0900 Subject: [PATCH 12/13] remove unnecessary cleanup codes --- .../org/apache/tajo/scheduler/TestFifoScheduler.java | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java index e08f47c290..acd6b71740 100644 --- a/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java +++ b/tajo-core/src/test/java/org/apache/tajo/scheduler/TestFifoScheduler.java @@ -71,8 +71,6 @@ public final void testKillScheduledQuery() throws Exception { cluster.waitForQueryRunning(queryId); client.killQuery(queryId2); assertEquals(TajoProtos.QueryState.QUERY_KILLED, client.getQueryStatus(queryId2).getState()); - - client.killQuery(queryId); // cleanup } @Test @@ -89,8 +87,6 @@ public final void testForwardedQuery() throws Exception { assertEquals(TajoProtos.QueryState.QUERY_SUCCEEDED, client.getQueryStatus(queryId2).getState()); ResultSet resSet = TajoClientUtil.createResultSet(conf, client, res2); assertNotNull(resSet); - - client.killQuery(queryId); //cleanup } @Test @@ -113,9 +109,8 @@ public final void testScheduledQuery() throws Exception { assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId3).getState()); assertEquals(TajoProtos.QueryState.QUERY_MASTER_INIT, client.getQueryStatus(queryId4).getState()); - client.killQuery(queryId2); - client.killQuery(queryId3); client.killQuery(queryId4); - client.killQuery(queryId); + client.killQuery(queryId3); + client.killQuery(queryId2); } } From 6565514148ef2d554bb4768d551349c584588357 Mon Sep 17 00:00:00 2001 From: jhkim Date: Thu, 8 Jan 2015 14:39:43 +0900 Subject: [PATCH 13/13] remove changes in HBaseTestClusterUtil --- .../src/test/java/org/apache/tajo/HBaseTestClusterUtil.java | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java index ccf6c0fb37..fe2e2818b3 100644 --- a/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java +++ b/tajo-core/src/test/java/org/apache/tajo/HBaseTestClusterUtil.java @@ -101,12 +101,6 @@ public void startHBaseCluster() throws Exception { conf.setInt(ServerManager.WAIT_ON_REGIONSERVERS_MAXTOSTART, 1); } conf.setBoolean(REPLICATION_ENABLE_KEY, false); - conf.setInt(HConstants.HBASE_CLIENT_IPC_POOL_SIZE, 5); - conf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 5); - conf.setInt(HConstants.REGION_SERVER_META_HANDLER_COUNT, 2); - conf.setInt(HConstants.MASTER_HANDLER_COUNT, 5); - conf.setInt("hbase.hconnection.threads.max", 5); - conf.setInt("hbase.hconnection.threads.core", 5); createRootDir(); Configuration c = HBaseConfiguration.create(this.conf);