From 7c98709f0fcb06dfb675acae3d6489a6126f55b5 Mon Sep 17 00:00:00 2001 From: jinossy Date: Wed, 6 Aug 2014 17:43:35 +0900 Subject: [PATCH 1/6] TAJO-995: HiveMetaStoreClient wrapper should retry the connection --- .../tajo/catalog/store/HCatalogStore.java | 11 +++++----- .../store/HCatalogStoreClientPool.java | 20 +++++++++++++++---- .../tajo/catalog/store/HCatalogUtil.java | 6 ++++++ 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java index 6f483481fe..d27f19c51c 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStore.java @@ -48,6 +48,7 @@ import org.apache.tajo.exception.InternalException; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; +import org.apache.thrift.TException; import java.io.IOException; import java.util.*; @@ -77,13 +78,13 @@ public HCatalogStore(final Configuration conf) throws InternalException { @Override public boolean existTable(final String databaseName, final String tableName) throws CatalogException { boolean exist = false; - org.apache.hadoop.hive.ql.metadata.Table table = null; + org.apache.hadoop.hive.ql.metadata.Table table; HCatalogStoreClientPool.HCatalogStoreClient client = null; // get table try { client = clientPool.getClient(); - table = HCatUtil.getTable(client.getHiveClient(), databaseName, tableName); + table = HCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); if (table != null) { exist = true; } @@ -118,7 +119,7 @@ public final CatalogProtos.TableDescProto getTable(String databaseName, final St // get hive table schema try { client = clientPool.getClient(); - table = HCatUtil.getTable(client.getHiveClient(), databaseName, tableName); + table = HCatalogUtil.getTable(client.getHiveClient(), databaseName, tableName); path = table.getPath(); } catch (NoSuchObjectException nsoe) { throw new CatalogException("Table not found. - tableName:" + tableName, nsoe); @@ -291,7 +292,7 @@ public final List getAllTableNames(String databaseName) throws CatalogEx try { client = clientPool.getClient(); return client.getHiveClient().getAllTables(databaseName); - } catch (MetaException e) { + } catch (TException e) { throw new CatalogException(e); } finally { if(client != null) client.release(); @@ -401,7 +402,7 @@ public Collection getAllDatabaseNames() throws CatalogException { try { client = clientPool.getClient(); return client.getHiveClient().getAllDatabases(); - } catch (MetaException e) { + } catch (TException e) { throw new CatalogException(e); } finally { if (client != null) { diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java index 2fff67c722..8ccb100b01 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogStoreClientPool.java @@ -16,7 +16,9 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; +import org.apache.hadoop.hive.metastore.*; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Table; import org.apache.log4j.Logger; import java.util.Iterator; @@ -41,12 +43,22 @@ public class HCatalogStoreClientPool { * connection pool. */ public class HCatalogStoreClient { - private final HiveMetaStoreClient hiveClient; + private final IMetaStoreClient hiveClient; public AtomicBoolean isInUse = new AtomicBoolean(false); private HCatalogStoreClient(HiveConf hiveConf) { try { - this.hiveClient = new HiveMetaStoreClient(hiveConf); + HiveMetaHookLoader hookLoader = new HiveMetaHookLoader() { + @Override + public HiveMetaHook getHook(Table table) throws MetaException { + /* metadata hook implementation, or null if this + * storage handler does not need any metadata notifications + */ + return null; + } + }; + + this.hiveClient = RetryingMetaStoreClient.getProxy(hiveConf, hookLoader, HiveMetaStoreClient.class.getName()); clientPool.add(this); LOG.info("MetaStoreClient created (size = " + clientPool.size() + ")"); } catch (Exception e) { @@ -58,7 +70,7 @@ private HCatalogStoreClient(HiveConf hiveConf) { /** * Returns the internal HiveMetaStoreClient object. */ - public HiveMetaStoreClient getHiveClient() { + public IMetaStoreClient getHiveClient() { return hiveClient; } diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java index 9e60768ad6..54fdb9db84 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hcatalog/src/main/java/org/apache/tajo/catalog/store/HCatalogUtil.java @@ -20,9 +20,11 @@ import com.google.common.base.Preconditions; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat; import org.apache.hadoop.hive.ql.io.HiveSequenceFileOutputFormat; import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.data.schema.HCatFieldSchema; @@ -30,6 +32,7 @@ import org.apache.tajo.catalog.exception.CatalogException; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes; +import org.apache.thrift.TException; import parquet.hadoop.mapred.DeprecatedParquetOutputFormat; public class HCatalogUtil { @@ -137,4 +140,7 @@ public static String getStoreType(String fileFormat) { } } + public static Table getTable(IMetaStoreClient client, String dbName, String tableName) throws TException { + return new Table(client.getTable(dbName, tableName)); + } } From 97910caa09353eb395205b35b9417b2ae51d076a Mon Sep 17 00:00:00 2001 From: Jaehwa Jung Date: Fri, 26 Sep 2014 02:51:13 +0900 Subject: [PATCH 2/6] TAJO-1067: INSERT OVERWRITE INTO should not remove all partitions. --- .../java/org/apache/tajo/SessionVars.java | 4 + .../java/org/apache/tajo/conf/TajoConf.java | 5 +- .../apache/tajo/master/querymaster/Query.java | 133 ++++++++++++++++-- .../engine/query/TestTablePartitions.java | 84 +++++++++-- .../TestTajoCli/testHelpSessionVars.result | 1 + 5 files changed, 206 insertions(+), 21 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index cc875b2c1a..62e2cdfb0f 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -118,6 +118,10 @@ public enum SessionVars implements ConfigKey { // ResultSet ---------------------------------------------------------------- FETCH_ROWNUM(ConfVars.$RESULT_SET_FETCH_ROWNUM, "Sets the number of rows at a time from Master", DEFAULT), + // Column Partition + COLUMN_PARITION_REMOVE_ALL_PARTITIONS(ConfVars.$COLUMN_PARITION_REMOVE_ALL_PARTITIONS, "In Column partition, " + + "INSERT OVERWRITE INTO remove all partitions", DEFAULT), + //------------------------------------------------------------------------------- // Only for Unit Testing //------------------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index b5a9b506cb..48b22e8094 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -359,7 +359,10 @@ public static enum ConfVars implements ConfigKey { $BEHAVIOR_ARITHMETIC_ABORT("tajo.behavior.arithmetic-abort", false), // ResultSet --------------------------------------------------------- - $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200) + $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200), + + // Column Partition + $COLUMN_PARITION_REMOVE_ALL_PARTITIONS("tajo.column-paritition.remove.all-partitions", false) ; public final String varname; diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 7063197b49..788b0881bd 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -431,20 +431,70 @@ public Path commitOutputData(Query query) { // Upon failed, it recovers the original table if possible. boolean movedToOldTable = false; boolean committed = false; + boolean removeAll = queryContext.getBool(SessionVars.COLUMN_PARITION_REMOVE_ALL_PARTITIONS); Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - try { - if (fs.exists(finalOutputDir)) { - fs.rename(finalOutputDir, oldTableDir); - movedToOldTable = fs.exists(oldTableDir); - } else { // if the parent does not exist, make its parent directory. - fs.mkdirs(finalOutputDir.getParent()); + + // INSERT OVERWRITE INTO always moves the result data into the original table location. + // As a result, all existing partitions have been removed. The query should not remove all partitions + // because existing partitions may be a data-set for a production cluster. + if (!removeAll && queryContext.hasPartition()) { + Map renameDirs = TUtil.newHashMap(); + Map recoveryDirs = TUtil.newHashMap(); + + try { + if (!fs.exists(finalOutputDir)) { + fs.mkdirs(finalOutputDir); + } + + visitPartitionedDirectory(fs, stagingResultDir, finalOutputDir, stagingResultDir.toString(), + renameDirs, oldTableDir); + + // Rename target partition directories + for(Map.Entry entry : renameDirs.entrySet()) { + // Backup existing data files for recovering + if (fs.exists(entry.getValue())) { + String recoveryPathString = entry.getValue().toString().replaceAll(finalOutputDir.toString(), + oldTableDir.toString()); + Path recoveryPath = new Path(recoveryPathString); + fs.rename(entry.getValue(), recoveryPath); + fs.exists(recoveryPath); + recoveryDirs.put(entry.getValue(), recoveryPath); + } + // Delete existing directory + fs.deleteOnExit(entry.getValue()); + // Rename staging directory to final output directory + fs.rename(entry.getKey(), entry.getValue()); + } + + } catch (IOException ioe) { + // Remove created dirs + for(Map.Entry entry : renameDirs.entrySet()) { + fs.deleteOnExit(entry.getValue()); + } + + // Recovery renamed dirs + for(Map.Entry entry : recoveryDirs.entrySet()) { + fs.deleteOnExit(entry.getValue()); + fs.rename(entry.getValue(), entry.getKey()); + } + throw new IOException(ioe.getMessage()); } - fs.rename(stagingResultDir, finalOutputDir); - committed = fs.exists(finalOutputDir); - } catch (IOException ioe) { - // recover the old table - if (movedToOldTable && !committed) { - fs.rename(oldTableDir, finalOutputDir); + } else { + try { + if (fs.exists(finalOutputDir)) { + fs.rename(finalOutputDir, oldTableDir); + movedToOldTable = fs.exists(oldTableDir); + } else { // if the parent does not exist, make its parent directory. + fs.mkdirs(finalOutputDir.getParent()); + } + + fs.rename(stagingResultDir, finalOutputDir); + committed = fs.exists(finalOutputDir); + } catch (IOException ioe) { + // recover the old table + if (movedToOldTable && !committed) { + fs.rename(oldTableDir, finalOutputDir); + } } } } else { @@ -494,6 +544,65 @@ public Path commitOutputData(Query query) { return finalOutputDir; } + /** + * This method rename staging directory to final output directory recursively. + * If there exists some data files, this delete it for duplicate data. + * + * + * @param fs + * @param stagingPath + * @param outputPath + * @param stagingParentPathString + * @throws IOException + */ + private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path outputPath, + String stagingParentPathString, + Map renameDirs, Path oldTableDir) throws IOException { + FileStatus[] files = fs.listStatus(stagingPath); + + for(FileStatus eachFile : files) { + if (eachFile.isDirectory()) { + Path oldPath = eachFile.getPath(); + + // Make recover directory. + String recoverPathString = oldPath.toString().replaceAll(stagingParentPathString, + oldTableDir.toString()); + Path recoveryPath = new Path(recoverPathString); + if (!fs.exists(recoveryPath)) { + fs.mkdirs(recoveryPath); + } + + visitPartitionedDirectory(fs, eachFile.getPath(), outputPath, stagingParentPathString, + renameDirs, oldTableDir); + // Find last order partition for renaming + String newPathString = oldPath.toString().replaceAll(stagingParentPathString, + outputPath.toString()); + Path newPath = new Path(newPathString); + if (!hasDirectory(fs, eachFile.getPath())) { + renameDirs.put(eachFile.getPath(), newPath); + } else { + if (!fs.exists(newPath)) { + fs.mkdirs(newPath); + } + } + } + } + } + + private boolean hasDirectory(FileSystem fs, Path path) throws IOException { + boolean retValue = false; + + FileStatus[] files = fs.listStatus(path); + for (FileStatus file : files) { + if (fs.isDirectory(file.getPath())) { + retValue = true; + break; + } + } + + return retValue; + } + private boolean verifyAllFileMoved(FileSystem fs, Path stagingPath) throws IOException { FileStatus[] files = fs.listStatus(stagingPath); if (files != null && files.length != 0) { diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 5bf29446d4..c13422f2f2 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -25,10 +25,7 @@ import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.DeflateCodec; -import org.apache.tajo.QueryId; -import org.apache.tajo.QueryTestCaseBase; -import org.apache.tajo.TajoConstants; -import org.apache.tajo.TajoTestingCluster; +import org.apache.tajo.*; import org.apache.tajo.catalog.CatalogService; import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; @@ -47,6 +44,7 @@ import org.apache.tajo.master.querymaster.SubQuery; import org.apache.tajo.storage.StorageConstants; import org.apache.tajo.util.KeyValueSet; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TajoWorker; import org.junit.Test; @@ -485,6 +483,76 @@ public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Ex "R,3,3,49.0\n" + "R,3,3,49.0\n"; assertEquals(expected, resultSetData); + + // insert overwrite into already exists partitioned table with COLUMN_PARITION_REMOVE_ALL_PARTITIONS is false + res = executeString("insert overwrite into " + tableName + + " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem " + + " where l_orderkey = 1 and l_partkey = 1 and l_linenumber = 1"); + res.close(); + + desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + assertTrue(fs.isDirectory(path)); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=17.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=2/col2=2/col3=38.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=2/col3=45.0"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=3/col2=3/col3=49.0"))); + + if (!testingCluster.isHCatalogStoreRunning()) { + // TODO: If there is existing another partition directory, we must add its rows number to result row numbers. + // assertEquals(6, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from " + tableName + " where col2 = 1"); + resultSetData = resultSetToString(res); + res.close(); + expected = "col4,col1,col2,col3\n" + + "-------------------------------\n" + + "N,1,1,17.0\n" + + "N,1,1,17.0\n" + + "N,1,1,30.0\n" + + "N,1,1,36.0\n" + + "N,1,1,36.0\n"; + + assertEquals(expected, resultSetData); + + Map sessionVariables = TUtil.newHashMap(); + sessionVariables.put(SessionVars.COLUMN_PARITION_REMOVE_ALL_PARTITIONS.keyname(), "true"); + client.updateSessionVariables(sessionVariables); + + // insert overwrite into already exists partitioned table with COLUMN_PARITION_REMOVE_ALL_PARTITIONS is true + res = executeString("insert overwrite into " + tableName + + " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem " + + " where l_orderkey = 1 and l_partkey = 1 and l_linenumber = 1"); + res.close(); + + desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + assertTrue(fs.isDirectory(path)); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1"))); + assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0"))); + + if (!testingCluster.isHCatalogStoreRunning()) { + assertEquals(1, desc.getStats().getNumRows().intValue()); + } + + res = executeString("select * from " + tableName + " where col2 = 1"); + resultSetData = resultSetToString(res); + res.close(); + expected = "col4,col1,col2,col3\n" + + "-------------------------------\n" + + "N,1,1,30.0\n"; + + assertEquals(expected, resultSetData); + + sessionVariables.put(SessionVars.COLUMN_PARITION_REMOVE_ALL_PARTITIONS.keyname(), "false"); + client.updateSessionVariables(sessionVariables); } @Test @@ -888,16 +956,16 @@ public final void TestSpecialCharPartitionKeys2() throws Exception { executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl"); - executeString("CREATE TABLE IF NOT EXISTS pTable947 (id int, name text) PARTITION BY COLUMN (type text)") + executeString("CREATE TABLE IF NOT EXISTS pTable948 (id int, name text) PARTITION BY COLUMN (type text)") .close(); - executeString("INSERT OVERWRITE INTO pTable947 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") + executeString("INSERT OVERWRITE INTO pTable948 SELECT l_orderkey, l_shipinstruct, l_shipmode FROM lineitemspecial") .close(); - ResultSet res = executeString("select * from pTable947 where type='RA:*?> Date: Sat, 27 Sep 2014 16:31:10 +0900 Subject: [PATCH 3/6] Remove a session variable for removing existing all partitions. --- .../java/org/apache/tajo/SessionVars.java | 4 --- .../java/org/apache/tajo/conf/TajoConf.java | 3 -- .../apache/tajo/master/querymaster/Query.java | 3 +- .../engine/query/TestTablePartitions.java | 31 ------------------- .../TestTajoCli/testHelpSessionVars.result | 1 - 5 files changed, 1 insertion(+), 41 deletions(-) diff --git a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java index 62e2cdfb0f..cc875b2c1a 100644 --- a/tajo-common/src/main/java/org/apache/tajo/SessionVars.java +++ b/tajo-common/src/main/java/org/apache/tajo/SessionVars.java @@ -118,10 +118,6 @@ public enum SessionVars implements ConfigKey { // ResultSet ---------------------------------------------------------------- FETCH_ROWNUM(ConfVars.$RESULT_SET_FETCH_ROWNUM, "Sets the number of rows at a time from Master", DEFAULT), - // Column Partition - COLUMN_PARITION_REMOVE_ALL_PARTITIONS(ConfVars.$COLUMN_PARITION_REMOVE_ALL_PARTITIONS, "In Column partition, " - + "INSERT OVERWRITE INTO remove all partitions", DEFAULT), - //------------------------------------------------------------------------------- // Only for Unit Testing //------------------------------------------------------------------------------- diff --git a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java index 48b22e8094..f9f5e4a0ba 100644 --- a/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java +++ b/tajo-common/src/main/java/org/apache/tajo/conf/TajoConf.java @@ -360,9 +360,6 @@ public static enum ConfVars implements ConfigKey { // ResultSet --------------------------------------------------------- $RESULT_SET_FETCH_ROWNUM("tajo.resultset.fetch.rownum", 200), - - // Column Partition - $COLUMN_PARITION_REMOVE_ALL_PARTITIONS("tajo.column-paritition.remove.all-partitions", false) ; public final String varname; diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 788b0881bd..baff284eb4 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -431,13 +431,12 @@ public Path commitOutputData(Query query) { // Upon failed, it recovers the original table if possible. boolean movedToOldTable = false; boolean committed = false; - boolean removeAll = queryContext.getBool(SessionVars.COLUMN_PARITION_REMOVE_ALL_PARTITIONS); Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); // INSERT OVERWRITE INTO always moves the result data into the original table location. // As a result, all existing partitions have been removed. The query should not remove all partitions // because existing partitions may be a data-set for a production cluster. - if (!removeAll && queryContext.hasPartition()) { + if (queryContext.hasPartition()) { Map renameDirs = TUtil.newHashMap(); Map recoveryDirs = TUtil.newHashMap(); diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index c13422f2f2..5acd75585a 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -522,37 +522,6 @@ public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Ex "N,1,1,36.0\n"; assertEquals(expected, resultSetData); - - Map sessionVariables = TUtil.newHashMap(); - sessionVariables.put(SessionVars.COLUMN_PARITION_REMOVE_ALL_PARTITIONS.keyname(), "true"); - client.updateSessionVariables(sessionVariables); - - // insert overwrite into already exists partitioned table with COLUMN_PARITION_REMOVE_ALL_PARTITIONS is true - res = executeString("insert overwrite into " + tableName - + " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem " - + " where l_orderkey = 1 and l_partkey = 1 and l_linenumber = 1"); - res.close(); - - desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); - assertTrue(fs.isDirectory(path)); - assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1"))); - assertTrue(fs.isDirectory(new Path(path.toUri() + "/col1=1/col2=1/col3=30.0"))); - - if (!testingCluster.isHCatalogStoreRunning()) { - assertEquals(1, desc.getStats().getNumRows().intValue()); - } - - res = executeString("select * from " + tableName + " where col2 = 1"); - resultSetData = resultSetToString(res); - res.close(); - expected = "col4,col1,col2,col3\n" + - "-------------------------------\n" + - "N,1,1,30.0\n"; - - assertEquals(expected, resultSetData); - - sessionVariables.put(SessionVars.COLUMN_PARITION_REMOVE_ALL_PARTITIONS.keyname(), "false"); - client.updateSessionVariables(sessionVariables); } @Test diff --git a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result index cdb04fe454..e6b12b1fcb 100644 --- a/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result +++ b/tajo-core/src/test/resources/results/TestTajoCli/testHelpSessionVars.result @@ -35,5 +35,4 @@ Available Session Variables: \set CODEGEN [true or false] - Runtime code generation enabled (experiment) \set ARITHABORT [true or false] - If true, a running query will be terminated when an overflow or divide-by-zero occurs. \set FETCH_ROWNUM [int value] - Sets the number of rows at a time from Master -\set COLUMN_PARITION_REMOVE_ALL_PARTITIONS [true or false] - In Column partition, INSERT OVERWRITE INTO remove all partitions \set DEBUG_ENABLED [true or false] - (debug only) debug mode enabled \ No newline at end of file From 948bbc01ea29abe4593313b182e051cc90ab715d Mon Sep 17 00:00:00 2001 From: Jaehwa Jung Date: Thu, 2 Oct 2014 17:07:42 +0900 Subject: [PATCH 4/6] Update unnecesarry codes. --- .../java/org/apache/tajo/master/querymaster/Query.java | 9 +++------ .../apache/tajo/engine/query/TestTablePartitions.java | 2 +- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index baff284eb4..cf5e76a063 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -433,9 +433,6 @@ public Path commitOutputData(Query query) { boolean committed = false; Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); - // INSERT OVERWRITE INTO always moves the result data into the original table location. - // As a result, all existing partitions have been removed. The query should not remove all partitions - // because existing partitions may be a data-set for a production cluster. if (queryContext.hasPartition()) { Map renameDirs = TUtil.newHashMap(); Map recoveryDirs = TUtil.newHashMap(); @@ -460,7 +457,7 @@ public Path commitOutputData(Query query) { recoveryDirs.put(entry.getValue(), recoveryPath); } // Delete existing directory - fs.deleteOnExit(entry.getValue()); + fs.delete(entry.getValue(), true); // Rename staging directory to final output directory fs.rename(entry.getKey(), entry.getValue()); } @@ -473,7 +470,7 @@ public Path commitOutputData(Query query) { // Recovery renamed dirs for(Map.Entry entry : recoveryDirs.entrySet()) { - fs.deleteOnExit(entry.getValue()); + fs.delete(entry.getValue(), true); fs.rename(entry.getValue(), entry.getKey()); } throw new IOException(ioe.getMessage()); @@ -544,7 +541,7 @@ public Path commitOutputData(Query query) { } /** - * This method rename staging directory to final output directory recursively. + * This method sets a a rename map which includes renamed staging directory to final output directory recursively. * If there exists some data files, this delete it for duplicate data. * * diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 5acd75585a..da09129313 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -484,7 +484,7 @@ public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Ex "R,3,3,49.0\n"; assertEquals(expected, resultSetData); - // insert overwrite into already exists partitioned table with COLUMN_PARITION_REMOVE_ALL_PARTITIONS is false + // Check not to remove existing partition directories. res = executeString("insert overwrite into " + tableName + " select l_returnflag, l_orderkey, l_partkey, 30.0 as l_quantity from lineitem " + " where l_orderkey = 1 and l_partkey = 1 and l_linenumber = 1"); From 219a781233a38f18e3e2a7deab713e0edfa67755 Mon Sep 17 00:00:00 2001 From: Jaehwa Jung Date: Thu, 2 Oct 2014 17:16:54 +0900 Subject: [PATCH 5/6] deleteOnExit -> delete --- .../src/main/java/org/apache/tajo/master/querymaster/Query.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index cf5e76a063..fe94c13d39 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -465,7 +465,7 @@ public Path commitOutputData(Query query) { } catch (IOException ioe) { // Remove created dirs for(Map.Entry entry : renameDirs.entrySet()) { - fs.deleteOnExit(entry.getValue()); + fs.delete(entry.getValue(), true); } // Recovery renamed dirs From d314990a4c22138a62fb1c37bbf019f8af1bea07 Mon Sep 17 00:00:00 2001 From: Jaehwa Jung Date: Sun, 5 Oct 2014 22:04:16 +0900 Subject: [PATCH 6/6] Added more comments. --- .../org/apache/tajo/master/querymaster/Query.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java index 3fb9af88a3..78993652aa 100644 --- a/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/master/querymaster/Query.java @@ -418,7 +418,7 @@ public Path commitOutputData(Query query) { if (queryContext.isOutputOverwrite()) { // INSERT OVERWRITE INTO - // it moves the original table into the temporary location. + // It moves the original table into the temporary location. // Then it moves the new result table into the original table location. // Upon failed, it recovers the original table if possible. boolean movedToOldTable = false; @@ -426,7 +426,11 @@ public Path commitOutputData(Query query) { Path oldTableDir = new Path(queryContext.getStagingDir(), TajoConstants.INSERT_OVERWIRTE_OLD_TABLE_NAME); if (queryContext.hasPartition()) { + // This is a map for existing non-leaf directory to rename. A key is current directory and a value is + // renaming directory. Map renameDirs = TUtil.newHashMap(); + // This is a map for recovering existing partition directory. A key is current directory and a value is + // temporary directory to back up. Map recoveryDirs = TUtil.newHashMap(); try { @@ -533,7 +537,7 @@ public Path commitOutputData(Query query) { } /** - * This method sets a a rename map which includes renamed staging directory to final output directory recursively. + * This method sets a rename map which includes renamed staging directory to final output directory recursively. * If there exists some data files, this delete it for duplicate data. * * @@ -566,7 +570,7 @@ private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path out String newPathString = oldPath.toString().replaceAll(stagingParentPathString, outputPath.toString()); Path newPath = new Path(newPathString); - if (!hasDirectory(fs, eachFile.getPath())) { + if (!isLeafDirectory(fs, eachFile.getPath())) { renameDirs.put(eachFile.getPath(), newPath); } else { if (!fs.exists(newPath)) { @@ -577,7 +581,7 @@ private void visitPartitionedDirectory(FileSystem fs, Path stagingPath, Path out } } - private boolean hasDirectory(FileSystem fs, Path path) throws IOException { + private boolean isLeafDirectory(FileSystem fs, Path path) throws IOException { boolean retValue = false; FileStatus[] files = fs.listStatus(path);