From 64fb64ea9b3de3681a22c9f64e90beff06d001e1 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sun, 21 Dec 2014 20:23:17 +0900 Subject: [PATCH 1/8] Initial work --- tajo-client/src/main/proto/ClientProtos.proto | 19 +++++------ .../org/apache/tajo/master/GlobalEngine.java | 1 + .../apache/tajo/master/TajoMasterService.java | 14 ++++++-- .../master/querymaster/QueryInProgress.java | 32 ++++++++++++------- .../tajo/master/querymaster/QueryInfo.java | 30 +++++++++++++++-- .../master/querymaster/QueryJobManager.java | 20 +++++++++++- .../master/querymaster/QueryMasterTask.java | 10 +++++- .../tajo/worker/TajoWorkerClientService.java | 7 ++-- .../src/main/proto/TajoMasterProtocol.proto | 8 ++++- 9 files changed, 109 insertions(+), 32 deletions(-) diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 51db7630b4..b31c8bd742 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -242,15 +242,16 @@ message FunctionResponse { message QueryInfoProto { required string queryId = 1; optional string sql = 2; - optional QueryState queryState = 3; - optional float progress = 4; - optional int64 startTime = 5; - optional int64 finishTime = 6; - optional string lastMessage = 7; - optional string hostNameOfQM = 8; - optional int32 queryMasterPort = 9; - optional int32 queryMasterClientPort = 10; - optional int32 queryMasterInfoPort = 11; + optional KeyValueSetProto contextVars= 3; + optional QueryState queryState = 4; + optional float progress = 5; + optional int64 startTime = 6; + optional int64 finishTime = 7; + optional string lastMessage = 8; + optional string hostNameOfQM = 9; + optional int32 queryMasterPort = 10; + optional int32 queryMasterClientPort = 11; + optional int32 queryMasterInfoPort = 12; } message SubQueryHistoryProto { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java index 71b1f9b588..e81f61d48b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/GlobalEngine.java @@ -365,6 +365,7 @@ private SubmitQueryResponse executeQueryInternal(QueryContext queryContext, responseBuilder.setResultCode(ClientProtos.ResultCode.OK); if(queryInfo.getQueryMasterHost() != null) { responseBuilder.setQueryMasterHost(queryInfo.getQueryMasterHost()); + responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterPort()); } responseBuilder.setQueryMasterPort(queryInfo.getQueryMasterClientPort()); LOG.info("Query " + queryInfo.getQueryId().toString() + "," + queryInfo.getSql() + "," + diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java index 1e3501c80d..5a9dd29078 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java @@ -26,6 +26,9 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.TajoIdProtos; +import org.apache.tajo.TajoProtos; +import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; @@ -138,9 +141,16 @@ public void releaseWorkerResource(RpcController controller, } @Override - public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request, + public void stopQueryMaster(RpcController controller, TajoMasterProtocol.QueryCompleteReport request, RpcCallback done) { - context.getQueryJobManager().stopQuery(new QueryId(request)); + QueryId queryId = new QueryId(request.getQueryId()); + if (request.getFinalState() == QueryState.QUERY_SUCCEEDED && request.hasTableDesc()) { + TableDesc resultTableDesc = new TableDesc(request.getTableDesc()); + context.getQueryJobManager().stopQuery(queryId, request.getFinalState(), resultTableDesc); + } else { + context.getQueryJobManager().stopQuery(queryId, request.getFinalState()); + } + done.run(BOOL_TRUE); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index e361c7f572..3e5350b2a4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -24,13 +24,11 @@ import org.apache.hadoop.service.CompositeService; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; -import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.ContainerProtocol; -import org.apache.tajo.plan.logical.LogicalRootNode; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.ipc.QueryMasterProtocol.QueryMasterProtocolService; import org.apache.tajo.ipc.TajoWorkerProtocol; @@ -39,6 +37,7 @@ import org.apache.tajo.master.TajoMaster; import org.apache.tajo.master.rm.WorkerResourceManager; import org.apache.tajo.master.session.Session; +import org.apache.tajo.plan.logical.LogicalRootNode; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; @@ -57,8 +56,6 @@ public class QueryInProgress extends CompositeService { private Session session; - private QueryContext queryContext; - private TajoAsyncDispatcher dispatcher; private LogicalRootNode plan; @@ -77,6 +74,8 @@ public class QueryInProgress extends CompositeService { private ContainerProtocol.TajoContainerIdProto qmContainerId; + private TableDesc resultDesc; + public QueryInProgress( TajoMaster.MasterContext masterContext, Session session, @@ -85,11 +84,10 @@ public QueryInProgress( super(QueryInProgress.class.getName()); this.masterContext = masterContext; this.session = session; - this.queryContext = queryContext; this.queryId = queryId; this.plan = plan; - queryInfo = new QueryInfo(queryId, sql, jsonExpr); + queryInfo = new QueryInfo(queryId, queryContext, sql, jsonExpr); queryInfo.setStartTime(System.currentTimeMillis()); } @@ -108,6 +106,18 @@ public synchronized void kill() { } } + public void setResultDesc(TableDesc resultDesc) { + synchronized (this) { + this.resultDesc = resultDesc; + } + } + + public TableDesc getResultDesc() { + synchronized (this) { + return resultDesc; + } + } + @Override public void stop() { if(stopped.getAndSet(true)) { @@ -145,7 +155,7 @@ public void stop() { } if(queryMasterRpc != null) { - RpcConnectionPool.getPool((TajoConf)getConfig()).closeConnection(queryMasterRpc); + RpcConnectionPool.getPool(masterContext.getConf()).closeConnection(queryMasterRpc); } masterContext.getHistoryWriter().appendHistory(queryInfo); @@ -212,7 +222,7 @@ private void connectQueryMaster() throws Exception { InetSocketAddress addr = NetUtils.createSocketAddr(queryInfo.getQueryMasterHost(), queryInfo.getQueryMasterPort()); LOG.info("Connect to QueryMaster:" + addr); queryMasterRpc = - RpcConnectionPool.getPool((TajoConf) getConfig()).getConnection(addr, QueryMasterProtocol.class, true); + RpcConnectionPool.getPool(masterContext.getConf()).getConnection(addr, QueryMasterProtocol.class, true); queryMasterRpcClient = queryMasterRpc.getStub(); } @@ -235,8 +245,8 @@ private synchronized void submmitQueryToMaster() { QueryExecutionRequestProto.Builder builder = TajoWorkerProtocol.QueryExecutionRequestProto.newBuilder(); builder.setQueryId(queryId.getProto()) + .setQueryContext(queryInfo.getQueryContext().getProto()) .setSession(session.getProto()) - .setQueryContext(queryContext.getProto()) .setExprInJson(PrimitiveProtos.StringProto.newBuilder().setValue(queryInfo.getJsonExpr())) .setLogicalPlanJson(PrimitiveProtos.StringProto.newBuilder().setValue(plan.toJson()).build()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java index 00b95acc94..2bc8649d3f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java @@ -22,7 +22,9 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ClientProtos.QueryInfoProto; import org.apache.tajo.json.GsonObject; import org.apache.tajo.util.TajoIdUtils; @@ -31,6 +33,8 @@ public class QueryInfo implements GsonObject, History { private QueryId queryId; @Expose + private QueryContext context; + @Expose private String sql; @Expose private TajoProtos.QueryState queryState; @@ -53,17 +57,22 @@ public class QueryInfo implements GsonObject, History { @Expose private String queryIdStr; + @Expose + private TableDesc resultDesc; + private String jsonExpr; public QueryInfo(QueryId queryId) { - this(queryId, null, null); + this(queryId, null, null, null); } - public QueryInfo(QueryId queryId, String sql, String jsonExpr) { + public QueryInfo(QueryId queryId, QueryContext queryContext, String sql, String jsonExpr) { this.queryId = queryId; this.queryIdStr = queryId.toString(); + this.context = queryContext; this.sql = sql; this.jsonExpr = jsonExpr; + this.queryState = TajoProtos.QueryState.QUERY_MASTER_INIT; } @@ -71,6 +80,10 @@ public QueryId getQueryId() { return queryId; } + public QueryContext getQueryContext() { + return context; + } + public String getSql() { return sql; } @@ -147,6 +160,18 @@ public void setProgress(float progress) { this.progress = progress; } + public void setResultDesc(TableDesc result) { + this.resultDesc = result; + } + + public boolean hasResultdesc() { + return resultDesc != null; + } + + public TableDesc getResultDesc() { + return resultDesc; + } + @Override public String toString() { return queryId.toString() + ",state=" + queryState +",progress=" + progress + ", queryMaster=" @@ -182,6 +207,7 @@ public QueryInfoProto getProto() { builder.setQueryId(queryId.toString()) .setQueryState(queryState) + .setContextVars(context.getProto()) .setProgress(progress) .setStartTime(startTime) .setFinishTime(finishTime) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index ddbd3e1a2c..5e4df61534 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -29,6 +29,8 @@ import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoProtos; +import org.apache.tajo.annotation.Nullable; +import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; import org.apache.tajo.master.TajoMaster; @@ -39,7 +41,6 @@ import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -214,6 +215,14 @@ public QueryInProgress getQueryInProgress(QueryId queryId) { } public void stopQuery(QueryId queryId) { + stopQuery(queryId, null, null); + } + + public void stopQuery(QueryId queryId, TajoProtos.QueryState finalState) { + stopQuery(queryId, finalState, null); + } + + public void stopQuery(QueryId queryId, @Nullable TajoProtos.QueryState finalState, @Nullable TableDesc resultDesc) { LOG.info("Stop QueryInProgress:" + queryId); QueryInProgress queryInProgress = getQueryInProgress(queryId); if(queryInProgress != null) { @@ -227,6 +236,15 @@ public void stopQuery(QueryId queryId) { } QueryInfo queryInfo = queryInProgress.getQueryInfo(); + + if (finalState != null) { + queryInfo.setQueryState(finalState); + } + + if (resultDesc != null) { + queryInfo.setResultDesc(resultDesc); + } + long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime(); if (executionTime < minExecutionTime.get()) { minExecutionTime.set(executionTime); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 9ab4f0a90c..691e8fcd76 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -37,6 +37,7 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ipc.TajoMasterProtocol.QueryCompleteReport; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -228,7 +229,14 @@ public void stop() { } TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); - masterClientService.stopQueryMaster(null, queryId.getProto(), future); + QueryCompleteReport.Builder completionReport = QueryCompleteReport.newBuilder(); + completionReport.setQueryId(queryId.getProto()); + completionReport.setFinalState(getState()); + Query query = queryMasterContext.getQueryMaster().getQueryMasterTask(queryId, true).getQuery(); + if (query != null && query.getSynchronizedState() == QueryState.QUERY_SUCCEEDED) { + completionReport.setTableDesc(query.getResultDesc().getProto()); + } + masterClientService.stopQueryMaster(null, completionReport.build(), future); } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 0f4a60cb7e..4cab9f339f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -230,11 +230,8 @@ public ClientProtos.GetQueryStatusResponse getQueryStatus( } @Override - public PrimitiveProtos.BoolProto closeQuery ( - RpcController controller, - TajoIdProtos.QueryIdProto request) throws ServiceException { - final QueryId queryId = new QueryId(request); - LOG.info("Stop Query:" + queryId); + public PrimitiveProtos.BoolProto closeQuery(RpcController controller, + TajoIdProtos.QueryIdProto request) throws ServiceException { return BOOL_TRUE; } diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto index e5eab4f690..a475c64a10 100644 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto @@ -138,10 +138,16 @@ message WorkerResourceAllocationResponse { repeated WorkerAllocatedResource workerAllocatedResource = 2; } +message QueryCompleteReport { + required QueryIdProto queryId = 1; + optional QueryState finalState = 2; + optional TableDescProto tableDesc = 3; +} + service TajoMasterProtocolService { rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse); rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse); rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto); - rpc stopQueryMaster(QueryIdProto) returns (BoolProto); + rpc stopQueryMaster(QueryCompleteReport) returns (BoolProto); rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest); } \ No newline at end of file From 0241bbcd86517604cc997471860b1f5af4a2f4b5 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sun, 21 Dec 2014 21:19:24 +0900 Subject: [PATCH 2/8] All test passed after getStatus from TajoMaster. --- .../apache/tajo/client/QueryClientImpl.java | 26 ++++++++++++++++--- .../tajo/master/TajoMasterClientService.java | 10 +++++-- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index dc35968789..36a5a101af 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -318,7 +318,8 @@ public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException { ClientProtos.GetQueryStatusRequest.Builder builder = ClientProtos.GetQueryStatusRequest.newBuilder(); builder.setQueryId(queryId.getProto()); - ClientProtos.GetQueryStatusResponse res = null; + GetQueryStatusResponse res = null; + GetQueryStatusResponse res2 = null; if(connection.queryMasterMap.containsKey(queryId)) { NettyClientBase qmClient = null; @@ -330,11 +331,31 @@ public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException { QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); res = queryMasterService.getQueryStatus(null, builder.build()); + NettyClientBase tmClient = connection.getTajoMasterConnection(false); + connection.checkSessionAndGet(tmClient); + builder.setSessionId(connection.sessionId); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); + res2 = tajoMasterService.getQueryStatus(null, builder.build()); + + if (res.getResultCode() != res2.getResultCode()) { + LOG.fatal("resultCode is different: " + res.getResultCode() + " <> " + res2.getResultCode()); + } + + if (!new QueryId(res.getQueryId()).equals(new QueryId(res2.getQueryId()))) { + LOG.fatal("queryId is different: " + res.getQueryId() + " <> " + res2.getQueryId()); + } + + if (!res.getState().equals(res2.getState())) { + LOG.fatal("getState is different: " + res.getState() + " <> " + res2.getState()); + } + } catch (Exception e) { throw new ServiceException(e.getMessage(), e); } finally { connection.connPool.releaseConnection(qmClient); } + return new QueryStatus(res2); + } else { @@ -356,8 +377,7 @@ public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException { try { InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort()); - qmClient = connection.connPool.getConnection( - qmAddr, QueryMasterClientProtocol.class, false); + qmClient = connection.connPool.getConnection(qmAddr, QueryMasterClientProtocol.class, false); QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); res = queryMasterService.getQueryStatus(null, builder.build()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 701403495b..66b50a5483 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -362,8 +362,9 @@ public GetQueryResultResponse getQueryResult(RpcController controller, } switch (queryInfo.getQueryState()) { case QUERY_SUCCEEDED: - // TODO check this logic needed - //builder.setTableDesc((TableDescProto) queryJobManager.getResultDesc().getProto()); + if (queryInfo.hasResultdesc()) { + builder.setTableDesc(queryInfo.getResultDesc().getProto()); + } break; case QUERY_FAILED: case QUERY_ERROR: @@ -479,6 +480,11 @@ public GetQueryStatusResponse getQueryStatus(RpcController controller, if (queryInfo != null) { builder.setResultCode(ResultCode.OK); builder.setState(queryInfo.getQueryState()); + + boolean isCreateTable = queryInfo.getQueryContext().isCreateTable(); + boolean isInsert = queryInfo.getQueryContext().isInsert(); + builder.setHasResult(!(isCreateTable || isInsert)); + builder.setProgress(queryInfo.getProgress()); builder.setSubmitTime(queryInfo.getStartTime()); if(queryInfo.getQueryMasterHost() != null) { From 01a1aed280b0007eb2d07b6548660e7eba18d12a Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Mon, 22 Dec 2014 00:37:37 +0900 Subject: [PATCH 3/8] Completely remove connection of query master of query status. --- .../apache/tajo/client/QueryClientImpl.java | 104 +++++------------- 1 file changed, 25 insertions(+), 79 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 36a5a101af..4b55b8fac3 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -95,19 +95,6 @@ public UserGroupInformation getUserInfo() { @Override public void closeQuery(QueryId queryId) { - if(connection.queryMasterMap.containsKey(queryId)) { - NettyClientBase qmClient = null; - try { - qmClient = connection.getConnection(queryId, QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMaster = qmClient.getStub(); - queryMaster.closeQuery(null, queryId.getProto()); - } catch (Exception e) { - LOG.warn("Fail to close a QueryMaster connection (qid=" + queryId + ", msg=" + e.getMessage() + ")", e); - } finally { - connection.connPool.closeConnection(qmClient); - connection.queryMasterMap.remove(queryId); - } - } } @Override @@ -319,82 +306,41 @@ public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException { builder.setQueryId(queryId.getProto()); GetQueryStatusResponse res = null; - GetQueryStatusResponse res2 = null; - - if(connection.queryMasterMap.containsKey(queryId)) { - NettyClientBase qmClient = null; - - try { - - qmClient = connection.connPool.getConnection(connection.queryMasterMap.get(queryId), - QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); - res = queryMasterService.getQueryStatus(null, builder.build()); - NettyClientBase tmClient = connection.getTajoMasterConnection(false); - connection.checkSessionAndGet(tmClient); - builder.setSessionId(connection.sessionId); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - res2 = tajoMasterService.getQueryStatus(null, builder.build()); - - if (res.getResultCode() != res2.getResultCode()) { - LOG.fatal("resultCode is different: " + res.getResultCode() + " <> " + res2.getResultCode()); - } - - if (!new QueryId(res.getQueryId()).equals(new QueryId(res2.getQueryId()))) { - LOG.fatal("queryId is different: " + res.getQueryId() + " <> " + res2.getQueryId()); - } - - if (!res.getState().equals(res2.getState())) { - LOG.fatal("getState is different: " + res.getState() + " <> " + res2.getState()); - } - - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(qmClient); - } - return new QueryStatus(res2); - - - } else { - - NettyClientBase tmClient = null; - - try { - tmClient = connection.getTajoMasterConnection(false); - connection.checkSessionAndGet(tmClient); - builder.setSessionId(connection.sessionId); - TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); + NettyClientBase tmClient = null; + try { + tmClient = connection.getTajoMasterConnection(false); + connection.checkSessionAndGet(tmClient); + builder.setSessionId(connection.sessionId); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); - res = tajoMasterService.getQueryStatus(null, builder.build()); + res = tajoMasterService.getQueryStatus(null, builder.build()); - String queryMasterHost = res.getQueryMasterHost(); + String queryMasterHost = res.getQueryMasterHost(); - if(queryMasterHost != null && !queryMasterHost.isEmpty()) { - NettyClientBase qmClient = null; + if(queryMasterHost != null && !queryMasterHost.isEmpty()) { + NettyClientBase qmClient = null; - try { + try { - InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort()); - qmClient = connection.connPool.getConnection(qmAddr, QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); - res = queryMasterService.getQueryStatus(null, builder.build()); + InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort()); + qmClient = connection.connPool.getConnection(qmAddr, QueryMasterClientProtocol.class, false); + QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); + res = queryMasterService.getQueryStatus(null, builder.build()); - connection.queryMasterMap.put(queryId, qmAddr); + connection.queryMasterMap.put(queryId, qmAddr); - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(qmClient); - } + } catch (Exception e) { + throw new ServiceException(e.getMessage(), e); + } finally { + connection.connPool.releaseConnection(qmClient); } - - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(tmClient); } + + } catch (Exception e) { + throw new ServiceException(e.getMessage(), e); + } finally { + connection.connPool.releaseConnection(tmClient); } return new QueryStatus(res); } From a8983b73911f8c25cac270f04137101d2cb2930d Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Wed, 24 Dec 2014 11:16:25 +0900 Subject: [PATCH 4/8] Passed all unit tests and removed worker client service. --- .../apache/tajo/client/QueryClientImpl.java | 39 ++---- tajo-client/src/main/proto/ClientProtos.proto | 3 +- .../proto/QueryMasterClientProtocol.proto | 4 - .../tajo/master/TajoMasterClientService.java | 15 ++- .../apache/tajo/master/TajoMasterService.java | 14 +-- .../master/querymaster/QueryInProgress.java | 18 +-- .../tajo/master/querymaster/QueryInfo.java | 15 ++- .../master/querymaster/QueryJobManager.java | 24 +--- .../tajo/master/querymaster/QueryMaster.java | 6 +- .../master/querymaster/QueryMasterTask.java | 10 +- .../tajo/worker/TajoWorkerClientService.java | 113 ------------------ .../src/main/proto/TajoMasterProtocol.proto | 15 +-- .../org/apache/tajo/TajoTestingCluster.java | 27 ++--- 13 files changed, 57 insertions(+), 246 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 4b55b8fac3..1a48c4821f 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -316,27 +316,6 @@ public QueryStatus getQueryStatus(QueryId queryId) throws ServiceException { res = tajoMasterService.getQueryStatus(null, builder.build()); - String queryMasterHost = res.getQueryMasterHost(); - - if(queryMasterHost != null && !queryMasterHost.isEmpty()) { - NettyClientBase qmClient = null; - - try { - - InetSocketAddress qmAddr = NetUtils.createSocketAddr(queryMasterHost, res.getQueryMasterPort()); - qmClient = connection.connPool.getConnection(qmAddr, QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = qmClient.getStub(); - res = queryMasterService.getQueryStatus(null, builder.build()); - - connection.queryMasterMap.put(queryId, qmAddr); - - } catch (Exception e) { - throw new ServiceException(e.getMessage(), e); - } finally { - connection.connPool.releaseConnection(qmClient); - } - } - } catch (Exception e) { throw new ServiceException(e.getMessage(), e); } finally { @@ -370,29 +349,25 @@ public GetQueryResultResponse getResultResponse(QueryId queryId) throws ServiceE return null; } - NettyClientBase client = null; + NettyClientBase tmClient = null; try { - InetSocketAddress queryMasterAddr = connection.queryMasterMap.get(queryId); - if(queryMasterAddr == null) { - LOG.warn("No Connection to QueryMaster for " + queryId); - return null; - } - - client = connection.getConnection(queryMasterAddr, QueryMasterClientProtocol.class, false); - QueryMasterClientProtocolService.BlockingInterface queryMasterService = client.getStub(); + tmClient = connection.getTajoMasterConnection(false); + connection.checkSessionAndGet(tmClient); + TajoMasterClientProtocolService.BlockingInterface tajoMasterService = tmClient.getStub(); GetQueryResultRequest.Builder builder = GetQueryResultRequest.newBuilder(); builder.setQueryId(queryId.getProto()); - GetQueryResultResponse response = queryMasterService.getQueryResult(null,builder.build()); + builder.setSessionId(connection.sessionId); + GetQueryResultResponse response = tajoMasterService.getQueryResult(null,builder.build()); return response; } catch (Exception e) { throw new ServiceException(e.getMessage(), e); } finally { - connection.connPool.releaseConnection(client); + connection.connPool.releaseConnection(tmClient); } } diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index b31c8bd742..771b1fa1d9 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -87,7 +87,7 @@ message GetQueryResultRequest { message GetQueryResultResponse { optional TableDescProto tableDesc = 1; optional string errorMessage = 2; - required string tajoUserName = 3; + optional string tajoUserName = 3; } message QueryIdRequest { @@ -252,6 +252,7 @@ message QueryInfoProto { optional int32 queryMasterPort = 10; optional int32 queryMasterClientPort = 11; optional int32 queryMasterInfoPort = 12; + optional TableDescProto resultDesc = 13; } message SubQueryHistoryProto { diff --git a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto index 3d8d70b14b..0b115667b0 100644 --- a/tajo-client/src/main/proto/QueryMasterClientProtocol.proto +++ b/tajo-client/src/main/proto/QueryMasterClientProtocol.proto @@ -28,9 +28,5 @@ import "PrimitiveProtos.proto"; import "ClientProtos.proto"; service QueryMasterClientProtocolService { - rpc updateSessionVariables(UpdateSessionVariableRequest) returns (BoolProto); - rpc getQueryResult(GetQueryResultRequest) returns (GetQueryResultResponse); - rpc getQueryStatus(GetQueryStatusRequest) returns (GetQueryStatusResponse); - rpc closeQuery(QueryIdProto) returns (BoolProto); rpc getQueryHistory(QueryIdRequest) returns (GetQueryHistoryResponse); } \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 66b50a5483..901a10e663 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -330,6 +330,7 @@ public UpdateQueryResponse updateQuery(RpcController controller, QueryRequest re @Override public GetQueryResultResponse getQueryResult(RpcController controller, GetQueryResultRequest request) throws ServiceException { + LOG.info(">>>>> Enter TajoMasterClientService::getQueryResult"); try { context.getSessionManager().touch(request.getSessionId().getId()); QueryId queryId = new QueryId(request.getQueryId()); @@ -344,31 +345,33 @@ public GetQueryResultResponse getQueryResult(RpcController controller, queryInfo = queryInProgress.getQueryInfo(); } + LOG.info(">>>>> Get QueryInfo: " + queryInfo); + GetQueryResultResponse.Builder builder = GetQueryResultResponse.newBuilder(); + builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName()); // If we cannot the QueryInfo instance from the finished list, // the query result was expired due to timeout. // In this case, we will result in error. if (queryInfo == null) { builder.setErrorMessage("No such query: " + queryId.toString()); + LOG.info(">>>>> no QueryInfo"); return builder.build(); } - try { - //TODO After implementation Tajo's user security feature, Should be modified. - builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName()); - } catch (IOException e) { - LOG.warn("Can't get current user name"); - } switch (queryInfo.getQueryState()) { case QUERY_SUCCEEDED: if (queryInfo.hasResultdesc()) { + LOG.info(">>>>> hasResultDesc: " + queryInfo.getResultDesc().getPath()); builder.setTableDesc(queryInfo.getResultDesc().getProto()); + } else { + LOG.info(">>>>> no ResultDesc"); } break; case QUERY_FAILED: case QUERY_ERROR: builder.setErrorMessage("Query " + queryId + " is failed"); + break; default: builder.setErrorMessage("Query " + queryId + " is still running"); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java index 5a9dd29078..1e3501c80d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterService.java @@ -26,9 +26,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos; import org.apache.tajo.QueryId; import org.apache.tajo.TajoIdProtos; -import org.apache.tajo.TajoProtos; -import org.apache.tajo.TajoProtos.QueryState; -import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.TajoMasterProtocol; @@ -141,16 +138,9 @@ public void releaseWorkerResource(RpcController controller, } @Override - public void stopQueryMaster(RpcController controller, TajoMasterProtocol.QueryCompleteReport request, + public void stopQueryMaster(RpcController controller, TajoIdProtos.QueryIdProto request, RpcCallback done) { - QueryId queryId = new QueryId(request.getQueryId()); - if (request.getFinalState() == QueryState.QUERY_SUCCEEDED && request.hasTableDesc()) { - TableDesc resultTableDesc = new TableDesc(request.getTableDesc()); - context.getQueryJobManager().stopQuery(queryId, request.getFinalState(), resultTableDesc); - } else { - context.getQueryJobManager().stopQuery(queryId, request.getFinalState()); - } - + context.getQueryJobManager().stopQuery(new QueryId(request)); done.run(BOOL_TRUE); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index 3e5350b2a4..0f797c7974 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -18,6 +18,7 @@ package org.apache.tajo.master.querymaster; +import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -74,8 +75,6 @@ public class QueryInProgress extends CompositeService { private ContainerProtocol.TajoContainerIdProto qmContainerId; - private TableDesc resultDesc; - public QueryInProgress( TajoMaster.MasterContext masterContext, Session session, @@ -106,18 +105,6 @@ public synchronized void kill() { } } - public void setResultDesc(TableDesc resultDesc) { - synchronized (this) { - this.resultDesc = resultDesc; - } - } - - public TableDesc getResultDesc() { - synchronized (this) { - return resultDesc; - } - } - @Override public void stop() { if(stopped.getAndSet(true)) { @@ -297,6 +284,9 @@ private void heartbeat(QueryInfo queryInfo) { } if(isFinishState(this.queryInfo.getQueryState())) { + LOG.info(">>>>> " + queryId + ", hasResultDesc" + queryInfo.hasResultdesc()); + this.queryInfo.setResultDesc(queryInfo.getResultDesc()); + getEventHandler().handle( new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_FINISH, this.queryInfo)); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java index 2bc8649d3f..197998f477 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java @@ -20,6 +20,8 @@ import com.google.gson.annotations.Expose; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.TableDesc; @@ -56,9 +58,8 @@ public class QueryInfo implements GsonObject, History { private int queryMasterInfoPort; @Expose private String queryIdStr; - @Expose - private TableDesc resultDesc; + private volatile TableDesc resultDesc; private String jsonExpr; @@ -160,15 +161,19 @@ public void setProgress(float progress) { this.progress = progress; } - public void setResultDesc(TableDesc result) { + Log LOG = LogFactory.getLog(QueryInfo.class); + + public synchronized void setResultDesc(TableDesc result) { + LOG.info(">>>>> " + queryId); this.resultDesc = result; + LOG.info(">>>>> hasResultDesc" + hasResultdesc()); } - public boolean hasResultdesc() { + public synchronized boolean hasResultdesc() { return resultDesc != null; } - public TableDesc getResultDesc() { + public synchronized TableDesc getResultDesc() { return resultDesc; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index 5e4df61534..8559145230 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -29,7 +29,6 @@ import org.apache.tajo.QueryId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.TajoProtos; -import org.apache.tajo.annotation.Nullable; import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.TajoMasterProtocol; @@ -41,6 +40,7 @@ import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; @@ -215,14 +215,6 @@ public QueryInProgress getQueryInProgress(QueryId queryId) { } public void stopQuery(QueryId queryId) { - stopQuery(queryId, null, null); - } - - public void stopQuery(QueryId queryId, TajoProtos.QueryState finalState) { - stopQuery(queryId, finalState, null); - } - - public void stopQuery(QueryId queryId, @Nullable TajoProtos.QueryState finalState, @Nullable TableDesc resultDesc) { LOG.info("Stop QueryInProgress:" + queryId); QueryInProgress queryInProgress = getQueryInProgress(queryId); if(queryInProgress != null) { @@ -236,15 +228,6 @@ public void stopQuery(QueryId queryId, @Nullable TajoProtos.QueryState finalStat } QueryInfo queryInfo = queryInProgress.getQueryInfo(); - - if (finalState != null) { - queryInfo.setQueryState(finalState); - } - - if (resultDesc != null) { - queryInfo.setResultDesc(resultDesc); - } - long executionTime = queryInfo.getFinishTime() - queryInfo.getStartTime(); if (executionTime < minExecutionTime.get()) { minExecutionTime.set(executionTime); @@ -318,6 +301,11 @@ private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat qu queryInfo.setFinishTime(queryHeartbeat.getQueryFinishTime()); } + if (queryHeartbeat.hasResultDesc()) { + LOG.info(">>>>> " + new QueryId(queryHeartbeat.getQueryId()) + ", hasResultDesc" + queryHeartbeat.hasResultDesc()); + queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc())); + } + return queryInfo; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index 7ddd7875dd..7623026cdb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -454,10 +454,12 @@ private TajoHeartbeat buildTajoHeartBeat(QueryMasterTask queryMasterTask) { TajoHeartbeat.Builder builder = TajoHeartbeat.newBuilder(); builder.setConnectionInfo(workerContext.getConnectionInfo().getProto()); - builder.setState(queryMasterTask.getState()); builder.setQueryId(queryMasterTask.getQueryId().getProto()); - + builder.setState(queryMasterTask.getState()); if (queryMasterTask.getQuery() != null) { + if (queryMasterTask.getQuery().getResultDesc() != null) { + builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto()); + } builder.setQueryProgress(queryMasterTask.getQuery().getProgress()); builder.setQueryFinishTime(queryMasterTask.getQuery().getFinishTime()); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 691e8fcd76..9ab4f0a90c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -37,7 +37,6 @@ import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.catalog.proto.CatalogProtos.StoreType; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.ipc.TajoMasterProtocol.QueryCompleteReport; import org.apache.tajo.plan.LogicalOptimizer; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.LogicalPlanner; @@ -229,14 +228,7 @@ public void stop() { } TajoMasterProtocol.TajoMasterProtocolService masterClientService = tmClient.getStub(); - QueryCompleteReport.Builder completionReport = QueryCompleteReport.newBuilder(); - completionReport.setQueryId(queryId.getProto()); - completionReport.setFinalState(getState()); - Query query = queryMasterContext.getQueryMaster().getQueryMasterTask(queryId, true).getQuery(); - if (query != null && query.getSynchronizedState() == QueryState.QUERY_SUCCEEDED) { - completionReport.setTableDesc(query.getResultDesc().getProto()); - } - masterClientService.stopQueryMaster(null, completionReport.build(), future); + masterClientService.stopQueryMaster(null, queryId.getProto(), future); } catch (Exception e) { LOG.error(e.getMessage(), e); } finally { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 4cab9f339f..73379023fd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -79,15 +79,12 @@ public void init(Configuration conf) { this.serviceHandler = new TajoWorkerClientProtocolServiceHandler(); // init RPC Server in constructor cause Heartbeat Thread use bindAddr - // Setup RPC server try { - // TODO initial port num is value of config and find unused port with sequence InetSocketAddress initIsa = new InetSocketAddress("0.0.0.0", port); if (initIsa.getAddress() == null) { throw new IllegalArgumentException("Failed resolve of " + initIsa); } - // TODO blocking/non-blocking?? int workerNum = this.conf.getIntVar(TajoConf.ConfVars.WORKER_SERVICE_RPC_SERVER_WORKER_THREAD_NUM); this.rpcServer = new BlockingRpcServer(QueryMasterClientProtocol.class, serviceHandler, initIsa, workerNum); this.rpcServer.start(); @@ -124,116 +121,6 @@ public InetSocketAddress getBindAddr() { public class TajoWorkerClientProtocolServiceHandler implements QueryMasterClientProtocol.QueryMasterClientProtocolService.BlockingInterface { - @Override - public PrimitiveProtos.BoolProto updateSessionVariables( - RpcController controller, - ClientProtos.UpdateSessionVariableRequest request) throws ServiceException { - return null; - } - - private boolean hasResultTableDesc(QueryContext queryContext) { - return !(queryContext.isCreateTable() || queryContext.isInsert()); - } - - @Override - public ClientProtos.GetQueryResultResponse getQueryResult( - RpcController controller, - ClientProtos.GetQueryResultRequest request) throws ServiceException { - QueryId queryId = new QueryId(request.getQueryId()); - QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); - - ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder(); - try { - builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName()); - } catch (IOException e) { - LOG.warn("Can't get current user name"); - } - - if(queryMasterTask == null || queryMasterTask.getQuery() == null) { - builder.setErrorMessage("No Query for " + queryId); - } else { - switch (queryMasterTask.getState()) { - case QUERY_SUCCEEDED: -// if (hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext())) { - builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto()); - //} - break; - case QUERY_FAILED: - case QUERY_ERROR: - builder.setErrorMessage("Query " + queryId + " is failed"); - default: - builder.setErrorMessage("Query " + queryId + " is still running"); - } - } - return builder.build(); - } - - @Override - public ClientProtos.GetQueryStatusResponse getQueryStatus( - RpcController controller, - ClientProtos.GetQueryStatusRequest request) throws ServiceException { - ClientProtos.GetQueryStatusResponse.Builder builder - = ClientProtos.GetQueryStatusResponse.newBuilder(); - QueryId queryId = new QueryId(request.getQueryId()); - - builder.setQueryId(request.getQueryId()); - - if (queryId.equals(QueryIdFactory.NULL_QUERY_ID)) { - builder.setResultCode(ClientProtos.ResultCode.OK); - builder.setState(TajoProtos.QueryState.QUERY_SUCCEEDED); - } else { - QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId); - - builder.setResultCode(ClientProtos.ResultCode.OK); - builder.setQueryMasterHost(bindAddr.getHostName()); - builder.setQueryMasterPort(bindAddr.getPort()); - - if (queryMasterTask == null) { - queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); - } - if (queryMasterTask == null) { - builder.setState(TajoProtos.QueryState.QUERY_NOT_ASSIGNED); - return builder.build(); - } - - builder.setHasResult(hasResultTableDesc(queryMasterTask.getQueryTaskContext().getQueryContext())); - - queryMasterTask.touchSessionTime(); - Query query = queryMasterTask.getQuery(); - - if (query != null) { - builder.setState(queryMasterTask.getState()); - builder.setProgress(query.getProgress()); - builder.setSubmitTime(query.getAppSubmitTime()); - if (queryMasterTask.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { - builder.setFinishTime(query.getFinishTime()); - } else { - builder.setFinishTime(System.currentTimeMillis()); - } - } - Collection diagnostics = queryMasterTask.getDiagnostics(); - if(!diagnostics.isEmpty()) { - TajoWorkerProtocol.TaskFatalErrorReport firstError = diagnostics.iterator().next(); - builder.setErrorMessage(firstError.getErrorMessage()); - builder.setErrorTrace(firstError.getErrorTrace()); - } - - if (queryMasterTask.isInitError()) { - Throwable initError = queryMasterTask.getInitError(); - builder.setErrorMessage( - initError.getMessage() == null ? initError.getClass().getName() : initError.getMessage()); - builder.setErrorTrace(StringUtils.stringifyException(initError)); - builder.setState(queryMasterTask.getState()); - } - } - return builder.build(); - } - - @Override - public PrimitiveProtos.BoolProto closeQuery(RpcController controller, - TajoIdProtos.QueryIdProto request) throws ServiceException { - return BOOL_TRUE; - } @Override public GetQueryHistoryResponse getQueryHistory(RpcController controller, QueryIdRequest request) throws ServiceException { diff --git a/tajo-core/src/main/proto/TajoMasterProtocol.proto b/tajo-core/src/main/proto/TajoMasterProtocol.proto index a475c64a10..cc83e47267 100644 --- a/tajo-core/src/main/proto/TajoMasterProtocol.proto +++ b/tajo-core/src/main/proto/TajoMasterProtocol.proto @@ -66,9 +66,10 @@ message TajoHeartbeat { required WorkerConnectionInfoProto connectionInfo = 1; optional QueryIdProto queryId = 2; optional QueryState state = 3; - optional string statusMessage = 4; - optional float queryProgress = 5; - optional int64 queryFinishTime = 6; + optional TableDescProto resultDesc = 4; + optional string statusMessage = 5; + optional float queryProgress = 6; + optional int64 queryFinishTime = 7; } message TajoHeartbeatResponse { @@ -138,16 +139,10 @@ message WorkerResourceAllocationResponse { repeated WorkerAllocatedResource workerAllocatedResource = 2; } -message QueryCompleteReport { - required QueryIdProto queryId = 1; - optional QueryState finalState = 2; - optional TableDescProto tableDesc = 3; -} - service TajoMasterProtocolService { rpc heartbeat(TajoHeartbeat) returns (TajoHeartbeatResponse); rpc allocateWorkerResources(WorkerResourceAllocationRequest) returns (WorkerResourceAllocationResponse); rpc releaseWorkerResource(WorkerResourceReleaseRequest) returns (BoolProto); - rpc stopQueryMaster(QueryCompleteReport) returns (BoolProto); + rpc stopQueryMaster(QueryIdProto) returns (BoolProto); rpc getAllWorkerResource(NullProto) returns (WorkerResourcesRequest); } \ No newline at end of file diff --git a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java index 9868297360..a84f03ed7d 100644 --- a/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java +++ b/tajo-core/src/test/java/org/apache/tajo/TajoTestingCluster.java @@ -43,10 +43,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.master.TajoMaster; -import org.apache.tajo.master.querymaster.Query; -import org.apache.tajo.master.querymaster.QueryMasterTask; -import org.apache.tajo.master.querymaster.SubQuery; -import org.apache.tajo.master.querymaster.SubQueryState; +import org.apache.tajo.master.querymaster.*; import org.apache.tajo.master.rm.TajoWorkerResourceManager; import org.apache.tajo.util.CommonTestingUtil; import org.apache.tajo.util.KeyValueSet; @@ -774,14 +771,16 @@ public void waitForQueryRunning(QueryId queryId) throws Exception { } public void waitForQueryRunning(QueryId queryId, int delay) throws Exception { - QueryMasterTask qmt = null; + QueryInProgress qip = null; int i = 0; - while (qmt == null || TajoClientUtil.isQueryWaitingForSchedule(qmt.getState())) { + while (qip == null || TajoClientUtil.isQueryWaitingForSchedule(qip.getQueryInfo().getQueryState())) { try { Thread.sleep(delay); - if(qmt == null){ - qmt = getQueryMasterTask(queryId); + if(qip == null){ + + TajoMaster master = getMaster(); + qip = master.getContext().getQueryJobManager().getQueryInProgress(queryId); } } catch (InterruptedException e) { } @@ -817,16 +816,4 @@ public void waitForSubQueryState(SubQuery subQuery, SubQueryState expected, int } } } - - public QueryMasterTask getQueryMasterTask(QueryId queryId) { - QueryMasterTask qmt = null; - for (TajoWorker worker : getTajoWorkers()) { - qmt = worker.getWorkerContext().getQueryMaster().getQueryMasterTask(queryId); - if (qmt != null) { - break; - } - } - - return qmt; - } } From 546837ee8176ebecc38f14050fdbc0394ea0fd43 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Wed, 24 Dec 2014 12:15:37 +0900 Subject: [PATCH 5/8] Remove queryMasterMap from SessionConnection. --- .../apache/tajo/client/QueryClientImpl.java | 1 - .../apache/tajo/client/SessionConnection.java | 20 +------------------ .../master/querymaster/QueryInProgress.java | 1 - 3 files changed, 1 insertion(+), 21 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java index 1a48c4821f..e2ed9449dd 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/QueryClientImpl.java @@ -36,7 +36,6 @@ import org.apache.tajo.jdbc.TajoResultSet; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.ServerCallable; -import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; import java.io.IOException; diff --git a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java index db2bd2a1f9..b725bb9428 100644 --- a/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java +++ b/tajo-client/src/main/java/org/apache/tajo/client/SessionConnection.java @@ -22,10 +22,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.tajo.QueryId; import org.apache.tajo.TajoIdProtos; import org.apache.tajo.annotation.Nullable; import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.ClientProtos.SessionUpdateResponse; @@ -33,7 +33,6 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.RpcConnectionPool; import org.apache.tajo.rpc.ServerCallable; -import org.apache.tajo.ha.HAServiceUtil; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; @@ -46,7 +45,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.tajo.ipc.ClientProtos.CreateSessionRequest; @@ -59,8 +57,6 @@ public class SessionConnection implements Closeable { private final TajoConf conf; - final Map queryMasterMap = new ConcurrentHashMap(); - final InetSocketAddress tajoMasterAddr; final RpcConnectionPool connPool; @@ -117,23 +113,11 @@ public Map getClientSideSessionVars() { return Collections.unmodifiableMap(sessionVarsCache); } - public T getStub(QueryId queryId, Class protocolClass, boolean asyncMode) throws NoSuchMethodException, - ConnectTimeoutException, ClassNotFoundException { - InetSocketAddress addr = queryMasterMap.get(queryId); - return connPool.getConnection(addr, protocolClass, asyncMode).getStub(); - } - public NettyClientBase getTajoMasterConnection(boolean asyncMode) throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { return connPool.getConnection(getTajoMasterAddr(), TajoMasterClientProtocol.class, asyncMode); } - public NettyClientBase getConnection(QueryId queryId, Class protocolClass, boolean asyncMode) - throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { - InetSocketAddress addr = queryMasterMap.get(queryId); - return connPool.getConnection(addr, protocolClass, asyncMode); - } - public NettyClientBase getConnection(InetSocketAddress addr, Class protocolClass, boolean asyncMode) throws NoSuchMethodException, ConnectTimeoutException, ClassNotFoundException { return connPool.getConnection(addr, protocolClass, asyncMode); @@ -321,8 +305,6 @@ public void close() { if(connPool != null) { connPool.shutdown(); } - - queryMasterMap.clear(); } protected InetSocketAddress getTajoMasterAddr() { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index 0f797c7974..dd532323d4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -284,7 +284,6 @@ private void heartbeat(QueryInfo queryInfo) { } if(isFinishState(this.queryInfo.getQueryState())) { - LOG.info(">>>>> " + queryId + ", hasResultDesc" + queryInfo.hasResultdesc()); this.queryInfo.setResultDesc(queryInfo.getResultDesc()); getEventHandler().handle( From 9de88de5da0a27145ae3626ed2c9db47b0d8092d Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Wed, 24 Dec 2014 13:03:59 +0900 Subject: [PATCH 6/8] Clean up codes. --- .../apache/tajo/master/TajoMasterClientService.java | 7 ------- .../tajo/master/querymaster/QueryInProgress.java | 4 ---- .../org/apache/tajo/master/querymaster/QueryInfo.java | 10 ++++------ .../tajo/master/querymaster/QueryJobManager.java | 1 - .../apache/tajo/worker/TajoWorkerClientService.java | 11 ----------- 5 files changed, 4 insertions(+), 29 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index ea8313629f..02f9ed98e3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -330,7 +330,6 @@ public UpdateQueryResponse updateQuery(RpcController controller, QueryRequest re @Override public GetQueryResultResponse getQueryResult(RpcController controller, GetQueryResultRequest request) throws ServiceException { - LOG.info(">>>>> Enter TajoMasterClientService::getQueryResult"); try { context.getSessionManager().touch(request.getSessionId().getId()); QueryId queryId = new QueryId(request.getQueryId()); @@ -345,8 +344,6 @@ public GetQueryResultResponse getQueryResult(RpcController controller, queryInfo = queryInProgress.getQueryInfo(); } - LOG.info(">>>>> Get QueryInfo: " + queryInfo); - GetQueryResultResponse.Builder builder = GetQueryResultResponse.newBuilder(); builder.setTajoUserName(UserGroupInformation.getCurrentUser().getUserName()); @@ -355,17 +352,13 @@ public GetQueryResultResponse getQueryResult(RpcController controller, // In this case, we will result in error. if (queryInfo == null) { builder.setErrorMessage("No such query: " + queryId.toString()); - LOG.info(">>>>> no QueryInfo"); return builder.build(); } switch (queryInfo.getQueryState()) { case QUERY_SUCCEEDED: if (queryInfo.hasResultdesc()) { - LOG.info(">>>>> hasResultDesc: " + queryInfo.getResultDesc().getPath()); builder.setTableDesc(queryInfo.getResultDesc().getProto()); - } else { - LOG.info(">>>>> no ResultDesc"); } break; case QUERY_FAILED: diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index dd532323d4..485f8d7cb5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -18,7 +18,6 @@ package org.apache.tajo.master.querymaster; -import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -27,7 +26,6 @@ import org.apache.hadoop.yarn.event.EventHandler; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; -import org.apache.tajo.catalog.TableDesc; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.ContainerProtocol; import org.apache.tajo.ipc.QueryMasterProtocol; @@ -73,8 +71,6 @@ public class QueryInProgress extends CompositeService { private QueryMasterProtocolService queryMasterRpcClient; - private ContainerProtocol.TajoContainerIdProto qmContainerId; - public QueryInProgress( TajoMaster.MasterContext masterContext, Session session, diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java index 197998f477..9a162c4ec8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java @@ -20,8 +20,6 @@ import com.google.gson.annotations.Expose; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.tajo.QueryId; import org.apache.tajo.TajoProtos; import org.apache.tajo.catalog.TableDesc; @@ -161,12 +159,8 @@ public void setProgress(float progress) { this.progress = progress; } - Log LOG = LogFactory.getLog(QueryInfo.class); - public synchronized void setResultDesc(TableDesc result) { - LOG.info(">>>>> " + queryId); this.resultDesc = result; - LOG.info(">>>>> hasResultDesc" + hasResultdesc()); } public synchronized boolean hasResultdesc() { @@ -220,6 +214,10 @@ public QueryInfoProto getProto() { .setQueryMasterClientPort(queryMasterClientPort) .setQueryMasterInfoPort(queryMasterInfoPort); + if (resultDesc != null) { + builder.setResultDesc(resultDesc.getProto()); + } + if (sql != null) { builder.setSql(sql); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java index 8559145230..34a0d013d4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryJobManager.java @@ -302,7 +302,6 @@ private QueryInfo makeQueryInfoFromHeartbeat(TajoMasterProtocol.TajoHeartbeat qu } if (queryHeartbeat.hasResultDesc()) { - LOG.info(">>>>> " + new QueryId(queryHeartbeat.getQueryId()) + ", hasResultDesc" + queryHeartbeat.hasResultDesc()); queryInfo.setResultDesc(new TableDesc(queryHeartbeat.getResultDesc())); } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index 73379023fd..1c831107a8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -24,31 +24,20 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.util.StringUtils; import org.apache.tajo.QueryId; -import org.apache.tajo.QueryIdFactory; -import org.apache.tajo.TajoIdProtos; -import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.GetQueryHistoryResponse; import org.apache.tajo.ipc.ClientProtos.QueryIdRequest; import org.apache.tajo.ipc.ClientProtos.ResultCode; import org.apache.tajo.ipc.QueryMasterClientProtocol; -import org.apache.tajo.ipc.TajoWorkerProtocol; -import org.apache.tajo.master.querymaster.Query; import org.apache.tajo.master.querymaster.QueryMasterTask; import org.apache.tajo.rpc.BlockingRpcServer; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.history.QueryHistory; -import java.io.IOException; import java.net.InetSocketAddress; -import java.util.Collection; public class TajoWorkerClientService extends AbstractService { private static final Log LOG = LogFactory.getLog(TajoWorkerClientService.class); From 108dedaea42a49ef5bd526caf6067e6464a24f26 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Tue, 30 Dec 2014 11:45:53 +0900 Subject: [PATCH 7/8] Fix synchronization problem of query result. --- .../master/querymaster/QueryInProgress.java | 43 ++++++++++--------- .../tajo/master/querymaster/QueryInfo.java | 14 +++--- 2 files changed, 30 insertions(+), 27 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index 485f8d7cb5..d64a832bc5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -260,30 +260,33 @@ public boolean isStarted() { private void heartbeat(QueryInfo queryInfo) { LOG.info("Received QueryMaster heartbeat:" + queryInfo); - this.queryInfo.setQueryState(queryInfo.getQueryState()); - this.queryInfo.setProgress(queryInfo.getProgress()); - this.queryInfo.setFinishTime(queryInfo.getFinishTime()); - if(queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) { - this.queryInfo.setLastMessage(queryInfo.getLastMessage()); - LOG.info(queryId + queryInfo.getLastMessage()); - } - if(this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) { - //TODO needed QueryMaster's detail status(failed before or after launching worker) - //queryMasterStopped.set(true); - LOG.warn(queryId + " failed, " + queryInfo.getLastMessage()); - } + synchronized (this.queryInfo) { - if(!querySubmitted.get()) { - getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_START, this.queryInfo)); - } + if (isFinishState(queryInfo.getQueryState())) { + if (queryInfo.hasResultdesc()) { + this.queryInfo.setResultDesc(queryInfo.getResultDesc()); + } + } - if(isFinishState(this.queryInfo.getQueryState())) { - this.queryInfo.setResultDesc(queryInfo.getResultDesc()); + this.queryInfo.setQueryState(queryInfo.getQueryState()); + this.queryInfo.setProgress(queryInfo.getProgress()); + this.queryInfo.setFinishTime(queryInfo.getFinishTime()); - getEventHandler().handle( - new QueryJobEvent(QueryJobEvent.Type.QUERY_JOB_FINISH, this.queryInfo)); + // Update diagnosis message + if (queryInfo.getLastMessage() != null && !queryInfo.getLastMessage().isEmpty()) { + this.queryInfo.setLastMessage(queryInfo.getLastMessage()); + LOG.info(queryId + queryInfo.getLastMessage()); + } + + // if any error occurs, print outs the error message + if (this.queryInfo.getQueryState() == TajoProtos.QueryState.QUERY_FAILED) { + LOG.warn(queryId + " failed, " + queryInfo.getLastMessage()); + } + + if (isFinishState(this.queryInfo.getQueryState())) { + stop(); + } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java index 9a162c4ec8..559fc14578 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInfo.java @@ -37,13 +37,13 @@ public class QueryInfo implements GsonObject, History { @Expose private String sql; @Expose - private TajoProtos.QueryState queryState; + private volatile TajoProtos.QueryState queryState; @Expose - private float progress; + private volatile float progress; @Expose - private long startTime; + private volatile long startTime; @Expose - private long finishTime; + private volatile long finishTime; @Expose private String lastMessage; @Expose @@ -159,15 +159,15 @@ public void setProgress(float progress) { this.progress = progress; } - public synchronized void setResultDesc(TableDesc result) { + public void setResultDesc(TableDesc result) { this.resultDesc = result; } - public synchronized boolean hasResultdesc() { + public boolean hasResultdesc() { return resultDesc != null; } - public synchronized TableDesc getResultDesc() { + public TableDesc getResultDesc() { return resultDesc; } From 60499789228ad0f7bfaa513f0374911a1c1a90f2 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Tue, 30 Dec 2014 12:50:28 +0900 Subject: [PATCH 8/8] Improve heartbeat status update. --- .../org/apache/tajo/master/querymaster/QueryInProgress.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java index d64a832bc5..ca0bd72c6d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryInProgress.java @@ -261,8 +261,11 @@ public boolean isStarted() { private void heartbeat(QueryInfo queryInfo) { LOG.info("Received QueryMaster heartbeat:" + queryInfo); + // to avoid partial update by different heartbeats synchronized (this.queryInfo) { + // terminal state will let client to retrieve a query result + // So, we must set the query result before changing query state if (isFinishState(queryInfo.getQueryState())) { if (queryInfo.hasResultdesc()) { this.queryInfo.setResultDesc(queryInfo.getResultDesc()); @@ -284,6 +287,7 @@ private void heartbeat(QueryInfo queryInfo) { LOG.warn(queryId + " failed, " + queryInfo.getLastMessage()); } + if (isFinishState(this.queryInfo.getQueryState())) { stop(); }