From 565744dea7ebcd80de100741465cab606fdad068 Mon Sep 17 00:00:00 2001 From: jinossy Date: Tue, 29 Jul 2014 17:54:00 +0900 Subject: [PATCH 1/6] TAJO-985: Client API should be async --- .../java/org/apache/tajo/cli/TajoCli.java | 2 +- .../apache/tajo/master/querymaster/Query.java | 30 +++++++++++----- .../master/querymaster/QueryMasterTask.java | 2 +- .../tajo/master/querymaster/SubQuery.java | 34 +++++++++++++------ .../tajo/worker/TajoWorkerClientService.java | 12 +++---- 5 files changed, 53 insertions(+), 27 deletions(-) diff --git a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java index 98a5bf456a..785921fb78 100644 --- a/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java +++ b/tajo-client/src/main/java/org/apache/tajo/cli/TajoCli.java @@ -487,7 +487,7 @@ private void waitForQueryCompleted(QueryId queryId) { while (true) { // TODO - configurable status = client.getQueryStatus(queryId); - if(status.getState() == QueryState.QUERY_MASTER_INIT || status.getState() == QueryState.QUERY_MASTER_LAUNCHED) { + if(status.getState() == QueryState.QUERY_NEW || status.getState() == QueryState.QUERY_MASTER_INIT || status.getState() == QueryState.QUERY_MASTER_LAUNCHED) { Thread.sleep(Math.min(20 * initRetries, 1000)); initRetries++; continue; diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 8bb3dde755..bc89cdec70 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -94,6 +94,7 @@ public class Query implements EventHandler { // State Machine private final StateMachine stateMachine; + private QueryState queryState; // Transition Handler private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); @@ -231,10 +232,11 @@ public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId this.writeLock = readWriteLock.writeLock(); stateMachine = stateMachineFactory.make(this); + queryState = stateMachine.getCurrentState(); } public float getProgress() { - QueryState state = getStateMachine().getCurrentState(); + QueryState state = getState(true); if (state == QueryState.QUERY_SUCCEEDED) { return 1.0f; } else { @@ -246,9 +248,9 @@ public float getProgress() { float [] subProgresses = new float[tempSubQueries.size()]; boolean finished = true; for (SubQuery subquery: tempSubQueries) { - if (subquery.getState() != SubQueryState.NEW) { + if (subquery.getState(true) != SubQueryState.NEW) { subProgresses[idx] = subquery.getProgress(); - if (finished && subquery.getState() != SubQueryState.SUCCEEDED) { + if (finished && subquery.getState(true) != SubQueryState.SUCCEEDED) { finished = false; } } else { @@ -337,15 +339,24 @@ public Collection getSubQueries() { return this.subqueries.values(); } - public QueryState getState() { - readLock.lock(); - try { - return stateMachine.getCurrentState(); - } finally { - readLock.unlock(); + protected QueryState getState(boolean async) { + if(async){ + /* non-blocking call for client API */ + return queryState; + } else { + readLock.lock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } } } + protected QueryState getState() { + return getState(false); + } + public ExecutionBlockCursor getExecutionBlockCursor() { return cursor; } @@ -845,6 +856,7 @@ public void handle(QueryEvent event) { QueryState oldState = getState(); try { getStateMachine().doTransition(event.getType(), event); + queryState = getState(); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state" + ", type:" + event diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 071e5d4f5d..635b8d567d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -478,7 +478,7 @@ public QueryState getState() { return QueryState.QUERY_NOT_ASSIGNED; } } else { - return query.getState(); + return query.getState(true); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index f2e9dd56ad..9cb88c515c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -89,6 +89,7 @@ public class SubQuery implements EventHandler { private AbstractTaskScheduler taskScheduler; private QueryMasterTask.QueryMasterTaskContext context; private final List diagnostics = new ArrayList(); + private SubQueryState subQueryState; private long startTime; private long finishTime; @@ -263,6 +264,7 @@ public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan maste this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); stateMachine = stateMachineFactory.make(this); + subQueryState = stateMachine.getCurrentState(); } public static boolean isRunningState(SubQueryState state) { @@ -310,7 +312,7 @@ public long getFinishTime() { public float getTaskProgress() { readLock.lock(); try { - if (getState() == SubQueryState.NEW) { + if (getState(true) == SubQueryState.NEW) { return 0; } else { return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount; @@ -324,7 +326,7 @@ public float getProgress() { List tempTasks = null; readLock.lock(); try { - if (getState() == SubQueryState.NEW) { + if (getState(true) == SubQueryState.NEW) { return 0; } else { tempTasks = new ArrayList(tasks.values()); @@ -340,7 +342,7 @@ public float getProgress() { } } - return totalProgress/(float)tempTasks.size(); + return totalProgress / (float) tempTasks.size(); } public int getSucceededObjectCount() { @@ -468,15 +470,24 @@ public int compareTo(SubQuery other) { return getId().compareTo(other.getId()); } - public SubQueryState getState() { - readLock.lock(); - try { - return stateMachine.getCurrentState(); - } finally { - readLock.unlock(); + public SubQueryState getState(boolean async) { + if(async){ + /* non-blocking call for client API */ + return subQueryState; + } else { + readLock.lock(); + try { + return stateMachine.getCurrentState(); + } finally { + readLock.unlock(); + } } } + public SubQueryState getState() { + return getState(false); + } + public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) { TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()}; long[] avgRows = new long[]{0, 0}; @@ -594,6 +605,7 @@ public void handle(SubQueryEvent event) { SubQueryState oldState = getState(); try { getStateMachine().doTransition(event.getType(), event); + subQueryState = getState(); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state" + ", eventType:" + event.getType().name() @@ -635,6 +647,7 @@ public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) subQuery.finalizeStats(); state = SubQueryState.SUCCEEDED; } else { + //TODO change to async execution ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock()); DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId()); setShuffleIfNecessary(subQuery, channel); @@ -1004,7 +1017,8 @@ public void transition(SubQuery subQuery, SubQueryEvent event) { LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!"); subQuery.eventHandler.handle( new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH, - subQuery.getId(), allocationEvent.getAllocatedContainer())); + subQuery.getId(), allocationEvent.getAllocatedContainer()) + ); subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START)); } catch (Throwable t) { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java index abd4e98ec8..d25013c8ed 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerClientService.java @@ -133,7 +133,7 @@ public ClientProtos.GetQueryResultResponse getQueryResult( RpcController controller, ClientProtos.GetQueryResultRequest request) throws ServiceException { QueryId queryId = new QueryId(request.getQueryId()); - Query query = workerContext.getQueryMaster().getQueryMasterTask(queryId, true).getQuery(); + QueryMasterTask queryMasterTask = workerContext.getQueryMaster().getQueryMasterTask(queryId, true); ClientProtos.GetQueryResultResponse.Builder builder = ClientProtos.GetQueryResultResponse.newBuilder(); try { @@ -142,12 +142,12 @@ public ClientProtos.GetQueryResultResponse getQueryResult( LOG.warn("Can't get current user name"); } - if(query == null) { + if(queryMasterTask == null || queryMasterTask.getQuery() == null) { builder.setErrorMessage("No Query for " + queryId); } else { - switch (query.getState()) { + switch (queryMasterTask.getState()) { case QUERY_SUCCEEDED: - builder.setTableDesc(query.getResultDesc().getProto()); + builder.setTableDesc(queryMasterTask.getQuery().getResultDesc().getProto()); break; case QUERY_FAILED: case QUERY_ERROR: @@ -196,10 +196,10 @@ public ClientProtos.GetQueryStatusResponse getQueryStatus( Query query = queryMasterTask.getQuery(); if (query != null) { - builder.setState(query.getState()); + builder.setState(queryMasterTask.getState()); builder.setProgress(query.getProgress()); builder.setSubmitTime(query.getAppSubmitTime()); - if (query.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { + if (queryMasterTask.getState() == TajoProtos.QueryState.QUERY_SUCCEEDED) { builder.setFinishTime(query.getFinishTime()); } else { builder.setFinishTime(System.currentTimeMillis()); From e4eb0fe704a7499858ba4821910e75f1577c79a8 Mon Sep 17 00:00:00 2001 From: jinossy Date: Wed, 30 Jul 2014 11:50:58 +0900 Subject: [PATCH 2/6] fixed wrong progress --- .../main/java/org/apache/tajo/master/querymaster/Query.java | 1 + .../java/org/apache/tajo/master/querymaster/SubQuery.java | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index bc89cdec70..c3639b513b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -255,6 +255,7 @@ public float getProgress() { } } else { subProgresses[idx] = 0.0f; + finished = false; } idx++; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 9cb88c515c..c54c9d3d9d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -327,7 +327,7 @@ public float getProgress() { readLock.lock(); try { if (getState(true) == SubQueryState.NEW) { - return 0; + return 0.0f; } else { tempTasks = new ArrayList(tasks.values()); } @@ -342,7 +342,7 @@ public float getProgress() { } } - return totalProgress / (float) tempTasks.size(); + return (float) Math.floor((totalProgress / (float) tempTasks.size()) * 100.0f) / 100.0f; } public int getSucceededObjectCount() { From e6326d8b6dd26742de026dba1d0a4d34f2bc99cd Mon Sep 17 00:00:00 2001 From: jinossy Date: Wed, 30 Jul 2014 11:57:20 +0900 Subject: [PATCH 3/6] applied to web ui --- tajo-core/src/main/resources/webapps/worker/querydetail.jsp | 2 +- tajo-core/src/main/resources/webapps/worker/queryplan.jsp | 2 +- tajo-core/src/main/resources/webapps/worker/querytasks.jsp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp index c0bee9b64b..bad6cad0fd 100644 --- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp @@ -81,7 +81,7 @@ for(SubQuery eachSubQuery: subQueries) { %> <%=eachSubQuery.getId()%> - <%=eachSubQuery.getState()%> + <%=eachSubQuery.getState(true)%> <%=df.format(eachSubQuery.getStartTime())%> <%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%> <%=JSPUtil.getElapsedTime(eachSubQuery.getStartTime(), eachSubQuery.getFinishTime())%> diff --git a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp index ec860b999a..275decdb4d 100644 --- a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp +++ b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp @@ -187,7 +187,7 @@