From 2a0b095bb5a1f9b551a417028cc4ca3c06766501 Mon Sep 17 00:00:00 2001 From: jhkim Date: Fri, 9 Jan 2015 01:04:39 +0900 Subject: [PATCH 1/4] TAJO-1251: Query is hanging occasionally by shuffle report. --- .../java/org/apache/tajo/conf/TajoConf.java | 3 +- .../tajo/master/event/StageEventType.java | 3 +- .../tajo/master/event/StageFinalizeEvent.java | 38 ++++ .../org/apache/tajo/querymaster/Query.java | 6 +- .../QueryMasterManagerService.java | 2 +- .../org/apache/tajo/querymaster/Stage.java | 211 ++++++++++++------ .../apache/tajo/querymaster/StageState.java | 1 + tajo-dist/pom.xml | 28 +++ 8 files changed, 223 insertions(+), 69 deletions(-) create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index ab11ddddef..74a9271418 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -188,7 +188,8 @@ public static enum ConfVars implements ConfigKey { /** how many launching TaskRunners in parallel */ YARN_RM_QUERY_MASTER_MEMORY_MB("tajo.querymaster.memory-mb", 512, Validators.min("64")), YARN_RM_QUERY_MASTER_DISKS("tajo.yarn-rm.querymaster.disks", 1), - YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", 16), + YARN_RM_TASKRUNNER_LAUNCH_PARALLEL_NUM("tajo.yarn-rm.parallel-task-runner-launcher-num", + Runtime.getRuntime().availableProcessors() * 2), YARN_RM_WORKER_NUMBER_PER_NODE("tajo.yarn-rm.max-worker-num-per-node", 8), // Query Configuration diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java index fa808d4183..6ac3b7559c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java @@ -34,7 +34,8 @@ public enum StageEventType { SQ_TASK_COMPLETED, SQ_FAILED, - // Producer: Completed + // Producer: Stage + SQ_STAGE_FINALIZE, SQ_STAGE_COMPLETED, // Producer: Any component diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java new file mode 100644 index 0000000000..bdbf1152fc --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.ExecutionBlockId; +import org.apache.tajo.ipc.TajoWorkerProtocol; + +/** + * Event Class: From Stage to Stage + */ +public class StageFinalizeEvent extends StageEvent { + private TajoWorkerProtocol.ExecutionBlockReport report; + + public StageFinalizeEvent(ExecutionBlockId executionBlockId, TajoWorkerProtocol.ExecutionBlockReport report) { + super(executionBlockId, StageEventType.SQ_STAGE_FINALIZE); + this.report = report; + } + + public TajoWorkerProtocol.ExecutionBlockReport getReport() { + return report; + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 293269424c..eeff254475 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -659,8 +659,8 @@ public void transition(Query query, QueryEvent event) { // if a stage is succeeded and a query is running if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR. - hasNext(query)) { // there remains at least one stage. - query.getStage(castEvent.getExecutionBlockId()).waitingIntermediateReport(); + hasNext(query)) { + // there remains at least one stage. executeNextBlock(query); } else { // if a query is completed due to finished, kill, failure, or error query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); @@ -692,6 +692,8 @@ private static class KillAllStagesTransition implements SingleArcTransition { private long startTime; private long finishTime; + private AtomicLong lastContactTime = new AtomicLong(); + private Thread timeoutChecker; volatile Map tasks = new ConcurrentHashMap(); volatile Map containers = new ConcurrentHashMap { private static final AllocatedContainersCancelTransition CONTAINERS_CANCEL_TRANSITION = new AllocatedContainersCancelTransition(); private static final StageCompleteTransition STAGE_COMPLETED_TRANSITION = new StageCompleteTransition(); + private static final StageFinalizeTransition STAGE_FINALIZE_TRANSITION = new StageFinalizeTransition(); private StateMachine stateMachine; protected static final StateMachineFactory stateMachineFactory = - new StateMachineFactory (StageState.NEW) + new StateMachineFactory(StageState.NEW) // Transitions from NEW state .addTransition(StageState.NEW, @@ -135,7 +140,7 @@ public class Stage implements EventHandler { StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Transitions from INITED state + // Transitions from INITED state .addTransition(StageState.INITED, StageState.RUNNING, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION) @@ -148,13 +153,16 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Transitions from RUNNING state + // Transitions from RUNNING state .addTransition(StageState.RUNNING, StageState.RUNNING, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION) .addTransition(StageState.RUNNING, StageState.RUNNING, StageEventType.SQ_TASK_COMPLETED, TASK_COMPLETED_TRANSITION) + .addTransition(StageState.RUNNING, StageState.FINALIZE, + StageEventType.SQ_STAGE_FINALIZE, + STAGE_FINALIZE_TRANSITION) .addTransition(StageState.RUNNING, EnumSet.of(StageState.SUCCEEDED, StageState.FAILED), StageEventType.SQ_STAGE_COMPLETED, @@ -171,11 +179,11 @@ StageEventType.SQ_KILL, new KillTasksTransition()) .addTransition(StageState.RUNNING, StageState.ERROR, StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Ignore-able Transition + // Ignore-able Transition .addTransition(StageState.RUNNING, StageState.RUNNING, StageEventType.SQ_START) - // Transitions from KILL_WAIT state + // Transitions from KILL_WAIT state .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) @@ -198,6 +206,24 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) + // Transitions from FINALIZE state + .addTransition(StageState.FINALIZE, StageState.FINALIZE, + StageEventType.SQ_STAGE_FINALIZE, + STAGE_FINALIZE_TRANSITION) + .addTransition(StageState.FINALIZE, + EnumSet.of(StageState.SUCCEEDED, StageState.FAILED), + StageEventType.SQ_STAGE_COMPLETED, + STAGE_COMPLETED_TRANSITION) + .addTransition(StageState.FINALIZE, StageState.FINALIZE, + StageEventType.SQ_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(StageState.FINALIZE, StageState.ERROR, + StageEventType.SQ_INTERNAL_ERROR, + INTERNAL_ERROR_TRANSITION) + // Ignore-able Transition + .addTransition(StageState.FINALIZE, StageState.KILLED, + StageEventType.SQ_KILL) + // Transitions from SUCCEEDED state .addTransition(StageState.SUCCEEDED, StageState.SUCCEEDED, StageEventType.SQ_CONTAINER_ALLOCATED, @@ -215,7 +241,7 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_KILL, StageEventType.SQ_CONTAINER_ALLOCATED)) - // Transitions from KILLED state + // Transitions from KILLED state .addTransition(StageState.KILLED, StageState.KILLED, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) @@ -233,7 +259,7 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_CONTAINER_ALLOCATED, StageEventType.SQ_FAILED)) - // Transitions from FAILED state + // Transitions from FAILED state .addTransition(StageState.FAILED, StageState.FAILED, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) @@ -243,7 +269,7 @@ StageEventType.SQ_KILL, new KillTasksTransition()) .addTransition(StageState.FAILED, StageState.ERROR, StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Ignore-able transitions + // Ignore-able transitions .addTransition(StageState.FAILED, StageState.FAILED, EnumSet.of( StageEventType.SQ_START, @@ -251,14 +277,14 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_CONTAINER_ALLOCATED, StageEventType.SQ_FAILED)) - // Transitions from ERROR state + // Transitions from ERROR state .addTransition(StageState.ERROR, StageState.ERROR, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) .addTransition(StageState.ERROR, StageState.ERROR, StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - // Ignore-able transitions + // Ignore-able transitions .addTransition(StageState.ERROR, StageState.ERROR, EnumSet.of( StageEventType.SQ_START, @@ -273,14 +299,14 @@ StageEventType.SQ_KILL, new KillTasksTransition()) private final Lock writeLock; private int totalScheduledObjectsCount; - private int succeededObjectCount = 0; private int completedTaskCount = 0; - private int succeededTaskCount = 0; + private int succeededObjectCount = 0; private int killedObjectCount = 0; private int failedObjectCount = 0; private TaskSchedulerContext schedulerContext; - private List hashShuffleIntermediateEntries = new ArrayList(); - private AtomicInteger completeReportReceived = new AtomicInteger(0); + private List hashShuffleIntermediateEntries = Lists.newArrayList(); + private AtomicInteger completedShuffleTasks = new AtomicInteger(0); + private AtomicBoolean stopShuffleReceiver = new AtomicBoolean(); private StageHistory finalStageHistory; public Stage(QueryMasterTask.QueryMasterTaskContext context, MasterPlan masterPlan, ExecutionBlock block) { @@ -465,10 +491,16 @@ private StageHistory makeStageHistory() { } /** - * It finalizes this stage. It is only invoked when the stage is succeeded. + * It finalizes this stage. It is only invoked when the stage is finalizing. */ - public void complete() { + public void finalizeStage() { cleanup(); + } + + /** + * It complete this stage. It is only invoked when the stage is succeeded. + */ + public void complete() { finalizeStats(); setFinishTime(); eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED)); @@ -652,7 +684,7 @@ private void stopScheduler() { } private void releaseContainers() { - // If there are still live TaskRunners, try to kill the containers. + // If there are still live TaskRunners, try to kill the containers. and send the shuffle report request eventHandler.handle(new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_CLEANUP, getId(), containers.values())); } @@ -684,6 +716,7 @@ private void finalizeStats() { @Override public void handle(StageEvent event) { + lastContactTime.set(System.currentTimeMillis()); if (LOG.isDebugEnabled()) { LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState=" + getSynchronizedState()); @@ -751,6 +784,7 @@ public void run() { LOG.info(stage.totalScheduledObjectsCount + " objects are scheduled"); if (stage.getTaskScheduler().remainingScheduledObjectNum() == 0) { // if there is no tasks + stage.finalizeStage(); stage.complete(); } else { if(stage.getSynchronizedState() == StageState.INITED) { @@ -1192,16 +1226,19 @@ public void transition(Stage stage, stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); } - LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)", - stage.getId(), - stage.getTotalScheduledObjectsCount(), - stage.succeededObjectCount, - stage.killedObjectCount, - stage.failedObjectCount)); - - if (stage.totalScheduledObjectsCount == - stage.succeededObjectCount + stage.killedObjectCount + stage.failedObjectCount) { - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + if (stage.totalScheduledObjectsCount == stage.completedTaskCount) { + if (stage.succeededObjectCount == stage.completedTaskCount) { + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_FINALIZE)); + } else { + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + } + } else { + LOG.info(String.format("[%s] Task Completion Event (Total: %d, Success: %d, Killed: %d, Failed: %d)", + stage.getId(), + stage.totalScheduledObjectsCount, + stage.succeededObjectCount, + stage.killedObjectCount, + stage.failedObjectCount)); } } } @@ -1244,48 +1281,94 @@ public List getHashShuffleIntermediateEntries() { return hashShuffleIntermediateEntries; } - protected void waitingIntermediateReport() { - LOG.info(getId() + ", waiting IntermediateReport: expectedTaskNum=" + completeReportReceived.get()); - synchronized(completeReportReceived) { - long startTime = System.currentTimeMillis(); - while (true) { - if (completeReportReceived.get() >= tasks.size()) { - LOG.info(getId() + ", completed waiting IntermediateReport"); - return; - } else { - try { - completeReportReceived.wait(10 * 1000); - } catch (InterruptedException e) { + protected void stopFinalization() { + stopShuffleReceiver.set(true); + } + + private static class StageFinalizeTransition implements SingleArcTransition { + + @Override + public void transition(final Stage stage, StageEvent event) { + //If a shuffle report are failed, remaining reports will ignore + if (stage.stopShuffleReceiver.get()) { + return; + } + + stage.lastContactTime.set(System.currentTimeMillis()); + try { + if (event instanceof StageFinalizeEvent) { + + StageFinalizeEvent finalizeEvent = (StageFinalizeEvent) event; + TajoWorkerProtocol.ExecutionBlockReport report = finalizeEvent.getReport(); + + if (!report.getReportSuccess()) { + stage.stopFinalization(); + LOG.error(stage.getId() + ", Shuffle report are failed. Caused by:" + report.getReportErrorMessage()); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); } - long elapsedTime = System.currentTimeMillis() - startTime; - if (elapsedTime >= 120 * 1000) { - LOG.error(getId() + ", Timeout while receiving intermediate reports: " + elapsedTime + " ms"); - abort(StageState.FAILED); - return; + + stage.completedShuffleTasks.addAndGet(finalizeEvent.getReport().getSucceededTasks()); + if (report.getIntermediateEntriesCount() > 0) { + for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) { + stage.hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm)); + } } - } - } - } - } - public void receiveExecutionBlockReport(TajoWorkerProtocol.ExecutionBlockReport report) { - LOG.info(getId() + ", receiveExecutionBlockReport:" + report.getSucceededTasks()); - if (!report.getReportSuccess()) { - LOG.error(getId() + ", ExecutionBlock final report fail cause:" + report.getReportErrorMessage()); - abort(StageState.FAILED); - return; - } - if (report.getIntermediateEntriesCount() > 0) { - synchronized (hashShuffleIntermediateEntries) { - for (IntermediateEntryProto eachInterm: report.getIntermediateEntriesList()) { - hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm)); + if (stage.completedShuffleTasks.get() >= stage.succeededObjectCount) { + LOG.info(stage.getId() + ", Finalized shuffle reports: " + stage.completedShuffleTasks.get()); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); + if (stage.timeoutChecker != null) { + stage.stopFinalization(); + synchronized (stage.timeoutChecker){ + stage.timeoutChecker.notifyAll(); + } + } + } else { + LOG.info(stage.getId() + ", Received shuffle report: " + + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount); + } + + } else { + LOG.info(String.format("Stage finalize - %s (total=%d, success=%d, killed=%d)", + stage.getId().toString(), + stage.totalScheduledObjectsCount, + stage.succeededObjectCount, + stage.killedObjectCount)); + stage.finalizeStage(); + LOG.info(stage.getId() + ", waiting for shuffle reports. expected Tasks:" + stage.succeededObjectCount); + + /* FIXME implement timeout handler of stage and task */ + if (stage.timeoutChecker != null) { + stage.timeoutChecker = new Thread(new Runnable() { + @Override + public void run() { + while (stage.getSynchronizedState() == StageState.FINALIZE && !Thread.interrupted()) { + long elapsedTime = System.currentTimeMillis() - stage.lastContactTime.get(); + if (elapsedTime > 120 * 1000) { + stage.stopFinalization(); + LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime + + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); + } + synchronized (this) { + try { + this.wait(1 * 1000); + } catch (InterruptedException e) { + } + } + } + } + }); + stage.timeoutChecker.start(); + } } + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + stage.stopFinalization(); + stage.eventHandler.handle(new StageDiagnosticsUpdateEvent(stage.getId(), t.getMessage())); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_INTERNAL_ERROR)); } } - synchronized(completeReportReceived) { - completeReportReceived.addAndGet(report.getSucceededTasks()); - completeReportReceived.notifyAll(); - } } private static class StageCompleteTransition implements MultipleArcTransition { diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java index 2fd62be4f2..07a87b191e 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java @@ -22,6 +22,7 @@ public enum StageState { NEW, INITED, RUNNING, + FINALIZE, SUCCEEDED, FAILED, KILL_WAIT, diff --git a/tajo-dist/pom.xml b/tajo-dist/pom.xml index d469ba9d32..3df26815a2 100644 --- a/tajo-dist/pom.xml +++ b/tajo-dist/pom.xml @@ -207,6 +207,34 @@ false + + + org.apache.hadoop + hadoop-yarn-common + + + org.apache.hadoop + hadoop-common + + + commons-el + commons-el + + + tomcat + jasper-runtime + + + tomcat + jasper-compiler + + + org.mortbay.jetty + jsp-2.1-jetty + + + + From 9a201d3d0ffaa36453ae665fe6855a33aeff1f46 Mon Sep 17 00:00:00 2001 From: jhkim Date: Fri, 9 Jan 2015 01:12:02 +0900 Subject: [PATCH 2/4] TAJO-1251: Query is hanging occasionally by shuffle report. --- .../org/apache/tajo/querymaster/Query.java | 4 +--- .../org/apache/tajo/querymaster/Stage.java | 18 +++++++++--------- 2 files changed, 10 insertions(+), 12 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index eeff254475..060e620f4d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -659,8 +659,7 @@ public void transition(Query query, QueryEvent event) { // if a stage is succeeded and a query is running if (castEvent.getState() == StageState.SUCCEEDED && // latest stage succeeded query.getSynchronizedState() == QueryState.QUERY_RUNNING && // current state is not in KILL_WAIT, FAILED, or ERROR. - hasNext(query)) { - // there remains at least one stage. + hasNext(query)) { // there remains at least one stage. executeNextBlock(query); } else { // if a query is completed due to finished, kill, failure, or error query.eventHandler.handle(new QueryCompletedEvent(castEvent.getExecutionBlockId(), castEvent.getState())); @@ -692,7 +691,6 @@ private static class KillAllStagesTransition implements SingleArcTransition { StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Transitions from INITED state + // Transitions from INITED state .addTransition(StageState.INITED, StageState.RUNNING, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION) @@ -153,7 +153,7 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Transitions from RUNNING state + // Transitions from RUNNING state .addTransition(StageState.RUNNING, StageState.RUNNING, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINER_LAUNCH_TRANSITION) @@ -179,11 +179,11 @@ StageEventType.SQ_KILL, new KillTasksTransition()) .addTransition(StageState.RUNNING, StageState.ERROR, StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Ignore-able Transition + // Ignore-able Transition .addTransition(StageState.RUNNING, StageState.RUNNING, StageEventType.SQ_START) - // Transitions from KILL_WAIT state + // Transitions from KILL_WAIT state .addTransition(StageState.KILL_WAIT, StageState.KILL_WAIT, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) @@ -241,7 +241,7 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_KILL, StageEventType.SQ_CONTAINER_ALLOCATED)) - // Transitions from KILLED state + // Transitions from KILLED state .addTransition(StageState.KILLED, StageState.KILLED, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) @@ -259,7 +259,7 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_CONTAINER_ALLOCATED, StageEventType.SQ_FAILED)) - // Transitions from FAILED state + // Transitions from FAILED state .addTransition(StageState.FAILED, StageState.FAILED, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) @@ -269,7 +269,7 @@ StageEventType.SQ_KILL, new KillTasksTransition()) .addTransition(StageState.FAILED, StageState.ERROR, StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Ignore-able transitions + // Ignore-able transitions .addTransition(StageState.FAILED, StageState.FAILED, EnumSet.of( StageEventType.SQ_START, @@ -277,14 +277,14 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_CONTAINER_ALLOCATED, StageEventType.SQ_FAILED)) - // Transitions from ERROR state + // Transitions from ERROR state .addTransition(StageState.ERROR, StageState.ERROR, StageEventType.SQ_CONTAINER_ALLOCATED, CONTAINERS_CANCEL_TRANSITION) .addTransition(StageState.ERROR, StageState.ERROR, StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - // Ignore-able transitions + // Ignore-able transitions .addTransition(StageState.ERROR, StageState.ERROR, EnumSet.of( StageEventType.SQ_START, From e94215ce69c341be260bbd6a705e6db105edefcc Mon Sep 17 00:00:00 2001 From: jhkim Date: Fri, 9 Jan 2015 16:29:31 +0900 Subject: [PATCH 3/4] rename to FINALIZING --- .../tajo/master/event/StageEventType.java | 2 +- .../tajo/master/event/StageFinalizeEvent.java | 2 +- .../org/apache/tajo/querymaster/Stage.java | 45 ++++++++++--------- .../apache/tajo/querymaster/StageState.java | 2 +- 4 files changed, 26 insertions(+), 25 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java index 6ac3b7559c..763d42655a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageEventType.java @@ -35,7 +35,7 @@ public enum StageEventType { SQ_FAILED, // Producer: Stage - SQ_STAGE_FINALIZE, + SQ_SHUFFLE_REPORT, SQ_STAGE_COMPLETED, // Producer: Any component diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java index bdbf1152fc..cd0ebd8d74 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java @@ -28,7 +28,7 @@ public class StageFinalizeEvent extends StageEvent { private TajoWorkerProtocol.ExecutionBlockReport report; public StageFinalizeEvent(ExecutionBlockId executionBlockId, TajoWorkerProtocol.ExecutionBlockReport report) { - super(executionBlockId, StageEventType.SQ_STAGE_FINALIZE); + super(executionBlockId, StageEventType.SQ_SHUFFLE_REPORT); this.report = report; } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 8b0f64ccd0..27517447ee 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -50,22 +50,24 @@ import org.apache.tajo.ipc.TajoWorkerProtocol.DistinctGroupbyEnforcer.MultipleAggregationStage; import org.apache.tajo.ipc.TajoWorkerProtocol.EnforceProperty; import org.apache.tajo.ipc.TajoWorkerProtocol.IntermediateEntryProto; -import org.apache.tajo.master.*; +import org.apache.tajo.master.LaunchTaskRunnersEvent; +import org.apache.tajo.master.TaskRunnerGroupEvent; import org.apache.tajo.master.TaskRunnerGroupEvent.EventType; +import org.apache.tajo.master.TaskState; +import org.apache.tajo.master.container.TajoContainer; +import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.master.event.*; import org.apache.tajo.master.event.TaskAttemptToSchedulerEvent.TaskAttemptScheduleContext; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.util.PlannerUtil; import org.apache.tajo.querymaster.Task.IntermediateEntry; -import org.apache.tajo.master.container.TajoContainer; -import org.apache.tajo.master.container.TajoContainerId; import org.apache.tajo.storage.FileStorageManager; -import org.apache.tajo.plan.util.PlannerUtil; -import org.apache.tajo.plan.logical.*; import org.apache.tajo.storage.StorageManager; import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; -import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.util.history.StageHistory; +import org.apache.tajo.util.history.TaskHistory; import org.apache.tajo.worker.FetchImpl; import java.io.IOException; @@ -73,7 +75,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -104,7 +105,7 @@ public class Stage implements EventHandler { private long startTime; private long finishTime; - private AtomicLong lastContactTime = new AtomicLong(); + private volatile long lastContactTime; private Thread timeoutChecker; volatile Map tasks = new ConcurrentHashMap(); @@ -160,8 +161,8 @@ StageEventType.SQ_KILL, new KillTasksTransition()) .addTransition(StageState.RUNNING, StageState.RUNNING, StageEventType.SQ_TASK_COMPLETED, TASK_COMPLETED_TRANSITION) - .addTransition(StageState.RUNNING, StageState.FINALIZE, - StageEventType.SQ_STAGE_FINALIZE, + .addTransition(StageState.RUNNING, StageState.FINALIZING, + StageEventType.SQ_SHUFFLE_REPORT, STAGE_FINALIZE_TRANSITION) .addTransition(StageState.RUNNING, EnumSet.of(StageState.SUCCEEDED, StageState.FAILED), @@ -206,22 +207,22 @@ StageEventType.SQ_KILL, new KillTasksTransition()) StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) - // Transitions from FINALIZE state - .addTransition(StageState.FINALIZE, StageState.FINALIZE, - StageEventType.SQ_STAGE_FINALIZE, + // Transitions from FINALIZING state + .addTransition(StageState.FINALIZING, StageState.FINALIZING, + StageEventType.SQ_SHUFFLE_REPORT, STAGE_FINALIZE_TRANSITION) - .addTransition(StageState.FINALIZE, + .addTransition(StageState.FINALIZING, EnumSet.of(StageState.SUCCEEDED, StageState.FAILED), StageEventType.SQ_STAGE_COMPLETED, STAGE_COMPLETED_TRANSITION) - .addTransition(StageState.FINALIZE, StageState.FINALIZE, + .addTransition(StageState.FINALIZING, StageState.FINALIZING, StageEventType.SQ_DIAGNOSTIC_UPDATE, DIAGNOSTIC_UPDATE_TRANSITION) - .addTransition(StageState.FINALIZE, StageState.ERROR, + .addTransition(StageState.FINALIZING, StageState.ERROR, StageEventType.SQ_INTERNAL_ERROR, INTERNAL_ERROR_TRANSITION) // Ignore-able Transition - .addTransition(StageState.FINALIZE, StageState.KILLED, + .addTransition(StageState.FINALIZING, StageState.KILLED, StageEventType.SQ_KILL) // Transitions from SUCCEEDED state @@ -716,7 +717,7 @@ private void finalizeStats() { @Override public void handle(StageEvent event) { - lastContactTime.set(System.currentTimeMillis()); + lastContactTime = System.currentTimeMillis(); if (LOG.isDebugEnabled()) { LOG.debug("Processing " + event.getStageId() + " of type " + event.getType() + ", preState=" + getSynchronizedState()); @@ -1228,7 +1229,7 @@ public void transition(Stage stage, if (stage.totalScheduledObjectsCount == stage.completedTaskCount) { if (stage.succeededObjectCount == stage.completedTaskCount) { - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_FINALIZE)); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_SHUFFLE_REPORT)); } else { stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); } @@ -1294,7 +1295,7 @@ public void transition(final Stage stage, StageEvent event) { return; } - stage.lastContactTime.set(System.currentTimeMillis()); + stage.lastContactTime = System.currentTimeMillis(); try { if (event instanceof StageFinalizeEvent) { @@ -1342,8 +1343,8 @@ public void transition(final Stage stage, StageEvent event) { stage.timeoutChecker = new Thread(new Runnable() { @Override public void run() { - while (stage.getSynchronizedState() == StageState.FINALIZE && !Thread.interrupted()) { - long elapsedTime = System.currentTimeMillis() - stage.lastContactTime.get(); + while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) { + long elapsedTime = System.currentTimeMillis() - stage.lastContactTime; if (elapsedTime > 120 * 1000) { stage.stopFinalization(); LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java index 07a87b191e..2d68332fba 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/StageState.java @@ -22,7 +22,7 @@ public enum StageState { NEW, INITED, RUNNING, - FINALIZE, + FINALIZING, SUCCEEDED, FAILED, KILL_WAIT, From a038a3a8ab886f0ed294fb7c3a9399f4e05b29f0 Mon Sep 17 00:00:00 2001 From: jhkim Date: Fri, 9 Jan 2015 17:55:38 +0900 Subject: [PATCH 4/4] rename to StageShuffleReportEvent --- ...StageFinalizeEvent.java => StageShuffleReportEvent.java} | 6 +++--- .../apache/tajo/querymaster/QueryMasterManagerService.java | 2 +- .../src/main/java/org/apache/tajo/querymaster/Stage.java | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) rename tajo-core/src/main/java/org/apache/tajo/master/event/{StageFinalizeEvent.java => StageShuffleReportEvent.java} (81%) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java similarity index 81% rename from tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java rename to tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java index cd0ebd8d74..924fb59540 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/StageFinalizeEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageShuffleReportEvent.java @@ -22,12 +22,12 @@ import org.apache.tajo.ipc.TajoWorkerProtocol; /** - * Event Class: From Stage to Stage + * Event Class: From {@link org.apache.tajo.querymaster.QueryMasterManagerService} to Stage */ -public class StageFinalizeEvent extends StageEvent { +public class StageShuffleReportEvent extends StageEvent { private TajoWorkerProtocol.ExecutionBlockReport report; - public StageFinalizeEvent(ExecutionBlockId executionBlockId, TajoWorkerProtocol.ExecutionBlockReport report) { + public StageShuffleReportEvent(ExecutionBlockId executionBlockId, TajoWorkerProtocol.ExecutionBlockReport report) { super(executionBlockId, StageEventType.SQ_SHUFFLE_REPORT); this.report = report; } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java index 34f181bed9..85cc553d84 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterManagerService.java @@ -220,7 +220,7 @@ public void doneExecutionBlock( QueryMasterTask queryMasterTask = queryMaster.getQueryMasterTask(new QueryId(request.getEbId().getQueryId())); if (queryMasterTask != null) { ExecutionBlockId ebId = new ExecutionBlockId(request.getEbId()); - queryMasterTask.getEventHandler().handle(new StageFinalizeEvent(ebId, request)); + queryMasterTask.getEventHandler().handle(new StageShuffleReportEvent(ebId, request)); } done.run(TajoWorker.TRUE_PROTO); } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 27517447ee..1ea7051b7f 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -1297,9 +1297,9 @@ public void transition(final Stage stage, StageEvent event) { stage.lastContactTime = System.currentTimeMillis(); try { - if (event instanceof StageFinalizeEvent) { + if (event instanceof StageShuffleReportEvent) { - StageFinalizeEvent finalizeEvent = (StageFinalizeEvent) event; + StageShuffleReportEvent finalizeEvent = (StageShuffleReportEvent) event; TajoWorkerProtocol.ExecutionBlockReport report = finalizeEvent.getReport(); if (!report.getReportSuccess()) {