From 5b55e875e3a8b060dd08c9d3a981062fd61dfda4 Mon Sep 17 00:00:00 2001 From: jinossy Date: Sun, 3 Aug 2014 23:07:58 +0900 Subject: [PATCH 1/2] TAJO-989: Cleanup of child blocks after parent execution block is complete --- .../tajo/master/querymaster/QueryMaster.java | 30 ++++++++- .../tajo/master/querymaster/SubQuery.java | 24 ++++++-- .../tajo/worker/TajoWorkerManagerService.java | 13 ++++ .../java/org/apache/tajo/worker/Task.java | 7 +-- .../org/apache/tajo/worker/TaskRunner.java | 21 ++++++- .../src/main/proto/TajoWorkerProtocol.proto | 5 ++ .../tajo/worker/TestDeletionService.java | 61 +++++++++++++++++++ 7 files changed, 151 insertions(+), 10 deletions(-) create mode 100644 tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java index f173c249ad..25af82f56a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/QueryMaster.java @@ -18,6 +18,7 @@ package org.apache.tajo.master.querymaster; +import com.google.common.collect.Lists; import com.google.common.collect.Maps; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -28,6 +29,7 @@ import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.tajo.QueryId; +import org.apache.tajo.TajoIdProtos; import org.apache.tajo.TajoProtos; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.GlobalPlanner; @@ -162,6 +164,30 @@ public void stop() { } } + protected void cleanupExecutionBlock(List executionBlockIds) { + LOG.info("cleanup executionBlocks : " + executionBlockIds); + NettyClientBase rpc = null; + List workers = getAllWorker(); + TajoWorkerProtocol.ExecutionBlockListProto.Builder builder = TajoWorkerProtocol.ExecutionBlockListProto.newBuilder(); + builder.addAllExecutionBlockId(Lists.newArrayList(executionBlockIds)); + TajoWorkerProtocol.ExecutionBlockListProto executionBlockListProto = builder.build(); + for (TajoMasterProtocol.WorkerResourceProto worker : workers) { + try { + if (worker.getPeerRpcPort() == 0) continue; + + rpc = connPool.getConnection(NetUtils.createSocketAddr(worker.getHost(), worker.getPeerRpcPort()), + TajoWorkerProtocol.class, true); + TajoWorkerProtocol.TajoWorkerProtocolService tajoWorkerProtocolService = rpc.getStub(); + + tajoWorkerProtocolService.cleanupExecutionBlocks(null, executionBlockListProto, NullCallback.get()); + } catch (Exception e) { + LOG.error(e.getMessage()); + } finally { + connPool.releaseConnection(rpc); + } + } + } + private void cleanup(QueryId queryId) { LOG.info("cleanup query resources : " + queryId); NettyClientBase rpc = null; @@ -338,7 +364,9 @@ public void stopQuery(QueryId queryId) { queryMasterTask.stop(); //if (!systemConf.get(CommonTestingUtil.TAJO_TEST, "FALSE").equalsIgnoreCase("TRUE") // && !workerContext.isYarnContainerMode()) { - cleanup(queryId); // TODO We will support yarn mode + if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) { + cleanup(queryId); + } //} } catch (Exception e) { LOG.error(e.getMessage(), e); diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index f2e9dd56ad..3712d7b203 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -35,6 +35,7 @@ import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryIdFactory; import org.apache.tajo.QueryUnitId; +import org.apache.tajo.TajoIdProtos; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; @@ -363,7 +364,7 @@ public void addTask(QueryUnit task) { * It finalizes this subquery. It is only invoked when the subquery is succeeded. */ public void complete() { - cleanup(); + cleanup(getId()); finalizeStats(); setFinishTime(); eventHandler.handle(new SubQueryCompletedEvent(getId(), SubQueryState.SUCCEEDED)); @@ -381,7 +382,7 @@ public void abort(SubQueryState finalState) { // - record SubQuery Finish Time // - CleanUp Tasks // - Record History - cleanup(); + cleanup(getId()); setFinishTime(); eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState)); } @@ -1004,7 +1005,8 @@ public void transition(SubQuery subQuery, SubQueryEvent event) { LOG.info("SubQuery (" + subQuery.getId() + ") has " + subQuery.containers.size() + " containers!"); subQuery.eventHandler.handle( new TaskRunnerGroupEvent(EventType.CONTAINER_REMOTE_LAUNCH, - subQuery.getId(), allocationEvent.getAllocatedContainer())); + subQuery.getId(), allocationEvent.getAllocatedContainer()) + ); subQuery.eventHandler.handle(new SubQueryEvent(subQuery.getId(), SubQueryEventType.SQ_START)); } catch (Throwable t) { @@ -1104,9 +1106,23 @@ public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { } } - private void cleanup() { + private void cleanup(ExecutionBlockId executionBlockId) { stopScheduler(); releaseContainers(); + + if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) { + List childs = getMasterPlan().getChilds(executionBlockId); + List ebIds = Lists.newArrayList(); + for (ExecutionBlock executionBlock : childs){ + ebIds.add(executionBlock.getId().getProto()); + } + + try { + getContext().getQueryMasterContext().getQueryMaster().cleanupExecutionBlock(ebIds); + } catch (Throwable e) { + LOG.error(e); + } + } } private static class SubQueryCompleteTransition diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java index 13ef15d503..e77da70122 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TajoWorkerManagerService.java @@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.service.CompositeService; +import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.TajoIdProtos; @@ -151,4 +152,16 @@ public void cleanup(RpcController controller, TajoIdProtos.QueryIdProto request, workerContext.cleanup(new QueryId(request).toString()); done.run(TajoWorker.TRUE_PROTO); } + + @Override + public void cleanupExecutionBlocks(RpcController controller, TajoWorkerProtocol.ExecutionBlockListProto request, + RpcCallback done) { + for (TajoIdProtos.ExecutionBlockIdProto executionBlockIdProto : request.getExecutionBlockIdList()) { + String inputDir = TaskRunner.getBaseInputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); + workerContext.cleanup(inputDir); + String outputDir = TaskRunner.getBaseOutputDir(new ExecutionBlockId(executionBlockIdProto)).toString(); + workerContext.cleanup(outputDir); + } + done.run(TajoWorker.TRUE_PROTO); + } } diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index 230c63aee8..bb23c7f2b3 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos; @@ -794,12 +795,10 @@ public void stopCommunicationThread() throws InterruptedException { } } } + public static Path getTaskAttemptDir(QueryUnitAttemptId quid) { Path workDir = - StorageUtil.concatPath( - quid.getQueryUnitId().getExecutionBlockId().getQueryId().toString(), - "in", - quid.getQueryUnitId().getExecutionBlockId().toString(), + StorageUtil.concatPath(TaskRunner.getBaseInputDir(quid.getQueryUnitId().getExecutionBlockId()), String.valueOf(quid.getQueryUnitId().getId()), String.valueOf(quid.getId())); return workDir; diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java index 3fcee06ab9..967619240a 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/TaskRunner.java @@ -45,6 +45,7 @@ import org.apache.tajo.rpc.NettyClientBase; import org.apache.tajo.rpc.NullCallback; import org.apache.tajo.rpc.RpcConnectionPool; +import org.apache.tajo.storage.StorageUtil; import org.apache.tajo.util.TajoIdUtils; import java.net.InetSocketAddress; @@ -172,6 +173,24 @@ public static String getId(ExecutionBlockId executionBlockId, ContainerId contai return executionBlockId + "," + containerId; } + public static Path getBaseOutputDir(ExecutionBlockId executionBlockId){ + Path workDir = + StorageUtil.concatPath( + executionBlockId.getQueryId().toString(), + "output", + String.valueOf(executionBlockId.getId())); + return workDir; + } + + public static Path getBaseInputDir(ExecutionBlockId executionBlockId) { + Path workDir = + StorageUtil.concatPath( + executionBlockId.getQueryId().toString(), + "in", + executionBlockId.toString()); + return workDir; + } + @Override public void init(Configuration conf) { this.systemConf = (TajoConf)conf; @@ -182,7 +201,7 @@ public void init(Configuration conf) { localFS = FileSystem.getLocal(conf); // the base dir for an output dir - baseDir = queryId.toString() + "/output" + "/" + executionBlockId.getId(); + baseDir = getBaseOutputDir(executionBlockId).toString(); // initialize LocalDirAllocator lDirAllocator = new LocalDirAllocator(ConfVars.WORKER_TEMPORAL_DIR.varname); diff --git a/tajo-core/src/main/proto/TajoWorkerProtocol.proto b/tajo-core/src/main/proto/TajoWorkerProtocol.proto index ce8ce86563..dc2b1d7623 100644 --- a/tajo-core/src/main/proto/TajoWorkerProtocol.proto +++ b/tajo-core/src/main/proto/TajoWorkerProtocol.proto @@ -179,6 +179,10 @@ message RunExecutionBlockRequestProto { optional string queryOutputPath = 6; } +message ExecutionBlockListProto { + repeated ExecutionBlockIdProto executionBlockId = 1; +} + service TajoWorkerProtocolService { rpc ping (QueryUnitAttemptIdProto) returns (BoolProto); @@ -186,6 +190,7 @@ service TajoWorkerProtocolService { rpc executeExecutionBlock(RunExecutionBlockRequestProto) returns (BoolProto); rpc killTaskAttempt(QueryUnitAttemptIdProto) returns (BoolProto); rpc cleanup(QueryIdProto) returns (BoolProto); + rpc cleanupExecutionBlocks(ExecutionBlockListProto) returns (BoolProto); } message EnforceProperty { diff --git a/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java b/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java new file mode 100644 index 0000000000..98251c1b85 --- /dev/null +++ b/tajo-core/src/test/java/org/apache/tajo/worker/TestDeletionService.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.worker; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestDeletionService { + DeletionService deletionService; + + @Before + public void setUp() throws Exception { + } + + @After + public void tearDown() { + if(deletionService != null){ + deletionService.stop(); + } + } + + @Test + public final void testTemporalDirectory() throws IOException, InterruptedException { + int delay = 1; + deletionService = new DeletionService(1, delay); + FileSystem fs = FileSystem.getLocal(new Configuration()); + Path tempPath = CommonTestingUtil.getTestDir(); + assertTrue(fs.exists(tempPath)); + deletionService.delete(tempPath); + assertTrue(fs.exists(tempPath)); + + Thread.sleep(delay * 2 * 1000); + assertFalse(fs.exists(tempPath)); + } +} From aeb9cd970adb25d626cde71062a5911ee9dd0c51 Mon Sep 17 00:00:00 2001 From: jinossy Date: Tue, 5 Aug 2014 11:53:22 +0900 Subject: [PATCH 2/2] removed parameter of SubQuery::cleanup and unused import --- .../org/apache/tajo/master/querymaster/SubQuery.java | 10 +++++----- .../src/main/java/org/apache/tajo/worker/Task.java | 1 - 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java index 3712d7b203..17efa21fb1 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/SubQuery.java @@ -364,7 +364,7 @@ public void addTask(QueryUnit task) { * It finalizes this subquery. It is only invoked when the subquery is succeeded. */ public void complete() { - cleanup(getId()); + cleanup(); finalizeStats(); setFinishTime(); eventHandler.handle(new SubQueryCompletedEvent(getId(), SubQueryState.SUCCEEDED)); @@ -382,7 +382,7 @@ public void abort(SubQueryState finalState) { // - record SubQuery Finish Time // - CleanUp Tasks // - Record History - cleanup(getId()); + cleanup(); setFinishTime(); eventHandler.handle(new SubQueryCompletedEvent(getId(), finalState)); } @@ -1106,12 +1106,12 @@ public void transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { } } - private void cleanup(ExecutionBlockId executionBlockId) { + private void cleanup() { stopScheduler(); releaseContainers(); if (!getContext().getConf().getBoolVar(TajoConf.ConfVars.TAJO_DEBUG)) { - List childs = getMasterPlan().getChilds(executionBlockId); + List childs = getMasterPlan().getChilds(getId()); List ebIds = Lists.newArrayList(); for (ExecutionBlock executionBlock : childs){ ebIds.add(executionBlock.getId().getProto()); @@ -1130,7 +1130,7 @@ private static class SubQueryCompleteTransition @Override public SubQueryState transition(SubQuery subQuery, SubQueryEvent subQueryEvent) { - // TODO - Commit subQuery & do cleanup + // TODO - Commit subQuery // TODO - records succeeded, failed, killed completed task // TODO - records metrics try { diff --git a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java index bb23c7f2b3..3a4536af9c 100644 --- a/tajo-core/src/main/java/org/apache/tajo/worker/Task.java +++ b/tajo-core/src/main/java/org/apache/tajo/worker/Task.java @@ -27,7 +27,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.*; import org.apache.hadoop.util.ReflectionUtils; -import org.apache.tajo.ExecutionBlockId; import org.apache.tajo.QueryUnitAttemptId; import org.apache.tajo.TajoConstants; import org.apache.tajo.TajoProtos;