From c364dbc08d0cb5d87beb5e8c1d37573c9cdb9c77 Mon Sep 17 00:00:00 2001 From: Jinho Kim Date: Thu, 3 Sep 2015 00:34:24 +0900 Subject: [PATCH] TAJO-993: Cleanup the result data in HDFS after query finished. --- .../java/org/apache/tajo/conf/TajoConf.java | 14 ++++ .../apache/tajo/master/TestQueryResult.java | 74 +++++++++++++++++++ .../testTableResultOnClose.result | 3 + .../testTemporalResultOnClose.result | 7 ++ .../NonForwardQueryResultFileScanner.java | 9 +++ 5 files changed, 107 insertions(+) create mode 100644 tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java create mode 100644 tajo-core-tests/src/test/resources/results/TestQueryResult/testTableResultOnClose.result create mode 100644 tajo-core-tests/src/test/resources/results/TestQueryResult/testTemporalResultOnClose.result diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 75826e69a2..b50ce81624 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.ConfigKey; +import org.apache.tajo.QueryId; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoConstants; import org.apache.tajo.service.BaseServiceTracker; @@ -773,6 +774,19 @@ public static Path getDefaultRootStagingDir(TajoConf conf) throws IOException { return new Path(stagingDirString); } + /** + * It returns the temporal query directory + * An example dir is
/{staging-dir}/{queryId}/RESULT
. + * + * @param conf TajoConf + * @param queryId queryId + * @throws IOException + */ + public static Path getTemporalResultDir(TajoConf conf, QueryId queryId) throws IOException { + Path queryDir = new Path(getDefaultRootStagingDir(conf), queryId.toString()); + return new Path(queryDir, TajoConstants.RESULT_DIR_NAME); + } + public static Path getQueryHistoryDir(TajoConf conf) throws IOException { String historyDirString = conf.getVar(ConfVars.HISTORY_QUERY_DIR); if (!hasScheme(historyDirString)) { diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java new file mode 100644 index 0000000000..6775c84dbb --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/master/TestQueryResult.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.master; + +import org.apache.tajo.QueryId; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.TajoConstants; +import org.apache.tajo.conf.TajoConf; +import org.apache.tajo.jdbc.FetchResultSet; +import org.apache.tajo.jdbc.TajoMemoryResultSet; +import org.apache.hadoop.fs.Path; +import org.junit.Test; + +import java.sql.ResultSet; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class TestQueryResult extends QueryTestCaseBase { + + public TestQueryResult() { + super(TajoConstants.DEFAULT_DATABASE_NAME); + } + + @Test + public final void testTemporalResultOnClose() throws Exception { + + ResultSet res = executeString("select l_orderkey, l_partkey from lineitem where 1=1;"); + QueryId queryId = getQueryId(res); + Path resultPath = TajoConf.getTemporalResultDir(testingCluster.getConfiguration(), queryId); + assertTrue(testingCluster.getDefaultFileSystem().exists(resultPath)); + + assertResultSet(res); + cleanupQuery(res); + assertFalse(testingCluster.getDefaultFileSystem().exists(resultPath)); + } + + @Test + public final void testTableResultOnClose() throws Exception { + + ResultSet res = executeString("select * from lineitem limit 1"); + QueryId queryId = getQueryId(res); + Path resultPath = TajoConf.getTemporalResultDir(testingCluster.getConfiguration(), queryId); + assertFalse(testingCluster.getDefaultFileSystem().exists(resultPath)); + assertResultSet(res); + cleanupQuery(res); + } + + private QueryId getQueryId(ResultSet resultSet) { + if (resultSet instanceof TajoMemoryResultSet) { + return ((TajoMemoryResultSet) resultSet).getQueryId(); + } else if (resultSet instanceof FetchResultSet) { + return ((FetchResultSet) resultSet).getQueryId(); + } else { + throw new IllegalArgumentException(resultSet.toString()); + } + } +} diff --git a/tajo-core-tests/src/test/resources/results/TestQueryResult/testTableResultOnClose.result b/tajo-core-tests/src/test/resources/results/TestQueryResult/testTableResultOnClose.result new file mode 100644 index 0000000000..dd9349f2bb --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestQueryResult/testTableResultOnClose.result @@ -0,0 +1,3 @@ +l_orderkey,l_partkey,l_suppkey,l_linenumber,l_quantity,l_extendedprice,l_discount,l_tax,l_returnflag,l_linestatus,l_shipdate,l_commitdate,l_receiptdate,l_shipinstruct,l_shipmode,l_comment +------------------------------- +1,1,7706,1,17.0,21168.23,0.04,0.02,N,O,1996-03-13,1996-02-12,1996-03-22,DELIVER IN PERSON,TRUCK,egular courts above the \ No newline at end of file diff --git a/tajo-core-tests/src/test/resources/results/TestQueryResult/testTemporalResultOnClose.result b/tajo-core-tests/src/test/resources/results/TestQueryResult/testTemporalResultOnClose.result new file mode 100644 index 0000000000..13785365c7 --- /dev/null +++ b/tajo-core-tests/src/test/resources/results/TestQueryResult/testTemporalResultOnClose.result @@ -0,0 +1,7 @@ +l_orderkey,l_partkey +------------------------------- +1,1 +1,1 +2,2 +3,2 +3,3 \ No newline at end of file diff --git a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java index 877e32bd77..9e132b0f34 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/exec/NonForwardQueryResultFileScanner.java @@ -40,6 +40,7 @@ import org.apache.tajo.storage.fragment.FragmentConvertor; import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; +import org.apache.hadoop.fs.Path; import java.io.IOException; import java.util.ArrayList; @@ -117,6 +118,14 @@ public void close() throws Exception { scanExec.close(); scanExec = null; } + + //remove temporal final output + if (!tajoConf.getBoolVar(TajoConf.ConfVars.$DEBUG_ENABLED)) { + Path temporalResultDir = TajoConf.getTemporalResultDir(tajoConf, queryId); + if (tableDesc.getUri().equals(temporalResultDir.toUri())) { + temporalResultDir.getParent().getFileSystem(tajoConf).delete(temporalResultDir, true); + } + } } public List getNextRows(int fetchRowNum) throws IOException {