From bb34d42799ab92495f92beaa741c90aa53396367 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Fri, 17 Apr 2015 18:34:28 +0900 Subject: [PATCH 1/3] add shuffle type --- .../main/java/org/apache/tajo/master/TajoContainerProxy.java | 3 +++ tajo-core/src/main/proto/TajoWorkerProtocol.proto | 1 + 2 files changed, 4 insertions(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 1fda7d484c..5311de1668 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -32,6 +32,7 @@ import org.apache.tajo.master.event.TaskFatalErrorEvent; import org.apache.tajo.master.rm.TajoWorkerContainer; import org.apache.tajo.master.rm.TajoWorkerContainerId; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.querymaster.QueryMasterTask; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; @@ -102,6 +103,8 @@ private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContain tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); + PlanProto.ShuffleType type = context.getQuery().getStage(executionBlockId).getDataChannel().getShuffleType(); + TajoWorkerProtocol.RunExecutionBlockRequestProto request = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder() .setExecutionBlockId(executionBlockId.getProto()) diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index b8c9575b35..fe779a9721 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -201,6 +201,7 @@ message RunExecutionBlockRequestProto { required KeyValueSetProto queryContext = 6; required string planJson = 7; + required ShuffleType type = 8; } message ExecutionBlockListProto { From 24b291437fec9ed3796cd3ae57bd6142383cb51e Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Fri, 17 Apr 2015 21:27:32 +0900 Subject: [PATCH 2/3] TAJO-1560: HashShuffle report should be ignored when a succeed tasks are not included --- .../tajo/master/TajoContainerProxy.java | 8 +- .../org/apache/tajo/querymaster/Stage.java | 132 +++++++++++------- .../tajo/util/history/HistoryWriter.java | 30 ++-- .../tajo/worker/ExecutionBlockContext.java | 46 +++--- .../tajo/worker/TajoWorkerManagerService.java | 3 +- .../apache/tajo/worker/TaskRunnerManager.java | 16 ++- .../worker/event/TaskRunnerStartEvent.java | 10 +- .../src/main/proto/TajoWorkerProtocol.proto | 2 +- .../tajo/querymaster/TestKillQuery.java | 3 +- 9 files changed, 150 insertions(+), 100 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java index 5311de1668..2aac00561b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/TajoContainerProxy.java @@ -94,16 +94,15 @@ public void killTaskAttempt(TaskAttemptId taskAttemptId) { } private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContainer container) { - NettyClientBase tajoWorkerRpc = null; + NettyClientBase tajoWorkerRpc; try { - InetSocketAddress myAddr= context.getQueryMasterContext().getWorkerContext() - .getQueryMasterManagerService().getBindAddr(); InetSocketAddress addr = new InetSocketAddress(container.getNodeId().getHost(), container.getNodeId().getPort()); tajoWorkerRpc = RpcClientManager.getInstance().getClient(addr, TajoWorkerProtocol.class, true); TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerRpcClient = tajoWorkerRpc.getStub(); - PlanProto.ShuffleType type = context.getQuery().getStage(executionBlockId).getDataChannel().getShuffleType(); + PlanProto.ShuffleType shuffleType = + context.getQuery().getStage(executionBlockId).getDataChannel().getShuffleType(); TajoWorkerProtocol.RunExecutionBlockRequestProto request = TajoWorkerProtocol.RunExecutionBlockRequestProto.newBuilder() @@ -114,6 +113,7 @@ private void assignExecutionBlock(ExecutionBlockId executionBlockId, TajoContain .setQueryOutputPath(context.getStagingDir().toString()) .setQueryContext(queryContext.getProto()) .setPlanJson(planJson) + .setShuffleType(shuffleType) .build(); tajoWorkerRpcClient.startExecutionBlock(null, request, NullCallback.get()); diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 20add9fa9b..480c7ac9c2 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -239,7 +239,8 @@ StageEventType.SQ_KILL, new KillTasksTransition()) EnumSet.of( StageEventType.SQ_START, StageEventType.SQ_KILL, - StageEventType.SQ_CONTAINER_ALLOCATED)) + StageEventType.SQ_CONTAINER_ALLOCATED, + StageEventType.SQ_SHUFFLE_REPORT)) // Transitions from KILLED state .addTransition(StageState.KILLED, StageState.KILLED, @@ -1300,6 +1301,52 @@ protected void stopFinalization() { stopShuffleReceiver.set(true); } + private void finalizeShuffleReport(StageShuffleReportEvent event, ShuffleType type) { + if(!checkIfNeedFinalizing(type)) return; + + TajoWorkerProtocol.ExecutionBlockReport report = event.getReport(); + + if (!report.getReportSuccess()) { + stopFinalization(); + LOG.error(getId() + ", " + type + " report are failed. Caused by:" + report.getReportErrorMessage()); + eventHandler.handle(new StageEvent(getId(), StageEventType.SQ_FAILED)); + } + + completedShuffleTasks.addAndGet(report.getSucceededTasks()); + if (report.getIntermediateEntriesCount() > 0) { + for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) { + hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm)); + } + } + + if (completedShuffleTasks.get() >= succeededObjectCount) { + LOG.info(getId() + ", Finalized " + type + " reports: " + completedShuffleTasks.get()); + eventHandler.handle(new StageEvent(getId(), StageEventType.SQ_STAGE_COMPLETED)); + if (timeoutChecker != null) { + stopFinalization(); + synchronized (timeoutChecker){ + timeoutChecker.notifyAll(); + } + } + } else { + LOG.info(getId() + ", Received " + type + " reports " + + completedShuffleTasks.get() + "/" + succeededObjectCount); + } + } + + /** + * HASH_SHUFFLE, SCATTERED_HASH_SHUFFLE should get report from worker nodes when ExecutionBlock is stopping. + */ + public static boolean checkIfNeedFinalizing(ShuffleType type) { + switch (type) { + case HASH_SHUFFLE: + case SCATTERED_HASH_SHUFFLE: + return true; + default: + return false; + } + } + private static class StageFinalizeTransition implements SingleArcTransition { @Override @@ -1310,71 +1357,50 @@ public void transition(final Stage stage, StageEvent event) { } stage.lastContactTime = System.currentTimeMillis(); + ShuffleType shuffleType = stage.getDataChannel().getShuffleType(); try { if (event instanceof StageShuffleReportEvent) { - - StageShuffleReportEvent finalizeEvent = (StageShuffleReportEvent) event; - TajoWorkerProtocol.ExecutionBlockReport report = finalizeEvent.getReport(); - - if (!report.getReportSuccess()) { - stage.stopFinalization(); - LOG.error(stage.getId() + ", Shuffle report are failed. Caused by:" + report.getReportErrorMessage()); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); - } - - stage.completedShuffleTasks.addAndGet(finalizeEvent.getReport().getSucceededTasks()); - if (report.getIntermediateEntriesCount() > 0) { - for (IntermediateEntryProto eachInterm : report.getIntermediateEntriesList()) { - stage.hashShuffleIntermediateEntries.add(new IntermediateEntry(eachInterm)); - } - } - - if (stage.completedShuffleTasks.get() >= stage.succeededObjectCount) { - LOG.info(stage.getId() + ", Finalized shuffle reports: " + stage.completedShuffleTasks.get()); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); - if (stage.timeoutChecker != null) { - stage.stopFinalization(); - synchronized (stage.timeoutChecker){ - stage.timeoutChecker.notifyAll(); - } - } - } else { - LOG.info(stage.getId() + ", Received shuffle report: " + - stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount); - } - + stage.finalizeShuffleReport((StageShuffleReportEvent) event, shuffleType); } else { - LOG.info(String.format("Stage finalize - %s (total=%d, success=%d, killed=%d)", + LOG.info(String.format("Stage - %s finalize %s (total=%d, success=%d, killed=%d)", stage.getId().toString(), + shuffleType, stage.totalScheduledObjectsCount, stage.succeededObjectCount, stage.killedObjectCount)); stage.finalizeStage(); - LOG.info(stage.getId() + ", waiting for shuffle reports. expected Tasks:" + stage.succeededObjectCount); + if (checkIfNeedFinalizing(shuffleType)) { + /* wait for StageShuffleReportEvent from worker nodes */ + + LOG.info(stage.getId() + ", wait for " + shuffleType + " reports. expected Tasks:" + + stage.succeededObjectCount); /* FIXME implement timeout handler of stage and task */ - if (stage.timeoutChecker != null) { - stage.timeoutChecker = new Thread(new Runnable() { - @Override - public void run() { - while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) { - long elapsedTime = System.currentTimeMillis() - stage.lastContactTime; - if (elapsedTime > 120 * 1000) { - stage.stopFinalization(); - LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime - + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount); - stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); - } - synchronized (this) { - try { - this.wait(1 * 1000); - } catch (InterruptedException e) { + if (stage.timeoutChecker != null) { + stage.timeoutChecker = new Thread(new Runnable() { + @Override + public void run() { + while (stage.getSynchronizedState() == StageState.FINALIZING && !Thread.interrupted()) { + long elapsedTime = System.currentTimeMillis() - stage.lastContactTime; + if (elapsedTime > 120 * 1000) { + stage.stopFinalization(); + LOG.error(stage.getId() + ": Timed out while receiving intermediate reports: " + elapsedTime + + " ms, report:" + stage.completedShuffleTasks.get() + "/" + stage.succeededObjectCount); + stage.eventHandler.handle(new StageEvent(stage.getId(), StageEventType.SQ_FAILED)); + } + synchronized (this) { + try { + this.wait(1 * 1000); + } catch (InterruptedException e) { + } } } } - } - }); - stage.timeoutChecker.start(); + }); + stage.timeoutChecker.start(); + } + } else { + stage.handle(new StageEvent(stage.getId(), StageEventType.SQ_STAGE_COMPLETED)); } } } catch (Throwable t) { diff --git a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java index f0c6c1134f..e8ba3046b9 100644 --- a/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java +++ b/tajo-core/src/main/java/org/apache/tajo/util/history/HistoryWriter.java @@ -132,7 +132,7 @@ public WriterFuture appendHistory(History history) { } /* asynchronously flush to history file */ - public synchronized WriterFuture appendAndFlush(History history) { + public WriterFuture appendAndFlush(History history) { WriterFuture future = new WriterFuture(history) { public void done(WriterHolder holder) { try { @@ -163,7 +163,7 @@ public synchronized void appendAndSync(History history) } /* Flushing the buffer */ - public synchronized void flushTaskHistories() { + public void flushTaskHistories() { if (historyQueue.size() > 0) { synchronized (writerThread) { writerThread.needTaskFlush.set(true); @@ -244,20 +244,16 @@ public void run() { cal.add(Calendar.HOUR_OF_DAY, -2); String closeTargetTime = df.format(cal.getTime()); List closingTargets = new ArrayList(); - synchronized (taskWriters) { - for (String eachWriterTime : taskWriters.keySet()) { - if (eachWriterTime.compareTo(closeTargetTime) <= 0) { - closingTargets.add(eachWriterTime); - } + + for (String eachWriterTime : taskWriters.keySet()) { + if (eachWriterTime.compareTo(closeTargetTime) <= 0) { + closingTargets.add(eachWriterTime); } } for (String eachWriterTime : closingTargets) { WriterHolder writerHolder; - synchronized (taskWriters) { - writerHolder = taskWriters.remove(eachWriterTime); - } - + writerHolder = taskWriters.remove(eachWriterTime); if (writerHolder != null) { LOG.info("Closing task history file: " + writerHolder.path); IOUtils.cleanup(LOG, writerHolder); @@ -340,7 +336,7 @@ private List> writeHistory(List//query-detail//query.hist @@ -381,7 +377,7 @@ private synchronized void writeQueryHistory(QueryHistory queryHistory) throws Ex } } - private synchronized WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception { + private WriterHolder writeQuerySummary(QueryInfo queryInfo) throws Exception { if(stopped.get()) return null; // writing to HDFS and rolling hourly @@ -409,7 +405,7 @@ private synchronized WriterHolder writeQuerySummary(QueryInfo queryInfo) throws return querySummaryWriter; } - private synchronized void rollingQuerySummaryWriter() throws Exception { + private void rollingQuerySummaryWriter() throws Exception { // finding largest file sequence SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHHmmss"); String currentDateTime = df.format(new Date(System.currentTimeMillis())); @@ -442,7 +438,7 @@ private void flushTaskHistories() { } } - private synchronized WriterHolder writeTaskHistory(TaskHistory taskHistory) throws Exception { + private WriterHolder writeTaskHistory(TaskHistory taskHistory) throws Exception { SimpleDateFormat df = new SimpleDateFormat("yyyyMMddHH"); String taskStartTime = df.format(new Date(taskHistory.getStartTime())); @@ -536,14 +532,14 @@ static class WriterHolder implements Closeable { FSDataOutputStream out; @Override - public synchronized void close() throws IOException { + public void close() throws IOException { if (out != null) out.close(); } /* * Sync buffered data to DataNodes or disks (flush to disk devices). */ - private synchronized void flush() throws IOException { + private void flush() throws IOException { if (out != null) out.hsync(); } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java index 5ffc7a9700..fcf787eecb 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/ExecutionBlockContext.java @@ -20,6 +20,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import io.netty.channel.ConnectTimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileSystem; @@ -34,6 +35,7 @@ import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.ipc.QueryMasterProtocol; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.PlanProto; import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcClientManager; @@ -42,9 +44,6 @@ import org.apache.tajo.util.NetUtils; import org.apache.tajo.util.Pair; -import io.netty.channel.ConnectTimeoutException; -import io.netty.channel.EventLoopGroup; - import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -66,8 +65,6 @@ public class ExecutionBlockContext { public AtomicInteger killedTasksNum = new AtomicInteger(); public AtomicInteger failedTasksNum = new AtomicInteger(); - private EventLoopGroup loopGroup; - // for temporal or intermediate files private FileSystem localFS; // for input files private FileSystem defaultFS; @@ -90,6 +87,8 @@ public class ExecutionBlockContext { private AtomicBoolean stop = new AtomicBoolean(); + private PlanProto.ShuffleType shuffleType; + // It keeps all of the query unit attempts while a TaskRunner is running. private final ConcurrentMap tasks = Maps.newConcurrentMap(); @@ -97,7 +96,8 @@ public class ExecutionBlockContext { public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerContext, TaskRunnerManager manager, QueryContext queryContext, String plan, - ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster) throws Throwable { + ExecutionBlockId executionBlockId, WorkerConnectionInfo queryMaster, + PlanProto.ShuffleType shuffleType) throws Throwable { this.manager = manager; this.executionBlockId = executionBlockId; this.connManager = RpcClientManager.getInstance(); @@ -114,6 +114,7 @@ public ExecutionBlockContext(TajoConf conf, TajoWorker.WorkerContext workerConte this.plan = plan; this.resource = new ExecutionBlockSharedResource(); this.workerContext = workerContext; + this.shuffleType = shuffleType; } public void init() throws Throwable { @@ -193,10 +194,6 @@ public FileSystem getLocalFS() { return localFS; } - public FileSystem getDefaultFS() { - return defaultFS; - } - public LocalDirAllocator getLocalDirAllocator() { return workerContext.getLocalDirAllocator(); } @@ -264,13 +261,30 @@ public TajoWorker.WorkerContext getWorkerContext(){ return workerContext; } - private void sendExecutionBlockReport(ExecutionBlockReport reporter) throws Exception { + /** + * HASH_SHUFFLE, SCATTERED_HASH_SHUFFLE should send report when this executionBlock stopping. + */ + protected void sendShuffleReport() throws Exception { + + switch (shuffleType) { + case HASH_SHUFFLE: + case SCATTERED_HASH_SHUFFLE: + sendHashShuffleReport(executionBlockId); + break; + case NONE_SHUFFLE: + case RANGE_SHUFFLE: + default: + break; + } + } + + private void sendHashShuffleReport(ExecutionBlockId ebId) throws Exception { + /* This case is that worker did not ran tasks */ + if(completedTasksNum.get() == 0) return; + NettyClientBase client = getQueryMasterConnection(); QueryMasterProtocol.QueryMasterProtocolService.Interface stub = client.getStub(); - stub.doneExecutionBlock(null, reporter, NullCallback.get()); - } - protected void reportExecutionBlock(ExecutionBlockId ebId) { ExecutionBlockReport.Builder reporterBuilder = ExecutionBlockReport.newBuilder(); reporterBuilder.setEbId(ebId.getProto()); reporterBuilder.setReportSuccess(true); @@ -281,7 +295,7 @@ protected void reportExecutionBlock(ExecutionBlockId ebId) { getWorkerContext().getHashShuffleAppenderManager().close(ebId); if (shuffles == null) { reporterBuilder.addAllIntermediateEntries(intermediateEntries); - sendExecutionBlockReport(reporterBuilder.build()); + stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get()); return; } @@ -334,7 +348,7 @@ protected void reportExecutionBlock(ExecutionBlockId ebId) { } } try { - sendExecutionBlockReport(reporterBuilder.build()); + stub.doneExecutionBlock(null, reporterBuilder.build(), NullCallback.get()); } catch (Throwable e) { // can't send report to query master LOG.fatal(e.getMessage(), e); diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index 4a097725fc..71d96c4825 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -121,7 +121,8 @@ public void startExecutionBlock(RpcController controller, , new ExecutionBlockId(request.getExecutionBlockId()) , request.getContainerId() , new QueryContext(workerContext.getConf(), request.getQueryContext()), - request.getPlanJson() + request.getPlanJson(), + request.getShuffleType() )); done.run(TajoWorker.TRUE_PROTO); } catch (Throwable t) { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java index b3c28b381a..734a8a5b4b 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunnerManager.java @@ -32,7 +32,6 @@ import org.apache.tajo.worker.event.TaskRunnerStartEvent; import org.apache.tajo.worker.event.TaskRunnerStopEvent; -import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; @@ -155,8 +154,14 @@ public void handle(TaskRunnerEvent event) { if(context == null){ try { - context = new ExecutionBlockContext(getTajoConf(), getWorkerContext(), this, startEvent.getQueryContext(), - startEvent.getPlan(), startEvent.getExecutionBlockId(), startEvent.getQueryMaster()); + context = new ExecutionBlockContext(getTajoConf(), + getWorkerContext(), + this, + startEvent.getQueryContext(), + startEvent.getPlan(), + startEvent.getExecutionBlockId(), + startEvent.getQueryMaster(), + startEvent.getShuffleType()); context.init(); } catch (Throwable e) { LOG.fatal(e.getMessage(), e); @@ -178,10 +183,9 @@ public void handle(TaskRunnerEvent event) { if(executionBlockContext != null){ try { executionBlockContext.getSharedResource().releaseBroadcastCache(event.getExecutionBlockId()); - executionBlockContext.reportExecutionBlock(event.getExecutionBlockId()); - workerContext.getHashShuffleAppenderManager().close(event.getExecutionBlockId()); + executionBlockContext.sendShuffleReport(); workerContext.getTaskHistoryWriter().flushTaskHistories(); - } catch (IOException e) { + } catch (Exception e) { LOG.fatal(e.getMessage(), e); throw new RuntimeException(e); } finally { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java index ff63754ceb..908afa2293 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/event/TaskRunnerStartEvent.java @@ -21,6 +21,7 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.master.cluster.WorkerConnectionInfo; +import org.apache.tajo.plan.serder.PlanProto; public class TaskRunnerStartEvent extends TaskRunnerEvent { @@ -28,17 +29,20 @@ public class TaskRunnerStartEvent extends TaskRunnerEvent { private final WorkerConnectionInfo queryMaster; private final String containerId; private final String plan; + private final PlanProto.ShuffleType shuffleType; public TaskRunnerStartEvent(WorkerConnectionInfo queryMaster, ExecutionBlockId executionBlockId, String containerId, QueryContext context, - String plan) { + String plan, + PlanProto.ShuffleType shuffleType) { super(EventType.START, executionBlockId); this.queryMaster = queryMaster; this.containerId = containerId; this.queryContext = context; this.plan = plan; + this.shuffleType = shuffleType; } public WorkerConnectionInfo getQueryMaster() { @@ -56,4 +60,8 @@ public QueryContext getQueryContext() { public String getPlan() { return plan; } + + public PlanProto.ShuffleType getShuffleType() { + return shuffleType; + } } diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index fe779a9721..fddef8fa81 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -201,7 +201,7 @@ message RunExecutionBlockRequestProto { required KeyValueSetProto queryContext = 6; required string planJson = 7; - required ShuffleType type = 8; + required ShuffleType shuffleType = 8; } message ExecutionBlockListProto { diff --git a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java index 09be700dbc..b2e1ce9587 100644 --- a/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java +++ b/tajo-core/src/test/java/org/apache/tajo/querymaster/TestKillQuery.java @@ -211,7 +211,8 @@ public void testKillTask() throws Throwable { taskRequest.setInterQuery(); TaskAttemptId attemptId = new TaskAttemptId(tid, 1); - ExecutionBlockContext context = new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null); + ExecutionBlockContext context = + new ExecutionBlockContext(conf, null, null, new QueryContext(conf), null, eid, null, null); org.apache.tajo.worker.Task task = new Task("test", CommonTestingUtil.getTestDir(), attemptId, conf, context, taskRequest); From 118368b17b6feaf0945cda732f06942fe9a5c44c Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Sun, 19 Apr 2015 18:10:31 +0900 Subject: [PATCH 3/3] add more description --- tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java index 480c7ac9c2..d933fba1a5 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Stage.java @@ -1336,6 +1336,7 @@ private void finalizeShuffleReport(StageShuffleReportEvent event, ShuffleType ty /** * HASH_SHUFFLE, SCATTERED_HASH_SHUFFLE should get report from worker nodes when ExecutionBlock is stopping. + * RANGE_SHUFFLE report is sent from task reporter when a Task is finished in worker node. */ public static boolean checkIfNeedFinalizing(ShuffleType type) { switch (type) {