From b90387d6894493ebbfdfd949e1cd28924cfccdfa Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 9 Jan 2015 03:08:37 +0900 Subject: [PATCH 1/4] TAJO-1282: Cleanup the relationship of QueryInProgress and QueryJobManager. --- .../apache/tajo/master/QueryJobManager.java | 59 +++++++++--------- .../master/scheduler/SimpleFifoScheduler.java | 3 +- .../tajo/querymaster/QueryInProgress.java | 60 +++---------------- 3 files changed, 41 insertions(+), 81 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java index c9b8711272..6a8da27da3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java @@ -28,22 +28,25 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.master.scheduler.SimpleFifoScheduler; +import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.querymaster.QueryInProgress; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.session.Session; -import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.master.scheduler.SimpleFifoScheduler; import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; +/** + * QueryJobManager manages all scheduled and running queries. + * It receives all Query related events and routes them to each QueryInProgress. + */ public class QueryJobManager extends CompositeService { private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName()); @@ -69,7 +72,7 @@ public QueryJobManager(final TajoMaster.MasterContext masterContext) { } @Override - public void init(Configuration conf) { + public void serviceInit(Configuration conf) throws Exception { try { this.dispatcher = new AsyncDispatcher(); addService(this.dispatcher); @@ -81,24 +84,24 @@ public void init(Configuration conf) { catchException(null, e); } - super.init(conf); + super.serviceInit(conf); } @Override - public void stop() { + public void serviceStop() throws Exception { synchronized(runningQueries) { for(QueryInProgress eachQueryInProgress: runningQueries.values()) { - eachQueryInProgress.stop(); + eachQueryInProgress.stopProgress(); } } this.scheduler.stop(); - super.stop(); + super.serviceStop(); } @Override - public void start() { + public void serviceStart() throws Exception { this.scheduler.start(); - super.start(); + super.serviceStart(); } public EventHandler getEventHandler() { @@ -164,39 +167,42 @@ public QueryInfo startQueryJob(QueryId queryId) throws Exception { runningQueries.put(queryInProgress.getQueryId(), queryInProgress); } - addService(queryInProgress); - queryInProgress.init(getConfig()); - queryInProgress.start(); - - if (!queryInProgress.startQueryMaster()) { + if (queryInProgress.startQueryMaster()) { + dispatcher.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, + queryInProgress.getQueryInfo())); + } else { stopQuery(queryId); } return queryInProgress.getQueryInfo(); } - public TajoMaster.MasterContext getMasterContext() { - return masterContext; - } - class QueryJobManagerEventHandler implements EventHandler { + @Override public void handle(QueryJobEvent event) { QueryInProgress queryInProgress = getQueryInProgress(event.getQueryInfo().getQueryId()); - if(queryInProgress == null) { + + + if (queryInProgress == null) { LOG.warn("No query info in running queries.[" + event.getQueryInfo().getQueryId() + "]"); return; } - if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { + + if (event.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { + queryInProgress.submmitQueryToMaster(); + + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_STOP) { stopQuery(event.getQueryInfo().getQueryId()); - } else if (queryInProgress.isStarted()) { - queryInProgress.getEventHandler().handle(event); + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { scheduler.removeQuery(queryInProgress.getQueryId()); - queryInProgress.getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); - + queryInProgress.kill(); stopQuery(queryInProgress.getQueryId()); + + } else if (event.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) { + queryInProgress.heartbeat(event.getQueryInfo()); } } } @@ -219,7 +225,7 @@ public void stopQuery(QueryId queryId) { LOG.info("Stop QueryInProgress:" + queryId); QueryInProgress queryInProgress = getQueryInProgress(queryId); if(queryInProgress != null) { - queryInProgress.stop(); + queryInProgress.stopProgress(); synchronized(submittedQueries) { submittedQueries.remove(queryId); } @@ -245,7 +251,6 @@ public void stopQuery(QueryId queryId) { avgExecutionTime.set(executionTime); } executedQuerySize.incrementAndGet(); - removeService(queryInProgress); } else { LOG.warn("No QueryInProgress while query stopping: " + queryId); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java index bd8ca2829b..a091ed5b3c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java @@ -58,7 +58,8 @@ public boolean addQuery(QueryInProgress queryInProgress) { LOG.info("Size of Fifo queue is " + qSize); } - QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, queryInProgress.getStartTime()); + QuerySchedulingInfo querySchedulingInfo = new QuerySchedulingInfo(queryInProgress.getQueryId(), 1, + queryInProgress.getQueryInfo().getStartTime()); boolean result = pool.add(querySchedulingInfo); if (getRunningQueries().size() == 0) wakeupProcessor(); return result; diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java index bda2ec1438..a05748b019 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java @@ -20,11 +20,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.yarn.event.AsyncDispatcher; -import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.engine.query.QueryContext; @@ -35,12 +31,12 @@ import org.apache.tajo.master.QueryInfo; import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.rm.WorkerResourceManager; -import org.apache.tajo.session.Session; import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; +import org.apache.tajo.session.Session; import org.apache.tajo.util.NetUtils; import java.net.InetSocketAddress; @@ -48,15 +44,13 @@ import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource; -public class QueryInProgress extends CompositeService { +public class QueryInProgress { private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName()); private QueryId queryId; private Session session; - private AsyncDispatcher dispatcher; - private LogicalRootNode plan; private AtomicBoolean querySubmitted = new AtomicBoolean(false); @@ -76,7 +70,7 @@ public QueryInProgress( Session session, QueryContext queryContext, QueryId queryId, String sql, String jsonExpr, LogicalRootNode plan) { - super(QueryInProgress.class.getName()); + this.masterContext = masterContext; this.session = session; this.queryId = queryId; @@ -86,23 +80,14 @@ public QueryInProgress( queryInfo.setStartTime(System.currentTimeMillis()); } - @Override - public void init(Configuration conf) { - dispatcher = new AsyncDispatcher(); - this.addService(dispatcher); - - dispatcher.register(QueryJobEvent.Type.class, new QueryInProgressEventHandler()); - super.init(conf); - } - public synchronized void kill() { + getQueryInfo().setQueryState(TajoProtos.QueryState.QUERY_KILLED); if(queryMasterRpcClient != null){ queryMasterRpcClient.killQuery(null, queryId.getProto(), NullCallback.get()); } } - @Override - public void stop() { + public void stopProgress() { if(stopped.getAndSet(true)) { return; } @@ -142,20 +127,8 @@ public void stop() { } masterContext.getHistoryWriter().appendHistory(queryInfo); - super.stop(); - } - - @Override - public void start() { - super.start(); } - public EventHandler getEventHandler() { - return dispatcher.getEventHandler(); - } - - - public boolean startQueryMaster() { try { LOG.info("Initializing QueryInProgress for QueryID=" + queryId); @@ -173,8 +146,6 @@ public boolean startQueryMaster() { queryInfo.setQueryMasterclientPort(resource.getConnectionInfo().getClientPort()); queryInfo.setQueryMasterInfoPort(resource.getConnectionInfo().getHttpInfoPort()); - getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_MASTER_START, queryInfo)); - return true; } catch (Exception e) { catchException(e); @@ -182,23 +153,6 @@ public boolean startQueryMaster() { } } - class QueryInProgressEventHandler implements EventHandler { - @Override - public void handle(QueryJobEvent queryJobEvent) { - if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_HEARTBEAT) { - heartbeat(queryJobEvent.getQueryInfo()); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_MASTER_START) { - QueryInProgress queryInProgress = masterContext.getQueryJobManager().getQueryInProgress(queryId); - queryInProgress.getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, queryInProgress.getQueryInfo())); - } else if(queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_START) { - submmitQueryToMaster(); - } else if (queryJobEvent.getType() == QueryJobEvent.Type.QUERY_JOB_KILL) { - kill(); - } - } - } - private void connectQueryMaster() throws Exception { InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); LOG.info("Connect to QueryMaster:" + addr); @@ -207,7 +161,7 @@ private void connectQueryMaster() throws Exception { queryMasterRpcClient = queryMasterRpc.getStub(); } - private synchronized void submmitQueryToMaster() { + public synchronized void submmitQueryToMaster() { if(querySubmitted.get()) { return; } @@ -256,7 +210,7 @@ public boolean isStarted() { return !stopped.get() && this.querySubmitted.get(); } - private void heartbeat(QueryInfo queryInfo) { + public void heartbeat(QueryInfo queryInfo) { LOG.info("Received QueryMaster heartbeat:" + queryInfo); // to avoid partial update by different heartbeats From 3b5901c4dfbe28641464e68eed97961f49d0d012 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 9 Jan 2015 03:15:03 +0900 Subject: [PATCH 2/4] Simplify events in QueryJobEvent. - Remove the following event types in QueryJobEvent: * QUERY_JOB_START * QUERY_JOB_FINISH * QUERY_MASTER_START * QUERY_MASTER_STOP --- .../main/java/org/apache/tajo/querymaster/QueryJobEvent.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java index 1a1f2ff2ee..27eb2b6c49 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryJobEvent.java @@ -35,12 +35,9 @@ public QueryInfo getQueryInfo() { } public enum Type { - QUERY_JOB_START, + QUERY_MASTER_START, QUERY_JOB_HEARTBEAT, - QUERY_JOB_FINISH, QUERY_JOB_STOP, - QUERY_MASTER_START, - QUERY_MASTER_STOP, QUERY_JOB_KILL } } From 21d9ea1c39ab014559247240b6609bdd5db161bb Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 9 Jan 2015 16:39:01 +0900 Subject: [PATCH 3/4] TAJO-1291: Rename TajoMasterProtocol to QueryCoordinatorProtocol. --- tajo-core/pom.xml | 2 +- .../QueryInProgress.java | 8 +- ...QueryJobManager.java => QueryManager.java} | 23 +++-- .../tajo/master/TajoContainerProxy.java | 12 +-- .../org/apache/tajo/master/TajoMaster.java | 16 ++-- .../tajo/master/TajoMasterClientService.java | 9 +- ...e.java => TajoMasterUmbilicalService.java} | 52 +++++----- .../tajo/master/exec/QueryExecutor.java | 4 +- .../tajo/master/rm/TajoResourceTracker.java | 12 +-- .../master/rm/TajoWorkerResourceManager.java | 14 ++- .../tajo/master/rm/WorkerResourceManager.java | 17 ++-- .../tajo/master/scheduler/Scheduler.java | 2 +- .../master/scheduler/SimpleFifoScheduler.java | 8 +- .../apache/tajo/querymaster/QueryMaster.java | 95 +++++-------------- .../tajo/querymaster/QueryMasterTask.java | 10 +- .../org/apache/tajo/querymaster/Stage.java | 31 ++---- .../java/org/apache/tajo/util/JSPUtil.java | 2 +- .../tajo/worker/TajoResourceAllocator.java | 37 ++++---- .../org/apache/tajo/worker/TajoWorker.java | 20 ++-- .../tajo/worker/WorkerHeartbeatService.java | 27 +++--- .../ConnectivityCheckerRuleForTajoWorker.java | 14 +-- ...l.proto => QueryCoordinatorProtocol.proto} | 4 +- .../main/proto/ResourceTrackerProtocol.proto | 2 +- .../main/resources/webapps/admin/index.jsp | 2 +- .../main/resources/webapps/admin/query.jsp | 2 +- .../org/apache/tajo/TajoTestingCluster.java | 2 +- .../master/rm/TestTajoResourceManager.java | 3 +- 27 files changed, 183 insertions(+), 247 deletions(-) rename tajo-core/src/main/java/org/apache/tajo/{querymaster => master}/QueryInProgress.java (97%) rename tajo-core/src/main/java/org/apache/tajo/master/{QueryJobManager.java => QueryManager.java} (92%) rename tajo-core/src/main/java/org/apache/tajo/master/{TajoMasterService.java => TajoMasterUmbilicalService.java} (71%) rename tajo-core/src/main/proto/{TajoMasterProtocol.proto => QueryCoordinatorProtocol.proto} (97%) diff --git a/tajo-core/pom.xml b/tajo-core/pom.xml index a7205dd5b2..05ccf078db 100644 --- a/tajo-core/pom.xml +++ b/tajo-core/pom.xml @@ -166,7 +166,7 @@ src/main/proto/ContainerProtocol.proto src/main/proto/ResourceTrackerProtocol.proto src/main/proto/QueryMasterProtocol.proto - src/main/proto/TajoMasterProtocol.proto + src/main/proto/QueryCoordinatorProtocol.proto src/main/proto/TajoWorkerProtocol.proto src/main/proto/InternalTypes.proto diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java similarity index 97% rename from tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java rename to tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java index a05748b019..a4192b1450 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInProgress.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.tajo.querymaster; +package org.apache.tajo.master; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -24,14 +24,14 @@ import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerAllocatedResource; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.QueryExecutionRequestProto; -import org.apache.tajo.master.QueryInfo; -import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; @@ -42,8 +42,6 @@ import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource; - public class QueryInProgress { private static final Log LOG = LogFactory.getLog(QueryInProgress.class.getName()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java similarity index 92% rename from tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java rename to tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 6a8da27da3..296be04cb1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -30,11 +30,10 @@ import org.apache.tajo.QueryIdFactory; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.scheduler.SimpleFifoScheduler; import org.apache.tajo.plan.logical.LogicalRootNode; -import org.apache.tajo.querymaster.QueryInProgress; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.session.Session; @@ -44,11 +43,11 @@ import java.util.concurrent.atomic.AtomicLong; /** - * QueryJobManager manages all scheduled and running queries. + * QueryManager manages all scheduled and running queries. * It receives all Query related events and routes them to each QueryInProgress. */ -public class QueryJobManager extends CompositeService { - private static final Log LOG = LogFactory.getLog(QueryJobManager.class.getName()); +public class QueryManager extends CompositeService { + private static final Log LOG = LogFactory.getLog(QueryManager.class.getName()); // TajoMaster Context private final TajoMaster.MasterContext masterContext; @@ -66,8 +65,8 @@ public class QueryJobManager extends CompositeService { private AtomicLong avgExecutionTime = new AtomicLong(); private AtomicLong executedQuerySize = new AtomicLong(); - public QueryJobManager(final TajoMaster.MasterContext masterContext) { - super(QueryJobManager.class.getName()); + public QueryManager(final TajoMaster.MasterContext masterContext) { + super(QueryManager.class.getName()); this.masterContext = masterContext; } @@ -139,8 +138,8 @@ public synchronized QueryInfo getFinishedQuery(QueryId queryId) { } } - public QueryInfo createNewQueryJob(Session session, QueryContext queryContext, String sql, - String jsonExpr, LogicalRootNode plan) + public QueryInfo scheduleQuery(Session session, QueryContext queryContext, String sql, + String jsonExpr, LogicalRootNode plan) throws Exception { QueryId queryId = QueryIdFactory.newQueryId(masterContext.getResourceManager().getSeedQueryId()); QueryInProgress queryInProgress = new QueryInProgress(masterContext, session, queryContext, queryId, sql, @@ -279,8 +278,8 @@ private void catchException(QueryId queryId, Exception e) { queryInProgress.catchException(e); } - public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat( - TajoMasterProtocol.TajoHeartbeat queryHeartbeat) { + public synchronized QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand queryHeartbeat( + QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) { QueryInProgress queryInProgress = getQueryInProgress(new QueryId(queryHeartbeat.getQueryId())); if(queryInProgress == null) { return null; @@ -292,7 +291,7 @@ public synchronized TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand que return null; } - private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat queryHeartbeat) { + private QueryInfo makeQueryInfoFromHeartbeat(QueryCoordinatorProtocol.TajoHeartbeat queryHeartbeat) { QueryInfo queryInfo = new QueryInfo(new QueryId(queryHeartbeat.getQueryId())); WorkerConnectionInfo connectionInfo = new WorkerConnectionInfo(queryHeartbeat.getConnectionInfo()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 720908021f..2ffd7cabfb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -26,7 +26,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.master.container.TajoContainer; @@ -177,23 +177,23 @@ public static void releaseWorkerResource(QueryMasterTask.QueryMasterTaskContext if (conf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { try { tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } catch (Exception e) { context.getQueryMasterContext().getWorkerContext().setWorkerResourceTrackerAddr( HAServiceUtil.getResourceTrackerAddress(conf)); context.getQueryMasterContext().getWorkerContext().setTajoMasterAddress( HAServiceUtil.getMasterUmbilicalAddress(conf)); tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } } else { tmClient = connPool.getConnection(context.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); + QueryCoordinatorProtocol.QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.releaseWorkerResource(null, - TajoMasterProtocol.WorkerResourceReleaseRequest.newBuilder() + QueryCoordinatorProtocol.WorkerResourceReleaseRequest.newBuilder() .setExecutionBlockId(executionBlockId.getProto()) .addAllContainerIds(containerIdProtos) .build(), diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index c054599212..f19ca9d2bf 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -114,14 +114,14 @@ public class TajoMaster extends CompositeService { private GlobalEngine globalEngine; private AsyncDispatcher dispatcher; private TajoMasterClientService tajoMasterClientService; - private TajoMasterService tajoMasterService; + private TajoMasterUmbilicalService tajoMasterService; private SessionManager sessionManager; private WorkerResourceManager resourceManager; //Web Server private StaticHttpServer webServer; - private QueryJobManager queryJobManager; + private QueryManager queryManager; private ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); @@ -183,13 +183,13 @@ public void serviceInit(Configuration _conf) throws Exception { globalEngine = new GlobalEngine(context); addIfService(globalEngine); - queryJobManager = new QueryJobManager(context); - addIfService(queryJobManager); + queryManager = new QueryManager(context); + addIfService(queryManager); tajoMasterClientService = new TajoMasterClientService(context); addIfService(tajoMasterClientService); - tajoMasterService = new TajoMasterService(context); + tajoMasterService = new TajoMasterUmbilicalService(context); addIfService(tajoMasterService); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -441,8 +441,8 @@ public Clock getClock() { return clock; } - public QueryJobManager getQueryJobManager() { - return queryJobManager; + public QueryManager getQueryJobManager() { + return queryManager; } public WorkerResourceManager getResourceManager() { @@ -469,7 +469,7 @@ public StorageManager getStorageManager() { return storeManager; } - public TajoMasterService getTajoMasterService() { + public TajoMasterUmbilicalService getTajoMasterService() { return tajoMasterService; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 93326be72b..fcc016ad7c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -45,7 +45,6 @@ import org.apache.tajo.ipc.TajoMasterClientProtocol.TajoMasterClientProtocolService; import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.master.exec.NonForwardQueryResultScanner; -import org.apache.tajo.querymaster.QueryInProgress; import org.apache.tajo.querymaster.QueryJobEvent; import org.apache.tajo.master.rm.Worker; import org.apache.tajo.master.rm.WorkerResource; @@ -567,8 +566,8 @@ public GetQueryInfoResponse getQueryInfo(RpcController controller, QueryIdReques context.getSessionManager().touch(request.getSessionId().getId()); QueryId queryId = new QueryId(request.getQueryId()); - QueryJobManager queryJobManager = context.getQueryJobManager(); - QueryInProgress queryInProgress = queryJobManager.getQueryInProgress(queryId); + QueryManager queryManager = context.getQueryJobManager(); + QueryInProgress queryInProgress = queryManager.getQueryInProgress(queryId); QueryInfo queryInfo = null; if (queryInProgress == null) { @@ -598,8 +597,8 @@ public BoolProto killQuery(RpcController controller, QueryIdRequest request) thr try { context.getSessionManager().touch(request.getSessionId().getId()); QueryId queryId = new QueryId(request.getQueryId()); - QueryJobManager queryJobManager = context.getQueryJobManager(); - queryJobManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL, + QueryManager queryManager = context.getQueryJobManager(); + queryManager.getEventHandler().handle(new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_KILL, new QueryInfo(queryId))); return BOOL_TRUE; } catch (Throwable t) { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterUmbilicalService.java similarity index 71% rename from tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java rename to tajo-core/src/main/java/org/apache/tajo/master/TajoMasterUmbilicalService.java index a7df206ba8..abe9f725fb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterUmbilicalService.java @@ -27,7 +27,11 @@ import org.apache.tajo.TajoIdProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeat; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceProto; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceReleaseRequest; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourcesRequest; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.rm.Worker; import org.apache.tajo.master.rm.WorkerResource; @@ -40,23 +44,23 @@ import java.util.Collection; import java.util.List; -public class TajoMasterService extends AbstractService { - private final static Log LOG = LogFactory.getLog(TajoMasterService.class); +public class TajoMasterUmbilicalService extends AbstractService { + private final static Log LOG = LogFactory.getLog(TajoMasterUmbilicalService.class); private final TajoMaster.MasterContext context; private final TajoConf conf; - private final TajoMasterServiceHandler masterHandler; + private final ProtocolServiceHandler masterHandler; private AsyncRpcServer server; private InetSocketAddress bindAddress; private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build(); private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build(); - public TajoMasterService(TajoMaster.MasterContext context) { - super(TajoMasterService.class.getName()); + public TajoMasterUmbilicalService(TajoMaster.MasterContext context) { + super(TajoMasterUmbilicalService.class.getName()); this.context = context; this.conf = context.getConf(); - this.masterHandler = new TajoMasterServiceHandler(); + this.masterHandler = new ProtocolServiceHandler(); } @Override @@ -65,7 +69,7 @@ public void start() { InetSocketAddress initIsa = NetUtils.createSocketAddr(confMasterServiceAddr); int workerNum = conf.getIntVar(TajoConf.ConfVars.MASTER_RPC_SERVER_WORKER_THREAD_NUM); try { - server = new AsyncRpcServer(TajoMasterProtocol.class, masterHandler, initIsa, workerNum); + server = new AsyncRpcServer(QueryCoordinatorProtocol.class, masterHandler, initIsa, workerNum); } catch (Exception e) { LOG.error(e); } @@ -90,22 +94,25 @@ public InetSocketAddress getBindAddress() { return bindAddress; } - public class TajoMasterServiceHandler - implements TajoMasterProtocol.TajoMasterProtocolService.Interface { + /** + * Actual protocol service handler + */ + private class ProtocolServiceHandler implements QueryCoordinatorProtocol.QueryCoordinatorProtocolService.Interface { + @Override public void heartbeat( RpcController controller, - TajoMasterProtocol.TajoHeartbeat request, RpcCallback done) { + TajoHeartbeat request, RpcCallback done) { if(LOG.isDebugEnabled()) { LOG.debug("Received QueryHeartbeat:" + new WorkerConnectionInfo(request.getConnectionInfo())); } - TajoMasterProtocol.TajoHeartbeatResponse.ResponseCommand command = null; + QueryCoordinatorProtocol.TajoHeartbeatResponse.ResponseCommand command = null; - QueryJobManager queryJobManager = context.getQueryJobManager(); - command = queryJobManager.queryHeartbeat(request); + QueryManager queryManager = context.getQueryJobManager(); + command = queryManager.queryHeartbeat(request); - TajoMasterProtocol.TajoHeartbeatResponse.Builder builder = TajoMasterProtocol.TajoHeartbeatResponse.newBuilder(); + QueryCoordinatorProtocol.TajoHeartbeatResponse.Builder builder = QueryCoordinatorProtocol.TajoHeartbeatResponse.newBuilder(); builder.setHeartbeatResult(BOOL_TRUE); if(command != null) { builder.setResponseCommand(command); @@ -118,14 +125,13 @@ public void heartbeat( @Override public void allocateWorkerResources( RpcController controller, - TajoMasterProtocol.WorkerResourceAllocationRequest request, - RpcCallback done) { + QueryCoordinatorProtocol.WorkerResourceAllocationRequest request, + RpcCallback done) { context.getResourceManager().allocateWorkerResources(request, done); } @Override - public void releaseWorkerResource(RpcController controller, - TajoMasterProtocol.WorkerResourceReleaseRequest request, + public void releaseWorkerResource(RpcController controller, WorkerResourceReleaseRequest request, RpcCallback done) { List containerIds = request.getContainerIdsList(); @@ -144,17 +150,15 @@ public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto @Override public void getAllWorkerResource(RpcController controller, PrimitiveProtos.NullProto request, - RpcCallback done) { + RpcCallback done) { - TajoMasterProtocol.WorkerResourcesRequest.Builder builder = - TajoMasterProtocol.WorkerResourcesRequest.newBuilder(); + WorkerResourcesRequest.Builder builder = WorkerResourcesRequest.newBuilder(); Collection workers = context.getResourceManager().getWorkers().values(); for(Worker worker: workers) { WorkerResource resource = worker.getResource(); - TajoMasterProtocol.WorkerResourceProto.Builder workerResource = - TajoMasterProtocol.WorkerResourceProto.newBuilder(); + WorkerResourceProto.Builder workerResource = WorkerResourceProto.newBuilder(); workerResource.setConnectionInfo(worker.getConnectionInfo().getProto()); workerResource.setMemoryMB(resource.getMemoryMB()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java index 2fbebc1213..0860d63144 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/QueryExecutor.java @@ -388,10 +388,10 @@ public void executeDistributedQuery(QueryContext queryContext, Session session, context.getSystemMetrics().counter("Query", "numDMLQuery").inc(); hookManager.doHooks(queryContext, plan); - QueryJobManager queryJobManager = this.context.getQueryJobManager(); + QueryManager queryManager = this.context.getQueryJobManager(); QueryInfo queryInfo; - queryInfo = queryJobManager.createNewQueryJob(session, queryContext, sql, jsonExpr, rootNode); + queryInfo = queryManager.scheduleQuery(session, queryContext, sql, jsonExpr, rootNode); if(queryInfo == null) { responseBuilder.setQueryId(QueryIdFactory.NULL_QUERY_ID.getProto()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java index 831ce43f46..519aa9d336 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoResourceTracker.java @@ -26,7 +26,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.AsyncRpcServer; @@ -36,8 +37,6 @@ import java.io.IOError; import java.net.InetSocketAddress; -import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse; -import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse.Builder; import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.NodeHeartbeat; import static org.apache.tajo.ipc.TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService; @@ -110,7 +109,8 @@ public void serviceStop() { } /** The response builder */ - private static final Builder builder = TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE); + private static final TajoHeartbeatResponse.Builder builder = + TajoHeartbeatResponse.newBuilder().setHeartbeatResult(ProtoUtil.TRUE); private static WorkerStatusEvent createStatusEvent(int workerId, NodeHeartbeat heartbeat) { return new WorkerStatusEvent( @@ -204,7 +204,7 @@ private Worker createWorkerResource(NodeHeartbeat request) { return new Worker(rmContext, workerResource, new WorkerConnectionInfo(request.getConnectionInfo())); } - public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() { + public ClusterResourceSummary getClusterResourceSummary() { int totalDiskSlots = 0; int totalCpuCoreSlots = 0; int totalMemoryMB = 0; @@ -230,7 +230,7 @@ public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() { } } - return TajoMasterProtocol.ClusterResourceSummary.newBuilder() + return ClusterResourceSummary.newBuilder() .setNumWorkers(rmContext.getWorkers().size()) .setTotalCpuCoreSlots(totalCpuCoreSlots) .setTotalDiskSlots(totalDiskSlots) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java index c4200d5a4d..9c45c8aa2c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/TajoWorkerResourceManager.java @@ -34,9 +34,9 @@ import org.apache.tajo.QueryId; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; +import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.querymaster.QueryInProgress; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.util.ApplicationIdUtils; @@ -49,8 +49,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static org.apache.tajo.ipc.TajoMasterProtocol.*; - /** * It manages all resources of tajo workers. @@ -162,7 +160,7 @@ public Collection getQueryMasters() { } @Override - public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary() { + public ClusterResourceSummary getClusterResourceSummary() { return resourceTracker.getClusterResourceSummary(); } @@ -204,7 +202,7 @@ private WorkerResourceAllocationRequest createQMResourceRequest(QueryId queryId) builder.setMinMemoryMBPerContainer(queryMasterDefaultMemoryMB); builder.setMaxDiskSlotPerContainer(queryMasterDefaultDiskSlot); builder.setMinDiskSlotPerContainer(queryMasterDefaultDiskSlot); - builder.setResourceRequestPriority(TajoMasterProtocol.ResourceRequestPriority.MEMORY); + builder.setResourceRequestPriority(ResourceRequestPriority.MEMORY); builder.setNumContainers(1); return builder.build(); } @@ -358,10 +356,10 @@ private List chooseWorkers(WorkerResourceRequest resour int allocatedResources = 0; - TajoMasterProtocol.ResourceRequestPriority resourceRequestPriority + ResourceRequestPriority resourceRequestPriority = resourceRequest.request.getResourceRequestPriority(); - if(resourceRequestPriority == TajoMasterProtocol.ResourceRequestPriority.MEMORY) { + if(resourceRequestPriority == ResourceRequestPriority.MEMORY) { synchronized(rmContext) { List randomWorkers = new ArrayList(rmContext.getWorkers().keySet()); Collections.shuffle(randomWorkers); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java index b237cc5628..f551b1e0b4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/rm/WorkerResourceManager.java @@ -22,16 +22,15 @@ import org.apache.hadoop.service.Service; import org.apache.tajo.QueryId; import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.TajoMasterProtocol; -import org.apache.tajo.querymaster.QueryInProgress; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceAllocationRequest; +import org.apache.tajo.master.QueryInProgress; import java.io.IOException; import java.util.Collection; import java.util.Map; -import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerAllocatedResource; -import static org.apache.tajo.ipc.TajoMasterProtocol.WorkerResourceAllocationResponse; - /** * An interface of WorkerResourceManager which allows TajoMaster to request allocation for containers * and release the allocated containers. @@ -45,7 +44,7 @@ public interface WorkerResourceManager extends Service { * @return A allocated container resource */ @Deprecated - public WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress); + public QueryCoordinatorProtocol.WorkerAllocatedResource allocateQueryMaster(QueryInProgress queryInProgress); /** * Request one or more resource containers. You can set the number of containers and resource capabilities, such as @@ -55,8 +54,8 @@ public interface WorkerResourceManager extends Service { * @param request Request description * @param rpcCallBack Callback function */ - public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationRequest request, - RpcCallback rpcCallBack); + public void allocateWorkerResources(WorkerResourceAllocationRequest request, + RpcCallback rpcCallBack); /** * Release a container @@ -100,7 +99,7 @@ public void allocateWorkerResources(TajoMasterProtocol.WorkerResourceAllocationR * * @return The overall summary of cluster resources */ - public TajoMasterProtocol.ClusterResourceSummary getClusterResourceSummary(); + public ClusterResourceSummary getClusterResourceSummary(); /** * diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java index 02203a977c..19493d70d2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/Scheduler.java @@ -19,7 +19,7 @@ package org.apache.tajo.master.scheduler; import org.apache.tajo.QueryId; -import org.apache.tajo.querymaster.QueryInProgress; +import org.apache.tajo.master.QueryInProgress; import java.util.List; diff --git a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java index a091ed5b3c..6cb98ebc79 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/scheduler/SimpleFifoScheduler.java @@ -21,8 +21,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; -import org.apache.tajo.querymaster.QueryInProgress; -import org.apache.tajo.master.QueryJobManager; +import org.apache.tajo.master.QueryInProgress; +import org.apache.tajo.master.QueryManager; import java.util.*; import java.util.concurrent.atomic.AtomicBoolean; @@ -32,10 +32,10 @@ public class SimpleFifoScheduler implements Scheduler { private LinkedList pool = new LinkedList(); private final Thread queryProcessor; private AtomicBoolean stopped = new AtomicBoolean(); - private QueryJobManager manager; + private QueryManager manager; private Comparator COMPARATOR = new SchedulingAlgorithms.FifoComparator(); - public SimpleFifoScheduler(QueryJobManager manager) { + public SimpleFifoScheduler(QueryManager manager) { this.manager = manager; this.queryProcessor = new Thread(new QueryProcessor()); this.queryProcessor.setName("Query Processor"); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 76df3977a4..596778f865 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -34,7 +34,8 @@ import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.event.QueryStartEvent; import org.apache.tajo.master.event.QueryStopEvent; @@ -56,10 +57,6 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeat; -import static org.apache.tajo.ipc.TajoMasterProtocol.TajoHeartbeatResponse; - -// TODO - when exception, send error status to QueryJobManager public class QueryMaster extends CompositeService implements EventHandler { private static final Log LOG = LogFactory.getLog(QueryMaster.class.getName()); @@ -182,12 +179,12 @@ protected void cleanupExecutionBlock(List ex } LOG.info("cleanup executionBlocks: " + cleanupMessage); NettyClientBase rpc = null; - List workers = getAllWorker(); + List workers = getAllWorker(); TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder(); builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds)); TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build(); - for (TajoMasterProtocol.WorkerResourceProto worker : workers) { + for (WorkerResourceProto worker : workers) { try { TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), @@ -206,9 +203,9 @@ protected void cleanupExecutionBlock(List ex private void cleanup(QueryId queryId) { LOG.info("cleanup query resources : " + queryId); NettyClientBase rpc = null; - List workers = getAllWorker(); + List workers = getAllWorker(); - for (TajoMasterProtocol.WorkerResourceProto worker : workers) { + for (WorkerResourceProto worker : workers) { try { TajoProtos.WorkerConnectionInfoProto connectionInfo = worker.getConnectionInfo(); rpc = connPool.getConnection(NetUtils.createSocketAddr(connectionInfo.getHost(), connectionInfo.getPeerRpcPort()), @@ -224,7 +221,7 @@ private void cleanup(QueryId queryId) { } } - public List getAllWorker() { + public List getAllWorker() { NettyClientBase rpc = null; try { @@ -235,78 +232,34 @@ public List getAllWorker() { if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { try { rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } catch (Exception e) { queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( HAServiceUtil.getResourceTrackerAddress(systemConf)); queryMasterContext.getWorkerContext().setTajoMasterAddress( HAServiceUtil.getMasterUmbilicalAddress(systemConf)); rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } } else { rpc = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } - TajoMasterProtocol.TajoMasterProtocolService masterService = rpc.getStub(); + QueryCoordinatorProtocolService masterService = rpc.getStub(); - CallFuture callBack = - new CallFuture(); + CallFuture callBack = new CallFuture(); masterService.getAllWorkerResource(callBack.getController(), PrimitiveProtos.NullProto.getDefaultInstance(), callBack); - TajoMasterProtocol.WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS); + WorkerResourcesRequest workerResourcesRequest = callBack.get(2, TimeUnit.SECONDS); return workerResourcesRequest.getWorkerResourcesList(); } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { connPool.releaseConnection(rpc); } - return new ArrayList(); - } - - public void reportQueryStatusToQueryMaster(QueryId queryId, TajoProtos.QueryState state) { - LOG.info("Send QueryMaster Ready to QueryJobManager:" + queryId); - NettyClientBase tmClient = null; - try { - // In TajoMaster HA mode, if backup master be active status, - // worker may fail to connect existing active master. Thus, - // if worker can't connect the master, worker should try to connect another master and - // update master address in worker context. - if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { - try { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } catch (Exception e) { - queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( - HAServiceUtil.getResourceTrackerAddress(systemConf)); - queryMasterContext.getWorkerContext().setTajoMasterAddress( - HAServiceUtil.getMasterUmbilicalAddress(systemConf)); - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - } else { - tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); - } - - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); - - TajoHeartbeat.Builder queryHeartbeatBuilder = TajoHeartbeat.newBuilder() - .setConnectionInfo(workerContext.getConnectionInfo().getProto()) - .setState(state) - .setQueryId(queryId.getProto()); - - CallFuture callBack = - new CallFuture(); - - masterClientService.heartbeat(callBack.getController(), queryHeartbeatBuilder.build(), callBack); - } catch (Exception e) { - LOG.error(e.getMessage(), e); - } finally { - connPool.releaseConnection(tmClient); - } + return new ArrayList(); } @Override @@ -407,19 +360,19 @@ public void stopQuery(QueryId queryId) { if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { try { tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } catch (Exception e) { queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(systemConf)); queryMasterContext.getWorkerContext().setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(systemConf)); tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } } else { tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); + QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.heartbeat(future.getController(), queryHeartbeat, future); } catch (Exception e) { //this function will be closed in new thread. @@ -524,24 +477,24 @@ public void run() { if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { try { tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } catch (Exception e) { queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( HAServiceUtil.getResourceTrackerAddress(systemConf)); queryMasterContext.getWorkerContext().setTajoMasterAddress( HAServiceUtil.getMasterUmbilicalAddress(systemConf)); tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } } else { tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); + QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); - CallFuture callBack = - new CallFuture(); + CallFuture callBack = + new CallFuture(); TajoHeartbeat queryHeartbeat = buildTajoHeartBeat(eachTask); masterClientService.heartbeat(callBack.getController(), queryHeartbeat, callBack); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index bab5903e91..9900a9f3b5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -42,12 +42,11 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.UnimplementedException; import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; -import org.apache.tajo.session.Session; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -60,6 +59,7 @@ import org.apache.tajo.plan.verifier.VerifyException; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.session.Session; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageProperty; import org.apache.tajo.storage.StorageUtil; @@ -208,18 +208,18 @@ public void stop() { if (systemConf.getBoolVar(TajoConf.ConfVars.TAJO_MASTER_HA_ENABLE)) { try { tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } catch (Exception e) { queryMasterContext.getWorkerContext().setWorkerResourceTrackerAddr( HAServiceUtil.getResourceTrackerAddress(systemConf)); queryMasterContext.getWorkerContext().setTajoMasterAddress( HAServiceUtil.getMasterUmbilicalAddress(systemConf)); tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } } else { tmClient = connPool.getConnection(queryMasterContext.getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } } catch (Exception e) { LOG.error(e.getMessage(), e); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 34c58d4cd0..c8e285093f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -45,27 +45,28 @@ import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; -import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; -import org.apache.tajo.master.*; +import org.apache.tajo.master.LaunchTaskRunnersEvent; +import org.apache.tajo.master.TaskRunnerGroupEvent; import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; +import org.apache.tajo.master.TaskState; +import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.querymaster.Task.IntermediateEntry; -import org.apache.tajo.master.container.TajoContainer; -import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.FileStorageManager; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.logical.*; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.util.history.StageHistory; +import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.worker.FetchImpl; import java.io.IOException; @@ -802,22 +803,6 @@ private static void setShuffleIfNecessary(Stage stage, DataChannel channel) { } } - /** - * Getting the total memory of cluster - * - * @param stage - * @return mega bytes - */ - private static int getClusterTotalMemory(Stage stage) { - List workers = - stage.context.getQueryMasterContext().getQueryMaster().getAllWorker(); - - int totalMem = 0; - for (TajoMasterProtocol.WorkerResourceProto worker : workers) { - totalMem += worker.getMemoryMB(); - } - return totalMem; - } /** * Getting the desire number of partitions according to the volume of input data. * This method is only used to determine the partition key number of hash join or aggregation. diff --git a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java index d711258490..82fb37f9a0 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/JSPUtil.java @@ -25,7 +25,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.master.TajoMaster.MasterContext; import org.apache.tajo.ha.HAService; -import org.apache.tajo.querymaster.QueryInProgress; +import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.querymaster.Task; import org.apache.tajo.querymaster.Stage; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index 824147836b..04b65d260b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -28,8 +28,10 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; @@ -38,16 +40,18 @@ import org.apache.tajo.master.event.ContainerAllocationEvent; import org.apache.tajo.master.event.ContainerAllocatorEventType; import org.apache.tajo.master.event.StageContainerAllocationEvent; +import org.apache.tajo.master.rm.TajoWorkerContainer; +import org.apache.tajo.master.rm.TajoWorkerContainerId; +import org.apache.tajo.master.rm.Worker; +import org.apache.tajo.master.rm.WorkerResource; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; -import org.apache.tajo.master.rm.*; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.util.ApplicationIdUtils; -import org.apache.tajo.ha.HAServiceUtil; import java.net.InetSocketAddress; import java.util.*; @@ -91,7 +95,7 @@ public int calculateNumRequestContainers(TajoWorker.WorkerContext workerContext, int memoryMBPerTask) { //TODO consider disk slot - TajoMasterProtocol.ClusterResourceSummary clusterResource = workerContext.getClusterResource(); + ClusterResourceSummary clusterResource = workerContext.getClusterResource(); int clusterSlots = clusterResource == null ? 0 : clusterResource.getTotalMemoryMB() / memoryMBPerTask; clusterSlots = Math.max(1, clusterSlots - 1); // reserve query master slot LOG.info("CalculateNumberRequestContainer - Number of Tasks=" + numTasks + @@ -249,20 +253,19 @@ class TajoWorkerAllocationThread extends Thread { @Override public void run() { LOG.info("Start TajoWorkerAllocationThread"); - CallFuture callBack = - new CallFuture(); + CallFuture callBack = + new CallFuture(); //TODO consider task's resource usage pattern int requiredMemoryMB = tajoConf.getIntVar(TajoConf.ConfVars.TASK_DEFAULT_MEMORY); float requiredDiskSlots = tajoConf.getFloatVar(TajoConf.ConfVars.TASK_DEFAULT_DISK); - TajoMasterProtocol.WorkerResourceAllocationRequest request = - TajoMasterProtocol.WorkerResourceAllocationRequest.newBuilder() + WorkerResourceAllocationRequest request = WorkerResourceAllocationRequest.newBuilder() .setMinMemoryMBPerContainer(requiredMemoryMB) .setMaxMemoryMBPerContainer(requiredMemoryMB) .setNumContainers(event.getRequiredNum()) - .setResourceRequestPriority(!event.isLeafQuery() ? TajoMasterProtocol.ResourceRequestPriority.MEMORY - : TajoMasterProtocol.ResourceRequestPriority.DISK) + .setResourceRequestPriority(!event.isLeafQuery() ? + ResourceRequestPriority.MEMORY : ResourceRequestPriority.DISK) .setMinDiskSlotPerContainer(requiredDiskSlots) .setMaxDiskSlotPerContainer(requiredDiskSlots) .setQueryId(event.getExecutionBlockId().getQueryId().getProto()) @@ -280,7 +283,7 @@ public void run() { try { tmClient = connPool.getConnection( queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } catch (Exception e) { queryTaskContext.getQueryMasterContext().getWorkerContext(). setWorkerResourceTrackerAddr(HAServiceUtil.getResourceTrackerAddress(tajoConf)); @@ -288,15 +291,15 @@ public void run() { setTajoMasterAddress(HAServiceUtil.getMasterUmbilicalAddress(tajoConf)); tmClient = connPool.getConnection( queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } } else { tmClient = connPool.getConnection( queryTaskContext.getQueryMasterContext().getWorkerContext().getTajoMasterAddress(), - TajoMasterProtocol.class, true); + QueryCoordinatorProtocol.class, true); } - TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); + QueryCoordinatorProtocolService masterClientService = tmClient.getStub(); masterClientService.allocateWorkerResources(null, request, callBack); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -304,7 +307,7 @@ public void run() { connPool.releaseConnection(tmClient); } - TajoMasterProtocol.WorkerResourceAllocationResponse response = null; + WorkerResourceAllocationResponse response = null; while(!stopped.get()) { try { response = callBack.get(3, TimeUnit.SECONDS); @@ -321,11 +324,11 @@ public void run() { int numAllocatedContainers = 0; if(response != null) { - List allocatedResources = response.getWorkerAllocatedResourceList(); + List allocatedResources = response.getWorkerAllocatedResourceList(); ExecutionBlockId executionBlockId = event.getExecutionBlockId(); List containers = new ArrayList(); - for(TajoMasterProtocol.WorkerAllocatedResource eachAllocatedResource: allocatedResources) { + for(WorkerAllocatedResource eachAllocatedResource: allocatedResources) { TajoWorkerContainer container = new TajoWorkerContainer(); NodeId nodeId = NodeId.newInstance(eachAllocatedResource.getConnectionInfo().getHost(), eachAllocatedResource.getConnectionInfo().getPeerRpcPort()); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java index 09a87e0c6a..4003014983 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorker.java @@ -20,7 +20,6 @@ import com.codahale.metrics.Gauge; import com.google.common.annotations.VisibleForTesting; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -36,13 +35,13 @@ import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; -import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.ha.TajoMasterInfo; -import org.apache.tajo.querymaster.QueryMaster; -import org.apache.tajo.querymaster.QueryMasterManagerService; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; +import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.pullserver.TajoPullServerService; +import org.apache.tajo.querymaster.QueryMaster; +import org.apache.tajo.querymaster.QueryMasterManagerService; import org.apache.tajo.rpc.RpcChannelFactory; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.rule.EvaluationContext; @@ -50,7 +49,10 @@ import org.apache.tajo.rule.SelfDiagnosisRuleEngine; import org.apache.tajo.rule.SelfDiagnosisRuleSession; import org.apache.tajo.storage.HashShuffleAppenderManager; -import org.apache.tajo.util.*; +import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.JvmPauseMonitor; +import org.apache.tajo.util.NetUtils; +import org.apache.tajo.util.StringUtils; import org.apache.tajo.util.history.HistoryReader; import org.apache.tajo.util.history.HistoryWriter; import org.apache.tajo.util.metrics.TajoSystemMetrics; @@ -115,7 +117,7 @@ public class TajoWorker extends CompositeService { private AtomicInteger numClusterNodes = new AtomicInteger(); - private TajoMasterProtocol.ClusterResourceSummary clusterResource; + private ClusterResourceSummary clusterResource; private WorkerConnectionInfo connectionInfo; @@ -516,13 +518,13 @@ public int getNumClusterNodes() { return TajoWorker.this.numClusterNodes.get(); } - public void setClusterResource(TajoMasterProtocol.ClusterResourceSummary clusterResource) { + public void setClusterResource(ClusterResourceSummary clusterResource) { synchronized (numClusterNodes) { TajoWorker.this.clusterResource = clusterResource; } } - public TajoMasterProtocol.ClusterResourceSummary getClusterResource() { + public ClusterResourceSummary getClusterResource() { synchronized (numClusterNodes) { return TajoWorker.this.clusterResource; } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java index c8099212e4..b92c4cd543 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/WorkerHeartbeatService.java @@ -26,7 +26,10 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.AbstractService; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ha.HAServiceUtil; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.ClusterResourceSummary; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.ServerStatusProto; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeatResponse; import org.apache.tajo.ipc.TajoResourceTrackerProtocol; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; @@ -35,7 +38,6 @@ import org.apache.tajo.storage.DiskDeviceInfo; import org.apache.tajo.storage.DiskMountInfo; import org.apache.tajo.storage.DiskUtil; -import org.apache.tajo.ha.HAServiceUtil; import java.io.File; import java.util.List; @@ -98,8 +100,8 @@ public void serviceStop() throws Exception { class WorkerHeartbeatThread extends Thread { private volatile AtomicBoolean stopped = new AtomicBoolean(false); - TajoMasterProtocol.ServerStatusProto.System systemInfo; - List diskInfos = Lists.newArrayList(); + ServerStatusProto.System systemInfo; + List diskInfos = Lists.newArrayList(); float workerDiskSlots; int workerMemoryMB; List diskDeviceInfos; @@ -137,7 +139,7 @@ public WorkerHeartbeatThread() { } } - systemInfo = TajoMasterProtocol.ServerStatusProto.System.newBuilder() + systemInfo = ServerStatusProto.System.newBuilder() .setAvailableProcessors(workerCpuCoreNum) .setFreeMemoryMB(0) .setMaxMemoryMB(0) @@ -153,14 +155,14 @@ public void run() { if(sendDiskInfoCount == 0 && diskDeviceInfos != null) { getDiskUsageInfos(); } - TajoMasterProtocol.ServerStatusProto.JvmHeap jvmHeap = - TajoMasterProtocol.ServerStatusProto.JvmHeap.newBuilder() + ServerStatusProto.JvmHeap jvmHeap = + ServerStatusProto.JvmHeap.newBuilder() .setMaxHeap(Runtime.getRuntime().maxMemory()) .setFreeHeap(Runtime.getRuntime().freeMemory()) .setTotalHeap(Runtime.getRuntime().totalMemory()) .build(); - TajoMasterProtocol.ServerStatusProto serverStatus = TajoMasterProtocol.ServerStatusProto.newBuilder() + ServerStatusProto serverStatus = ServerStatusProto.newBuilder() .addAllDisk(diskInfos) .setRunningTaskNum( context.getTaskRunnerManager() == null ? 1 : context.getTaskRunnerManager().getNumTasks()) @@ -179,8 +181,7 @@ public void run() { NettyClientBase rmClient = null; try { - CallFuture callBack = - new CallFuture(); + CallFuture callBack = new CallFuture(); // In TajoMaster HA mode, if backup master be active status, // worker may fail to connect existing active master. Thus, @@ -201,9 +202,9 @@ public void run() { TajoResourceTrackerProtocol.TajoResourceTrackerProtocolService resourceTracker = rmClient.getStub(); resourceTracker.heartbeat(callBack.getController(), heartbeatProto, callBack); - TajoMasterProtocol.TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS); + TajoHeartbeatResponse response = callBack.get(2, TimeUnit.SECONDS); if(response != null) { - TajoMasterProtocol.ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary(); + ClusterResourceSummary clusterResourceSummary = response.getClusterResourceSummary(); if(clusterResourceSummary.getNumWorkers() > 0) { context.setNumClusterNodes(clusterResourceSummary.getNumWorkers()); } @@ -249,7 +250,7 @@ private void getDiskUsageInfos() { if(mountInfos != null) { for(DiskMountInfo eachMount: mountInfos) { File eachFile = new File(eachMount.getMountPath()); - diskInfos.add(TajoMasterProtocol.ServerStatusProto.Disk.newBuilder() + diskInfos.add(ServerStatusProto.Disk.newBuilder() .setAbsolutePath(eachFile.getAbsolutePath()) .setTotalSpace(eachFile.getTotalSpace()) .setFreeSpace(eachFile.getFreeSpace()) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java index 6eb710a471..68890e359c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/rule/ConnectivityCheckerRuleForTajoWorker.java @@ -18,23 +18,19 @@ package org.apache.tajo.worker.rule; -import java.net.InetSocketAddress; - import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.TajoMasterProtocol; +import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.rule.EvaluationContext; -import org.apache.tajo.rule.EvaluationResult; -import org.apache.tajo.rule.SelfDiagnosisRuleDefinition; -import org.apache.tajo.rule.SelfDiagnosisRuleVisibility; +import org.apache.tajo.rule.*; import org.apache.tajo.rule.EvaluationResult.EvaluationResultCode; -import org.apache.tajo.rule.SelfDiagnosisRule; import org.apache.tajo.util.NetUtils; import org.apache.tajo.worker.TajoWorker; +import java.net.InetSocketAddress; + /** * With this rule, Tajo worker will check the connectivity to tajo master server. */ @@ -54,7 +50,7 @@ private void checkTajoMasterConnectivity(TajoConf tajoConf) throws Exception { } else { masterAddress = NetUtils.createSocketAddr(tajoConf.getVar(ConfVars.TAJO_MASTER_UMBILICAL_RPC_ADDRESS)); } - masterClient = pool.getConnection(masterAddress, TajoMasterProtocol.class, true); + masterClient = pool.getConnection(masterAddress, QueryCoordinatorProtocol.class, true); masterClient.getStub(); } finally { diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto similarity index 97% rename from tajo-core/src/main/proto/TajoMasterProtocol.proto rename to tajo-core/src/main/proto/QueryCoordinatorProtocol.proto index cc83e47267..1776fc1659 100644 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ b/tajo-core/src/main/proto/QueryCoordinatorProtocol.proto @@ -19,7 +19,7 @@ //TajoWorker -> TajoMaster protocol option java_package = "org.apache.tajo.ipc"; -option java_outer_classname = "TajoMasterProtocol"; +option java_outer_classname = "QueryCoordinatorProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; @@ -139,7 +139,7 @@ message WorkerResourceAllocationResponse { repeated WorkerAllocatedResource workerAllocatedResource = 2; } -service TajoMasterProtocolService { +service QueryCoordinatorProtocolService { rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse); rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse); rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto); diff --git a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto index b2db46aed4..40aeab7360 100644 --- a/tajo-core/src/main/proto/ResourceTrackerProtocol.proto +++ b/tajo-core/src/main/proto/ResourceTrackerProtocol.proto @@ -22,7 +22,7 @@ option java_outer_classname = "TajoResourceTrackerProtocol"; option java_generic_services = true; option java_generate_equals_and_hash = true; -import "TajoMasterProtocol.proto"; +import "QueryCoordinatorProtocol.proto"; import "ContainerProtocol.proto"; import "tajo_protos.proto"; diff --git a/tajo-core/src/main/resources/webapps/admin/index.jsp b/tajo-core/src/main/resources/webapps/admin/index.jsp index 00186d71f0..0defb3c6f2 100644 --- a/tajo-core/src/main/resources/webapps/admin/index.jsp +++ b/tajo-core/src/main/resources/webapps/admin/index.jsp @@ -25,7 +25,7 @@ <%@ page import="org.apache.tajo.master.TajoMaster" %> <%@ page import="org.apache.tajo.ha.HAService" %> <%@ page import="org.apache.tajo.ha.TajoMasterInfo" %> -<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %> +<%@ page import="org.apache.tajo.master.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.master.rm.WorkerState" %> <%@ page import="org.apache.tajo.util.NetUtils" %> diff --git a/tajo-core/src/main/resources/webapps/admin/query.jsp b/tajo-core/src/main/resources/webapps/admin/query.jsp index 4d8e5e6e75..85f71763e2 100644 --- a/tajo-core/src/main/resources/webapps/admin/query.jsp +++ b/tajo-core/src/main/resources/webapps/admin/query.jsp @@ -20,7 +20,7 @@ <%@ page language="java" contentType="text/html; charset=UTF-8" pageEncoding="UTF-8"%> <%@ page import="org.apache.tajo.master.TajoMaster" %> -<%@ page import="org.apache.tajo.querymaster.QueryInProgress" %> +<%@ page import="org.apache.tajo.master.QueryInProgress" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="org.apache.tajo.util.JSPUtil" %> <%@ page import="org.apache.tajo.util.StringUtils" %> diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 0786912c3b..e548b81f41 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -43,8 +43,8 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.planner.global.rewriter.GlobalPlanTestRuleProvider; +import org.apache.tajo.master.QueryInProgress; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.querymaster.*; import org.apache.tajo.querymaster.Query; import org.apache.tajo.querymaster.Stage; import org.apache.tajo.querymaster.StageState; diff --git a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java index b8fbd673f1..a013d0b95e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java +++ b/tajo-core/src/test/java/org/apache/tajo/master/rm/TestTajoResourceManager.java @@ -19,12 +19,11 @@ package org.apache.tajo.master.rm; import com.google.protobuf.RpcCallback; -import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.ipc.TajoMasterProtocol.*; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; From b40eb401f20e7ecb7044dc9d7980d60fe3661f11 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Fri, 9 Jan 2015 17:01:43 +0900 Subject: [PATCH 4/4] Rename TajoMasterUmbilicalService to QueryCoordinatorService. --- ...ervice.java => QueryCoordinatorService.java} | 17 ++++++----------- .../java/org/apache/tajo/master/TajoMaster.java | 6 +++--- .../tajo/querymaster/QueryMasterTask.java | 5 ----- 3 files changed, 9 insertions(+), 19 deletions(-) rename tajo-core/src/main/java/org/apache/tajo/master/{TajoMasterUmbilicalService.java => QueryCoordinatorService.java} (89%) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterUmbilicalService.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java similarity index 89% rename from tajo-core/src/main/java/org/apache/tajo/master/TajoMasterUmbilicalService.java rename to tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java index 1d8a17b2e9..1cb38422a3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterUmbilicalService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryCoordinatorService.java @@ -23,15 +23,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.service.AbstractService; -import org.apache.tajo.QueryId; -import org.apache.tajo.TajoIdProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.TajoHeartbeat; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceProto; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourceReleaseRequest; -import org.apache.tajo.ipc.QueryCoordinatorProtocol.WorkerResourcesRequest; +import org.apache.tajo.ipc.QueryCoordinatorProtocol.*; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.rm.Worker; import org.apache.tajo.master.rm.WorkerResource; @@ -44,8 +39,8 @@ import java.util.Collection; import java.util.List; -public class TajoMasterUmbilicalService extends AbstractService { - private final static Log LOG = LogFactory.getLog(TajoMasterUmbilicalService.class); +public class QueryCoordinatorService extends AbstractService { + private final static Log LOG = LogFactory.getLog(QueryCoordinatorService.class); private final TajoMaster.MasterContext context; private final TajoConf conf; @@ -56,8 +51,8 @@ public class TajoMasterUmbilicalService extends AbstractService { private final BoolProto BOOL_TRUE = BoolProto.newBuilder().setValue(true).build(); private final BoolProto BOOL_FALSE = BoolProto.newBuilder().setValue(false).build(); - public TajoMasterUmbilicalService(TajoMaster.MasterContext context) { - super(TajoMasterUmbilicalService.class.getName()); + public QueryCoordinatorService(TajoMaster.MasterContext context) { + super(QueryCoordinatorService.class.getName()); this.context = context; this.conf = context.getConf(); this.masterHandler = new ProtocolServiceHandler(); @@ -97,7 +92,7 @@ public InetSocketAddress getBindAddress() { /** * Actual protocol service handler */ - private class ProtocolServiceHandler implements QueryCoordinatorProtocol.QueryCoordinatorProtocolService.Interface { + private class ProtocolServiceHandler implements QueryCoordinatorProtocolService.Interface { @Override public void heartbeat( diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java index f19ca9d2bf..786025a270 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMaster.java @@ -114,7 +114,7 @@ public class TajoMaster extends CompositeService { private GlobalEngine globalEngine; private AsyncDispatcher dispatcher; private TajoMasterClientService tajoMasterClientService; - private TajoMasterUmbilicalService tajoMasterService; + private QueryCoordinatorService tajoMasterService; private SessionManager sessionManager; private WorkerResourceManager resourceManager; @@ -189,7 +189,7 @@ public void serviceInit(Configuration _conf) throws Exception { tajoMasterClientService = new TajoMasterClientService(context); addIfService(tajoMasterClientService); - tajoMasterService = new TajoMasterUmbilicalService(context); + tajoMasterService = new QueryCoordinatorService(context); addIfService(tajoMasterService); } catch (Exception e) { LOG.error(e.getMessage(), e); @@ -469,7 +469,7 @@ public StorageManager getStorageManager() { return storeManager; } - public TajoMasterUmbilicalService getTajoMasterService() { + public QueryCoordinatorService getTajoMasterService() { return tajoMasterService; } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index bcc487050c..fd52488a43 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -41,8 +41,6 @@ import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.exception.UnimplementedException; -import org.apache.tajo.ha.HAServiceUtil; -import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TajoContainerProxy; import org.apache.tajo.master.event.*; @@ -57,9 +55,6 @@ import org.apache.tajo.plan.rewrite.LogicalPlanRewriteRule; import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.plan.verifier.VerifyException; -import org.apache.tajo.rpc.NettyClientBase; -import org.apache.tajo.rpc.RpcConnectionPool; -import org.apache.tajo.session.Session; import org.apache.tajo.session.Session; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.StorageProperty;