From 1c725cf071247d04b59bfe73f2e28adcfa489be3 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 11 Jan 2016 13:46:14 +0900 Subject: [PATCH 01/20] TAJO-2048: QueryMaster and TajoWorker should support the exception propagation. --- .../org/apache/tajo/exception/ErrorUtil.java | 21 ++++++++++ .../tajo/exception/ReturnStateUtil.java | 13 ++++++- .../tajo/cli/tsql/TestTajoCliNegatives.java | 2 +- .../tajo/worker/MockExecutionBlock.java | 2 +- .../org/apache/tajo/master/QueryInfo.java | 2 + .../org/apache/tajo/master/QueryManager.java | 5 ++- .../tajo/master/TajoMasterClientService.java | 11 ++++-- .../master/event/StageTaskFailedEvent.java | 39 +++++++++++++++++++ .../master/event/TaskFatalErrorEvent.java | 17 ++++---- .../master/event/TaskTAttemptFailedEvent.java | 36 +++++++++++++++++ .../querymaster/DefaultTaskScheduler.java | 4 +- .../org/apache/tajo/querymaster/Query.java | 21 +++++++--- .../apache/tajo/querymaster/QueryMaster.java | 8 +++- .../tajo/querymaster/QueryMasterTask.java | 5 ++- .../org/apache/tajo/querymaster/Stage.java | 28 +++++++++++-- .../org/apache/tajo/querymaster/Task.java | 17 ++++---- .../apache/tajo/querymaster/TaskAttempt.java | 8 ++-- .../tajo/worker/ExecutionBlockContext.java | 11 +++--- .../org/apache/tajo/worker/TaskContainer.java | 2 +- .../org/apache/tajo/worker/TaskExecutor.java | 3 +- .../java/org/apache/tajo/worker/TaskImpl.java | 16 ++------ tajo-core/src/main/proto/ResourceProtos.proto | 7 ++-- 22 files changed, 211 insertions(+), 67 deletions(-) create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java create mode 100644 tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java index 025a20cac1..f32e90d5ae 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java @@ -18,6 +18,8 @@ package org.apache.tajo.exception; +import org.apache.commons.lang.exception.ExceptionUtils; +import org.apache.tajo.error.Errors; import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.error.Stacktrace; @@ -41,4 +43,23 @@ public static Stacktrace.StackTrace convertStacktrace(Throwable t) { } return builder.build(); } + + public static Errors.SerializedException convertException(Throwable t) { + Errors.SerializedException.Builder builder = Errors.SerializedException.newBuilder(); + + if (ExceptionUtil.isExceptionWithResultCode(t)) { + DefaultTajoException tajoException = (DefaultTajoException) t; + builder.setReturnCode(tajoException.getErrorCode()); + builder.setMessage(tajoException.getMessage()); + } else { + Throwable rootCause = ExceptionUtils.getRootCause(t); + if(rootCause != null) t = rootCause; + + builder.setReturnCode(ResultCode.INTERNAL_ERROR); + builder.setMessage(ErrorMessages.getInternalErrorMessage(t)); + } + builder.setStackTrace(ErrorUtil.convertStacktrace(t)); + builder.setTimestamp(System.currentTimeMillis()); + return builder.build(); + } } diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java index 3257f46b2a..a9df8fd815 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java @@ -20,6 +20,7 @@ import com.google.common.base.Preconditions; import org.apache.tajo.QueryId; +import org.apache.tajo.error.Errors; import org.apache.tajo.error.Errors.ResultCode; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.ReturnState; import org.apache.tajo.rpc.protocolrecords.PrimitiveProtos.StringListResponse; @@ -83,9 +84,19 @@ public static ReturnState returnError(Throwable t) { } else { builder.setReturnCode(ResultCode.INTERNAL_ERROR); builder.setMessage(ErrorMessages.getInternalErrorMessage(t)); - builder.setStackTrace(ErrorUtil.convertStacktrace(t)); } + builder.setStackTrace(ErrorUtil.convertStacktrace(t)); + return builder.build(); + } + public static ReturnState returnError(Errors.SerializedException e) { + ReturnState.Builder builder = ReturnState.newBuilder(); + + builder.setReturnCode(e.getReturnCode()); + builder.setMessage(e.getMessage()); + if (e.hasStackTrace()) { + builder.setStackTrace(e.getStackTrace()); + } return builder.build(); } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java index 6d939defda..fcf4546c7f 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCliNegatives.java @@ -141,6 +141,6 @@ public void testQueryFailureOfSimpleQuery() throws Exception { public void testQueryFailure() throws Exception { setVar(tajoCli, SessionVars.CLI_FORMATTER_CLASS, TajoCliOutputTestFormatter.class.getName()); assertScriptFailure("select fail(3, l_orderkey, 'testQueryFailure') from default.lineitem where l_orderkey > 0" , - "ERROR: Internal error. Please check out log files in ${tajo_install_dir}/logs directory.\n"); + "ERROR: internal error: testQueryFailure\n"); } } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java index 7d7fb1a1a0..cbc4312de9 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/worker/MockExecutionBlock.java @@ -36,7 +36,7 @@ public void init() throws Throwable { } @Override - public void fatalError(TaskAttemptId taskAttemptId, String message) { + public void fatalError(TaskAttemptId taskAttemptId, Throwable throwable) { } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java index 6c324d9985..eba633ec3d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryInfo.java @@ -44,6 +44,8 @@ public class QueryInfo implements GsonObject, History, Comparable { private volatile long startTime; @Expose private volatile long finishTime; + + @Deprecated @Expose private String lastMessage; @Expose diff --git a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java index 24ed830eb9..b4f1d66ac7 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/QueryManager.java @@ -346,7 +346,10 @@ private QueryInfo makeQueryInfoFromHeartbeat(ResourceProtos.TajoHeartbeatRequest queryInfo.setQueryMaster(connectionInfo.getHost()); queryInfo.setQueryMasterPort(connectionInfo.getQueryMasterPort()); queryInfo.setQueryMasterclientPort(connectionInfo.getClientPort()); - queryInfo.setLastMessage(queryHeartbeat.getStatusMessage()); + if(queryHeartbeat.hasError()) { + //TODO set error instead of last message + queryInfo.setLastMessage(queryHeartbeat.getError().getMessage()); + } queryInfo.setQueryState(queryHeartbeat.getState()); queryInfo.setProgress(queryHeartbeat.getQueryProgress()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index bb042299cd..1fc894abef 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -39,10 +39,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.QueryNotFoundException; -import org.apache.tajo.exception.ReturnStateUtil; -import org.apache.tajo.exception.UnavailableTableLocationException; -import org.apache.tajo.exception.UndefinedDatabaseException; +import org.apache.tajo.exception.*; import org.apache.tajo.ipc.ClientProtos; import org.apache.tajo.ipc.ClientProtos.*; import org.apache.tajo.ipc.TajoMasterClientProtocol; @@ -61,6 +58,7 @@ import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.ProtoUtil; +import org.apache.tajo.util.StringUtils; import java.net.InetSocketAddress; import java.util.*; @@ -505,6 +503,11 @@ public GetQueryStatusResponse getQueryStatus(RpcController controller, GetQueryS builder.setFinishTime(queryInfo.getFinishTime()); } else { builder.setFinishTime(System.currentTimeMillis()); + + if(!StringUtils.isEmpty(queryInfo.getLastMessage())) { + builder.setErrorMessage(queryInfo.getLastMessage()); + } + builder.setState(OK); } } else { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java new file mode 100644 index 0000000000..731134e4fc --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/StageTaskFailedEvent.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.TaskId; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.master.TaskState; + +/** + * Event Class: From Task to Stage + */ +public class StageTaskFailedEvent extends StageTaskEvent { + private final SerializedException exception; + + public StageTaskFailedEvent(TaskId taskId, SerializedException exception) { + super(taskId, TaskState.FAILED); + this.exception = exception; + } + + public SerializedException getException() { + return exception; + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java index d50fcb889a..351a42bc94 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskFatalErrorEvent.java @@ -20,22 +20,23 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.ResourceProtos.TaskFatalErrorReport; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.exception.ErrorUtil; public class TaskFatalErrorEvent extends TaskAttemptEvent { - private final String message; + private final SerializedException error; public TaskFatalErrorEvent(TaskFatalErrorReport report) { - super(new TaskAttemptId(report.getId()), - TaskAttemptEventType.TA_FATAL_ERROR); - this.message = report.getErrorMessage(); + super(new TaskAttemptId(report.getId()), TaskAttemptEventType.TA_FATAL_ERROR); + this.error = report.getError(); } - public TaskFatalErrorEvent(TaskAttemptId attemptId, String message) { + public TaskFatalErrorEvent(TaskAttemptId attemptId, Throwable e) { super(attemptId, TaskAttemptEventType.TA_FATAL_ERROR); - this.message = message; + this.error = ErrorUtil.convertException(e); } - public String errorMessage() { - return message; + public SerializedException getError() { + return error; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java new file mode 100644 index 0000000000..77e0c4aa7f --- /dev/null +++ b/tajo-core/src/main/java/org/apache/tajo/master/event/TaskTAttemptFailedEvent.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master.event; + +import org.apache.tajo.TaskAttemptId; +import org.apache.tajo.error.Errors.SerializedException; + +public class TaskTAttemptFailedEvent extends TaskTAttemptEvent { + private final SerializedException exception; + + public TaskTAttemptFailedEvent(TaskAttemptId attemptId, + SerializedException exception) { + super(attemptId, TaskEventType.T_ATTEMPT_FAILED); + this.exception = exception; + } + + public SerializedException getException() { + return exception; + } +} diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 8535912d7f..27959952f2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -116,11 +116,11 @@ public void run() { break; } else { LOG.fatal(e.getMessage(), e); - stage.abort(StageState.ERROR); + stage.abort(StageState.ERROR, e); } } catch (Throwable e) { LOG.fatal(e.getMessage(), e); - stage.abort(StageState.ERROR); + stage.abort(StageState.ERROR, e); break; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 599931df3e..9d0f208cf6 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -44,6 +44,8 @@ import org.apache.tajo.engine.planner.global.ExecutionQueue; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.master.event.*; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.util.PlannerUtil; @@ -79,11 +81,12 @@ public class Query implements EventHandler { private long startTime; private long finishTime; private TableDesc resultDesc; - private int completedStagesCount = 0; - private int succeededStagesCount = 0; - private int killedStagesCount = 0; - private int failedStagesCount = 0; - private int erroredStagesCount = 0; + private volatile int completedStagesCount = 0; + private volatile int succeededStagesCount = 0; + private volatile int killedStagesCount = 0; + private volatile int failedStagesCount = 0; + private volatile int erroredStagesCount = 0; + private volatile SerializedException failureReason; private final List diagnostics = new ArrayList<>(); // Internal Variables @@ -341,6 +344,10 @@ public void clearPartitions() { } } + public SerializedException getFailureReason() { + return failureReason; + } + public List getDiagnostics() { readLock.lock(); try { @@ -530,6 +537,8 @@ private QueryState finalizeQuery(Query query, QueryCompletedEvent event) { query.clearPartitions(); } } catch (Throwable e) { + LOG.fatal(e.getMessage(), e); + query.failureReason = ErrorUtil.convertException(e); query.eventHandler.handle(new QueryDiagnosticsUpdateEvent(query.id, ExceptionUtils.getStackTrace(e))); return QueryState.QUERY_ERROR; } @@ -791,8 +800,10 @@ public void transition(Query query, QueryEvent event) { query.killedStagesCount++; } else if (castEvent.getState() == StageState.FAILED) { query.failedStagesCount++; + query.failureReason = query.getStage(castEvent.getExecutionBlockId()).getFailureReason(); } else if (castEvent.getState() == StageState.ERROR) { query.erroredStagesCount++; + query.failureReason = query.getStage(castEvent.getExecutionBlockId()).getFailureReason(); } else { LOG.error(String.format("Invalid Stage (%s) State %s at %s", castEvent.getExecutionBlockId().toString(), castEvent.getState().name(), query.getSynchronizedState().name())); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java index 7104fb9423..adc7b089e8 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMaster.java @@ -37,7 +37,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; import org.apache.tajo.engine.query.QueryContext; -import org.apache.tajo.exception.ReturnStateUtil; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.ipc.QueryCoordinatorProtocol; import org.apache.tajo.ipc.QueryCoordinatorProtocol.QueryCoordinatorProtocolService; import org.apache.tajo.master.event.QueryStartEvent; @@ -351,9 +351,13 @@ private TajoHeartbeatRequest buildTajoHeartBeat(QueryMasterTask queryMasterTask) builder.setResultDesc(queryMasterTask.getQuery().getResultDesc().getProto()); } builder.setQueryProgress(queryMasterTask.getQuery().getProgress()); + + if(queryMasterTask.getQuery().getFailureReason() != null) { + builder.setError(queryMasterTask.getQuery().getFailureReason()); + } } if (queryMasterTask.isInitError()) { - builder.setStatusMessage(ReturnStateUtil.returnError(queryMasterTask.getInitError()).getMessage()); + builder.setError(ErrorUtil.convertException(queryMasterTask.getInitError())); } return builder.build(); } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java index 995a8e5416..6030d9058a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/QueryMasterTask.java @@ -37,6 +37,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.MasterPlan; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.master.event.*; @@ -266,12 +267,12 @@ protected void killTaskAttempt(int workerId, TaskAttemptId taskAttemptId) { if(!callFuture.get().getValue()){ getEventHandler().handle( - new TaskFatalErrorEvent(taskAttemptId, "Can't kill task :" + taskAttemptId)); + new TaskFatalErrorEvent(taskAttemptId, new TajoInternalError("Can't kill task :" + taskAttemptId))); } } catch (Exception e) { /* Node RPC failure */ LOG.error(e.getMessage(), e); - getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e.getMessage())); + getEventHandler().handle(new TaskFatalErrorEvent(taskAttemptId, e)); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 5f050bfc15..b037a29072 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -42,7 +42,10 @@ import org.apache.tajo.engine.planner.global.DataChannel; import org.apache.tajo.engine.planner.global.ExecutionBlock; import org.apache.tajo.engine.planner.global.MasterPlan; +import org.apache.tajo.error.Errors.SerializedException; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.exception.TajoException; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.TajoWorkerProtocol; import org.apache.tajo.master.TaskState; import org.apache.tajo.master.event.*; @@ -63,6 +66,7 @@ import org.apache.tajo.unit.StorageUnit; import org.apache.tajo.util.KeyValueSet; import org.apache.tajo.util.RpcParameterFactory; +import org.apache.tajo.util.TUtil; import org.apache.tajo.util.history.StageHistory; import org.apache.tajo.util.history.TaskHistory; @@ -289,6 +293,7 @@ StageEventType.SQ_KILL, new KillTasksTransition()) private volatile int succeededObjectCount = 0; private volatile int killedObjectCount = 0; private volatile int failedObjectCount = 0; + private volatile SerializedException failureReason; private TaskSchedulerContext schedulerContext; private List hashShuffleIntermediateEntries = Lists.newArrayList(); private AtomicInteger completedShuffleTasks = new AtomicInteger(0); @@ -412,6 +417,10 @@ public int getCompletedTaskCount() { return completedTaskCount; } + public SerializedException getFailureReason() { + return failureReason; + } + public ExecutionBlock getBlock() { return block; } @@ -533,18 +542,26 @@ public void complete() { eventHandler.handle(new StageCompletedEvent(getId(), StageState.SUCCEEDED)); } + public void abort(StageState finalState) { + abort(finalState, null); + } + /** * It finalizes this stage. Unlike {@link Stage#complete()}, * it is invoked when a stage is abnormally finished. * * @param finalState The final stage state + * @param reason The failure reason, if exist */ - public void abort(StageState finalState) { + public void abort(StageState finalState, Throwable reason) { // TODO - // - committer.abortStage(...) // - record Stage Finish Time // - CleanUp Tasks // - Record History + if(reason != null) + failureReason = ErrorUtil.convertException(reason); + cleanup(); setFinishTime(); eventHandler.handle(new StageCompletedEvent(getId(), finalState)); @@ -1229,7 +1246,9 @@ public void transition(Stage stage, } else if (task.getState() == TaskState.KILLED) { stage.killedObjectCount++; } else if (task.getState() == TaskState.FAILED) { + StageTaskFailedEvent failedEvent = TUtil.checkTypeAndGet(event, StageTaskFailedEvent.class); stage.failedObjectCount++; + stage.failureReason = failedEvent.getException(); // if at least one task is failed, try to kill all tasks. stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_KILL)); } @@ -1415,8 +1434,9 @@ public StageState transition(Stage stage, StageEvent stageEvent) { stage.abort(StageState.KILLED); return StageState.KILLED; } else { - LOG.error("Invalid State " + stage.getSynchronizedState() + " State"); - stage.abort(StageState.ERROR); + TajoInternalError error = new TajoInternalError("Invalid State " + stage.getSynchronizedState() + " State"); + LOG.error(error.getMessage(), error); + stage.abort(StageState.ERROR, error); return StageState.ERROR; } } else { @@ -1425,7 +1445,7 @@ public StageState transition(Stage stage, StageEvent stageEvent) { } } catch (Throwable t) { LOG.error(t.getMessage(), t); - stage.abort(StageState.ERROR); + stage.abort(StageState.ERROR, t); return StageState.ERROR; } } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java index 9d038acad7..c4f5a877b2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Task.java @@ -45,6 +45,7 @@ import org.apache.tajo.storage.fragment.Fragment; import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.Pair; +import org.apache.tajo.util.TUtil; import org.apache.tajo.util.TajoIdUtils; import org.apache.tajo.util.history.TaskHistory; @@ -624,10 +625,8 @@ public void transition(Task task, private static class AttemptFailedTransition implements SingleArcTransition { @Override public void transition(Task task, TaskEvent event) { - if (!(event instanceof TaskTAttemptEvent)) { - throw new IllegalArgumentException("event should be a TaskTAttemptEvent type."); - } - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) event; + TaskTAttemptFailedEvent attemptEvent = TUtil.checkTypeAndGet(event, TaskTAttemptFailedEvent.class); + LOG.info("============================================================="); LOG.info(">>> Task Failed: " + attemptEvent.getTaskAttemptId() + " <<<"); LOG.info("============================================================="); @@ -635,7 +634,7 @@ public void transition(Task task, TaskEvent event) { task.finishedAttempts++; task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED)); + task.eventHandler.handle(new StageTaskFailedEvent(task.getId(), attemptEvent.getException())); } } @@ -644,10 +643,8 @@ private static class AttemptFailedOrRetryTransition implements @Override public TaskState transition(Task task, TaskEvent taskEvent) { - if (!(taskEvent instanceof TaskTAttemptEvent)) { - throw new IllegalArgumentException("taskEvent should be a TaskTAttemptEvent type."); - } - TaskTAttemptEvent attemptEvent = (TaskTAttemptEvent) taskEvent; + TaskTAttemptFailedEvent attemptEvent = TUtil.checkTypeAndGet(taskEvent, TaskTAttemptFailedEvent.class); + task.failedAttempts++; task.finishedAttempts++; boolean retry = task.failedAttempts < task.maxAttempts; @@ -663,7 +660,7 @@ public TaskState transition(Task task, TaskEvent taskEvent) { } } else { task.finishTask(); - task.eventHandler.handle(new StageTaskEvent(task.getId(), TaskState.FAILED)); + task.eventHandler.handle(new StageTaskFailedEvent(task.getId(), attemptEvent.getException())); return TaskState.FAILED; } diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java index b5ffc030b5..ed3002af12 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/TaskAttempt.java @@ -409,7 +409,7 @@ public void transition(TaskAttempt taskAttempt, taskAttempt.fillTaskStatistics(report); taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_SUCCEEDED)); } catch (Throwable t) { - taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t.getMessage())); + taskAttempt.eventHandler.handle(new TaskFatalErrorEvent(taskAttempt.getId(), t)); taskAttempt.addDiagnosticInfo(ExceptionUtils.getStackTrace(t)); } } @@ -432,10 +432,10 @@ public void transition(TaskAttempt taskAttempt, TaskAttemptEvent event) { throw new IllegalArgumentException("event should be a TaskFatalErrorEvent type."); } TaskFatalErrorEvent errorEvent = (TaskFatalErrorEvent) event; - taskAttempt.eventHandler.handle(new TaskTAttemptEvent(taskAttempt.getId(), TaskEventType.T_ATTEMPT_FAILED)); - taskAttempt.addDiagnosticInfo(errorEvent.errorMessage()); + taskAttempt.eventHandler.handle(new TaskTAttemptFailedEvent(taskAttempt.getId(), errorEvent.getError())); + taskAttempt.addDiagnosticInfo(errorEvent.getError().getMessage()); LOG.error(taskAttempt.getId() + " FROM " + taskAttempt.getWorkerConnectionInfo().getHost() - + " >> " + errorEvent.errorMessage()); + + " >> " + errorEvent.getError().getMessage()); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 098567a8c4..e675d7063d 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -38,8 +38,9 @@ import org.apache.tajo.TaskAttemptId; import org.apache.tajo.TaskId; import org.apache.tajo.conf.TajoConf; -import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.exception.ErrorUtil; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.serder.PlanProto; @@ -308,13 +309,13 @@ public Map getTaskHistories() { return taskHistories; } - public void fatalError(TaskAttemptId taskAttemptId, String message) { - if (message == null) { - message = "No error message"; + public void fatalError(TaskAttemptId taskAttemptId, Throwable error) { + if (error == null) { + error = new TajoInternalError("No error message"); } TaskFatalErrorReport.Builder builder = TaskFatalErrorReport.newBuilder() .setId(taskAttemptId.getProto()) - .setErrorMessage(message); + .setError(ErrorUtil.convertException(error)); try { //If QueryMaster does not responding, current execution block should be stop diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java index bd28bb73f0..ac37258bfb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskContainer.java @@ -71,7 +71,7 @@ public void run() { if (task != null) { try { task.abort(); - task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), e.getMessage()); + task.getExecutionBlockContext().fatalError(task.getTaskContext().getTaskId(), e); } catch (Throwable t) { LOG.fatal(t.getMessage(), t); } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java index 57f3cd9653..7476580a9c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskExecutor.java @@ -31,6 +31,7 @@ import org.apache.tajo.conf.TajoConf; import org.apache.tajo.conf.TajoConf.ConfVars; import org.apache.tajo.engine.query.TaskRequestImpl; +import org.apache.tajo.exception.TajoInternalError; import org.apache.tajo.resource.NodeResource; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.event.NodeResourceDeallocateEvent; @@ -157,7 +158,7 @@ protected Task createTask(ExecutionBlockContext executionBlockContext, if (executionBlockContext.getTasks().containsKey(taskAttemptId)) { String errorMessage = "Duplicate Task Attempt: " + taskAttemptId; LOG.error(errorMessage); - executionBlockContext.fatalError(taskAttemptId, errorMessage); + executionBlockContext.fatalError(taskAttemptId, new TajoInternalError(errorMessage)); } else { task = new TaskImpl(new TaskRequestImpl(taskRequest), executionBlockContext); executionBlockContext.getTasks().put(task.getTaskContext().getTaskId(), task); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java index 6d9639cdd1..55eb02ab15 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskImpl.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.commons.lang.exception.ExceptionUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -41,6 +40,7 @@ import org.apache.tajo.engine.planner.physical.PhysicalExec; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.engine.query.TaskRequest; +import org.apache.tajo.exception.ErrorUtil; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; import org.apache.tajo.plan.function.python.TajoScriptEngine; @@ -448,18 +448,10 @@ public void run() throws Exception { executionBlockContext.killedTasksNum.incrementAndGet(); } else { context.setState(TaskAttemptState.TA_FAILED); - TaskFatalErrorReport.Builder errorBuilder = - TaskFatalErrorReport.newBuilder() - .setId(getId().getProto()); - if (error != null) { - if (error.getMessage() == null) { - errorBuilder.setErrorMessage(error.getClass().getCanonicalName()); - } else { - errorBuilder.setErrorMessage(error.getMessage()); - } - errorBuilder.setErrorTrace(ExceptionUtils.getStackTrace(error)); - } + TaskFatalErrorReport.Builder errorBuilder = TaskFatalErrorReport.newBuilder(); + errorBuilder.setId(getId().getProto()); + errorBuilder.setError(ErrorUtil.convertException(error)); queryMasterStub.fatalError(null, errorBuilder.build(), NullCallback.get()); executionBlockContext.failedTasksNum.incrementAndGet(); } diff --git a/tajo-core/src/main/proto/ResourceProtos.proto b/tajo-core/src/main/proto/ResourceProtos.proto index 3643a97613..74a475efb2 100644 --- a/tajo-core/src/main/proto/ResourceProtos.proto +++ b/tajo-core/src/main/proto/ResourceProtos.proto @@ -26,6 +26,8 @@ import "TajoIdProtos.proto"; import "CatalogProtos.proto"; import "PrimitiveProtos.proto"; import "Plan.proto"; +import "errors.proto"; +import "stacktrace.proto"; enum ResponseCommand { NORMAL = 1; // ping @@ -114,8 +116,7 @@ message TaskCompletionReport { message TaskFatalErrorReport { required TaskAttemptIdProto id = 1; - optional string error_message = 2; - optional string error_trace = 3; + required tajo.error.SerializedException error = 2; } message FailureIntermediateProto { @@ -215,7 +216,7 @@ message TajoHeartbeatRequest { optional QueryIdProto query_id = 2; optional QueryState state = 3; optional TableDescProto result_desc = 4; - optional string status_message = 5; + optional tajo.error.SerializedException error = 5; optional float query_progress = 6; } From bbbeba86545ba48c89c5d000b7e954c2244b7772 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 11 Jan 2016 18:07:30 +0900 Subject: [PATCH 02/20] fix NPE --- .../main/java/org/apache/tajo/exception/ReturnStateUtil.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java index a9df8fd815..a4e23d2980 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java @@ -85,7 +85,10 @@ public static ReturnState returnError(Throwable t) { builder.setReturnCode(ResultCode.INTERNAL_ERROR); builder.setMessage(ErrorMessages.getInternalErrorMessage(t)); } - builder.setStackTrace(ErrorUtil.convertStacktrace(t)); + + if (t.getStackTrace() != null) { + builder.setStackTrace(ErrorUtil.convertStacktrace(t)); + } return builder.build(); } From 251f64688f1b21367ae9c5c3e83353a62808dcb2 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 11 Jan 2016 18:32:59 +0900 Subject: [PATCH 03/20] Fix unknown source --- .../src/main/java/org/apache/tajo/exception/ErrorUtil.java | 4 ++-- .../main/java/org/apache/tajo/exception/ReturnStateUtil.java | 4 +--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java index f32e90d5ae..9a71bd69eb 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ErrorUtil.java @@ -34,9 +34,9 @@ public static boolean isFailed(ResultCode code) { public static Stacktrace.StackTrace convertStacktrace(Throwable t) { Stacktrace.StackTrace.Builder builder = Stacktrace.StackTrace.newBuilder(); - for (StackTraceElement element: t.getStackTrace()) { + for (StackTraceElement element : t.getStackTrace()) { builder.addElement(Stacktrace.StackTrace.Element.newBuilder() - .setFilename(element.getFileName()) + .setFilename(element.getFileName() == null ? "(Unknown Source)" : element.getFileName()) .setFunction(element.getClassName() + "::" + element.getMethodName()) .setLine(element.getLineNumber()) ); diff --git a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java index a4e23d2980..152442deea 100644 --- a/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java +++ b/tajo-common/src/main/java/org/apache/tajo/exception/ReturnStateUtil.java @@ -86,9 +86,7 @@ public static ReturnState returnError(Throwable t) { builder.setMessage(ErrorMessages.getInternalErrorMessage(t)); } - if (t.getStackTrace() != null) { - builder.setStackTrace(ErrorUtil.convertStacktrace(t)); - } + builder.setStackTrace(ErrorUtil.convertStacktrace(t)); return builder.build(); } From 9739ad3150cfb76b3641102c1255041243194a25 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 12:51:32 +0900 Subject: [PATCH 04/20] remove new instance --- .../engine/planner/physical/RangeShuffleFileWriteExec.java | 3 +-- .../java/org/apache/tajo/querymaster/DefaultTaskScheduler.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java index e4217b30ba..776a7834be 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/RangeShuffleFileWriteExec.java @@ -28,7 +28,6 @@ import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.SortSpec; import org.apache.tajo.catalog.TableMeta; -import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.KeyProjector; import org.apache.tajo.plan.logical.ShuffleFileWriteNode; import org.apache.tajo.plan.util.PlannerUtil; @@ -74,7 +73,7 @@ public void init() throws IOException { keySchema = PlannerUtil.sortSpecsToSchema(sortSpecs); keyProjector = new KeyProjector(inSchema, keySchema.toArray()); - BSTIndex bst = new BSTIndex(new TajoConf()); + BSTIndex bst = new BSTIndex(context.getConf()); this.comp = new BaseTupleComparator(keySchema, sortSpecs); Path storeTablePath = new Path(context.getWorkDir(), "output"); LOG.info("Output data directory: " + storeTablePath); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java index 27959952f2..be30af2f95 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/DefaultTaskScheduler.java @@ -97,7 +97,7 @@ public DefaultTaskScheduler(TaskSchedulerContext context, Stage stage) { @Override public void init(Configuration conf) { tajoConf = TUtil.checkTypeAndGet(conf, TajoConf.class); - rpcParams = RpcParameterFactory.get(new TajoConf()); + rpcParams = RpcParameterFactory.get(tajoConf); scheduledRequests = new ScheduledRequests(); minTaskMemory = tajoConf.getIntVar(TajoConf.ConfVars.TASK_RESOURCE_MINIMUM_MEMORY); From 76b2338c86244e299bcc35caf4f253a5a286c629 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 14:01:34 +0900 Subject: [PATCH 05/20] TAJO-2048 --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 2f752fdfc5..71d0f82914 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,6 @@ Tajo Change Log + Release 0.12.0 - unreleased NEW FEATURES From bf5ab4f5f5bce8f64753d342bf4045c48637a1f1 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 14:01:53 +0900 Subject: [PATCH 06/20] Trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index 71d0f82914..2f752fdfc5 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,5 @@ Tajo Change Log - Release 0.12.0 - unreleased NEW FEATURES From 60ac9ae75b6d1c42b407834e4b3a2cff27c074f7 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 15:16:35 +0900 Subject: [PATCH 07/20] Trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 2f752fdfc5..71d0f82914 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,6 @@ Tajo Change Log + Release 0.12.0 - unreleased NEW FEATURES From ed1398ebf920961a2dd791ffb6d1a4a22ca1f792 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 15:16:51 +0900 Subject: [PATCH 08/20] Trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index 71d0f82914..2f752fdfc5 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,5 @@ Tajo Change Log - Release 0.12.0 - unreleased NEW FEATURES From c43fa9df0aac71895f62bda3ad6c607d904b21db Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 16:14:40 +0900 Subject: [PATCH 09/20] Investigate failure --- .../java/org/apache/tajo/ha/HdfsServiceTracker.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java index 120d61cd58..0a14eb5746 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java @@ -282,12 +282,24 @@ private InetSocketAddress getHostAddress(int type) { public synchronized void delete() throws IOException { stopped = true; + if (checkerThread != null) { + checkerThread.interrupt(); + try { + checkerThread.join(); + } catch (InterruptedException ie) { + LOG.warn("Ignore InterruptedException from PingChecker.join"); + } + } + if (ShutdownHookManager.get().isShutdownInProgress()) return; String fileName = masterName.replaceAll(":", "_"); + fs.cancelDeleteOnExit(new Path(activePath, fileName)); fs.delete(new Path(activePath, fileName), false); + fs.cancelDeleteOnExit(new Path(activePath, fileName)); fs.delete(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE), false); + fs.cancelDeleteOnExit(new Path(backupPath, fileName)); fs.delete(new Path(backupPath, fileName), false); } From 6f620a973b4b74ad15080dbd63655eb90e1dd26b Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 16:58:43 +0900 Subject: [PATCH 10/20] Trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 3d49873441..529c4e325d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,6 @@ Tajo Change Log + Release 0.12.0 - unreleased NEW FEATURES From 4b38102c3053d103eabe8d6f3c8e735112e2aa00 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 16:59:05 +0900 Subject: [PATCH 11/20] Trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index 529c4e325d..3d49873441 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,5 @@ Tajo Change Log - Release 0.12.0 - unreleased NEW FEATURES From 98e7d8dc96c58b7d835cdb72673c4e2e001ce6c3 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 17:28:46 +0900 Subject: [PATCH 12/20] Trigger CI --- .../java/org/apache/tajo/master/TajoMasterClientService.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index 1fc894abef..c3fd7d1b65 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -107,7 +107,7 @@ public void serviceStart() throws Exception { @Override public void serviceStop() throws Exception { if (server != null) { - server.shutdown(); + server.shutdown(true); } super.serviceStop(); } From 2d0ce7743e42c7329580d66c4178cef149ae19fc Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 17:29:17 +0900 Subject: [PATCH 13/20] Trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 3d49873441..529c4e325d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,6 @@ Tajo Change Log + Release 0.12.0 - unreleased NEW FEATURES From a3927676ff5a4ec950fa9fb27a030e002d008796 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 17:29:35 +0900 Subject: [PATCH 14/20] Trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index 529c4e325d..3d49873441 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,5 @@ Tajo Change Log - Release 0.12.0 - unreleased NEW FEATURES From 13db89f6a7125b05324ea6946f43440d6c5a8597 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 18:08:33 +0900 Subject: [PATCH 15/20] Trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 3d49873441..529c4e325d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,6 @@ Tajo Change Log + Release 0.12.0 - unreleased NEW FEATURES From 3e852c3c607afabd8c0c5187ed15d07ccdc95849 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 18:08:50 +0900 Subject: [PATCH 16/20] Trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index 529c4e325d..3d49873441 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,5 @@ Tajo Change Log - Release 0.12.0 - unreleased NEW FEATURES From a1c2dd1b1628587bdb9bb8b5b0abd791a1e45cac Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 18:09:11 +0900 Subject: [PATCH 17/20] Trigger CI --- CHANGES | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGES b/CHANGES index 3d49873441..529c4e325d 100644 --- a/CHANGES +++ b/CHANGES @@ -1,5 +1,6 @@ Tajo Change Log + Release 0.12.0 - unreleased NEW FEATURES From 24a4794380a195f156ea17d8e91f593855afde43 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 18:09:27 +0900 Subject: [PATCH 18/20] Trigger CI --- CHANGES | 1 - 1 file changed, 1 deletion(-) diff --git a/CHANGES b/CHANGES index 529c4e325d..3d49873441 100644 --- a/CHANGES +++ b/CHANGES @@ -1,6 +1,5 @@ Tajo Change Log - Release 0.12.0 - unreleased NEW FEATURES From 6796e88e78e61e8db4d9695b0a34820fd2d661b1 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Mon, 18 Jan 2016 19:24:19 +0900 Subject: [PATCH 19/20] revert investigation of HA failure --- .../java/org/apache/tajo/ha/HdfsServiceTracker.java | 12 ------------ 1 file changed, 12 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java index 0a14eb5746..120d61cd58 100644 --- a/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java +++ b/tajo-core/src/main/java/org/apache/tajo/ha/HdfsServiceTracker.java @@ -282,24 +282,12 @@ private InetSocketAddress getHostAddress(int type) { public synchronized void delete() throws IOException { stopped = true; - if (checkerThread != null) { - checkerThread.interrupt(); - try { - checkerThread.join(); - } catch (InterruptedException ie) { - LOG.warn("Ignore InterruptedException from PingChecker.join"); - } - } - if (ShutdownHookManager.get().isShutdownInProgress()) return; String fileName = masterName.replaceAll(":", "_"); - fs.cancelDeleteOnExit(new Path(activePath, fileName)); fs.delete(new Path(activePath, fileName), false); - fs.cancelDeleteOnExit(new Path(activePath, fileName)); fs.delete(new Path(activePath, HAConstants.ACTIVE_LOCK_FILE), false); - fs.cancelDeleteOnExit(new Path(backupPath, fileName)); fs.delete(new Path(backupPath, fileName), false); } From 74f4b8dcb685ef645b94f4219bc7ae003a7b7ccd Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Fri, 22 Jan 2016 11:05:48 +0900 Subject: [PATCH 20/20] remove duplicate code and refactor useless code --- .../apache/tajo/master/TajoMasterClientService.java | 1 - .../main/java/org/apache/tajo/querymaster/Stage.java | 10 +++------- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java index c3fd7d1b65..bfba51d70b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoMasterClientService.java @@ -507,7 +507,6 @@ public GetQueryStatusResponse getQueryStatus(RpcController controller, GetQueryS if(!StringUtils.isEmpty(queryInfo.getLastMessage())) { builder.setErrorMessage(queryInfo.getLastMessage()); } - builder.setState(OK); } } else { Session session = context.getSessionManager().getSession(request.getSessionId().getId()); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index b037a29072..85086e616a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -1426,18 +1426,14 @@ public StageState transition(Stage stage, StageEvent stageEvent) { stage.getSucceededObjectCount(), stage.killedObjectCount)); - if (stage.killedObjectCount > 0 || stage.failedObjectCount > 0) { + // If the current stage are failed, next stages receives SQ_KILL event + if (stage.killedObjectCount + stage.failedObjectCount > 0) { if (stage.failedObjectCount > 0) { stage.abort(StageState.FAILED); return StageState.FAILED; - } else if (stage.killedObjectCount > 0) { + } else { stage.abort(StageState.KILLED); return StageState.KILLED; - } else { - TajoInternalError error = new TajoInternalError("Invalid State " + stage.getSynchronizedState() + " State"); - LOG.error(error.getMessage(), error); - stage.abort(StageState.ERROR, error); - return StageState.ERROR; } } else { stage.complete();