From 870c3e109c1c3d1f1e4902de3f0cc60319473235 Mon Sep 17 00:00:00 2001 From: Hyunsik Choi Date: Sat, 20 Dec 2014 13:49:23 +0900 Subject: [PATCH] TAJO-1262: Rename the prefix 'SubQuery' to 'Stage'. --- tajo-client/src/main/proto/ClientProtos.proto | 4 +- .../tajo/master/DefaultTaskScheduler.java | 36 +- .../apache/tajo/master/LazyTaskScheduler.java | 30 +- .../tajo/master/TaskSchedulerFactory.java | 12 +- .../master/event/QueryCompletedEvent.java | 8 +- .../tajo/master/event/QueryEventType.java | 4 +- .../tajo/master/event/QuerySubQueryEvent.java | 35 - ...tedEvent.java => StageCompletedEvent.java} | 14 +- ...ava => StageContainerAllocationEvent.java} | 8 +- ....java => StageDiagnosticsUpdateEvent.java} | 6 +- .../{SubQueryEvent.java => StageEvent.java} | 8 +- ...ueryEventType.java => StageEventType.java} | 6 +- ...ueryTaskEvent.java => StageTaskEvent.java} | 8 +- .../tajo/master/event/TaskEventType.java | 4 +- .../apache/tajo/master/querymaster/Query.java | 164 ++-- .../QueryMasterManagerService.java | 4 +- .../master/querymaster/QueryMasterTask.java | 28 +- .../master/querymaster/Repartitioner.java | 172 ++-- .../querymaster/{SubQuery.java => Stage.java} | 757 +++++++++--------- .../{SubQueryState.java => StageState.java} | 2 +- .../apache/tajo/master/querymaster/Task.java | 10 +- .../tajo/master/querymaster/TaskAttempt.java | 6 +- .../java/org/apache/tajo/util/JSPUtil.java | 36 +- .../tajo/util/history/HistoryReader.java | 2 +- .../tajo/util/history/HistoryWriter.java | 12 +- .../tajo/util/history/QueryHistory.java | 23 +- ...SubQueryHistory.java => StageHistory.java} | 10 +- .../tajo/worker/TajoResourceAllocator.java | 14 +- .../java/org/apache/tajo/worker/Task.java | 20 +- .../tajo/worker/TaskAttemptContext.java | 2 +- .../resources/webapps/admin/querydetail.jsp | 32 +- .../resources/webapps/admin/querytasks.jsp | 36 +- .../resources/webapps/worker/querydetail.jsp | 30 +- .../resources/webapps/worker/queryplan.jsp | 52 +- .../resources/webapps/worker/querytasks.jsp | 18 +- .../main/resources/webapps/worker/task.jsp | 10 +- .../org/apache/tajo/TajoTestingCluster.java | 8 +- .../org/apache/tajo/TestQueryIdFactory.java | 8 +- .../apache/tajo/client/TestTajoClient.java | 12 +- .../tajo/engine/query/TestGroupByQuery.java | 16 +- .../engine/query/TestTablePartitions.java | 2 +- .../tajo/engine/query/TestUnionQuery.java | 6 +- .../master/querymaster/TestKillQuery.java | 8 +- .../querymaster/TestTaskStatusUpdate.java | 18 +- .../util/history/TestHistoryWriterReader.java | 26 +- tajo-dist/pom.xml | 2 +- .../tajo/pullserver/PullServerAuxService.java | 16 +- .../pullserver/TajoPullServerService.java | 16 +- .../tajo/storage/FileStorageManager.java | 6 +- 49 files changed, 865 insertions(+), 902 deletions(-) delete mode 100644 tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java rename tajo-core/src/main/java/org/apache/tajo/master/event/{SubQueryCompletedEvent.java => StageCompletedEvent.java} (73%) rename tajo-core/src/main/java/org/apache/tajo/master/event/{SubQueryContainerAllocationEvent.java => StageContainerAllocationEvent.java} (80%) rename tajo-core/src/main/java/org/apache/tajo/master/event/{SubQueryDiagnosticsUpdateEvent.java => StageDiagnosticsUpdateEvent.java} (82%) rename tajo-core/src/main/java/org/apache/tajo/master/event/{SubQueryEvent.java => StageEvent.java} (81%) rename tajo-core/src/main/java/org/apache/tajo/master/event/{SubQueryEventType.java => StageEventType.java} (92%) rename tajo-core/src/main/java/org/apache/tajo/master/event/{SubQueryTaskEvent.java => StageTaskEvent.java} (83%) rename tajo-core/src/main/java/org/apache/tajo/master/querymaster/{SubQuery.java => Stage.java} (55%) rename tajo-core/src/main/java/org/apache/tajo/master/querymaster/{SubQueryState.java => StageState.java} (97%) rename tajo-core/src/main/java/org/apache/tajo/util/history/{SubQueryHistory.java => StageHistory.java} (95%) diff --git a/tajo-client/src/main/proto/ClientProtos.proto b/tajo-client/src/main/proto/ClientProtos.proto index 51db7630b4..a741268e70 100644 --- a/tajo-client/src/main/proto/ClientProtos.proto +++ b/tajo-client/src/main/proto/ClientProtos.proto @@ -253,7 +253,7 @@ message QueryInfoProto { optional int32 queryMasterInfoPort = 11; } -message SubQueryHistoryProto { +message StageHistoryProto { required string executionBlockId =1; required string state = 2; optional int64 startTime = 3; @@ -283,7 +283,7 @@ message QueryHistoryProto { optional string logicalPlan = 4; optional string distributedPlan = 5; repeated KeyValueProto sessionVariables = 6; - repeated SubQueryHistoryProto subQueryHistories = 7; + repeated StageHistoryProto stageHistories = 7; } message GetQueryHistoryResponse { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java index d9d496ede7..dd6233cf3f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/DefaultTaskScheduler.java @@ -40,7 +40,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.master.querymaster.Task; import org.apache.tajo.master.querymaster.TaskAttempt; -import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.master.querymaster.Stage; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.DataLocation; import org.apache.tajo.storage.fragment.FileFragment; @@ -60,7 +60,7 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private static final Log LOG = LogFactory.getLog(DefaultTaskScheduler.class); private final TaskSchedulerContext context; - private SubQuery subQuery; + private Stage stage; private Thread schedulingThread; private AtomicBoolean stopEventHandling = new AtomicBoolean(false); @@ -71,10 +71,10 @@ public class DefaultTaskScheduler extends AbstractTaskScheduler { private int nextTaskId = 0; private int scheduledObjectNum = 0; - public DefaultTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) { + public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) { super(DefaultTaskScheduler.class.getName()); this.context = context; - this.subQuery = subQuery; + this.stage = stage; } @Override @@ -117,8 +117,8 @@ public void run() { private static final TaskAttemptId NULL_ATTEMPT_ID; public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; static { - ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0); + ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); + NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0); TajoWorkerProtocol.TaskRequestProto.Builder builder = TajoWorkerProtocol.TaskRequestProto.newBuilder(); @@ -192,13 +192,13 @@ public void handle(TaskSchedulerEvent event) { FragmentScheduleEvent castEvent = (FragmentScheduleEvent) event; if (context.isLeafQuery()) { TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(); - Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++); + Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++); task.addFragment(castEvent.getLeftFragment(), true); scheduledObjectNum++; if (castEvent.hasRightFragments()) { task.addFragments(castEvent.getRightFragments()); } - subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } else { fragmentsForNonLeafTask = new FileFragment[2]; fragmentsForNonLeafTask[0] = castEvent.getLeftFragment(); @@ -217,7 +217,7 @@ public void handle(TaskSchedulerEvent event) { FetchScheduleEvent castEvent = (FetchScheduleEvent) event; Map> fetches = castEvent.getFetches(); TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(); - Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++); + Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++); scheduledObjectNum++; for (Entry> eachFetch : fetches.entrySet()) { task.addFetches(eachFetch.getKey(), eachFetch.getValue()); @@ -229,7 +229,7 @@ public void handle(TaskSchedulerEvent event) { if (broadcastFragmentsForNonLeafTask != null && broadcastFragmentsForNonLeafTask.length > 0) { task.addFragments(Arrays.asList(broadcastFragmentsForNonLeafTask)); } - subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } else if (event instanceof TaskAttemptToSchedulerEvent) { TaskAttemptToSchedulerEvent castEvent = (TaskAttemptToSchedulerEvent) event; if (context.isLeafQuery()) { @@ -239,7 +239,7 @@ public void handle(TaskSchedulerEvent event) { } } } else if (event.getType() == EventType.T_SCHEDULE_CANCEL) { - // when a subquery is killed, unassigned query unit attmpts are canceled from the scheduler. + // when a stage is killed, unassigned query unit attmpts are canceled from the scheduler. // This event is triggered by TaskAttempt. TaskAttemptToSchedulerEvent castedEvent = (TaskAttemptToSchedulerEvent) event; scheduledRequests.leafTasks.remove(castedEvent.getTaskAttempt().getId()); @@ -832,7 +832,7 @@ public void assignToLeafTasks(LinkedList taskRequests) { } if (attemptId != null) { - Task task = subQuery.getTask(attemptId.getTaskId()); + Task task = stage.getTask(attemptId.getTaskId()); TaskRequest taskAssign = new TaskRequestImpl( attemptId, new ArrayList(task.getAllFragments()), @@ -840,8 +840,8 @@ public void assignToLeafTasks(LinkedList taskRequests) { false, task.getLogicalPlan().toJson(), context.getMasterContext().getQueryContext(), - subQuery.getDataChannel(), subQuery.getBlock().getEnforcer()); - if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) { + stage.getDataChannel(), stage.getBlock().getEnforcer()); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { taskAssign.setInterQuery(); } @@ -888,7 +888,7 @@ public void assignToNonLeafTasks(LinkedList taskRequests) { LOG.debug("Assigned based on * match"); Task task; - task = subQuery.getTask(attemptId.getTaskId()); + task = stage.getTask(attemptId.getTaskId()); TaskRequest taskAssign = new TaskRequestImpl( attemptId, Lists.newArrayList(task.getAllFragments()), @@ -896,9 +896,9 @@ public void assignToNonLeafTasks(LinkedList taskRequests) { false, task.getLogicalPlan().toJson(), context.getMasterContext().getQueryContext(), - subQuery.getDataChannel(), - subQuery.getBlock().getEnforcer()); - if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) { + stage.getDataChannel(), + stage.getBlock().getEnforcer()); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { taskAssign.setInterQuery(); } for(Map.Entry> entry: task.getFetchMap().entrySet()) { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java index 0ab19db7ad..32af17b27c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/LazyTaskScheduler.java @@ -37,7 +37,7 @@ import org.apache.tajo.master.event.TaskSchedulerEvent.EventType; import org.apache.tajo.master.querymaster.Task; import org.apache.tajo.master.querymaster.TaskAttempt; -import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.master.querymaster.Stage; import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.fragment.FileFragment; @@ -57,7 +57,7 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { private static final Log LOG = LogFactory.getLog(LazyTaskScheduler.class); private final TaskSchedulerContext context; - private final SubQuery subQuery; + private final Stage stage; private Thread schedulingThread; private volatile boolean stopEventHandling; @@ -77,10 +77,10 @@ public class LazyTaskScheduler extends AbstractTaskScheduler { private int nextTaskId = 0; private int containerNum; - public LazyTaskScheduler(TaskSchedulerContext context, SubQuery subQuery) { + public LazyTaskScheduler(TaskSchedulerContext context, Stage stage) { super(LazyTaskScheduler.class.getName()); this.context = context; - this.subQuery = subQuery; + this.stage = stage; } @Override @@ -101,8 +101,8 @@ public void init(Configuration conf) { @Override public void start() { - containerNum = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers( - subQuery.getContext().getQueryMasterContext().getWorkerContext(), + containerNum = stage.getContext().getResourceAllocator().calculateNumRequestContainers( + stage.getContext().getQueryMasterContext().getWorkerContext(), context.getEstimatedTaskNum(), 512); LOG.info("Start TaskScheduler"); @@ -129,8 +129,8 @@ public void run() { private static final TaskAttemptId NULL_ATTEMPT_ID; public static final TajoWorkerProtocol.TaskRequestProto stopTaskRunnerReq; static { - ExecutionBlockId nullSubQuery = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); - NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullSubQuery, 0), 0); + ExecutionBlockId nullStage = QueryIdFactory.newExecutionBlockId(QueryIdFactory.NULL_QUERY_ID, 0); + NULL_ATTEMPT_ID = QueryIdFactory.newTaskAttemptId(QueryIdFactory.newTaskId(nullStage, 0), 0); TajoWorkerProtocol.TaskRequestProto.Builder builder = TajoWorkerProtocol.TaskRequestProto.newBuilder(); @@ -362,7 +362,7 @@ private void assignLeafTasks(List taskRequests) { String host = container.getTaskHostName(); TaskAttemptScheduleContext taskContext = new TaskAttemptScheduleContext(container.containerID, host, taskRequest.getCallback()); - Task task = SubQuery.newEmptyTask(context, taskContext, subQuery, nextTaskId++); + Task task = Stage.newEmptyTask(context, taskContext, stage, nextTaskId++); FragmentPair fragmentPair; List fragmentPairs = new ArrayList(); @@ -371,7 +371,7 @@ private void assignLeafTasks(List taskRequests) { long taskSize = adjustTaskSize(); LOG.info("Adjusted task size: " + taskSize); - TajoConf conf = subQuery.getContext().getConf(); + TajoConf conf = stage.getContext().getConf(); // host local, disk local String normalized = NetUtils.normalizeHost(host); Integer diskId = hostDiskBalancerMap.get(normalized).getDiskId(container.containerID); @@ -450,7 +450,7 @@ private void assignLeafTasks(List taskRequests) { LOG.info("host: " + host + " disk id: " + diskId + " fragment num: " + fragmentPairs.size()); task.setFragment(fragmentPairs.toArray(new FragmentPair[fragmentPairs.size()])); - subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } } @@ -469,9 +469,9 @@ private void assignNonLeafTasks(List taskRequests) { taskRequest.getContainerId()); TaskAttemptScheduleContext taskScheduleContext = new TaskAttemptScheduleContext(container.containerID, container.getTaskHostName(), taskRequest.getCallback()); - Task task = SubQuery.newEmptyTask(context, taskScheduleContext, subQuery, nextTaskId++); + Task task = Stage.newEmptyTask(context, taskScheduleContext, stage, nextTaskId++); task.setFragment(scheduledFragments.getAllFragments()); - subQuery.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); + stage.getEventHandler().handle(new TaskEvent(task.getId(), TaskEventType.T_SCHEDULE)); } } } @@ -485,8 +485,8 @@ private void assignTask(TaskAttemptScheduleContext attemptContext, TaskAttempt t false, taskAttempt.getTask().getLogicalPlan().toJson(), context.getMasterContext().getQueryContext(), - subQuery.getDataChannel(), subQuery.getBlock().getEnforcer()); - if (checkIfInterQuery(subQuery.getMasterPlan(), subQuery.getBlock())) { + stage.getDataChannel(), stage.getBlock().getEnforcer()); + if (checkIfInterQuery(stage.getMasterPlan(), stage.getBlock())) { taskAssign.setInterQuery(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java index 520ecd32d0..e5291e9c52 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TaskSchedulerFactory.java @@ -20,7 +20,7 @@ import com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; -import org.apache.tajo.master.querymaster.SubQuery; +import org.apache.tajo.master.querymaster.Stage; import java.io.IOException; import java.lang.reflect.Constructor; @@ -29,7 +29,7 @@ public class TaskSchedulerFactory { private static Class CACHED_ALGORITHM_CLASS; private static final Map, Constructor> CONSTRUCTOR_CACHE = Maps.newConcurrentMap(); - private static final Class[] DEFAULT_PARAMS = { TaskSchedulerContext.class, SubQuery.class }; + private static final Class[] DEFAULT_PARAMS = { TaskSchedulerContext.class, Stage.class }; public static Class getTaskSchedulerClass(Configuration conf) throws IOException { @@ -46,7 +46,7 @@ public static Class getTaskSchedulerClass(Confi } public static T get(Class clazz, TaskSchedulerContext context, - SubQuery subQuery) { + Stage stage) { T result; try { Constructor constructor = (Constructor) CONSTRUCTOR_CACHE.get(clazz); @@ -55,15 +55,15 @@ public static T get(Class clazz, TaskSchedu constructor.setAccessible(true); CONSTRUCTOR_CACHE.put(clazz, constructor); } - result = constructor.newInstance(new Object[]{context, subQuery}); + result = constructor.newInstance(new Object[]{context, stage}); } catch (Exception e) { throw new RuntimeException(e); } return result; } - public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, SubQuery subQuery) + public static AbstractTaskScheduler get(Configuration conf, TaskSchedulerContext context, Stage stage) throws IOException { - return get(getTaskSchedulerClass(conf), context, subQuery); + return get(getTaskSchedulerClass(conf), context, stage); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java index dc75a1d0f7..e5a9a32dda 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryCompletedEvent.java @@ -19,14 +19,14 @@ package org.apache.tajo.master.event; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.querymaster.SubQueryState; +import org.apache.tajo.master.querymaster.StageState; public class QueryCompletedEvent extends QueryEvent { private final ExecutionBlockId executionBlockId; - private final SubQueryState finalState; + private final StageState finalState; public QueryCompletedEvent(final ExecutionBlockId executionBlockId, - SubQueryState finalState) { + StageState finalState) { super(executionBlockId.getQueryId(), QueryEventType.QUERY_COMPLETED); this.executionBlockId = executionBlockId; this.finalState = finalState; @@ -36,7 +36,7 @@ public ExecutionBlockId getExecutionBlockId() { return executionBlockId; } - public SubQueryState getState() { + public StageState getState() { return finalState; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java index edc0cd8b06..e38a3c4ac6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/QueryEventType.java @@ -24,8 +24,8 @@ public enum QueryEventType { START, KILL, - // Producer: SubQuery - SUBQUERY_COMPLETED, + // Producer: Stage + STAGE_COMPLETED, // Producer: Query QUERY_COMPLETED, diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java deleted file mode 100644 index ae36a6938d..0000000000 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/QuerySubQueryEvent.java +++ /dev/null @@ -1,35 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.tajo.master.event; - -import org.apache.tajo.ExecutionBlockId; - -public class QuerySubQueryEvent extends QueryEvent { - private ExecutionBlockId executionBlockId; - - public QuerySubQueryEvent(final ExecutionBlockId id, - final QueryEventType queryEvent) { - super(id.getQueryId(), queryEvent); - this.executionBlockId = id; - } - - public ExecutionBlockId getExecutionBlockId() { - return this.executionBlockId; - } -} diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java similarity index 73% rename from tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java rename to tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java index 638979826e..2d16fbe99f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryCompletedEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageCompletedEvent.java @@ -19,15 +19,15 @@ package org.apache.tajo.master.event; import org.apache.tajo.ExecutionBlockId; -import org.apache.tajo.master.querymaster.SubQueryState; +import org.apache.tajo.master.querymaster.StageState; -public class SubQueryCompletedEvent extends QueryEvent { +public class StageCompletedEvent extends QueryEvent { private final ExecutionBlockId executionBlockId; - private final SubQueryState finalState; + private final StageState finalState; - public SubQueryCompletedEvent(final ExecutionBlockId executionBlockId, - SubQueryState finalState) { - super(executionBlockId.getQueryId(), QueryEventType.SUBQUERY_COMPLETED); + public StageCompletedEvent(final ExecutionBlockId executionBlockId, + StageState finalState) { + super(executionBlockId.getQueryId(), QueryEventType.STAGE_COMPLETED); this.executionBlockId = executionBlockId; this.finalState = finalState; } @@ -36,7 +36,7 @@ public ExecutionBlockId getExecutionBlockId() { return executionBlockId; } - public SubQueryState getState() { + public StageState getState() { return finalState; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java similarity index 80% rename from tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java rename to tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java index e617d53139..0d29e4467c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryContainerAllocationEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageContainerAllocationEvent.java @@ -23,12 +23,12 @@ import java.util.List; -public class SubQueryContainerAllocationEvent extends SubQueryEvent { +public class StageContainerAllocationEvent extends StageEvent { private List allocatedContainer; - public SubQueryContainerAllocationEvent(final ExecutionBlockId id, - List allocatedContainer) { - super(id, SubQueryEventType.SQ_CONTAINER_ALLOCATED); + public StageContainerAllocationEvent(final ExecutionBlockId id, + List allocatedContainer) { + super(id, StageEventType.SQ_CONTAINER_ALLOCATED); this.allocatedContainer = allocatedContainer; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java similarity index 82% rename from tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java rename to tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java index 0810e818c0..39afc924fd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryDiagnosticsUpdateEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageDiagnosticsUpdateEvent.java @@ -20,11 +20,11 @@ import org.apache.tajo.ExecutionBlockId; -public class SubQueryDiagnosticsUpdateEvent extends SubQueryEvent { +public class StageDiagnosticsUpdateEvent extends StageEvent { private final String msg; - public SubQueryDiagnosticsUpdateEvent(final ExecutionBlockId id, String diagnostic) { - super(id, SubQueryEventType.SQ_DIAGNOSTIC_UPDATE); + public StageDiagnosticsUpdateEvent(final ExecutionBlockId id, String diagnostic) { + super(id, StageEventType.SQ_DIAGNOSTIC_UPDATE); this.msg = diagnostic; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java similarity index 81% rename from tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java rename to tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java index 2b3d598bfa..6fc47461f7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEvent.java @@ -21,15 +21,15 @@ import org.apache.hadoop.yarn.event.AbstractEvent; import org.apache.tajo.ExecutionBlockId; -public class SubQueryEvent extends AbstractEvent { +public class StageEvent extends AbstractEvent { private final ExecutionBlockId id; - public SubQueryEvent(ExecutionBlockId id, SubQueryEventType subQueryEventType) { - super(subQueryEventType); + public StageEvent(ExecutionBlockId id, StageEventType stageEventType) { + super(stageEventType); this.id = id; } - public ExecutionBlockId getSubQueryId() { + public ExecutionBlockId getStageId() { return id; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java similarity index 92% rename from tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java rename to tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java index 79b6e2ec6c..fa808d4183 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java @@ -19,9 +19,9 @@ package org.apache.tajo.master.event; /** - * Event Types handled by SubQuery + * Event Types handled by Stage */ -public enum SubQueryEventType { +public enum StageEventType { // Producer: Query SQ_INIT, @@ -35,7 +35,7 @@ public enum SubQueryEventType { SQ_FAILED, // Producer: Completed - SQ_SUBQUERY_COMPLETED, + SQ_STAGE_COMPLETED, // Producer: Any component SQ_DIAGNOSTIC_UPDATE, diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java similarity index 83% rename from tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java rename to tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java index 816bc48b61..4377881ad6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/SubQueryTaskEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskEvent.java @@ -22,13 +22,13 @@ import org.apache.tajo.master.TaskState; /** - * Event Class: From Task to SubQuery + * Event Class: From Task to Stage */ -public class SubQueryTaskEvent extends SubQueryEvent { +public class StageTaskEvent extends StageEvent { private TaskId taskId; private TaskState state; - public SubQueryTaskEvent(TaskId taskId, TaskState state) { - super(taskId.getExecutionBlockId(), SubQueryEventType.SQ_TASK_COMPLETED); + public StageTaskEvent(TaskId taskId, TaskState state) { + super(taskId.getExecutionBlockId(), StageEventType.SQ_TASK_COMPLETED); this.taskId = taskId; this.state = state; } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java index 9448863f21..0f26821104 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskEventType.java @@ -23,10 +23,10 @@ */ public enum TaskEventType { - //Producer:Client, SubQuery + //Producer:Client, Stage T_KILL, - //Producer:SubQuery + //Producer:Stage T_SCHEDULE, //Producer:TaskAttempt diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 918cc824fa..a626df1c05 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -50,7 +50,7 @@ import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.QueryHistory; -import org.apache.tajo.util.history.SubQueryHistory; +import org.apache.tajo.util.history.StageHistory; import java.io.IOException; import java.util.*; @@ -65,7 +65,7 @@ public class Query implements EventHandler { private final TajoConf systemConf; private final Clock clock; private String queryStr; - private Map subqueries; + private Map stages; private final EventHandler eventHandler; private final MasterPlan plan; QueryMasterTask.QueryMasterTaskContext context; @@ -77,11 +77,11 @@ public class Query implements EventHandler { private long startTime; private long finishTime; private TableDesc resultDesc; - private int completedSubQueryCount = 0; - private int successedSubQueryCount = 0; - private int killedSubQueryCount = 0; - private int failedSubQueryCount = 0; - private int erroredSubQueryCount = 0; + private int completedStagesCount = 0; + private int successedStagesCount = 0; + private int killedStagesCount = 0; + private int failedStagesCount = 0; + private int erroredStagesCount = 0; private final List diagnostics = new ArrayList(); // Internal Variables @@ -96,7 +96,7 @@ public class Query implements EventHandler { // Transition Handler private static final SingleArcTransition INTERNAL_ERROR_TRANSITION = new InternalErrorTransition(); private static final DiagnosticsUpdateTransition DIAGNOSTIC_UPDATE_TRANSITION = new DiagnosticsUpdateTransition(); - private static final SubQueryCompletedTransition SUBQUERY_COMPLETED_TRANSITION = new SubQueryCompletedTransition(); + private static final StageCompletedTransition STAGE_COMPLETED_TRANSITION = new StageCompletedTransition(); private static final QueryCompletedTransition QUERY_COMPLETED_TRANSITION = new QueryCompletedTransition(); protected static final StateMachineFactory @@ -120,8 +120,8 @@ public class Query implements EventHandler { // Transitions from RUNNING state .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_RUNNING, - QueryEventType.SUBQUERY_COMPLETED, - SUBQUERY_COMPLETED_TRANSITION) + QueryEventType.STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) .addTransition(QueryState.QUERY_RUNNING, EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED, QueryState.QUERY_ERROR), @@ -132,7 +132,7 @@ public class Query implements EventHandler { DIAGNOSTIC_UPDATE_TRANSITION) .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_KILL_WAIT, QueryEventType.KILL, - new KillSubQueriesTransition()) + new KillAllStagesTransition()) .addTransition(QueryState.QUERY_RUNNING, QueryState.QUERY_ERROR, QueryEventType.INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) @@ -143,8 +143,8 @@ public class Query implements EventHandler { DIAGNOSTIC_UPDATE_TRANSITION) // ignore-able transitions .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, - QueryEventType.SUBQUERY_COMPLETED, - SUBQUERY_COMPLETED_TRANSITION) + QueryEventType.STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_SUCCEEDED, QueryEventType.KILL) .addTransition(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_ERROR, @@ -153,8 +153,8 @@ public class Query implements EventHandler { // Transitions from KILL_WAIT state .addTransition(QueryState.QUERY_KILL_WAIT, QueryState.QUERY_KILL_WAIT, - QueryEventType.SUBQUERY_COMPLETED, - SUBQUERY_COMPLETED_TRANSITION) + QueryEventType.STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) .addTransition(QueryState.QUERY_KILL_WAIT, EnumSet.of(QueryState.QUERY_SUCCEEDED, QueryState.QUERY_FAILED, QueryState.QUERY_KILLED, QueryState.QUERY_ERROR), @@ -191,7 +191,7 @@ public class Query implements EventHandler { INTERNAL_ERROR_TRANSITION) // Ignore-able transitions .addTransition(QueryState.QUERY_ERROR, QueryState.QUERY_ERROR, - EnumSet.of(QueryEventType.KILL, QueryEventType.SUBQUERY_COMPLETED)) + EnumSet.of(QueryEventType.KILL, QueryEventType.STAGE_COMPLETED)) .installTopology(); @@ -206,7 +206,7 @@ public Query(final QueryMasterTask.QueryMasterTaskContext context, final QueryId this.clock = context.getClock(); this.appSubmitTime = appSubmitTime; this.queryStr = queryStr; - this.subqueries = Maps.newConcurrentMap(); + this.stages = Maps.newConcurrentMap(); this.eventHandler = eventHandler; this.plan = plan; this.cursor = new ExecutionBlockCursor(plan, true); @@ -237,15 +237,15 @@ public float getProgress() { return 1.0f; } else { int idx = 0; - List tempSubQueries = new ArrayList(); - synchronized(subqueries) { - tempSubQueries.addAll(subqueries.values()); + List tempStages = new ArrayList(); + synchronized(stages) { + tempStages.addAll(stages.values()); } - float [] subProgresses = new float[tempSubQueries.size()]; - for (SubQuery subquery: tempSubQueries) { - if (subquery.getState() != SubQueryState.NEW) { - subProgresses[idx] = subquery.getProgress(); + float [] subProgresses = new float[tempStages.size()]; + for (Stage stage: tempStages) { + if (stage.getState() != StageState.NEW) { + subProgresses[idx] = stage.getProgress(); } else { subProgresses[idx] = 0.0f; } @@ -285,17 +285,17 @@ public void setFinishTime() { public QueryHistory getQueryHistory() { QueryHistory queryHistory = makeQueryHistory(); - queryHistory.setSubQueryHistories(makeSubQueryHistories()); + queryHistory.setStageHistories(makeStageHistories()); return queryHistory; } - private List makeSubQueryHistories() { - List subQueryHistories = new ArrayList(); - for(SubQuery eachSubQuery: getSubQueries()) { - subQueryHistories.add(eachSubQuery.getSubQueryHistory()); + private List makeStageHistories() { + List stageHistories = new ArrayList(); + for(Stage eachStage : getStages()) { + stageHistories.add(eachStage.getStageHistory()); } - return subQueryHistories; + return stageHistories; } private QueryHistory makeQueryHistory() { @@ -348,20 +348,20 @@ public StateMachine getStateMachine() { return stateMachine; } - public void addSubQuery(SubQuery subquery) { - subqueries.put(subquery.getId(), subquery); + public void addStage(Stage stage) { + stages.put(stage.getId(), stage); } public QueryId getId() { return this.id; } - public SubQuery getSubQuery(ExecutionBlockId id) { - return this.subqueries.get(id); + public Stage getStage(ExecutionBlockId id) { + return this.stages.get(id); } - public Collection getSubQueries() { - return this.subqueries.values(); + public Collection getStages() { + return this.stages.values(); } public QueryState getSynchronizedState() { @@ -389,13 +389,13 @@ public static class StartTransition public void transition(Query query, QueryEvent queryEvent) { query.setStartTime(); - SubQuery subQuery = new SubQuery(query.context, query.getPlan(), + Stage stage = new Stage(query.context, query.getPlan(), query.getExecutionBlockCursor().nextBlock()); - subQuery.setPriority(query.priority--); - query.addSubQuery(subQuery); + stage.setPriority(query.priority--); + query.addStage(stage); - subQuery.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INIT)); - LOG.debug("Schedule unit plan: \n" + subQuery.getBlock().getPlan()); + stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_INIT)); + LOG.debug("Schedule unit plan: \n" + stage.getBlock().getPlan()); } } @@ -403,20 +403,20 @@ public static class QueryCompletedTransition implements MultipleArcTransition { + public static class StageCompletedTransition implements SingleArcTransition { private boolean hasNext(Query query) { ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); @@ -624,43 +624,43 @@ private boolean hasNext(Query query) { private void executeNextBlock(Query query) { ExecutionBlockCursor cursor = query.getExecutionBlockCursor(); ExecutionBlock nextBlock = cursor.nextBlock(); - SubQuery nextSubQuery = new SubQuery(query.context, query.getPlan(), nextBlock); - nextSubQuery.setPriority(query.priority--); - query.addSubQuery(nextSubQuery); - nextSubQuery.handle(new SubQueryEvent(nextSubQuery.getId(), SubQueryEventType.SQ_INIT)); + Stage nextStage = new Stage(query.context, query.getPlan(), nextBlock); + nextStage.setPriority(query.priority--); + query.addStage(nextStage); + nextStage.handle(new StageEvent(nextStage.getId(), StageEventType.SQ_INIT)); - LOG.info("Scheduling SubQuery:" + nextSubQuery.getId()); + LOG.info("Scheduling Stage:" + nextStage.getId()); if(LOG.isDebugEnabled()) { - LOG.debug("Scheduling SubQuery's Priority: " + nextSubQuery.getPriority()); - LOG.debug("Scheduling SubQuery's Plan: \n" + nextSubQuery.getBlock().getPlan()); + LOG.debug("Scheduling Stage's Priority: " + nextStage.getPriority()); + LOG.debug("Scheduling Stage's Plan: \n" + nextStage.getBlock().getPlan()); } } @Override public void transition(Query query, QueryEvent event) { try { - query.completedSubQueryCount++; - SubQueryCompletedEvent castEvent = (SubQueryCompletedEvent) event; - - if (castEvent.getState() == SubQueryState.SUCCEEDED) { - query.successedSubQueryCount++; - } else if (castEvent.getState() == SubQueryState.KILLED) { - query.killedSubQueryCount++; - } else if (castEvent.getState() == SubQueryState.FAILED) { - query.failedSubQueryCount++; - } else if (castEvent.getState() == SubQueryState.ERROR) { - query.erroredSubQueryCount++; + query.completedStagesCount++; + StageCompletedEvent castEvent = (StageCompletedEvent) event; + + if (castEvent.getState() == StageState.SUCCEEDED) { + query.successedStagesCount++; + } else if (castEvent.getState() == StageState.KILLED) { + query.killedStagesCount++; + } else if (castEvent.getState() == StageState.FAILED) { + query.failedStagesCount++; + } else if (castEvent.getState() == StageState.ERROR) { + query.erroredStagesCount++; } else { - LOG.error(String.format("Invalid SubQuery (%s) State %s at %s", + LOG.error(String.format("Invalid Stage (%s) State %s at %s", castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name())); query.eventHandler.handle(new QueryEvent(event.getQueryId(), QueryEventType.INTERNAL_ERROR)); } - // if a subquery is succeeded and a query is running - if (castEvent.getState() == SubQueryState.SUCCEEDED && // latest subquery succeeded + // if a stage is succeeded and a query is running + if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR. - hasNext(query)) { // there remains at least one subquery. - query.getSubQuery(castEvent.getExecutionBlockId()).waitingIntermediateReport(); + hasNext(query)) { // there remains at least one stage. + query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport(); executeNextBlock(query); } else { // if a query is completed due to finished, kill, failure, or error query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); @@ -687,12 +687,12 @@ public void transition(Query query, QueryEvent event) { } } - private static class KillSubQueriesTransition implements SingleArcTransition { + private static class KillAllStagesTransition implements SingleArcTransition { @Override public void transition(Query query, QueryEvent event) { - synchronized (query.subqueries) { - for (SubQuery subquery : query.subqueries.values()) { - query.eventHandler.handle(new SubQueryEvent(subquery.getId(), SubQueryEventType.SQ_KILL)); + synchronized (query.stages) { + for (Stage stage : query.stages.values()) { + query.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java index e7e2bc0e61..c2e1009ba6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterManagerService.java @@ -150,7 +150,7 @@ public void statusUpdate(RpcController controller, TajoWorkerProtocol.TaskStatus if (queryMasterTask == null) { queryMasterTask = queryMaster.getQueryMasterTask(queryId, true); } - SubQuery sq = queryMasterTask.getQuery().getSubQuery(attemptId.getTaskId().getExecutionBlockId()); + Stage sq = queryMasterTask.getQuery().getStage(attemptId.getTaskId().getExecutionBlockId()); Task task = sq.getTask(attemptId.getTaskId()); TaskAttempt attempt = task.getAttempt(attemptId.getId()); @@ -221,7 +221,7 @@ public void doneExecutionBlock( QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId())); if (queryMasterTask != null) { ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId()); - queryMasterTask.getQuery().getSubQuery(ebId).receiveExecutionBlockReport(request); + queryMasterTask.getQuery().getStage(ebId).receiveExecutionBlockReport(request); } done.run(TajoWorker.TRUE_PROTO); } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java index 9ab4f0a90c..06d20f948f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMasterTask.java @@ -158,7 +158,7 @@ public void init(Configuration conf) { dispatcher = new TajoAsyncDispatcher(queryId.toString()); addService(dispatcher); - dispatcher.register(SubQueryEventType.class, new SubQueryEventDispatcher()); + dispatcher.register(StageEventType.class, new StageEventDispatcher()); dispatcher.register(TaskEventType.class, new TaskEventDispatcher()); dispatcher.register(TaskAttemptEventType.class, new TaskAttemptEventDispatcher()); dispatcher.register(QueryMasterQueryCompletedEvent.EventType.class, new QueryFinishEventHandler()); @@ -253,7 +253,7 @@ public void stop() { public void handleTaskRequestEvent(TaskRequestEvent event) { ExecutionBlockId id = event.getExecutionBlockId(); - query.getSubQuery(id).handleTaskRequestEvent(event); + query.getStage(id).handleTaskRequestEvent(event); } public void handleTaskFailed(TajoWorkerProtocol.TaskFatalErrorReport report) { @@ -272,13 +272,13 @@ public Collection getDiagnostics() { } } - private class SubQueryEventDispatcher implements EventHandler { - public void handle(SubQueryEvent event) { - ExecutionBlockId id = event.getSubQueryId(); + private class StageEventDispatcher implements EventHandler { + public void handle(StageEvent event) { + ExecutionBlockId id = event.getStageId(); if(LOG.isDebugEnabled()) { - LOG.debug("SubQueryEventDispatcher:" + id + "," + event.getType()); + LOG.debug("StageEventDispatcher:" + id + "," + event.getType()); } - query.getSubQuery(id).handle(event); + query.getStage(id).handle(event); } } @@ -289,7 +289,7 @@ public void handle(TaskEvent event) { if(LOG.isDebugEnabled()) { LOG.debug("TaskEventDispatcher>" + taskId + "," + event.getType()); } - Task task = query.getSubQuery(taskId.getExecutionBlockId()). + Task task = query.getStage(taskId.getExecutionBlockId()). getTask(taskId); task.handle(event); } @@ -299,8 +299,8 @@ private class TaskAttemptEventDispatcher implements EventHandler { public void handle(TaskAttemptEvent event) { TaskAttemptId attemptId = event.getTaskAttemptId(); - SubQuery subQuery = query.getSubQuery(attemptId.getTaskId().getExecutionBlockId()); - Task task = subQuery.getTask(attemptId.getTaskId()); + Stage stage = query.getStage(attemptId.getTaskId().getExecutionBlockId()); + Task task = stage.getTask(attemptId.getTaskId()); TaskAttempt attempt = task.getAttempt(attemptId); attempt.handle(event); } @@ -309,8 +309,8 @@ public void handle(TaskAttemptEvent event) { private class TaskSchedulerDispatcher implements EventHandler { public void handle(TaskSchedulerEvent event) { - SubQuery subQuery = query.getSubQuery(event.getExecutionBlockId()); - subQuery.getTaskScheduler().handle(event); + Stage stage = query.getStage(event.getExecutionBlockId()); + stage.getTaskScheduler().handle(event); } } @@ -629,8 +629,8 @@ public TajoAsyncDispatcher getDispatcher() { return dispatcher; } - public SubQuery getSubQuery(ExecutionBlockId id) { - return query.getSubQuery(id); + public Stage getStage(ExecutionBlockId id) { + return query.getStage(id); } public Map getTableDescMap() { diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java index cf6b91799c..4cf6ce2a4a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Repartitioner.java @@ -81,11 +81,11 @@ public class Repartitioner { private final static int HTTP_REQUEST_MAXIMUM_LENGTH = 1900; private final static String UNKNOWN_HOST = "unknown"; - public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, SubQuery subQuery) + public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerContext, Stage stage) throws IOException { - MasterPlan masterPlan = subQuery.getMasterPlan(); - ExecutionBlock execBlock = subQuery.getBlock(); - QueryMasterTask.QueryMasterTaskContext masterContext = subQuery.getContext(); + MasterPlan masterPlan = stage.getMasterPlan(); + ExecutionBlock execBlock = stage.getBlock(); + QueryMasterTask.QueryMasterTaskContext masterContext = stage.getContext(); ScanNode[] scans = execBlock.getScanNodes(); @@ -98,17 +98,17 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC TableDesc tableDesc = masterContext.getTableDescMap().get(scans[i].getCanonicalName()); if (tableDesc == null) { // if it is a real table stored on storage FileStorageManager storageManager = - (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()); tablePath = storageManager.getTablePath(scans[i].getTableName()); if (execBlock.getUnionScanMap() != null && !execBlock.getUnionScanMap().isEmpty()) { for (Map.Entry unionScanEntry: execBlock.getUnionScanMap().entrySet()) { ExecutionBlockId originScanEbId = unionScanEntry.getKey(); - stats[i] += masterContext.getSubQuery(originScanEbId).getResultStats().getNumBytes(); + stats[i] += masterContext.getStage(originScanEbId).getResultStats().getNumBytes(); } } else { ExecutionBlockId scanEBId = TajoIdUtils.createExecutionBlockId(scans[i].getTableName()); - stats[i] = masterContext.getSubQuery(scanEBId).getResultStats().getNumBytes(); + stats[i] = masterContext.getStage(scanEBId).getResultStats().getNumBytes(); } fragments[i] = new FileFragment(scans[i].getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); } else { @@ -119,7 +119,7 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC } StorageManager storageManager = - StorageManager.getStorageManager(subQuery.getContext().getConf(), tableDesc.getMeta().getStoreType()); + StorageManager.getStorageManager(stage.getContext().getConf(), tableDesc.getMeta().getStoreType()); // if table has no data, storageManager will return empty FileFragment. // So, we need to handle FileFragment by its size. @@ -223,7 +223,7 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC execBlock.removeBroadcastTable(scans[baseScanIdx].getCanonicalName()); LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join with all tables, base_table=%s, base_volume=%d", scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx])); - scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments); + scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments); } else if (!execBlock.getBroadcastTables().isEmpty()) { // If some relations of this EB are broadcasted boolean hasNonLeafNode = false; List largeScanIndexList = new ArrayList(); @@ -266,7 +266,7 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC int baseScanIdx = largeScanIndexList.isEmpty() ? maxStatsScanIdx : largeScanIndexList.get(0); LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, base_table=%s, base_volume=%d", scans[baseScanIdx].getCanonicalName(), stats[baseScanIdx])); - scheduleLeafTasksWithBroadcastTable(schedulerContext, subQuery, baseScanIdx, fragments); + scheduleLeafTasksWithBroadcastTable(schedulerContext, stage, baseScanIdx, fragments); } else { if (largeScanIndexList.size() > 2) { throw new IOException("Symmetric Repartition Join should have two scan node, but " + nonLeafScanNames); @@ -292,12 +292,12 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC index++; } LOG.info(String.format("[Distributed Join Strategy] : Broadcast Join, join_node=%s", nonLeafScanNames)); - scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, subQuery, + scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage, intermediateScans, intermediateScanStats, intermediateFragments, broadcastScans, broadcastFragments); } } else { LOG.info("[Distributed Join Strategy] : Symmetric Repartition Join"); - scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, subQuery, scans, stats, fragments, null, null); + scheduleSymmetricRepartitionJoin(masterContext, schedulerContext, stage, scans, stats, fragments, null, null); } } @@ -305,7 +305,7 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC * Scheduling in tech case of Symmetric Repartition Join * @param masterContext * @param schedulerContext - * @param subQuery + * @param stage * @param scans * @param stats * @param fragments @@ -313,21 +313,21 @@ public static void scheduleFragmentsForJoinQuery(TaskSchedulerContext schedulerC */ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMasterTaskContext masterContext, TaskSchedulerContext schedulerContext, - SubQuery subQuery, + Stage stage, ScanNode[] scans, long[] stats, Fragment[] fragments, ScanNode[] broadcastScans, Fragment[] broadcastFragments) throws IOException { - MasterPlan masterPlan = subQuery.getMasterPlan(); - ExecutionBlock execBlock = subQuery.getBlock(); + MasterPlan masterPlan = stage.getMasterPlan(); + ExecutionBlock execBlock = stage.getBlock(); // The hash map is modeling as follows: // >> Map>> hashEntries = new HashMap>>(); // Grouping IntermediateData by a partition key and a table name - List childBlocks = masterPlan.getChilds(subQuery.getId()); + List childBlocks = masterPlan.getChilds(stage.getId()); // In the case of join with union, there is one ScanNode for union. Map unionScanMap = execBlock.getUnionScanMap(); @@ -336,7 +336,7 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster if (scanEbId == null) { scanEbId = childBlock.getId(); } - SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId()); + Stage childExecSM = stage.getContext().getStage(childBlock.getId()); if (childExecSM.getHashShuffleIntermediateEntries() != null && !childExecSM.getHashShuffleIntermediateEntries().isEmpty()) { @@ -387,7 +387,7 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster // Getting the desire number of join tasks according to the volumn // of a larger table int largerIdx = stats[0] >= stats[1] ? 0 : 1; - int desireJoinTaskVolumn = subQuery.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE); + int desireJoinTaskVolumn = stage.getMasterPlan().getContext().getInt(SessionVars.JOIN_TASK_INPUT_SIZE); // calculate the number of tasks according to the data size int mb = (int) Math.ceil((double) stats[largerIdx] / 1048576); @@ -412,7 +412,7 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster TableDesc tableDesc = masterContext.getTableDescMap().get(eachScan.getCanonicalName()); if (eachScan.getType() == NodeType.PARTITIONS_SCAN) { FileStorageManager storageManager = - (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()); PartitionedTableScanNode partitionScan = (PartitionedTableScanNode)eachScan; partitionScanPaths = partitionScan.getInputPaths(); @@ -420,7 +420,7 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster getFragmentsFromPartitionedTable(storageManager, eachScan, tableDesc); partitionScan.setInputPaths(partitionScanPaths); } else { - StorageManager storageManager = StorageManager.getStorageManager(subQuery.getContext().getConf(), + StorageManager storageManager = StorageManager.getStorageManager(stage.getContext().getConf(), tableDesc.getMeta().getStoreType()); Collection scanFragments = storageManager.getSplits(eachScan.getCanonicalName(), tableDesc, eachScan); @@ -430,12 +430,12 @@ private static void scheduleSymmetricRepartitionJoin(QueryMasterTask.QueryMaster } } } - SubQuery.scheduleFragment(subQuery, fragments[0], rightFragments); + Stage.scheduleFragment(stage, fragments[0], rightFragments); // Assign partitions to tasks in a round robin manner. for (Entry>> entry : hashEntries.entrySet()) { - addJoinShuffle(subQuery, entry.getKey(), entry.getValue()); + addJoinShuffle(stage, entry.getKey(), entry.getValue()); } schedulerContext.setTaskSize((int) Math.ceil((double) bothFetchSize / joinTaskNum)); @@ -503,9 +503,9 @@ public static List getFragmentsFromPartitionedTable(FileStorageManager return fragments; } - private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, SubQuery subQuery, + private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext schedulerContext, Stage stage, int baseScanId, Fragment[] fragments) throws IOException { - ExecutionBlock execBlock = subQuery.getBlock(); + ExecutionBlock execBlock = stage.getBlock(); ScanNode[] scans = execBlock.getScanNodes(); for (int i = 0; i < scans.length; i++) { @@ -527,7 +527,7 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch List broadcastFragments = new ArrayList(); for (int i = 0; i < scans.length; i++) { ScanNode scan = scans[i]; - TableDesc desc = subQuery.getContext().getTableDescMap().get(scan.getCanonicalName()); + TableDesc desc = stage.getContext().getTableDescMap().get(scan.getCanonicalName()); TableMeta meta = desc.getMeta(); Collection scanFragments; @@ -537,11 +537,11 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch partitionScanPaths = partitionScan.getInputPaths(); // set null to inputPaths in getFragmentsFromPartitionedTable() FileStorageManager storageManager = - (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()); scanFragments = getFragmentsFromPartitionedTable(storageManager, scan, desc); } else { StorageManager storageManager = - StorageManager.getStorageManager(subQuery.getContext().getConf(), desc.getMeta().getStoreType()); + StorageManager.getStorageManager(stage.getContext().getConf(), desc.getMeta().getStoreType()); scanFragments = storageManager.getSplits(scan.getCanonicalName(), desc, scan); } @@ -565,14 +565,14 @@ private static void scheduleLeafTasksWithBroadcastTable(TaskSchedulerContext sch throw new IOException("No fragments for " + scans[baseScanId].getTableName()); } - SubQuery.scheduleFragments(subQuery, baseFragments, broadcastFragments); + Stage.scheduleFragments(stage, baseFragments, broadcastFragments); schedulerContext.setEstimatedTaskNum(baseFragments.size()); } - private static void addJoinShuffle(SubQuery subQuery, int partitionId, + private static void addJoinShuffle(Stage stage, int partitionId, Map> grouppedPartitions) { Map> fetches = new HashMap>(); - for (ExecutionBlock execBlock : subQuery.getMasterPlan().getChilds(subQuery.getId())) { + for (ExecutionBlock execBlock : stage.getMasterPlan().getChilds(stage.getId())) { if (grouppedPartitions.containsKey(execBlock.getId())) { Collection requests = mergeShuffleRequest(partitionId, HASH_SHUFFLE, grouppedPartitions.get(execBlock.getId())); @@ -581,10 +581,10 @@ private static void addJoinShuffle(SubQuery subQuery, int partitionId, } if (fetches.isEmpty()) { - LOG.info(subQuery.getId() + "'s " + partitionId + " partition has empty result."); + LOG.info(stage.getId() + "'s " + partitionId + " partition has empty result."); return; } - SubQuery.scheduleFetches(subQuery, fetches); + Stage.scheduleFetches(stage, fetches); } /** @@ -616,14 +616,14 @@ private static Collection mergeShuffleRequest(int partitionId, } public static void scheduleFragmentsForNonLeafTasks(TaskSchedulerContext schedulerContext, - MasterPlan masterPlan, SubQuery subQuery, int maxNum) + MasterPlan masterPlan, Stage stage, int maxNum) throws IOException { - DataChannel channel = masterPlan.getIncomingChannels(subQuery.getBlock().getId()).get(0); + DataChannel channel = masterPlan.getIncomingChannels(stage.getBlock().getId()).get(0); if (channel.getShuffleType() == HASH_SHUFFLE || channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { - scheduleHashShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum); + scheduleHashShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); } else if (channel.getShuffleType() == RANGE_SHUFFLE) { - scheduleRangeShuffledFetches(schedulerContext, masterPlan, subQuery, channel, maxNum); + scheduleRangeShuffledFetches(schedulerContext, masterPlan, stage, channel, maxNum); } else { throw new InternalException("Cannot support partition type"); } @@ -634,22 +634,22 @@ private static TableStats computeChildBlocksStats(QueryMasterTask.QueryMasterTas List tableStatses = new ArrayList(); List childBlocks = masterPlan.getChilds(parentBlockId); for (ExecutionBlock childBlock : childBlocks) { - SubQuery childExecSM = context.getSubQuery(childBlock.getId()); - tableStatses.add(childExecSM.getResultStats()); + Stage childStage = context.getStage(childBlock.getId()); + tableStatses.add(childStage.getResultStats()); } return StatisticsUtil.aggregateTableStat(tableStatses); } public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, - SubQuery subQuery, DataChannel channel, int maxNum) + Stage stage, DataChannel channel, int maxNum) throws IOException { - ExecutionBlock execBlock = subQuery.getBlock(); + ExecutionBlock execBlock = stage.getBlock(); ScanNode scan = execBlock.getScanNodes()[0]; Path tablePath; - tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf())) + tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf())) .getTablePath(scan.getTableName()); - ExecutionBlock sampleChildBlock = masterPlan.getChild(subQuery.getId(), 0); + ExecutionBlock sampleChildBlock = masterPlan.getChild(stage.getId(), 0); SortNode sortNode = PlannerUtil.findTopNode(sampleChildBlock.getPlan(), NodeType.SORT); SortSpec [] sortSpecs = sortNode.getSortKeys(); Schema sortSchema = new Schema(channel.getShuffleKeys()); @@ -658,7 +658,7 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo int determinedTaskNum; // calculate the number of maximum query ranges - TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId()); + TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId()); // If there is an empty table in inner join, it should return zero rows. if (totalStat.getNumBytes() == 0 && totalStat.getColumnStats().size() == 0) { @@ -668,15 +668,15 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo if (sortNode.getSortPurpose() == SortPurpose.STORAGE_SPECIFIED) { StoreType storeType = PlannerUtil.getStoreType(masterPlan.getLogicalPlan()); - CatalogService catalog = subQuery.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); + CatalogService catalog = stage.getContext().getQueryMasterContext().getWorkerContext().getCatalog(); LogicalRootNode rootNode = masterPlan.getLogicalPlan().getRootBlock().getRoot(); TableDesc tableDesc = PlannerUtil.getTableDesc(catalog, rootNode.getChild()); if (tableDesc == null) { throw new IOException("Can't get table meta data from catalog: " + PlannerUtil.getStoreTableName(masterPlan.getLogicalPlan())); } - ranges = StorageManager.getStorageManager(subQuery.getContext().getConf(), storeType) - .getInsertSortRanges(subQuery.getContext().getQueryContext(), tableDesc, + ranges = StorageManager.getStorageManager(stage.getContext().getConf(), storeType) + .getInsertSortRanges(stage.getContext().getQueryContext(), tableDesc, sortNode.getInSchema(), sortSpecs, mergedRange); determinedTaskNum = ranges.length; @@ -687,36 +687,36 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo // if the number of the range cardinality is less than the desired number of tasks, // we set the the number of tasks to the number of range cardinality. if (card.compareTo(BigInteger.valueOf(maxNum)) < 0) { - LOG.info(subQuery.getId() + ", The range cardinality (" + card + LOG.info(stage.getId() + ", The range cardinality (" + card + ") is less then the desired number of tasks (" + maxNum + ")"); determinedTaskNum = card.intValue(); } else { determinedTaskNum = maxNum; } - LOG.info(subQuery.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + + LOG.info(stage.getId() + ", Try to divide " + mergedRange + " into " + determinedTaskNum + " sub ranges (total units: " + determinedTaskNum + ")"); ranges = partitioner.partition(determinedTaskNum); if (ranges == null || ranges.length == 0) { - LOG.warn(subQuery.getId() + " no range infos."); + LOG.warn(stage.getId() + " no range infos."); } TupleUtil.setMaxRangeIfNull(sortSpecs, sortSchema, totalStat.getColumnStats(), ranges); if (LOG.isDebugEnabled()) { if (ranges != null) { for (TupleRange eachRange : ranges) { - LOG.debug(subQuery.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); + LOG.debug(stage.getId() + " range: " + eachRange.getStart() + " ~ " + eachRange.getEnd()); } } } } FileFragment dummyFragment = new FileFragment(scan.getTableName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); - SubQuery.scheduleFragment(subQuery, dummyFragment); + Stage.scheduleFragment(stage, dummyFragment); List fetches = new ArrayList(); - List childBlocks = masterPlan.getChilds(subQuery.getId()); + List childBlocks = masterPlan.getChilds(stage.getId()); for (ExecutionBlock childBlock : childBlocks) { - SubQuery childExecSM = subQuery.getContext().getSubQuery(childBlock.getId()); + Stage childExecSM = stage.getContext().getStage(childBlock.getId()); for (Task qu : childExecSM.getTasks()) { for (IntermediateEntry p : qu.getIntermediateData()) { FetchImpl fetch = new FetchImpl(p.getPullHost(), RANGE_SHUFFLE, childBlock.getId(), 0); @@ -758,12 +758,12 @@ public static void scheduleRangeShuffledFetches(TaskSchedulerContext schedulerCo LOG.error(e); } - scheduleFetchesByRoundRobin(subQuery, map, scan.getTableName(), determinedTaskNum); + scheduleFetchesByRoundRobin(stage, map, scan.getTableName(), determinedTaskNum); schedulerContext.setEstimatedTaskNum(determinedTaskNum); } - public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map> partitions, + public static void scheduleFetchesByRoundRobin(Stage stage, Map> partitions, String tableName, int num) { int i; Map>[] fetchesArray = new Map[num]; @@ -777,7 +777,7 @@ public static void scheduleFetchesByRoundRobin(SubQuery subQuery, Map> eachFetches : fetchesArray) { - SubQuery.scheduleFetches(subQuery, eachFetches); + Stage.scheduleFetches(stage, eachFetches); } } @@ -807,18 +807,18 @@ public long getVolume() { } public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerContext, MasterPlan masterPlan, - SubQuery subQuery, DataChannel channel, + Stage stage, DataChannel channel, int maxNum) throws IOException { - ExecutionBlock execBlock = subQuery.getBlock(); + ExecutionBlock execBlock = stage.getBlock(); ScanNode scan = execBlock.getScanNodes()[0]; Path tablePath; - tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf())) + tablePath = ((FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf())) .getTablePath(scan.getTableName()); Fragment frag = new FileFragment(scan.getCanonicalName(), tablePath, 0, 0, new String[]{UNKNOWN_HOST}); List fragments = new ArrayList(); fragments.add(frag); - SubQuery.scheduleFragments(subQuery, fragments); + Stage.scheduleFragments(stage, fragments); Map finalFetches = new HashMap(); Map> intermediates = new HashMap partitions = new ArrayList(); - partitions.addAll(subQuery.getContext().getSubQuery(block.getId()).getHashShuffleIntermediateEntries()); + partitions.addAll(stage.getContext().getStage(block.getId()).getHashShuffleIntermediateEntries()); // In scattered hash shuffle, Collecting each IntermediateEntry if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { @@ -861,16 +861,16 @@ public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerCon } int groupingColumns = 0; - LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(subQuery.getBlock().getPlan(), + LogicalNode[] groupbyNodes = PlannerUtil.findAllNodes(stage.getBlock().getPlan(), new NodeType[]{NodeType.GROUP_BY, NodeType.DISTINCT_GROUP_BY}); if (groupbyNodes != null && groupbyNodes.length > 0) { LogicalNode bottomNode = groupbyNodes[0]; if (bottomNode.getType() == NodeType.GROUP_BY) { groupingColumns = ((GroupbyNode)bottomNode).getGroupingColumns().length; } else if (bottomNode.getType() == NodeType.DISTINCT_GROUP_BY) { - DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); if (distinctNode == null) { - LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode"); + LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode"); distinctNode = (DistinctGroupbyNode)bottomNode; } groupingColumns = distinctNode.getGroupingColumns().length; @@ -879,8 +879,8 @@ public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerCon EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); if (property != null) { if (property.getDistinct().getIsMultipleAggregation()) { - MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); - if (stage != MultipleAggregationStage.THRID_STAGE) { + MultipleAggregationStage mulAggStage = property.getDistinct().getMultipleAggregationStage(); + if (mulAggStage != MultipleAggregationStage.THRID_STAGE) { groupingColumns = distinctNode.getOutSchema().size(); } } @@ -889,13 +889,13 @@ public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerCon } // get a proper number of tasks int determinedTaskNum = Math.min(maxNum, finalFetches.size()); - LOG.info(subQuery.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size()); + LOG.info(stage.getId() + ", ScheduleHashShuffledFetches - Max num=" + maxNum + ", finalFetchURI=" + finalFetches.size()); if (groupingColumns == 0) { determinedTaskNum = 1; - LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); + LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); } else { - TableStats totalStat = computeChildBlocksStats(subQuery.getContext(), masterPlan, subQuery.getId()); + TableStats totalStat = computeChildBlocksStats(stage.getContext(), masterPlan, stage.getId()); if (totalStat.getNumRows() == 0) { determinedTaskNum = 1; } @@ -903,13 +903,13 @@ public static void scheduleHashShuffledFetches(TaskSchedulerContext schedulerCon // set the proper number of tasks to the estimated task num if (channel.getShuffleType() == SCATTERED_HASH_SHUFFLE) { - scheduleScatteredHashShuffleFetches(schedulerContext, subQuery, intermediates, + scheduleScatteredHashShuffleFetches(schedulerContext, stage, intermediates, scan.getTableName()); } else { schedulerContext.setEstimatedTaskNum(determinedTaskNum); // divide fetch uris into the the proper number of tasks according to volumes - scheduleFetchesByEvenDistributedVolumes(subQuery, finalFetches, scan.getTableName(), determinedTaskNum); - LOG.info(subQuery.getId() + ", DeterminedTaskNum : " + determinedTaskNum); + scheduleFetchesByEvenDistributedVolumes(stage, finalFetches, scan.getTableName(), determinedTaskNum); + LOG.info(stage.getId() + ", DeterminedTaskNum : " + determinedTaskNum); } } @@ -970,12 +970,12 @@ public int compare(FetchGroupMeta o1, FetchGroupMeta o2) { return new Pair>[]>(assignedVolumes, fetchesArray); } - public static void scheduleFetchesByEvenDistributedVolumes(SubQuery subQuery, Map partitions, + public static void scheduleFetchesByEvenDistributedVolumes(Stage stage, Map partitions, String tableName, int num) { Map>[] fetchsArray = makeEvenDistributedFetchImpl(partitions, tableName, num).getSecond(); // Schedule FetchImpls for (Map> eachFetches : fetchsArray) { - SubQuery.scheduleFetches(subQuery, eachFetches); + Stage.scheduleFetches(stage, eachFetches); } } @@ -987,12 +987,12 @@ public static void scheduleFetchesByEvenDistributedVolumes(SubQuery subQuery, Ma // to $DIST_QUERY_TABLE_PARTITION_VOLUME. Finally, each subgroup is assigned to a query unit. // It is usually used for writing partitioned tables. public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext schedulerContext, - SubQuery subQuery, Map> intermediates, + Stage stage, Map> intermediates, String tableName) { long splitVolume = StorageUnit.MB * - subQuery.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE); + stage.getMasterPlan().getContext().getLong(SessionVars.TABLE_PARTITION_PER_SHUFFLE_SIZE); long pageSize = StorageUnit.MB * - subQuery.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes + stage.getContext().getConf().getIntVar(ConfVars.SHUFFLE_HASH_APPENDER_PAGE_VOLUME); // in bytes if (pageSize >= splitVolume) { throw new RuntimeException("tajo.dist-query.table-partition.task-volume-mb should be great than " + "tajo.shuffle.hash.appender.page.volumn-mb"); @@ -1033,11 +1033,11 @@ public static void scheduleScatteredHashShuffleFetches(TaskSchedulerContext sche fetchesArray[i] = new HashMap>(); fetchesArray[i].put(tableName, entry); - SubQuery.scheduleFetches(subQuery, fetchesArray[i]); + Stage.scheduleFetches(stage, fetchesArray[i]); i++; } - LOG.info(subQuery.getId() + LOG.info(stage.getId() + ", ShuffleType:" + SCATTERED_HASH_SHUFFLE.name() + ", Intermediate Size: " + totalIntermediateSize + ", splitSize: " + splitVolume @@ -1207,16 +1207,16 @@ public static Map> hashByHost(List { +public class Stage implements EventHandler { - private static final Log LOG = LogFactory.getLog(SubQuery.class); + private static final Log LOG = LogFactory.getLog(Stage.class); private MasterPlan masterPlan; private ExecutionBlock block; @@ -98,7 +98,7 @@ public class SubQuery implements EventHandler { private AbstractTaskScheduler taskScheduler; private QueryMasterTask.QueryMasterTaskContext context; private final List diagnostics = new ArrayList(); - private SubQueryState subQueryState; + private StageState stageState; private long startTime; private long finishTime; @@ -113,160 +113,159 @@ public class SubQuery implements EventHandler { private static final TaskCompletedTransition TASK_COMPLETED_TRANSITION = new TaskCompletedTransition(); private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION = new AllocatedContainersCancelTransition(); - private static final SubQueryCompleteTransition SUBQUERY_COMPLETED_TRANSITION = - new SubQueryCompleteTransition(); - private StateMachine stateMachine; + private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition(); + private StateMachine stateMachine; - protected static final StateMachineFactory stateMachineFactory = - new StateMachineFactory (SubQueryState.NEW) + protected static final StateMachineFactory stateMachineFactory = + new StateMachineFactory (StageState.NEW) // Transitions from NEW state - .addTransition(SubQueryState.NEW, - EnumSet.of(SubQueryState.INITED, SubQueryState.ERROR, SubQueryState.SUCCEEDED), - SubQueryEventType.SQ_INIT, + .addTransition(StageState.NEW, + EnumSet.of(StageState.INITED, StageState.ERROR, StageState.SUCCEEDED), + StageEventType.SQ_INIT, new InitAndRequestContainer()) - .addTransition(SubQueryState.NEW, SubQueryState.NEW, - SubQueryEventType.SQ_DIAGNOSTIC_UPDATE, + .addTransition(StageState.NEW, StageState.NEW, + StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(SubQueryState.NEW, SubQueryState.KILLED, - SubQueryEventType.SQ_KILL) - .addTransition(SubQueryState.NEW, SubQueryState.ERROR, - SubQueryEventType.SQ_INTERNAL_ERROR, + .addTransition(StageState.NEW, StageState.KILLED, + StageEventType.SQ_KILL) + .addTransition(StageState.NEW, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Transitions from INITED state - .addTransition(SubQueryState.INITED, SubQueryState.RUNNING, - SubQueryEventType.SQ_CONTAINER_ALLOCATED, + .addTransition(StageState.INITED, StageState.RUNNING, + StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION) - .addTransition(SubQueryState.INITED, SubQueryState.INITED, - SubQueryEventType.SQ_DIAGNOSTIC_UPDATE, + .addTransition(StageState.INITED, StageState.INITED, + StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(SubQueryState.INITED, SubQueryState.KILL_WAIT, - SubQueryEventType.SQ_KILL, new KillTasksTransition()) - .addTransition(SubQueryState.INITED, SubQueryState.ERROR, - SubQueryEventType.SQ_INTERNAL_ERROR, + .addTransition(StageState.INITED, StageState.KILL_WAIT, + StageEventType.SQ_KILL, new KillTasksTransition()) + .addTransition(StageState.INITED, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Transitions from RUNNING state - .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING, - SubQueryEventType.SQ_CONTAINER_ALLOCATED, + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION) - .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING, - SubQueryEventType.SQ_TASK_COMPLETED, + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_TASK_COMPLETED, TASK_COMPLETED_TRANSITION) - .addTransition(SubQueryState.RUNNING, - EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED), - SubQueryEventType.SQ_SUBQUERY_COMPLETED, - SUBQUERY_COMPLETED_TRANSITION) - .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING, - SubQueryEventType.SQ_FAILED, + .addTransition(StageState.RUNNING, + EnumSet.of(StageState.SUCCEEDED, StageState.FAILED), + StageEventType.SQ_STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_FAILED, TASK_COMPLETED_TRANSITION) - .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING, - SubQueryEventType.SQ_DIAGNOSTIC_UPDATE, + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(SubQueryState.RUNNING, SubQueryState.KILL_WAIT, - SubQueryEventType.SQ_KILL, + .addTransition(StageState.RUNNING, StageState.KILL_WAIT, + StageEventType.SQ_KILL, new KillTasksTransition()) - .addTransition(SubQueryState.RUNNING, SubQueryState.ERROR, - SubQueryEventType.SQ_INTERNAL_ERROR, + .addTransition(StageState.RUNNING, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able Transition - .addTransition(SubQueryState.RUNNING, SubQueryState.RUNNING, - SubQueryEventType.SQ_START) + .addTransition(StageState.RUNNING, StageState.RUNNING, + StageEventType.SQ_START) // Transitions from KILL_WAIT state - .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT, - SubQueryEventType.SQ_CONTAINER_ALLOCATED, + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) - .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT, - EnumSet.of(SubQueryEventType.SQ_KILL), new KillTasksTransition()) - .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT, - SubQueryEventType.SQ_TASK_COMPLETED, + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + EnumSet.of(StageEventType.SQ_KILL), new KillTasksTransition()) + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + StageEventType.SQ_TASK_COMPLETED, TASK_COMPLETED_TRANSITION) - .addTransition(SubQueryState.KILL_WAIT, - EnumSet.of(SubQueryState.SUCCEEDED, SubQueryState.FAILED, SubQueryState.KILLED), - SubQueryEventType.SQ_SUBQUERY_COMPLETED, - SUBQUERY_COMPLETED_TRANSITION) - .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT, - SubQueryEventType.SQ_DIAGNOSTIC_UPDATE, + .addTransition(StageState.KILL_WAIT, + EnumSet.of(StageState.SUCCEEDED, StageState.FAILED, StageState.KILLED), + StageEventType.SQ_STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(SubQueryState.KILL_WAIT, SubQueryState.KILL_WAIT, - SubQueryEventType.SQ_FAILED, + .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, + StageEventType.SQ_FAILED, TASK_COMPLETED_TRANSITION) - .addTransition(SubQueryState.KILL_WAIT, SubQueryState.ERROR, - SubQueryEventType.SQ_INTERNAL_ERROR, + .addTransition(StageState.KILL_WAIT, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Transitions from SUCCEEDED state - .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED, - SubQueryEventType.SQ_CONTAINER_ALLOCATED, + .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, + StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) - .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED, - SubQueryEventType.SQ_DIAGNOSTIC_UPDATE, + .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, + StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(SubQueryState.SUCCEEDED, SubQueryState.ERROR, - SubQueryEventType.SQ_INTERNAL_ERROR, + .addTransition(StageState.SUCCEEDED, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able events - .addTransition(SubQueryState.SUCCEEDED, SubQueryState.SUCCEEDED, + .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, EnumSet.of( - SubQueryEventType.SQ_START, - SubQueryEventType.SQ_KILL, - SubQueryEventType.SQ_CONTAINER_ALLOCATED)) + StageEventType.SQ_START, + StageEventType.SQ_KILL, + StageEventType.SQ_CONTAINER_ALLOCATED)) // Transitions from KILLED state - .addTransition(SubQueryState.KILLED, SubQueryState.KILLED, - SubQueryEventType.SQ_CONTAINER_ALLOCATED, + .addTransition(StageState.KILLED, StageState.KILLED, + StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) - .addTransition(SubQueryState.KILLED, SubQueryState.KILLED, - SubQueryEventType.SQ_DIAGNOSTIC_UPDATE, + .addTransition(StageState.KILLED, StageState.KILLED, + StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(SubQueryState.KILLED, SubQueryState.ERROR, - SubQueryEventType.SQ_INTERNAL_ERROR, + .addTransition(StageState.KILLED, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able transitions - .addTransition(SubQueryState.KILLED, SubQueryState.KILLED, + .addTransition(StageState.KILLED, StageState.KILLED, EnumSet.of( - SubQueryEventType.SQ_START, - SubQueryEventType.SQ_KILL, - SubQueryEventType.SQ_CONTAINER_ALLOCATED, - SubQueryEventType.SQ_FAILED)) + StageEventType.SQ_START, + StageEventType.SQ_KILL, + StageEventType.SQ_CONTAINER_ALLOCATED, + StageEventType.SQ_FAILED)) // Transitions from FAILED state - .addTransition(SubQueryState.FAILED, SubQueryState.FAILED, - SubQueryEventType.SQ_CONTAINER_ALLOCATED, + .addTransition(StageState.FAILED, StageState.FAILED, + StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) - .addTransition(SubQueryState.FAILED, SubQueryState.FAILED, - SubQueryEventType.SQ_DIAGNOSTIC_UPDATE, + .addTransition(StageState.FAILED, StageState.FAILED, + StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(SubQueryState.FAILED, SubQueryState.ERROR, - SubQueryEventType.SQ_INTERNAL_ERROR, + .addTransition(StageState.FAILED, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able transitions - .addTransition(SubQueryState.FAILED, SubQueryState.FAILED, + .addTransition(StageState.FAILED, StageState.FAILED, EnumSet.of( - SubQueryEventType.SQ_START, - SubQueryEventType.SQ_KILL, - SubQueryEventType.SQ_CONTAINER_ALLOCATED, - SubQueryEventType.SQ_FAILED)) + StageEventType.SQ_START, + StageEventType.SQ_KILL, + StageEventType.SQ_CONTAINER_ALLOCATED, + StageEventType.SQ_FAILED)) // Transitions from ERROR state - .addTransition(SubQueryState.ERROR, SubQueryState.ERROR, - SubQueryEventType.SQ_CONTAINER_ALLOCATED, + .addTransition(StageState.ERROR, StageState.ERROR, + StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) - .addTransition(SubQueryState.ERROR, SubQueryState.ERROR, - SubQueryEventType.SQ_DIAGNOSTIC_UPDATE, + .addTransition(StageState.ERROR, StageState.ERROR, + StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) // Ignore-able transitions - .addTransition(SubQueryState.ERROR, SubQueryState.ERROR, + .addTransition(StageState.ERROR, StageState.ERROR, EnumSet.of( - SubQueryEventType.SQ_START, - SubQueryEventType.SQ_KILL, - SubQueryEventType.SQ_FAILED, - SubQueryEventType.SQ_INTERNAL_ERROR, - SubQueryEventType.SQ_SUBQUERY_COMPLETED)) + StageEventType.SQ_START, + StageEventType.SQ_KILL, + StageEventType.SQ_FAILED, + StageEventType.SQ_INTERNAL_ERROR, + StageEventType.SQ_STAGE_COMPLETED)) .installTopology(); @@ -282,9 +281,9 @@ SubQueryEventType.SQ_KILL, new KillTasksTransition()) private TaskSchedulerContext schedulerContext; private List hashShuffleIntermediateEntries = new ArrayList(); private AtomicInteger completeReportReceived = new AtomicInteger(0); - private SubQueryHistory finalSubQueryHistory; + private StageHistory finalStageHistory; - public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) { + public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) { this.context = context; this.masterPlan = masterPlan; this.block = block; @@ -294,11 +293,11 @@ public SubQuery(QueryMasterTask.QueryMasterTaskContext context, MasterPlan maste this.readLock = readWriteLock.readLock(); this.writeLock = readWriteLock.writeLock(); stateMachine = stateMachineFactory.make(this); - subQueryState = stateMachine.getCurrentState(); + stageState = stateMachine.getCurrentState(); } - public static boolean isRunningState(SubQueryState state) { - return state == SubQueryState.INITED || state == SubQueryState.NEW || state == SubQueryState.RUNNING; + public static boolean isRunningState(StageState state) { + return state == StageState.INITED || state == StageState.NEW || state == StageState.RUNNING; } public QueryMasterTask.QueryMasterTaskContext getContext() { @@ -342,7 +341,7 @@ public long getFinishTime() { public float getTaskProgress() { readLock.lock(); try { - if (getState() == SubQueryState.NEW) { + if (getState() == StageState.NEW) { return 0; } else { return (float)(succeededObjectCount) / (float)totalScheduledObjectsCount; @@ -356,7 +355,7 @@ public float getProgress() { List tempTasks = null; readLock.lock(); try { - if (getState() == SubQueryState.NEW) { + if (getState() == StageState.NEW) { return 0.0f; } else { tempTasks = new ArrayList(tasks.values()); @@ -395,15 +394,15 @@ public void addTask(Task task) { tasks.put(task.getId(), task); } - public SubQueryHistory getSubQueryHistory() { - if (finalSubQueryHistory != null) { - if (finalSubQueryHistory.getFinishTime() == 0) { - finalSubQueryHistory = makeSubQueryHistory(); - finalSubQueryHistory.setTasks(makeTaskHistories()); + public StageHistory getStageHistory() { + if (finalStageHistory != null) { + if (finalStageHistory.getFinishTime() == 0) { + finalStageHistory = makeStageHistory(); + finalStageHistory.setTasks(makeTaskHistories()); } - return finalSubQueryHistory; + return finalStageHistory; } else { - return makeSubQueryHistory(); + return makeStageHistory(); } } @@ -417,20 +416,20 @@ private List makeTaskHistories() { return taskHistories; } - private SubQueryHistory makeSubQueryHistory() { - SubQueryHistory subQueryHistory = new SubQueryHistory(); + private StageHistory makeStageHistory() { + StageHistory stageHistory = new StageHistory(); - subQueryHistory.setExecutionBlockId(getId().toString()); - subQueryHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan())); - subQueryHistory.setState(getState().toString()); - subQueryHistory.setStartTime(startTime); - subQueryHistory.setFinishTime(finishTime); - subQueryHistory.setSucceededObjectCount(succeededObjectCount); - subQueryHistory.setKilledObjectCount(killedObjectCount); - subQueryHistory.setFailedObjectCount(failedObjectCount); - subQueryHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount); - subQueryHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned()); - subQueryHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned()); + stageHistory.setExecutionBlockId(getId().toString()); + stageHistory.setPlan(PlannerUtil.buildExplainString(block.getPlan())); + stageHistory.setState(getState().toString()); + stageHistory.setStartTime(startTime); + stageHistory.setFinishTime(finishTime); + stageHistory.setSucceededObjectCount(succeededObjectCount); + stageHistory.setKilledObjectCount(killedObjectCount); + stageHistory.setFailedObjectCount(failedObjectCount); + stageHistory.setTotalScheduledObjectsCount(totalScheduledObjectsCount); + stageHistory.setHostLocalAssigned(getTaskScheduler().getHostLocalAssigned()); + stageHistory.setRackLocalAssigned(getTaskScheduler().getRackLocalAssigned()); long totalInputBytes = 0; long totalReadBytes = 0; @@ -455,44 +454,44 @@ private SubQueryHistory makeSubQueryHistory() { } } - subQueryHistory.setTotalInputBytes(totalInputBytes); - subQueryHistory.setTotalReadBytes(totalReadBytes); - subQueryHistory.setTotalReadRows(totalReadRows); - subQueryHistory.setTotalWriteBytes(totalWriteBytes); - subQueryHistory.setTotalWriteRows(totalWriteRows); - subQueryHistory.setNumShuffles(numShuffles); - subQueryHistory.setProgress(getProgress()); - return subQueryHistory; + stageHistory.setTotalInputBytes(totalInputBytes); + stageHistory.setTotalReadBytes(totalReadBytes); + stageHistory.setTotalReadRows(totalReadRows); + stageHistory.setTotalWriteBytes(totalWriteBytes); + stageHistory.setTotalWriteRows(totalWriteRows); + stageHistory.setNumShuffles(numShuffles); + stageHistory.setProgress(getProgress()); + return stageHistory; } /** - * It finalizes this subquery. It is only invoked when the subquery is succeeded. + * It finalizes this stage. It is only invoked when the stage is succeeded. */ public void complete() { cleanup(); finalizeStats(); setFinishTime(); - eventHandler.handle(new SubQueryCompletedEvent(getId(), SubQueryState.SUCCEEDED)); + eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED)); } /** - * It finalizes this subquery. Unlike {@link SubQuery#complete()}, - * it is invoked when a subquery is abnormally finished. + * It finalizes this stage. Unlike {@link Stage#complete()}, + * it is invoked when a stage is abnormally finished. * - * @param finalState The final subquery state + * @param finalState The final stage state */ - public void abort(SubQueryState finalState) { + public void abort(StageState finalState) { // TODO - - // - committer.abortSubQuery(...) - // - record SubQuery Finish Time + // - committer.abortStage(...) + // - record Stage Finish Time // - CleanUp Tasks // - Record History cleanup(); setFinishTime(); - eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState)); + eventHandler.handle(new StageCompletedEvent(getId(), finalState)); } - public StateMachine getStateMachine() { + public StateMachine getStateMachine() { return this.stateMachine; } @@ -554,8 +553,8 @@ public String toString() { @Override public boolean equals(Object o) { - if (o instanceof SubQuery) { - SubQuery other = (SubQuery)o; + if (o instanceof Stage) { + Stage other = (Stage)o; return getId().equals(other.getId()); } return false; @@ -566,11 +565,11 @@ public int hashCode() { return getId().hashCode(); } - public int compareTo(SubQuery other) { + public int compareTo(Stage other) { return getId().compareTo(other.getId()); } - public SubQueryState getSynchronizedState() { + public StageState getSynchronizedState() { readLock.lock(); try { return stateMachine.getCurrentState(); @@ -580,11 +579,11 @@ public SubQueryState getSynchronizedState() { } /* non-blocking call for client API */ - public SubQueryState getState() { - return subQueryState; + public StageState getState() { + return stageState; } - public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) { + public static TableStats[] computeStatFromUnionBlock(Stage stage) { TableStats[] stat = new TableStats[]{new TableStats(), new TableStats()}; long[] avgRows = new long[]{0, 0}; long[] numBytes = new long[]{0, 0}; @@ -595,13 +594,13 @@ public static TableStats[] computeStatFromUnionBlock(SubQuery subQuery) { List columnStatses = Lists.newArrayList(); - MasterPlan masterPlan = subQuery.getMasterPlan(); - Iterator it = masterPlan.getChilds(subQuery.getBlock()).iterator(); + MasterPlan masterPlan = stage.getMasterPlan(); + Iterator it = masterPlan.getChilds(stage.getBlock()).iterator(); while (it.hasNext()) { ExecutionBlock block = it.next(); - SubQuery childSubQuery = subQuery.context.getSubQuery(block.getId()); + Stage childStage = stage.context.getStage(block.getId()); TableStats[] childStatArray = new TableStats[]{ - childSubQuery.getInputStats(), childSubQuery.getResultStats() + childStage.getInputStats(), childStage.getResultStats() }; for (int i = 0; i < 2; i++) { if (childStatArray[i] == null) { @@ -684,31 +683,32 @@ private void finalizeStats() { } @Override - public void handle(SubQueryEvent event) { + public void handle(StageEvent event) { if (LOG.isDebugEnabled()) { - LOG.debug("Processing " + event.getSubQueryId() + " of type " + event.getType() + ", preState=" + getSynchronizedState()); + LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState=" + + getSynchronizedState()); } try { writeLock.lock(); - SubQueryState oldState = getSynchronizedState(); + StageState oldState = getSynchronizedState(); try { getStateMachine().doTransition(event.getType(), event); - subQueryState = getSynchronizedState(); + stageState = getSynchronizedState(); } catch (InvalidStateTransitonException e) { LOG.error("Can't handle this event at current state" + ", eventType:" + event.getType().name() + ", oldState:" + oldState.name() + ", nextState:" + getSynchronizedState().name() , e); - eventHandler.handle(new SubQueryEvent(getId(), - SubQueryEventType.SQ_INTERNAL_ERROR)); + eventHandler.handle(new StageEvent(getId(), + StageEventType.SQ_INTERNAL_ERROR)); } // notify the eventhandler of state change if (LOG.isDebugEnabled()) { if (oldState != getSynchronizedState()) { - LOG.debug(getId() + " SubQuery Transitioned from " + oldState + " to " + LOG.debug(getId() + " Stage Transitioned from " + oldState + " to " + getSynchronizedState()); } } @@ -721,96 +721,96 @@ public void handleTaskRequestEvent(TaskRequestEvent event) { taskScheduler.handleTaskRequestEvent(event); } - private static class InitAndRequestContainer implements MultipleArcTransition { + private static class InitAndRequestContainer implements MultipleArcTransition { @Override - public SubQueryState transition(final SubQuery subQuery, SubQueryEvent subQueryEvent) { - subQuery.setStartTime(); - ExecutionBlock execBlock = subQuery.getBlock(); - SubQueryState state; + public StageState transition(final Stage stage, StageEvent stageEvent) { + stage.setStartTime(); + ExecutionBlock execBlock = stage.getBlock(); + StageState state; try { // Union operator does not require actual query processing. It is performed logically. if (execBlock.hasUnion()) { - subQuery.finalizeStats(); - state = SubQueryState.SUCCEEDED; + stage.finalizeStats(); + state = StageState.SUCCEEDED; } else { // execute pre-processing asyncronously - subQuery.getContext().getQueryMasterContext().getEventExecutor() + stage.getContext().getQueryMasterContext().getEventExecutor() .submit(new Runnable() { @Override public void run() { try { - ExecutionBlock parent = subQuery.getMasterPlan().getParent(subQuery.getBlock()); - DataChannel channel = subQuery.getMasterPlan().getChannel(subQuery.getId(), parent.getId()); - setShuffleIfNecessary(subQuery, channel); - initTaskScheduler(subQuery); - schedule(subQuery); - subQuery.totalScheduledObjectsCount = subQuery.getTaskScheduler().remainingScheduledObjectNum(); - LOG.info(subQuery.totalScheduledObjectsCount + " objects are scheduled"); - - if (subQuery.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks - subQuery.complete(); + ExecutionBlock parent = stage.getMasterPlan().getParent(stage.getBlock()); + DataChannel channel = stage.getMasterPlan().getChannel(stage.getId(), parent.getId()); + setShuffleIfNecessary(stage, channel); + initTaskScheduler(stage); + schedule(stage); + stage.totalScheduledObjectsCount = stage.getTaskScheduler().remainingScheduledObjectNum(); + LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled"); + + if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks + stage.complete(); } else { - if(subQuery.getSynchronizedState() == SubQueryState.INITED) { - subQuery.taskScheduler.start(); - allocateContainers(subQuery); + if(stage.getSynchronizedState() == StageState.INITED) { + stage.taskScheduler.start(); + allocateContainers(stage); } else { - subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL)); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); } } } catch (Throwable e) { - LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e); - subQuery.setFinishTime(); - subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage())); - subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.ERROR)); + LOG.error("Stage (" + stage.getId() + ") ERROR: ", e); + stage.setFinishTime(); + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage())); + stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR)); } } } ); - state = SubQueryState.INITED; + state = StageState.INITED; } } catch (Throwable e) { - LOG.error("SubQuery (" + subQuery.getId() + ") ERROR: ", e); - subQuery.setFinishTime(); - subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), e.getMessage())); - subQuery.eventHandler.handle(new SubQueryCompletedEvent(subQuery.getId(), SubQueryState.ERROR)); - return SubQueryState.ERROR; + LOG.error("Stage (" + stage.getId() + ") ERROR: ", e); + stage.setFinishTime(); + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), e.getMessage())); + stage.eventHandler.handle(new StageCompletedEvent(stage.getId(), StageState.ERROR)); + return StageState.ERROR; } return state; } - private void initTaskScheduler(SubQuery subQuery) throws IOException { - TajoConf conf = subQuery.context.getConf(); - subQuery.schedulerContext = new TaskSchedulerContext(subQuery.context, - subQuery.getMasterPlan().isLeaf(subQuery.getId()), subQuery.getId()); - subQuery.taskScheduler = TaskSchedulerFactory.get(conf, subQuery.schedulerContext, subQuery); - subQuery.taskScheduler.init(conf); - LOG.info(subQuery.taskScheduler.getName() + " is chosen for the task scheduling for " + subQuery.getId()); + private void initTaskScheduler(Stage stage) throws IOException { + TajoConf conf = stage.context.getConf(); + stage.schedulerContext = new TaskSchedulerContext(stage.context, + stage.getMasterPlan().isLeaf(stage.getId()), stage.getId()); + stage.taskScheduler = TaskSchedulerFactory.get(conf, stage.schedulerContext, stage); + stage.taskScheduler.init(conf); + LOG.info(stage.taskScheduler.getName() + " is chosen for the task scheduling for " + stage.getId()); } /** * If a parent block requires a repartition operation, the method sets proper repartition - * methods and the number of partitions to a given subquery. + * methods and the number of partitions to a given Stage. */ - private static void setShuffleIfNecessary(SubQuery subQuery, DataChannel channel) { + private static void setShuffleIfNecessary(Stage stage, DataChannel channel) { if (channel.getShuffleType() != ShuffleType.NONE_SHUFFLE) { - int numTasks = calculateShuffleOutputNum(subQuery, channel); - Repartitioner.setShuffleOutputNumForTwoPhase(subQuery, numTasks, channel); + int numTasks = calculateShuffleOutputNum(stage, channel); + Repartitioner.setShuffleOutputNumForTwoPhase(stage, numTasks, channel); } } /** * Getting the total memory of cluster * - * @param subQuery + * @param stage * @return mega bytes */ - private static int getClusterTotalMemory(SubQuery subQuery) { + private static int getClusterTotalMemory(Stage stage) { List workers = - subQuery.context.getQueryMasterContext().getQueryMaster().getAllWorker(); + stage.context.getQueryMasterContext().getQueryMaster().getAllWorker(); int totalMem = 0; for (TajoMasterProtocol.WorkerResourceProto worker : workers) { @@ -822,13 +822,13 @@ private static int getClusterTotalMemory(SubQuery subQuery) { * Getting the desire number of partitions according to the volume of input data. * This method is only used to determine the partition key number of hash join or aggregation. * - * @param subQuery + * @param stage * @return */ - public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel channel) { - TajoConf conf = subQuery.context.getConf(); - MasterPlan masterPlan = subQuery.getMasterPlan(); - ExecutionBlock parent = masterPlan.getParent(subQuery.getBlock()); + public static int calculateShuffleOutputNum(Stage stage, DataChannel channel) { + TajoConf conf = stage.context.getConf(); + MasterPlan masterPlan = stage.getMasterPlan(); + ExecutionBlock parent = masterPlan.getParent(stage.getBlock()); LogicalNode grpNode = null; if (parent != null) { @@ -844,18 +844,18 @@ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel chann // for outer ExecutionBlock outer = childs.get(0); - long outerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, outer); + long outerVolume = getInputVolume(stage.masterPlan, stage.context, outer); // for inner ExecutionBlock inner = childs.get(1); - long innerVolume = getInputVolume(subQuery.masterPlan, subQuery.context, inner); - LOG.info(subQuery.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, " + long innerVolume = getInputVolume(stage.masterPlan, stage.context, inner); + LOG.info(stage.getId() + ", Outer volume: " + Math.ceil((double) outerVolume / 1048576) + "MB, " + "Inner volume: " + Math.ceil((double) innerVolume / 1048576) + "MB"); long bigger = Math.max(outerVolume, innerVolume); int mb = (int) Math.ceil((double) bigger / 1048576); - LOG.info(subQuery.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); + LOG.info(stage.getId() + ", Bigger Table's volume is approximately " + mb + " MB"); int taskNum = (int) Math.ceil((double) mb / masterPlan.getContext().getInt(SessionVars.JOIN_PER_SHUFFLE_SIZE)); @@ -877,14 +877,14 @@ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel chann if (outerShuffleOutputNum != innerShuffleOutputNum && taskNum != outerShuffleOutputNum && taskNum != innerShuffleOutputNum) { - LOG.info(subQuery.getId() + ", Change determined number of join partitions cause difference of outputNum" + + LOG.info(stage.getId() + ", Change determined number of join partitions cause difference of outputNum" + ", originTaskNum=" + taskNum + ", changedTaskNum=" + Math.max(outerShuffleOutputNum, innerShuffleOutputNum) + ", outerShuffleOutptNum=" + outerShuffleOutputNum + ", innerShuffleOutputNum=" + innerShuffleOutputNum); taskNum = Math.max(outerShuffleOutputNum, innerShuffleOutputNum); } - LOG.info(subQuery.getId() + ", The determined number of join partitions is " + taskNum); + LOG.info(stage.getId() + ", The determined number of join partitions is " + taskNum); return taskNum; // Is this subquery the first step of group-by? @@ -894,82 +894,82 @@ public static int calculateShuffleOutputNum(SubQuery subQuery, DataChannel chann hasGroupColumns = ((GroupbyNode)grpNode).getGroupingColumns().length > 0; } else if (grpNode.getType() == NodeType.DISTINCT_GROUP_BY) { // Find current distinct stage node. - DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(subQuery.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); + DistinctGroupbyNode distinctNode = PlannerUtil.findMostBottomNode(stage.getBlock().getPlan(), NodeType.DISTINCT_GROUP_BY); if (distinctNode == null) { - LOG.warn(subQuery.getId() + ", Can't find current DistinctGroupbyNode"); + LOG.warn(stage.getId() + ", Can't find current DistinctGroupbyNode"); distinctNode = (DistinctGroupbyNode)grpNode; } hasGroupColumns = distinctNode.getGroupingColumns().length > 0; - Enforcer enforcer = subQuery.getBlock().getEnforcer(); + Enforcer enforcer = stage.getBlock().getEnforcer(); if (enforcer == null) { - LOG.warn(subQuery.getId() + ", DistinctGroupbyNode's enforcer is null."); + LOG.warn(stage.getId() + ", DistinctGroupbyNode's enforcer is null."); } EnforceProperty property = PhysicalPlannerImpl.getAlgorithmEnforceProperty(enforcer, distinctNode); if (property != null) { if (property.getDistinct().getIsMultipleAggregation()) { - MultipleAggregationStage stage = property.getDistinct().getMultipleAggregationStage(); - if (stage != MultipleAggregationStage.THRID_STAGE) { + MultipleAggregationStage multiAggStage = property.getDistinct().getMultipleAggregationStage(); + if (multiAggStage != MultipleAggregationStage.THRID_STAGE) { hasGroupColumns = true; } } } } if (!hasGroupColumns) { - LOG.info(subQuery.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); + LOG.info(stage.getId() + ", No Grouping Column - determinedTaskNum is set to 1"); return 1; } else { - long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block); + long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); int volumeByMB = (int) Math.ceil((double) volume / StorageUnit.MB); - LOG.info(subQuery.getId() + ", Table's volume is approximately " + volumeByMB + " MB"); + LOG.info(stage.getId() + ", Table's volume is approximately " + volumeByMB + " MB"); // determine the number of task int taskNum = (int) Math.ceil((double) volumeByMB / masterPlan.getContext().getInt(SessionVars.GROUPBY_PER_SHUFFLE_SIZE)); - LOG.info(subQuery.getId() + ", The determined number of aggregation partitions is " + taskNum); + LOG.info(stage.getId() + ", The determined number of aggregation partitions is " + taskNum); return taskNum; } } else { LOG.info("============>>>>> Unexpected Case! <<<<<================"); - long volume = getInputVolume(subQuery.masterPlan, subQuery.context, subQuery.block); + long volume = getInputVolume(stage.masterPlan, stage.context, stage.block); int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB"); + LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB"); // determine the number of task per 128MB int taskNum = (int) Math.ceil((double)mb / 128); - LOG.info(subQuery.getId() + ", The determined number of partitions is " + taskNum); + LOG.info(stage.getId() + ", The determined number of partitions is " + taskNum); return taskNum; } } - private static void schedule(SubQuery subQuery) throws IOException { - MasterPlan masterPlan = subQuery.getMasterPlan(); - ExecutionBlock execBlock = subQuery.getBlock(); - if (subQuery.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan - scheduleFragmentsForLeafQuery(subQuery); + private static void schedule(Stage stage) throws IOException { + MasterPlan masterPlan = stage.getMasterPlan(); + ExecutionBlock execBlock = stage.getBlock(); + if (stage.getMasterPlan().isLeaf(execBlock.getId()) && execBlock.getScanNodes().length == 1) { // Case 1: Just Scan + scheduleFragmentsForLeafQuery(stage); } else if (execBlock.getScanNodes().length > 1) { // Case 2: Join - Repartitioner.scheduleFragmentsForJoinQuery(subQuery.schedulerContext, subQuery); + Repartitioner.scheduleFragmentsForJoinQuery(stage.schedulerContext, stage); } else { // Case 3: Others (Sort or Aggregation) - int numTasks = getNonLeafTaskNum(subQuery); - Repartitioner.scheduleFragmentsForNonLeafTasks(subQuery.schedulerContext, masterPlan, subQuery, numTasks); + int numTasks = getNonLeafTaskNum(stage); + Repartitioner.scheduleFragmentsForNonLeafTasks(stage.schedulerContext, masterPlan, stage, numTasks); } } /** * Getting the desire number of tasks according to the volume of input data * - * @param subQuery + * @param stage * @return */ - public static int getNonLeafTaskNum(SubQuery subQuery) { + public static int getNonLeafTaskNum(Stage stage) { // Getting intermediate data size - long volume = getInputVolume(subQuery.getMasterPlan(), subQuery.context, subQuery.getBlock()); + long volume = getInputVolume(stage.getMasterPlan(), stage.context, stage.getBlock()); int mb = (int) Math.ceil((double)volume / 1048576); - LOG.info(subQuery.getId() + ", Table's volume is approximately " + mb + " MB"); + LOG.info(stage.getId() + ", Table's volume is approximately " + mb + " MB"); // determine the number of task per 64MB int maxTaskNum = Math.max(1, (int) Math.ceil((double)mb / 64)); - LOG.info(subQuery.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum); + LOG.info(stage.getId() + ", The determined number of non-leaf tasks is " + maxTaskNum); return maxTaskNum; } @@ -989,11 +989,11 @@ public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMa } else { long aggregatedVolume = 0; for (ExecutionBlock childBlock : masterPlan.getChilds(execBlock)) { - SubQuery subquery = context.getSubQuery(childBlock.getId()); - if (subquery == null || subquery.getSynchronizedState() != SubQueryState.SUCCEEDED) { + Stage stage = context.getStage(childBlock.getId()); + if (stage == null || stage.getSynchronizedState() != StageState.SUCCEEDED) { aggregatedVolume += getInputVolume(masterPlan, context, childBlock); } else { - aggregatedVolume += subquery.getResultStats().getNumBytes(); + aggregatedVolume += stage.getResultStats().getNumBytes(); } } @@ -1001,15 +1001,15 @@ public static long getInputVolume(MasterPlan masterPlan, QueryMasterTask.QueryMa } } - public static void allocateContainers(SubQuery subQuery) { - ExecutionBlock execBlock = subQuery.getBlock(); + public static void allocateContainers(Stage stage) { + ExecutionBlock execBlock = stage.getBlock(); //TODO consider disk slot int requiredMemoryMBPerTask = 512; - int numRequest = subQuery.getContext().getResourceAllocator().calculateNumRequestContainers( - subQuery.getContext().getQueryMasterContext().getWorkerContext(), - subQuery.schedulerContext.getEstimatedTaskNum(), + int numRequest = stage.getContext().getResourceAllocator().calculateNumRequestContainers( + stage.getContext().getQueryMasterContext().getWorkerContext(), + stage.schedulerContext.getEstimatedTaskNum(), requiredMemoryMBPerTask ); @@ -1017,23 +1017,23 @@ public static void allocateContainers(SubQuery subQuery) { resource.setMemory(requiredMemoryMBPerTask); - LOG.info("Request Container for " + subQuery.getId() + " containers=" + numRequest); + LOG.info("Request Container for " + stage.getId() + " containers=" + numRequest); Priority priority = Records.newRecord(Priority.class); - priority.setPriority(subQuery.getPriority()); + priority.setPriority(stage.getPriority()); ContainerAllocationEvent event = new ContainerAllocationEvent(ContainerAllocatorEventType.CONTAINER_REQ, - subQuery.getId(), priority, resource, numRequest, - subQuery.masterPlan.isLeaf(execBlock), 0.0f); - subQuery.eventHandler.handle(event); + stage.getId(), priority, resource, numRequest, + stage.masterPlan.isLeaf(execBlock), 0.0f); + stage.eventHandler.handle(event); } - private static void scheduleFragmentsForLeafQuery(SubQuery subQuery) throws IOException { - ExecutionBlock execBlock = subQuery.getBlock(); + private static void scheduleFragmentsForLeafQuery(Stage stage) throws IOException { + ExecutionBlock execBlock = stage.getBlock(); ScanNode[] scans = execBlock.getScanNodes(); Preconditions.checkArgument(scans.length == 1, "Must be Scan Query"); ScanNode scan = scans[0]; - TableDesc table = subQuery.context.getTableDescMap().get(scan.getCanonicalName()); + TableDesc table = stage.context.getTableDescMap().get(scan.getCanonicalName()); Collection fragments; TableMeta meta = table.getMeta(); @@ -1045,101 +1045,101 @@ private static void scheduleFragmentsForLeafQuery(SubQuery subQuery) throws IOEx if (scan.getType() == NodeType.PARTITIONS_SCAN) { // After calling this method, partition paths are removed from the physical plan. FileStorageManager storageManager = - (FileStorageManager)StorageManager.getFileStorageManager(subQuery.getContext().getConf()); + (FileStorageManager)StorageManager.getFileStorageManager(stage.getContext().getConf()); fragments = Repartitioner.getFragmentsFromPartitionedTable(storageManager, scan, table); } else { StorageManager storageManager = - StorageManager.getStorageManager(subQuery.getContext().getConf(), meta.getStoreType()); + StorageManager.getStorageManager(stage.getContext().getConf(), meta.getStoreType()); fragments = storageManager.getSplits(scan.getCanonicalName(), table, scan); } - SubQuery.scheduleFragments(subQuery, fragments); - if (subQuery.getTaskScheduler() instanceof DefaultTaskScheduler) { + Stage.scheduleFragments(stage, fragments); + if (stage.getTaskScheduler() instanceof DefaultTaskScheduler) { //Leaf task of DefaultTaskScheduler should be fragment size // EstimatedTaskNum determined number of initial container - subQuery.schedulerContext.setEstimatedTaskNum(fragments.size()); + stage.schedulerContext.setEstimatedTaskNum(fragments.size()); } else { - TajoConf conf = subQuery.context.getConf(); - subQuery.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024); + TajoConf conf = stage.context.getConf(); + stage.schedulerContext.setTaskSize(conf.getIntVar(ConfVars.TASK_DEFAULT_SIZE) * 1024 * 1024); int estimatedTaskNum = (int) Math.ceil((double) table.getStats().getNumBytes() / - (double) subQuery.schedulerContext.getTaskSize()); - subQuery.schedulerContext.setEstimatedTaskNum(estimatedTaskNum); + (double) stage.schedulerContext.getTaskSize()); + stage.schedulerContext.setEstimatedTaskNum(estimatedTaskNum); } } } - public static void scheduleFragment(SubQuery subQuery, Fragment fragment) { - subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, - subQuery.getId(), fragment)); + public static void scheduleFragment(Stage stage, Fragment fragment) { + stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, + stage.getId(), fragment)); } - public static void scheduleFragments(SubQuery subQuery, Collection fragments) { + public static void scheduleFragments(Stage stage, Collection fragments) { for (Fragment eachFragment : fragments) { - scheduleFragment(subQuery, eachFragment); + scheduleFragment(stage, eachFragment); } } - public static void scheduleFragments(SubQuery subQuery, Collection leftFragments, + public static void scheduleFragments(Stage stage, Collection leftFragments, Collection broadcastFragments) { for (Fragment eachLeafFragment : leftFragments) { - scheduleFragment(subQuery, eachLeafFragment, broadcastFragments); + scheduleFragment(stage, eachLeafFragment, broadcastFragments); } } - public static void scheduleFragment(SubQuery subQuery, + public static void scheduleFragment(Stage stage, Fragment leftFragment, Collection rightFragments) { - subQuery.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, - subQuery.getId(), leftFragment, rightFragments)); + stage.taskScheduler.handle(new FragmentScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, + stage.getId(), leftFragment, rightFragments)); } - public static void scheduleFetches(SubQuery subQuery, Map> fetches) { - subQuery.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, - subQuery.getId(), fetches)); + public static void scheduleFetches(Stage stage, Map> fetches) { + stage.taskScheduler.handle(new FetchScheduleEvent(TaskSchedulerEvent.EventType.T_SCHEDULE, + stage.getId(), fetches)); } public static Task newEmptyTask(TaskSchedulerContext schedulerContext, TaskAttemptScheduleContext taskContext, - SubQuery subQuery, int taskId) { - ExecutionBlock execBlock = subQuery.getBlock(); + Stage stage, int taskId) { + ExecutionBlock execBlock = stage.getBlock(); Task unit = new Task(schedulerContext.getMasterContext().getConf(), taskContext, QueryIdFactory.newTaskId(schedulerContext.getBlockId(), taskId), - schedulerContext.isLeafQuery(), subQuery.eventHandler); + schedulerContext.isLeafQuery(), stage.eventHandler); unit.setLogicalPlan(execBlock.getPlan()); - subQuery.addTask(unit); + stage.addTask(unit); return unit; } private static class ContainerLaunchTransition - implements SingleArcTransition { + implements SingleArcTransition { @Override - public void transition(SubQuery subQuery, SubQueryEvent event) { + public void transition(Stage stage, StageEvent event) { try { - SubQueryContainerAllocationEvent allocationEvent = - (SubQueryContainerAllocationEvent) event; + StageContainerAllocationEvent allocationEvent = + (StageContainerAllocationEvent) event; for (TajoContainer container : allocationEvent.getAllocatedContainer()) { TajoContainerId cId = container.getId(); - if (subQuery.containers.containsKey(cId)) { - subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), + if (stage.containers.containsKey(cId)) { + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), "Duplicated containers are allocated: " + cId.toString())); - subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR)); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); } - subQuery.containers.put(cId, container); + stage.containers.put(cId, container); } - LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!"); - subQuery.eventHandler.handle( - new LaunchTaskRunnersEvent(subQuery.getId(), allocationEvent.getAllocatedContainer(), - subQuery.getContext().getQueryContext(), - CoreGsonHelper.toJson(subQuery.getBlock().getPlan(), LogicalNode.class)) + LOG.info("Stage (" + stage.getId() + ") has " + stage.containers.size() + " containers!"); + stage.eventHandler.handle( + new LaunchTaskRunnersEvent(stage.getId(), allocationEvent.getAllocatedContainer(), + stage.getContext().getQueryContext(), + CoreGsonHelper.toJson(stage.getBlock().getPlan(), LogicalNode.class)) ); - subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START)); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_START)); } catch (Throwable t) { - subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), ExceptionUtils.getStackTrace(t))); - subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR)); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); } } } @@ -1148,75 +1148,75 @@ public void transition(SubQuery subQuery, SubQueryEvent event) { * It is used in KILL_WAIT state against Contained Allocated event. * It just returns allocated containers to resource manager. */ - private static class AllocatedContainersCancelTransition implements SingleArcTransition { + private static class AllocatedContainersCancelTransition implements SingleArcTransition { @Override - public void transition(SubQuery subQuery, SubQueryEvent event) { + public void transition(Stage stage, StageEvent event) { try { - SubQueryContainerAllocationEvent allocationEvent = - (SubQueryContainerAllocationEvent) event; - subQuery.eventHandler.handle( + StageContainerAllocationEvent allocationEvent = + (StageContainerAllocationEvent) event; + stage.eventHandler.handle( new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, - subQuery.getId(), allocationEvent.getAllocatedContainer())); + stage.getId(), allocationEvent.getAllocatedContainer())); LOG.info(String.format("[%s] %d allocated containers are canceled", - subQuery.getId().toString(), + stage.getId().toString(), allocationEvent.getAllocatedContainer().size())); } catch (Throwable t) { - subQuery.eventHandler.handle(new SubQueryDiagnosticsUpdateEvent(subQuery.getId(), + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), ExceptionUtils.getStackTrace(t))); - subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_INTERNAL_ERROR)); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); } } } - private static class TaskCompletedTransition implements SingleArcTransition { + private static class TaskCompletedTransition implements SingleArcTransition { @Override - public void transition(SubQuery subQuery, - SubQueryEvent event) { - SubQueryTaskEvent taskEvent = (SubQueryTaskEvent) event; - Task task = subQuery.getTask(taskEvent.getTaskId()); + public void transition(Stage stage, + StageEvent event) { + StageTaskEvent taskEvent = (StageTaskEvent) event; + Task task = stage.getTask(taskEvent.getTaskId()); if (task == null) { // task failed LOG.error(String.format("Task %s is absent", taskEvent.getTaskId())); - subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_FAILED)); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); } else { - subQuery.completedTaskCount++; + stage.completedTaskCount++; if (taskEvent.getState() == TaskState.SUCCEEDED) { - subQuery.succeededObjectCount++; + stage.succeededObjectCount++; } else if (task.getState() == TaskState.KILLED) { - subQuery.killedObjectCount++; + stage.killedObjectCount++; } else if (task.getState() == TaskState.FAILED) { - subQuery.failedObjectCount++; + stage.failedObjectCount++; // if at least one task is failed, try to kill all tasks. - subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_KILL)); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); } LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)", - subQuery.getId(), - subQuery.getTotalScheduledObjectsCount(), - subQuery.succeededObjectCount, - subQuery.killedObjectCount, - subQuery.failedObjectCount)); - - if (subQuery.totalScheduledObjectsCount == - subQuery.succeededObjectCount + subQuery.killedObjectCount + subQuery.failedObjectCount) { - subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_SUBQUERY_COMPLETED)); + stage.getId(), + stage.getTotalScheduledObjectsCount(), + stage.succeededObjectCount, + stage.killedObjectCount, + stage.failedObjectCount)); + + if (stage.totalScheduledObjectsCount == + stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) { + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); } } } } - private static class KillTasksTransition implements SingleArcTransition { + private static class KillTasksTransition implements SingleArcTransition { @Override - public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { - if(subQuery.getTaskScheduler() != null){ - subQuery.getTaskScheduler().stop(); + public void transition(Stage stage, StageEvent stageEvent) { + if(stage.getTaskScheduler() != null){ + stage.getTaskScheduler().stop(); } - for (Task task : subQuery.getTasks()) { - subQuery.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL)); + for (Task task : stage.getTasks()) { + stage.eventHandler.handle(new TaskEvent(task.getId(), TaskEventType.T_KILL)); } } } @@ -1236,8 +1236,8 @@ private void cleanup() { getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds); } - this.finalSubQueryHistory = makeSubQueryHistory(); - this.finalSubQueryHistory.setTasks(makeTaskHistories()); + this.finalStageHistory = makeStageHistory(); + this.finalStageHistory.setTasks(makeTaskHistories()); } public List getHashShuffleIntermediateEntries() { @@ -1260,7 +1260,7 @@ protected void waitingIntermediateReport() { long elapsedTime = System.currentTimeMillis() - startTime; if (elapsedTime >= 120 * 1000) { LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms"); - abort(SubQueryState.FAILED); + abort(StageState.FAILED); return; } } @@ -1272,7 +1272,7 @@ public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport LOG.info(getId() + ", receiveExecutionBlockReport:" + report.getSucceededTasks()); if (!report.getReportSuccess()) { LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage()); - abort(SubQueryState.FAILED); + abort(StageState.FAILED); return; } if (report.getIntermediateEntriesCount() > 0) { @@ -1288,56 +1288,55 @@ public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport } } - private static class SubQueryCompleteTransition - implements MultipleArcTransition { + private static class StageCompleteTransition implements MultipleArcTransition { @Override - public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { - // TODO - Commit subQuery + public StageState transition(Stage stage, StageEvent stageEvent) { + // TODO - Commit Stage // TODO - records succeeded, failed, killed completed task // TODO - records metrics try { - LOG.info(String.format("subQuery completed - %s (total=%d, success=%d, killed=%d)", - subQuery.getId().toString(), - subQuery.getTotalScheduledObjectsCount(), - subQuery.getSucceededObjectCount(), - subQuery.killedObjectCount)); - - if (subQuery.killedObjectCount > 0 || subQuery.failedObjectCount > 0) { - if (subQuery.failedObjectCount > 0) { - subQuery.abort(SubQueryState.FAILED); - return SubQueryState.FAILED; - } else if (subQuery.killedObjectCount > 0) { - subQuery.abort(SubQueryState.KILLED); - return SubQueryState.KILLED; + LOG.info(String.format("Stage completed - %s (total=%d, success=%d, killed=%d)", + stage.getId().toString(), + stage.getTotalScheduledObjectsCount(), + stage.getSucceededObjectCount(), + stage.killedObjectCount)); + + if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) { + if (stage.failedObjectCount > 0) { + stage.abort(StageState.FAILED); + return StageState.FAILED; + } else if (stage.killedObjectCount > 0) { + stage.abort(StageState.KILLED); + return StageState.KILLED; } else { - LOG.error("Invalid State " + subQuery.getSynchronizedState() + " State"); - subQuery.abort(SubQueryState.ERROR); - return SubQueryState.ERROR; + LOG.error("Invalid State " + stage.getSynchronizedState() + " State"); + stage.abort(StageState.ERROR); + return StageState.ERROR; } } else { - subQuery.complete(); - return SubQueryState.SUCCEEDED; + stage.complete(); + return StageState.SUCCEEDED; } } catch (Throwable t) { LOG.error(t.getMessage(), t); - subQuery.abort(SubQueryState.ERROR); - return SubQueryState.ERROR; + stage.abort(StageState.ERROR); + return StageState.ERROR; } } } - private static class DiagnosticsUpdateTransition implements SingleArcTransition { + private static class DiagnosticsUpdateTransition implements SingleArcTransition { @Override - public void transition(SubQuery subQuery, SubQueryEvent event) { - subQuery.addDiagnostic(((SubQueryDiagnosticsUpdateEvent) event).getDiagnosticUpdate()); + public void transition(Stage stage, StageEvent event) { + stage.addDiagnostic(((StageDiagnosticsUpdateEvent) event).getDiagnosticUpdate()); } } - private static class InternalErrorTransition implements SingleArcTransition { + private static class InternalErrorTransition implements SingleArcTransition { @Override - public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { - subQuery.abort(SubQueryState.ERROR); + public void transition(Stage stage, StageEvent stageEvent) { + stage.abort(StageState.ERROR); } } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java similarity index 97% rename from tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java rename to tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java index effcfde42d..82a06fe4c5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQueryState.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/StageState.java @@ -18,7 +18,7 @@ package org.apache.tajo.master.querymaster; -public enum SubQueryState { +public enum StageState { NEW, INITED, RUNNING, diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java index 7de3933060..5475791da1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Task.java @@ -580,7 +580,7 @@ private static class KillNewTaskTransition implements SingleArcTransition sortSubQuery(Collection subQueries) { - List subQueryList = new ArrayList(subQueries); - Collections.sort(subQueryList, new Comparator() { + public static List sortStages(Collection stages) { + List stageList = new ArrayList(stages); + Collections.sort(stageList, new Comparator() { @Override - public int compare(SubQuery subQuery1, SubQuery subQuery2) { - long q1StartTime = subQuery1.getStartTime(); - long q2StartTime = subQuery2.getStartTime(); + public int compare(Stage stage1, Stage stage2) { + long q1StartTime = stage1.getStartTime(); + long q2StartTime = stage2.getStartTime(); q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime); q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime); int result = compareLong(q1StartTime, q2StartTime); if (result == 0) { - return subQuery1.getId().toString().compareTo(subQuery2.getId().toString()); + return stage1.getId().toString().compareTo(stage2.getId().toString()); } else { return result; } } }); - return subQueryList; + return stageList; } - public static List sortSubQueryHistory(Collection subQueries) { - List subQueryList = new ArrayList(subQueries); - Collections.sort(subQueryList, new Comparator() { + public static List sortStageHistories(Collection stages) { + List stageList = new ArrayList(stages); + Collections.sort(stageList, new Comparator() { @Override - public int compare(SubQueryHistory subQuery1, SubQueryHistory subQuery2) { - long q1StartTime = subQuery1.getStartTime(); - long q2StartTime = subQuery2.getStartTime(); + public int compare(StageHistory stage1, StageHistory stage2) { + long q1StartTime = stage1.getStartTime(); + long q2StartTime = stage2.getStartTime(); q1StartTime = (q1StartTime == 0 ? Long.MAX_VALUE : q1StartTime); q2StartTime = (q2StartTime == 0 ? Long.MAX_VALUE : q2StartTime); int result = compareLong(q1StartTime, q2StartTime); if (result == 0) { - return subQuery1.getExecutionBlockId().compareTo(subQuery2.getExecutionBlockId()); + return stage1.getExecutionBlockId().compareTo(stage2.getExecutionBlockId()); } else { return result; } } }); - return subQueryList; + return stageList; } public static String getMasterActiveLabel(MasterContext context) { diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java index 9fb427f501..932f58415a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryReader.java @@ -210,7 +210,7 @@ public List getTaskHistory(String queryId, String ebId) throws IOEx in.readFully(buf, 0, buf.length); - return SubQueryHistory.fromJsonTasks(new String(buf)); + return StageHistory.fromJsonTasks(new String(buf)); } finally { if (in != null) { in.close(); diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index 7e30f9c353..5934885d0a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -40,7 +40,7 @@ * * //query-list/query-list-.hist (TajoMaster's query list, hourly rolling) * /query-detail//query.hist (QueryMaster's query detail) - * /.hist (QueryMaster's subquery detail) + * /.hist (QueryMaster's stage detail) * //tasks/_/___.hist * History files are kept for "tajo.history.expiry-time-day" (default value is 7 days) */ @@ -267,7 +267,7 @@ private synchronized void writeQueryHistory(QueryHistory queryHistory) throws Ex // QueryMaster's query detail history (json format) // //query-detail//query.hist - // QueryMaster's subquery detail history(proto binary format) + // QueryMaster's stage detail history(proto binary format) // //query-detail//.hist Path queryHistoryFile = getQueryHistoryFilePath(historyParentPath, queryHistory.getQueryId()); @@ -295,13 +295,13 @@ private synchronized void writeQueryHistory(QueryHistory queryHistory) throws Ex } } - if (queryHistory.getSubQueryHistories() != null) { - for (SubQueryHistory subQueryHistory : queryHistory.getSubQueryHistories()) { - Path path = new Path(queryHistoryFile.getParent(), subQueryHistory.getExecutionBlockId() + HISTORY_FILE_POSTFIX); + if (queryHistory.getStageHistories() != null) { + for (StageHistory stageHistory : queryHistory.getStageHistories()) { + Path path = new Path(queryHistoryFile.getParent(), stageHistory.getExecutionBlockId() + HISTORY_FILE_POSTFIX); out = null; try { out = fs.create(path); - out.write(subQueryHistory.toTasksJson().getBytes()); + out.write(stageHistory.toTasksJson().getBytes()); LOG.info("Saving query unit: " + path); } finally { if (out != null) { diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java index 7a81b4bc2a..fdc45a1dbe 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/QueryHistory.java @@ -20,8 +20,9 @@ import com.google.gson.annotations.Expose; import org.apache.tajo.engine.json.CoreGsonHelper; +import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.QueryHistoryProto; -import org.apache.tajo.ipc.ClientProtos.SubQueryHistoryProto; +import org.apache.tajo.ipc.ClientProtos.StageHistoryProto; import org.apache.tajo.json.GsonObject; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.KeyValueProto; @@ -42,7 +43,7 @@ public class QueryHistory implements GsonObject, History { @Expose private String distributedPlan; @Expose - private List subQueryHistories; + private List stageHistories; public String getQueryId() { return queryId; @@ -56,8 +57,8 @@ public void setQueryMaster(String queryMaster) { this.queryMaster = queryMaster; } - public void setSubQueryHistories(List subQueryHistories) { - this.subQueryHistories = subQueryHistories; + public void setStageHistories(List stageHistories) { + this.stageHistories = stageHistories; } public String getQueryMaster() { @@ -72,8 +73,8 @@ public void setHttpPort(int httpPort) { this.httpPort = httpPort; } - public List getSubQueryHistories() { - return subQueryHistories; + public List getStageHistories() { + return stageHistories; } public List getSessionVariables() { @@ -138,13 +139,13 @@ public QueryHistoryProto getProto() { builder.addAllSessionVariables(sessionProtos); - List subQueryHistoryProtos = new ArrayList(); - if (subQueryHistories != null) { - for (SubQueryHistory eachSubQuery: subQueryHistories) { - subQueryHistoryProtos.add((eachSubQuery.getProto())); + List stageHistoryProtos = new ArrayList(); + if (stageHistories != null) { + for (StageHistory eachStage: stageHistories) { + stageHistoryProtos.add((eachStage.getProto())); } } - builder.addAllSubQueryHistories(subQueryHistoryProtos); + builder.addAllStageHistories(stageHistoryProtos); return builder.build(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java b/tajo-core/src/main/java/org/apache/tajo/util/history/StageHistory.java similarity index 95% rename from tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java rename to tajo-core/src/main/java/org/apache/tajo/util/history/StageHistory.java index 0afdf5a42c..e760f86912 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/SubQueryHistory.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/StageHistory.java @@ -21,13 +21,13 @@ import com.google.gson.annotations.Expose; import com.google.gson.reflect.TypeToken; import org.apache.tajo.engine.json.CoreGsonHelper; -import org.apache.tajo.ipc.ClientProtos.SubQueryHistoryProto; +import org.apache.tajo.ipc.ClientProtos.StageHistoryProto; import org.apache.tajo.json.GsonObject; import java.util.ArrayList; import java.util.List; -public class SubQueryHistory implements GsonObject { +public class StageHistory implements GsonObject { @Expose private String executionBlockId; @Expose @@ -223,7 +223,7 @@ public void setTasks(List tasks) { @Override public String toJson() { - return CoreGsonHelper.toJson(this, SubQueryHistory.class); + return CoreGsonHelper.toJson(this, StageHistory.class); } public String toTasksJson() { @@ -242,8 +242,8 @@ public static List fromJsonTasks(String json) { }.getType()); } - public SubQueryHistoryProto getProto() { - SubQueryHistoryProto.Builder builder = SubQueryHistoryProto.newBuilder(); + public StageHistoryProto getProto() { + StageHistoryProto.Builder builder = StageHistoryProto.newBuilder(); builder.setExecutionBlockId(executionBlockId) .setState(state) .setStartTime(startTime) diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java index f055733794..8944eaeebb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoResourceAllocator.java @@ -37,10 +37,10 @@ import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.ContainerAllocationEvent; import org.apache.tajo.master.event.ContainerAllocatorEventType; -import org.apache.tajo.master.event.SubQueryContainerAllocationEvent; +import org.apache.tajo.master.event.StageContainerAllocationEvent; import org.apache.tajo.master.querymaster.QueryMasterTask; -import org.apache.tajo.master.querymaster.SubQuery; -import org.apache.tajo.master.querymaster.SubQueryState; +import org.apache.tajo.master.querymaster.Stage; +import org.apache.tajo.master.querymaster.StageState; import org.apache.tajo.master.rm.*; import org.apache.tajo.rpc.CallFuture; import org.apache.tajo.rpc.NettyClientBase; @@ -352,8 +352,8 @@ public void run() { containers.add(container); } - SubQueryState state = queryTaskContext.getSubQuery(executionBlockId).getSynchronizedState(); - if (!SubQuery.isRunningState(state)) { + StageState state = queryTaskContext.getStage(executionBlockId).getSynchronizedState(); + if (!Stage.isRunningState(state)) { try { List containerIds = new ArrayList(); for(TajoContainer eachContainer: containers) { @@ -368,9 +368,9 @@ public void run() { if (allocatedResources.size() > 0) { if(LOG.isDebugEnabled()) { - LOG.debug("SubQueryContainerAllocationEvent fire:" + executionBlockId); + LOG.debug("StageContainerAllocationEvent fire:" + executionBlockId); } - queryTaskContext.getEventHandler().handle(new SubQueryContainerAllocationEvent(executionBlockId, containers)); + queryTaskContext.getEventHandler().handle(new StageContainerAllocationEvent(executionBlockId, containers)); } numAllocatedContainers += allocatedResources.size(); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 0920619d1a..70a3202cff 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -162,7 +162,7 @@ public Task(String taskRunnerId, context.setState(TaskAttemptState.TA_PENDING); LOG.info("=================================="); - LOG.info("* Subquery " + request.getId() + " is initialized"); + LOG.info("* Stage " + request.getId() + " is initialized"); LOG.info("* InterQuery: " + interQuery + (interQuery ? ", Use " + this.shuffleType + " shuffle":"") + ", Fragments (num: " + request.getFragments().size() + ")" + @@ -734,24 +734,24 @@ private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IO final List types = params.get("type"); final List qids = params.get("qid"); final List taskIdList = params.get("ta"); - final List subQueryIds = params.get("sid"); + final List stageIds = params.get("sid"); final List partIds = params.get("p"); final List offsetList = params.get("offset"); final List lengthList = params.get("length"); - if (types == null || subQueryIds == null || qids == null || partIds == null) { - LOG.error("Invalid URI - Required queryId, type, subquery Id, and part id"); + if (types == null || stageIds == null || qids == null || partIds == null) { + LOG.error("Invalid URI - Required queryId, type, stage Id, and part id"); return null; } - if (qids.size() != 1 && types.size() != 1 || subQueryIds.size() != 1) { - LOG.error("Invalid URI - Required qids, type, taskIds, subquery Id, and part id"); + if (qids.size() != 1 && types.size() != 1 || stageIds.size() != 1) { + LOG.error("Invalid URI - Required qids, type, taskIds, stage Id, and part id"); return null; } String queryId = qids.get(0); String shuffleType = types.get(0); - String sid = subQueryIds.get(0); + String sid = stageIds.get(0); String partId = partIds.get(0); if (shuffleType.equals("r") && taskIdList == null) { @@ -767,10 +767,10 @@ private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IO LOG.info("PullServer request param: shuffleType=" + shuffleType + ", sid=" + sid + ", partId=" + partId + ", taskIds=" + taskIdList); - // The working directory of Tajo worker for each query, including subquery + // The working directory of Tajo worker for each query, including stage String queryBaseDir = queryId.toString() + "/output" + "/" + sid + "/"; - // If the subquery requires a range shuffle + // If the stage requires a range shuffle if (shuffleType.equals("r")) { String ta = taskIds.get(0); if (!executionBlockContext.getLocalDirAllocator().ifExists(queryBaseDir + ta + "/output/", conf)) { @@ -790,7 +790,7 @@ private FileChunk getLocalStoredFileChunk(URI fetchURI, TajoConf conf) throws IO return null; } - // If the subquery requires a hash shuffle or a scattered hash shuffle + // If the stage requires a hash shuffle or a scattered hash shuffle } else if (shuffleType.equals("h") || shuffleType.equals("s")) { int partParentId = HashShuffleAppenderManager.getPartParentId(Integer.parseInt(partId), (TajoConf) conf); String partPath = queryBaseDir + "hash-shuffle/" + partParentId + "/" + partId; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java index 1556a44e37..3092c47050 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskAttemptContext.java @@ -51,7 +51,7 @@ /** - * Contains the information about executing subquery. + * Contains the information about executing task attempt. */ public class TaskAttemptContext { private static final Log LOG = LogFactory.getLog(TaskAttemptContext.class); diff --git a/tajo-core/src/main/resources/webapps/admin/querydetail.jsp b/tajo-core/src/main/resources/webapps/admin/querydetail.jsp index 41b0e8f116..099301ee3b 100644 --- a/tajo-core/src/main/resources/webapps/admin/querydetail.jsp +++ b/tajo-core/src/main/resources/webapps/admin/querydetail.jsp @@ -25,7 +25,7 @@ <%@ page import="java.text.SimpleDateFormat" %> <%@ page import="java.util.List" %> <%@ page import="org.apache.tajo.util.history.QueryHistory" %> -<%@ page import="org.apache.tajo.util.history.SubQueryHistory" %> +<%@ page import="org.apache.tajo.util.history.StageHistory" %> <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <% @@ -36,8 +36,8 @@ String startTime = request.getParameter("startTime"); QueryHistory queryHistory = reader.getQueryHistory(queryId, Long.parseLong(startTime)); - List subQueryHistories = - queryHistory != null ? JSPUtil.sortSubQueryHistory(queryHistory.getSubQueryHistories()) : null; + List stageHistories = + queryHistory != null ? JSPUtil.sortStageHistories(queryHistory.getStageHistories()) : null; SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); %> @@ -61,34 +61,34 @@ if (queryHistory == null) {
No Query history data.
<% } else { - if (subQueryHistories == null) { + if (stageHistories == null) { %> -
No SubQuery history data.
+
No Stage history data.
<% } else { %> <% - for(SubQueryHistory eachSubQuery: subQueryHistories) { - String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachSubQuery.getExecutionBlockId() + "&startTime=" + startTime; + for(StageHistory eachStage: stageHistories) { + String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachStage.getExecutionBlockId() + "&startTime=" + startTime; %> - - - - - - - - + + + + + + + + <% } //end of for %>
IDStateStartedFinishedRunning timeProgressSucceeded/TotalFailed/Killed
<%=eachSubQuery.getExecutionBlockId()%><%=eachSubQuery.getState()%><%=df.format(eachSubQuery.getStartTime())%><%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%><%=JSPUtil.getElapsedTime(eachSubQuery.getStartTime(), eachSubQuery.getFinishTime())%><%=JSPUtil.percentFormat(eachSubQuery.getProgress())%>%<%=eachSubQuery.getSucceededObjectCount()%> / <%=eachSubQuery.getTotalScheduledObjectsCount()%><%=eachSubQuery.getFailedObjectCount()%> / <%=eachSubQuery.getKilledObjectCount()%><%=eachStage.getExecutionBlockId()%><%=eachStage.getState()%><%=df.format(eachStage.getStartTime())%><%=eachStage.getFinishTime() == 0 ? "-" : df.format(eachStage.getFinishTime())%><%=JSPUtil.getElapsedTime(eachStage.getStartTime(), eachStage.getFinishTime())%><%=JSPUtil.percentFormat(eachStage.getProgress())%>%<%=eachStage.getSucceededObjectCount()%> / <%=eachStage.getTotalScheduledObjectsCount()%><%=eachStage.getFailedObjectCount()%> / <%=eachStage.getKilledObjectCount()%>
<% - } //end of else [if (subQueryHistories == null)] + } //end of else [if (stageHistories == null)] %>

Applied Session Variables

diff --git a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp index ed97efff78..09d9e2ed3b 100644 --- a/tajo-core/src/main/resources/webapps/admin/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/admin/querytasks.jsp @@ -28,7 +28,7 @@ <%@ page import="org.apache.tajo.master.TajoMaster" %> <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <%@ page import="org.apache.tajo.util.history.QueryHistory" %> -<%@ page import="org.apache.tajo.util.history.SubQueryHistory" %> +<%@ page import="org.apache.tajo.util.history.StageHistory" %> <%@ page import="org.apache.tajo.master.rm.Worker" %> <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.util.history.TaskHistory" %> @@ -43,14 +43,14 @@ QueryHistory queryHistory = reader.getQueryHistory(queryId); - List subQueryHistories = - queryHistory != null ? JSPUtil.sortSubQueryHistory(queryHistory.getSubQueryHistories()) : null; + List stageHistories = + queryHistory != null ? JSPUtil.sortStageHistories(queryHistory.getStageHistories()) : null; - SubQueryHistory subQuery = null; - if (subQueryHistories != null) { - for (SubQueryHistory eachSubQuery: subQueryHistories) { - if (eachSubQuery.getExecutionBlockId().equals(ebId)) { - subQuery = eachSubQuery; + StageHistory stage = null; + if (stageHistories != null) { + for (StageHistory eachStage: stageHistories) { + if (eachStage.getExecutionBlockId().equals(ebId)) { + stage = eachStage; break; } } @@ -92,12 +92,12 @@ long totalWriteBytes = 0; long totalWriteRows = 0; - if (subQuery != null) { - totalInputBytes = subQuery.getTotalInputBytes(); - totalReadBytes = subQuery.getTotalReadBytes(); - totalReadRows = subQuery.getTotalReadRows(); - totalWriteBytes = subQuery.getTotalWriteBytes(); - totalWriteRows = subQuery.getTotalWriteRows(); + if (stage != null) { + totalInputBytes = stage.getTotalInputBytes(); + totalReadBytes = stage.getTotalReadBytes(); + totalReadRows = stage.getTotalReadRows(); + totalWriteBytes = stage.getTotalWriteBytes(); + totalWriteRows = stage.getTotalWriteRows(); } List allTasks = reader.getTaskHistory(queryId, ebId); @@ -150,12 +150,12 @@

<%=ebId.toString()%>


-

<%=subQuery.getPlan()%>
+
<%=stage.getPlan()%>

- - - + + + diff --git a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp index ceb1c566ff..340eb9586a 100644 --- a/tajo-core/src/main/resources/webapps/worker/querydetail.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querydetail.jsp @@ -31,7 +31,7 @@ <%@ page import="java.util.Map" %> <%@ page import="org.apache.tajo.SessionVars" %> <%@ page import="org.apache.tajo.util.history.QueryHistory" %> -<%@ page import="org.apache.tajo.util.history.SubQueryHistory" %> +<%@ page import="org.apache.tajo.util.history.StageHistory" %> <%@ page import="org.apache.tajo.util.history.HistoryReader" %> <% @@ -61,8 +61,8 @@ return; } - List subQueryHistories = - queryHistory != null ? JSPUtil.sortSubQueryHistory(queryHistory.getSubQueryHistories()) : null; + List stageHistories = + queryHistory != null ? JSPUtil.sortStageHistories(queryHistory.getStageHistories()) : null; SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); %> @@ -86,26 +86,26 @@ if (runningQuery && query == null) { if (errorMessage != null && !errorMessage.isEmpty()) { out.write("

Message:

" + errorMessage + "
"); } -} else if (subQueryHistories == null) { - out.write("

Message:

No SubQueries
"); +} else if (stageHistories == null) { + out.write("

Message:

No Stages
"); } else { %>

<%=queryId.toString()%> [Query Plan]

Status:<%=subQuery.getState()%>
Started:<%=df.format(subQuery.getStartTime())%> ~ <%=subQuery.getFinishTime() == 0 ? "-" : df.format(subQuery.getFinishTime())%>
# Tasks:<%=numTasks%> (Local Tasks: <%=subQuery.getHostLocalAssigned()%>, Rack Local Tasks: <%=subQuery.getRackLocalAssigned()%>)
Status:<%=stage.getState()%>
Started:<%=df.format(stage.getStartTime())%> ~ <%=stage.getFinishTime() == 0 ? "-" : df.format(stage.getFinishTime())%>
# Tasks:<%=numTasks%> (Local Tasks: <%=stage.getHostLocalAssigned()%>, Rack Local Tasks: <%=stage.getRackLocalAssigned()%>)
Progress:<%=JSPUtil.percentFormat((float) (totalProgress / numTasks))%>%
# Shuffles:<%=numShuffles%>
Input Bytes:<%=FileUtil.humanReadableByteCount(totalInputBytes, false) + " (" + nf.format(totalInputBytes) + " B)"%>
<% -for(SubQueryHistory eachSubQuery: subQueryHistories) { - eachSubQuery.getSucceededObjectCount(); - String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachSubQuery.getExecutionBlockId(); +for(StageHistory eachStage: stageHistories) { + eachStage.getSucceededObjectCount(); + String detailLink = "querytasks.jsp?queryId=" + queryId + "&ebid=" + eachStage.getExecutionBlockId(); %> - - - - - - - + + + + + + + <% } //end of for diff --git a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp index ec860b999a..88de97d484 100644 --- a/tajo-core/src/main/resources/webapps/worker/queryplan.jsp +++ b/tajo-core/src/main/resources/webapps/worker/queryplan.jsp @@ -25,7 +25,7 @@ <%@ page import="org.apache.tajo.QueryId" %> <%@ page import="org.apache.tajo.util.TajoIdUtils" %> <%@ page import="org.apache.tajo.master.querymaster.QueryMasterTask" %> -<%@ page import="org.apache.tajo.master.querymaster.SubQuery" %> +<%@ page import="org.apache.tajo.master.querymaster.Stage" %> <%@ page import="org.apache.tajo.engine.planner.global.ExecutionBlock" %> <%@ page import="java.util.*" %> <%@ page import="org.apache.tajo.ExecutionBlockId" %> @@ -46,22 +46,22 @@ Query query = queryMasterTask.getQuery(); - Map subQueryMap = new HashMap(); + Map stageMap = new HashMap(); - for(SubQuery eachSubQuery: query.getSubQueries()) { - subQueryMap.put(eachSubQuery.getId(), eachSubQuery); + for(Stage eachStage : query.getStages()) { + stageMap.put(eachStage.getId(), eachStage); } - class SubQueryInfo { + class StageInfo { ExecutionBlock executionBlock; - SubQuery subQuery; + Stage stage; ExecutionBlockId parentId; int px; int py; int pos; // 0: mid 1: left 2: right - public SubQueryInfo(ExecutionBlock executionBlock, SubQuery subQuery, ExecutionBlockId parentId, int px, int py, int pos) { + public StageInfo(ExecutionBlock executionBlock, Stage stage, ExecutionBlockId parentId, int px, int py, int pos) { this.executionBlock = executionBlock; - this.subQuery = subQuery; + this.stage = stage; this.parentId = parentId; this.px = px; this.py = py; @@ -102,21 +102,21 @@ String curIdStr = null; int x=35, y=1; int pos; - List subQueryInfos = new ArrayList(); + List stageInfos = new ArrayList(); - subQueryInfos.add(new SubQueryInfo(masterPlan.getRoot(), null, null, x, y, 0)); + stageInfos.add(new StageInfo(masterPlan.getRoot(), null, null, x, y, 0)); - while (!subQueryInfos.isEmpty()) { - SubQueryInfo eachSubQueryInfo = subQueryInfos.remove(0); - curIdStr = eachSubQueryInfo.executionBlock.getId().toString(); + while (!stageInfos.isEmpty()) { + StageInfo eachStageInfo = stageInfos.remove(0); + curIdStr = eachStageInfo.executionBlock.getId().toString(); - y = eachSubQueryInfo.py + 13; - if (eachSubQueryInfo.pos == 0) { - x = eachSubQueryInfo.px; - } else if (eachSubQueryInfo.pos == 1) { - x = eachSubQueryInfo.px - 20; - } else if (eachSubQueryInfo.pos == 2) { - x = eachSubQueryInfo.px + 20; + y = eachStageInfo.py + 13; + if (eachStageInfo.pos == 0) { + x = eachStageInfo.px; + } else if (eachStageInfo.pos == 1) { + x = eachStageInfo.px - 20; + } else if (eachStageInfo.pos == 2) { + x = eachStageInfo.px + 20; } %> <% - List children = masterPlan.getChilds(eachSubQueryInfo.executionBlock.getId()); + List children = masterPlan.getChilds(eachStageInfo.executionBlock.getId()); if (children.size() == 1) { pos = 0; @@ -227,7 +227,7 @@ pos = 1; } for (ExecutionBlock child : children) { - subQueryInfos.add(new SubQueryInfo(child, subQueryMap.get(child.getId()), eachSubQueryInfo.executionBlock.getId(), x, y, pos++)); + stageInfos.add(new StageInfo(child, stageMap.get(child.getId()), eachStageInfo.executionBlock.getId(), x, y, pos++)); } } //end of while %> diff --git a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp index 265937cce8..3aef49dd0f 100644 --- a/tajo-core/src/main/resources/webapps/worker/querytasks.jsp +++ b/tajo-core/src/main/resources/webapps/worker/querytasks.jsp @@ -78,14 +78,14 @@ } Query query = queryMasterTask.getQuery(); - SubQuery subQuery = query.getSubQuery(ebid); + Stage stage = query.getStage(ebid); - if(subQuery == null) { + if(stage == null) { out.write(""); return; } - if(subQuery == null) { + if(stage == null) { %> "); return; } - if(subQuery == null) { + if(stage == null) { %>
IDStateStartedFinishedRunning timeProgressTasks
<%=eachSubQuery.getExecutionBlockId()%><%=eachSubQuery.getState()%><%=df.format(eachSubQuery.getStartTime())%><%=eachSubQuery.getFinishTime() == 0 ? "-" : df.format(eachSubQuery.getFinishTime())%><%=JSPUtil.getElapsedTime(eachSubQuery.getStartTime(), eachSubQuery.getFinishTime())%><%=JSPUtil.percentFormat(eachSubQuery.getProgress())%>%<%=eachSubQuery.getSucceededObjectCount()%>/<%=eachSubQuery.getTotalScheduledObjectsCount()%><%=eachStage.getExecutionBlockId()%><%=eachStage.getState()%><%=df.format(eachStage.getStartTime())%><%=eachStage.getFinishTime() == 0 ? "-" : df.format(eachStage.getFinishTime())%><%=JSPUtil.getElapsedTime(eachStage.getStartTime(), eachStage.getFinishTime())%><%=JSPUtil.percentFormat(eachStage.getProgress())%>%<%=eachStage.getSucceededObjectCount()%>/<%=eachStage.getTotalScheduledObjectsCount()%>