From b90387d6894493ebbfdfd949e1cd28924cfccdfa Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 9 Jan 2015 03:08:37 +0900 Subject: [PATCH 1/3] TAJO-1282: Cleanup the relationship of QueryInProgress and QueryJobManager. --- .../apache/tajo/master/QueryJobManager.java | 59 +++++++++--------- .../master/scheduler/SimpleFifoScheduler.java | 3 +- .../tajo/querymaster/QueryInProgress.java | 60 +++---------------- 3 files changed, 41 insertions(+), 81 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java index c9b8711272..6a8da27da3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java @@ -28,22 +28,25 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.scheduler.SimpleFifoScheduler; +import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.querymaster.QueryInProgress; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.session.Session; -import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.master.scheduler.SimpleFifoScheduler; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +/** + * QueryJobManager manages all scheduled and running queries. + * It receives all Query related events and routes them to each QueryInProgress. + */ public class QueryJobManager extends CompositeService { private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName()); @@ -69,7 +72,7 @@ public QueryJobManager(final TajoMaster.MasterContext masterContext) { } @Override - public void init(Configuration conf) { + public void serviceInit(Configuration conf) throws Exception { try { this.dispatcher = new AsyncDispatcher(); addService(this.dispatcher); @@ -81,24 +84,24 @@ public void init(Configuration conf) { catchException(null, e); } - super.init(conf); + super.serviceInit(conf); } @Override - public void stop() { + public void serviceStop() throws Exception { synchronized(runningQueries) { for(QueryInProgress eachQueryInProgress: runningQueries.values()) { - eachQueryInProgress.stop(); + eachQueryInProgress.stopProgress(); } } this.scheduler.stop(); - super.stop(); + super.serviceStop(); } @Override - public void start() { + public void serviceStart() throws Exception { this.scheduler.start(); - super.start(); + super.serviceStart(); } public EventHandler getEventHandler() { @@ -164,39 +167,42 @@ public QueryInfo startQueryJob(QueryId queryId) throws Exception { runningQueries.put(queryInProgress.getQueryId(), queryInProgress); } - addService(queryInProgress); - queryInProgress.init(getConfig()); - queryInProgress.start(); - - if (!queryInProgress.startQueryMaster()) { + if (queryInProgress.startQueryMaster()) { + dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, + queryInProgress.getQueryInfo())); + } else { stopQuery(queryId); } return queryInProgress.getQueryInfo(); } - public TajoMaster.MasterContext getMasterContext() { - return masterContext; - } - class QueryJobManagerEventHandler implements EventHandler { + @Override public void handle(QueryJobEvent event) { QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId()); - if(queryInProgress == null) { + + + if (queryInProgress == null) { LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); return; } - if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { + + if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { + queryInProgress.submmitQueryToMaster(); + + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { stopQuery(event.getQueryInfo().getQueryId()); - } else if (queryInProgress.isStarted()) { - queryInProgress.getEventHandler().handle(event); + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { scheduler.removeQuery(queryInProgress.getQueryId()); - queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); - + queryInProgress.kill(); stopQuery(queryInProgress.getQueryId()); + + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) { + queryInProgress.heartbeat(event.getQueryInfo()); } } } @@ -219,7 +225,7 @@ public void stopQuery(QueryId queryId) { LOG.info("Stop QueryInProgress:" + queryId); QueryInProgress queryInProgress = getQueryInProgress(queryId); if(queryInProgress != null) { - queryInProgress.stop(); + queryInProgress.stopProgress(); synchronized(submittedQueries) { submittedQueries.remove(queryId); } @@ -245,7 +251,6 @@ public void stopQuery(QueryId queryId) { avgExecutionTime.set(executionTime); } executedQuerySize.incrementAndGet(); - removeService(queryInProgress); } else { LOG.warn("No QueryInProgress while query stopping: " + queryId); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java index bd8ca2829b..a091ed5b3c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java @@ -58,7 +58,8 @@ public boolean addQuery(QueryInProgress queryInProgress) { LOG.info("Size of Fifo queue is " + qSize); } - QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime()); + QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, + queryInProgress.getQueryInfo().getStartTime()); boolean result = pool.add(querySchedulingInfo); if (getRunningQueries().size() == 0) wakeupProcessor(); return result; diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java index bda2ec1438..a05748b019 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java @@ -20,11 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.engine.query.QueryContext; @@ -35,12 +31,12 @@ import org.apache.tajo.master.QueryInfo; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.rm.WorkerResourceManager; -import org.apache.tajo.session.Session; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; import java.net.InetSocketAddress; @@ -48,15 +44,13 @@ import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource; -public class QueryInProgress extends CompositeService { +public class QueryInProgress { private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName()); private QueryId queryId; private Session session; - private AsyncDispatcher dispatcher; - private LogicalRootNode plan; private AtomicBoolean querySubmitted = new AtomicBoolean(false); @@ -76,7 +70,7 @@ public QueryInProgress( Session session, QueryContext queryContext, QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) { - super(QueryInProgress.class.getName()); + this.masterContext = masterContext; this.session = session; this.queryId = queryId; @@ -86,23 +80,14 @@ public QueryInProgress( queryInfo.setStartTime(System.currentTimeMillis()); } - @Override - public void init(Configuration conf) { - dispatcher = new AsyncDispatcher(); - this.addService(dispatcher); - - dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler()); - super.init(conf); - } - public synchronized void kill() { + getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); if(queryMasterRpcClient != null){ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); } } - @Override - public void stop() { + public void stopProgress() { if(stopped.getAndSet(true)) { return; } @@ -142,20 +127,8 @@ public void stop() { } masterContext.getHistoryWriter().appendHistory(queryInfo); - super.stop(); - } - - @Override - public void start() { - super.start(); } - public EventHandler getEventHandler() { - return dispatcher.getEventHandler(); - } - - - public boolean startQueryMaster() { try { LOG.info("Initializing QueryInProgress for QueryID=" + queryId); @@ -173,8 +146,6 @@ public boolean startQueryMaster() { queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort()); queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort()); - getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo)); - return true; } catch (Exception e) { catchException(e); @@ -182,23 +153,6 @@ public boolean startQueryMaster() { } } - class QueryInProgressEventHandler implements EventHandler { - @Override - public void handle(QueryJobEvent queryJobEvent) { - if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) { - heartbeat(queryJobEvent.getQueryInfo()); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { - QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId); - queryInProgress.getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo())); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) { - submmitQueryToMaster(); - } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { - kill(); - } - } - } - private void connectQueryMaster() throws Exception { InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); LOG.info("Connect to QueryMaster:" + addr); @@ -207,7 +161,7 @@ private void connectQueryMaster() throws Exception { queryMasterRpcClient = queryMasterRpc.getStub(); } - private synchronized void submmitQueryToMaster() { + public synchronized void submmitQueryToMaster() { if(querySubmitted.get()) { return; } @@ -256,7 +210,7 @@ public boolean isStarted() { return !stopped.get() && this.querySubmitted.get(); } - private void heartbeat(QueryInfo queryInfo) { + public void heartbeat(QueryInfo queryInfo) { LOG.info("Received QueryMaster heartbeat:" + queryInfo); // to avoid partial update by different heartbeats From 3b5901c4dfbe28641464e68eed97961f49d0d012 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 9 Jan 2015 03:15:03 +0900 Subject: [PATCH 2/3] Simplify events in QueryJobEvent. - Remove the following event types in QueryJobEvent: * QUERY_JOB_START * QUERY_JOB_FINISH * QUERY_MASTER_START * QUERY_MASTER_STOP --- .../main/java/org/apache/tajo/querymaster/QueryJobEvent.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java index 1a1f2ff2ee..27eb2b6c49 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java @@ -35,12 +35,9 @@ public QueryInfo getQueryInfo() { } public enum Type { - QUERY_JOB_START, + QUERY_MASTER_START, QUERY_JOB_HEARTBEAT, - QUERY_JOB_FINISH, QUERY_JOB_STOP, - QUERY_MASTER_START, - QUERY_MASTER_STOP, QUERY_JOB_KILL } } From e9617ec8406422fb493c8cfa37c514cee29f4240 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 9 Jan 2015 12:36:51 +0900 Subject: [PATCH 3/3] Remove unnecessary methods and rename ResourceManager::stopQueryMaster to releaseQueryMaster. --- .../apache/tajo/master/TajoMasterService.java | 7 --- .../master/rm/TajoWorkerResourceManager.java | 2 +- .../tajo/master/rm/WorkerResourceManager.java | 2 +- .../tajo/querymaster/QueryInProgress.java | 27 +---------- .../apache/tajo/querymaster/QueryMaster.java | 2 +- .../tajo/querymaster/QueryMasterTask.java | 47 ++----------------- .../src/main/proto/TajoMasterProtocol.proto | 1 - .../tajo/querymaster/TestKillQuery.java | 2 +- 8 files changed, 8 insertions(+), 82 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java index a7df206ba8..02bdfa192c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java @@ -135,13 +135,6 @@ public void releaseWorkerResource(RpcController controller, done.run(BOOL_TRUE); } - @Override - public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request, - RpcCallback done) { - context.getQueryJobManager().stopQuery(new QueryId(request)); - done.run(BOOL_TRUE); - } - @Override public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request, RpcCallback done) { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index c4200d5a4d..9f2a3d5862 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -526,7 +526,7 @@ public boolean isQueryMasterStopped(QueryId queryId) { } @Override - public void stopQueryMaster(QueryId queryId) { + public void releaseQueryMaster(QueryId queryId) { if(!rmContext.getQueryMasterContainer().containsKey(queryId)) { LOG.warn("No QueryMaster resource info for " + queryId); return; diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java index b237cc5628..79ec0ac133 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java @@ -80,7 +80,7 @@ public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationR * * @param queryId QueryId to be stopped */ - public void stopQueryMaster(QueryId queryId); + public void releaseQueryMaster(QueryId queryId); /** * diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java index a05748b019..f83f2449b0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java @@ -95,32 +95,7 @@ public void stopProgress() { LOG.info("========================================================="); LOG.info("Stop query:" + queryId); - masterContext.getResourceManager().stopQueryMaster(queryId); - - long startTime = System.currentTimeMillis(); - while(true) { - try { - if(masterContext.getResourceManager().isQueryMasterStopped(queryId)) { - LOG.info(queryId + " QueryMaster stopped"); - break; - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - break; - } - - try { - synchronized (this){ - wait(100); - } - } catch (InterruptedException e) { - break; - } - if(System.currentTimeMillis() - startTime > 60 * 1000) { - LOG.warn("Failed to stop QueryMaster:" + queryId); - break; - } - } + masterContext.getResourceManager().releaseQueryMaster(queryId); if(queryMasterRpc != null) { RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 76df3977a4..02760a3e5e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -473,7 +473,7 @@ private class QueryStartEventHandler implements EventHandler { public void handle(QueryStartEvent event) { LOG.info("Start QueryStartEventHandler:" + event.getQueryId()); QueryMasterTask queryMasterTask = new QueryMasterTask(queryMasterContext, - event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr(), event.getLogicalPlanJson()); + event.getQueryId(), event.getSession(), event.getQueryContext(), event.getJsonExpr()); synchronized(queryMasterTasks) { queryMasterTasks.put(event.getQueryId(), queryMasterTask); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index bab5903e91..fd52488a43 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -41,13 +41,10 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.UnimplementedException; -import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; -import org.apache.tajo.session.Session; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -58,8 +55,7 @@ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.session.Session; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageProperty; import org.apache.tajo.storage.StorageUtil; @@ -96,12 +92,8 @@ public class QueryMasterTask extends CompositeService { private Query query; - private MasterPlan masterPlan; - private String jsonExpr; - private String logicalPlanJson; - private AsyncDispatcher dispatcher; private final long querySubmitTime; @@ -124,8 +116,7 @@ public class QueryMasterTask extends CompositeService { new ArrayList(); public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, - QueryId queryId, Session session, QueryContext queryContext, String jsonExpr, - String logicalPlanJson) { + QueryId queryId, Session session, QueryContext queryContext, String jsonExpr) { super(QueryMasterTask.class.getName()); this.queryMasterContext = queryMasterContext; @@ -133,7 +124,6 @@ public QueryMasterTask(QueryMaster.QueryMasterContext queryMasterContext, this.session = session; this.queryContext = queryContext; this.jsonExpr = jsonExpr; - this.logicalPlanJson = logicalPlanJson; this.querySubmitTime = System.currentTimeMillis(); } @@ -198,42 +188,11 @@ public void stop() { LOG.fatal(t.getMessage(), t); } - RpcConnectionPool connPool = RpcConnectionPool.getPool(queryMasterContext.getConf()); - NettyClientBase tmClient = null; - try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } - - super.stop(); - - //TODO change report to tajo master if (queryMetrics != null) { queryMetrics.report(new MetricsConsoleReporter()); } + super.stop(); LOG.info("Stopped QueryMasterTask:" + queryId); } diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto index cc83e47267..bc73596e28 100644 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto @@ -143,6 +143,5 @@ service TajoMasterProtocolService { rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse); rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse); rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto); - rpc stopQueryMaster(QueryIdProto) returns (BoolProto); rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest); } \ No newline at end of file diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index a125196d14..bd899cd847 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -90,7 +90,7 @@ public final void testKillQueryFromInitState() throws Exception { QueryMaster qm = cluster.getTajoWorkers().get(0).getWorkerContext().getQueryMaster(); QueryMasterTask queryMasterTask = new QueryMasterTask(qm.getContext(), - queryId, session, defaultContext, expr.toJson(), plan.getRootBlock().getRoot().toJson()); + queryId, session, defaultContext, expr.toJson()); queryMasterTask.init(conf); queryMasterTask.getQueryTaskContext().getDispatcher().start();