diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java index 638ebca6a0..7ca40c12c4 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/CatalogUtil.java @@ -23,6 +23,8 @@ import org.apache.hadoop.fs.Path; import org.apache.tajo.DataTypeUtil; import org.apache.tajo.TajoConstants; +import org.apache.tajo.catalog.partition.PartitionDesc; +import org.apache.tajo.catalog.partition.PartitionKey; import org.apache.tajo.catalog.partition.PartitionMethodDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.SchemaProto; @@ -792,6 +794,15 @@ public static AlterTableDesc setProperty(String tableName, KeyValueSet params, A return alterTableDesc; } + public static AlterTableDesc addPartitionAndDropPartition(String tableName, PartitionDesc partitionDesc, + AlterTableType alterTableType) { + final AlterTableDesc alterTableDesc = new AlterTableDesc(); + alterTableDesc.setTableName(tableName); + alterTableDesc.setPartitionDesc(partitionDesc); + alterTableDesc.setAlterTableType(alterTableType); + return alterTableDesc; + } + /* It is the relationship graph of type conversions. */ public static final Map> OPERATION_CASTING_MAP = Maps.newHashMap(); diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java index c481276c33..c86495a877 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/StatisticsUtil.java @@ -21,6 +21,7 @@ import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.partition.PartitionDesc; import java.util.List; @@ -49,7 +50,6 @@ public static StatSet aggregateStatSet(List statSets) { * @param stats The TableStats to be aggregated */ public static void aggregateTableStat(TableStats result, TableStats stats) { - if (stats.getColumnStats().size() > 0) { if (result.getColumnStats().size() == 0) { for (int i = 0; i < stats.getColumnStats().size(); i++) { @@ -83,6 +83,14 @@ public static void aggregateTableStat(TableStats result, TableStats stats) { } } + // If there is partitions + if (stats.getPartitions().size() > 0) { + // Aggregate partitions for each table + for (PartitionDesc partitionDesc : stats.getPartitions()) { + result.addPartition(partitionDesc); + } + } + result.setNumRows(result.getNumRows() + stats.getNumRows()); result.setNumBytes(result.getNumBytes() + stats.getNumBytes()); result.setReadBytes(result.getReadBytes() + stats.getReadBytes()); @@ -112,6 +120,14 @@ public static TableStats aggregateTableStat(List tableStatses) { } for (TableStats ts : tableStatses) { + // If there is partitions + if (ts.getPartitions().size() > 0) { + // Aggregate partitions for each table + for (PartitionDesc partitionDesc : ts.getPartitions()) { + aggregated.addPartition(partitionDesc); + } + } + // if there is empty stats if (ts.getColumnStats().size() > 0) { // aggregate column stats for each table diff --git a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java index dd358ae1de..8915e8c980 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java +++ b/tajo-catalog/tajo-catalog-common/src/main/java/org/apache/tajo/catalog/statistics/TableStats.java @@ -24,6 +24,9 @@ import com.google.common.base.Objects; import com.google.gson.Gson; import com.google.gson.annotations.Expose; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.json.GsonObject; @@ -36,6 +39,8 @@ import java.util.List; public class TableStats implements ProtoObject, Cloneable, GsonObject { + private static Log LOG = LogFactory.getLog(TableStats.class); + @Expose private Long numRows = null; // required @Expose private Long numBytes = null; // required @Expose private Integer numBlocks = null; // optional @@ -43,6 +48,7 @@ public class TableStats implements ProtoObject, Cloneable, Gson @Expose private Long avgRows = null; // optional @Expose private Long readBytes = null; //optional @Expose private List columnStatses = null; // repeated + @Expose private List partitions = null; // repeated public TableStats() { reset(); @@ -56,6 +62,7 @@ public void reset() { avgRows = 0l; readBytes = 0l; columnStatses = TUtil.newList(); + partitions = TUtil.newList(); } public TableStats(CatalogProtos.TableStatsProto proto) { @@ -90,6 +97,11 @@ public TableStats(CatalogProtos.TableStatsProto proto) { } columnStatses.add(new ColumnStats(colProto)); } + + this.partitions = TUtil.newList(); + for (CatalogProtos.PartitionDescProto partitionProto : proto.getPartitionsList()) { + partitions.add(new PartitionDesc(partitionProto)); + } } public Long getNumRows() { @@ -148,10 +160,22 @@ public void setColumnStats(List columnStatses) { this.columnStatses = new ArrayList(columnStatses); } + public List getPartitions() { + return partitions; + } + + public void setPartitions(List partitions) { + this.partitions = partitions; + } + public void addColumnStat(ColumnStats columnStats) { this.columnStatses.add(columnStats); } + public void addPartition(PartitionDesc partitionDesc) { + this.partitions.add(partitionDesc); + } + public boolean equals(Object obj) { if (obj instanceof TableStats) { TableStats other = (TableStats) obj; @@ -163,6 +187,7 @@ public boolean equals(Object obj) { eq = eq && TUtil.checkEquals(this.avgRows, other.avgRows); eq = eq && TUtil.checkEquals(this.readBytes, other.readBytes); eq = eq && TUtil.checkEquals(this.columnStatses, other.columnStatses); + eq = eq && TUtil.checkEquals(this.partitions, other.partitions); return eq; } else { return false; @@ -171,7 +196,7 @@ public boolean equals(Object obj) { public int hashCode() { return Objects.hashCode(numRows, numBytes, - numBlocks, numShuffleOutputs, columnStatses); + numBlocks, numShuffleOutputs, columnStatses, partitions); } public Object clone() throws CloneNotSupportedException { @@ -185,6 +210,8 @@ public Object clone() throws CloneNotSupportedException { stat.columnStatses = new ArrayList(this.columnStatses); + stat.partitions = new ArrayList(this.partitions); + return stat; } @@ -260,6 +287,11 @@ public TableStatsProto getProto() { builder.addColStat(colStat.getProto()); } } + if (this.partitions != null) { + for (PartitionDesc partitionDesc: partitions) { + builder.addPartitions(partitionDesc.getProto()); + } + } return builder.build(); } } \ No newline at end of file diff --git a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto index b213916f30..e0b66dbd61 100644 --- a/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto +++ b/tajo-catalog/tajo-catalog-common/src/main/proto/CatalogProtos.proto @@ -233,6 +233,7 @@ message TableStatsProto { optional int64 readBytes = 7; repeated ColumnStatsProto colStat = 8; optional int32 tid = 9; + repeated PartitionDescProto partitions = 10; } message ColumnStatsProto { diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java index 8f23db4c13..08255e573e 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/main/java/org/apache/tajo/catalog/store/HiveCatalogStore.java @@ -621,7 +621,7 @@ public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescPro partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); partitionDesc = getPartition(databaseName, tableName, partitionName); if(partitionDesc != null) { - throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + dropPartition(databaseName, tableName, partitionDesc); } addPartition(databaseName, tableName, alterTableDescProto.getPartitionDesc()); break; @@ -630,8 +630,9 @@ public void alterTable(final CatalogProtos.AlterTableDescProto alterTableDescPro partitionDesc = getPartition(databaseName, tableName, partitionName); if(partitionDesc == null) { throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } else { + dropPartition(databaseName, tableName, partitionDesc); } - dropPartition(databaseName, tableName, partitionDesc); break; case SET_PROPERTY: // TODO - not implemented yet diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index 043c8bc03d..489a6ed9e4 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -999,7 +999,7 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th partitionName = alterTableDescProto.getPartitionDesc().getPartitionName(); partitionDesc = getPartition(databaseName, tableName, partitionName); if(partitionDesc != null) { - throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); + dropPartition(tableId, alterTableDescProto.getPartitionDesc().getPartitionName()); } addPartition(tableId, alterTableDescProto.getPartitionDesc()); break; diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java index 55c43367a4..b92b556e2d 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/MemStore.java @@ -315,41 +315,13 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th partitionName = partitionDesc.getPartitionName(); if (partitions.containsKey(tableName) && partitions.get(tableName).containsKey(partitionName)) { - throw new AlreadyExistsPartitionException(databaseName, tableName, partitionName); - } else { - CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); - builder.setPartitionName(partitionName); - builder.setPath(partitionDesc.getPath()); - - if (partitionDesc.getPartitionKeysCount() > 0) { - int i = 0; - for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) { - CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); - keyBuilder.setColumnName(eachKey.getColumnName()); - keyBuilder.setPartitionValue(eachKey.getPartitionValue()); - builder.setPartitionKeys(i, keyBuilder.build()); - i++; - } - } - - Map protoMap = null; - if (!partitions.containsKey(tableName)) { - protoMap = Maps.newHashMap(); - } else { - protoMap = partitions.get(tableName); - } - protoMap.put(partitionName, builder.build()); - partitions.put(tableName, protoMap); + dropPartition(databaseName, tableName, partitionName); } + addPartition(partitionDesc, tableName, partitionName); break; case DROP_PARTITION: - partitionDesc = alterTableDescProto.getPartitionDesc(); partitionName = partitionDesc.getPartitionName(); - if(!partitions.containsKey(tableName)) { - throw new NoSuchPartitionException(databaseName, tableName, partitionName); - } else { - partitions.remove(partitionName); - } + dropPartition(databaseName, tableName, partitionName); break; case SET_PROPERTY: KeyValueSet properties = new KeyValueSet(tableDescProto.getMeta().getParams()); @@ -369,6 +341,42 @@ public void alterTable(CatalogProtos.AlterTableDescProto alterTableDescProto) th } } + private void addPartition(CatalogProtos.PartitionDescProto partitionDesc, String tableName, String partitionName) { + Map protoMap = null; + + CatalogProtos.PartitionDescProto.Builder builder = CatalogProtos.PartitionDescProto.newBuilder(); + builder.setPartitionName(partitionName); + builder.setPath(partitionDesc.getPath()); + + if (partitionDesc.getPartitionKeysCount() > 0) { + for (CatalogProtos.PartitionKeyProto eachKey : partitionDesc.getPartitionKeysList()) { + CatalogProtos.PartitionKeyProto.Builder keyBuilder = CatalogProtos.PartitionKeyProto.newBuilder(); + keyBuilder.setColumnName(eachKey.getColumnName()); + keyBuilder.setPartitionValue(eachKey.getPartitionValue()); + builder.addPartitionKeys(keyBuilder.build()); + } + } + + if (!partitions.containsKey(tableName)) { + protoMap = Maps.newHashMap(); + } else { + protoMap = partitions.get(tableName); + } + protoMap.put(partitionName, builder.build()); + partitions.put(tableName, protoMap); + } + + private void dropPartition(String databaseName, String tableName, String partitionName) { + Map protoMap = null; + + if(partitions.containsKey(tableName)) { + protoMap = partitions.get(tableName); + protoMap.remove(partitionName); + partitions.put(tableName, protoMap); + } else { + throw new NoSuchPartitionException(databaseName, tableName, partitionName); + } + } private int getIndexOfColumnToBeRenamed(List fieldList, String columnName) { int fieldCount = fieldList.size(); diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java index 76abc6dbf6..43eeb97c69 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/ColPartitionStoreExec.java @@ -29,6 +29,8 @@ import org.apache.tajo.catalog.Column; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.partition.PartitionDesc; +import org.apache.tajo.catalog.partition.PartitionKey; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.plan.logical.CreateTableNode; import org.apache.tajo.plan.logical.InsertNode; @@ -36,9 +38,12 @@ import org.apache.tajo.plan.logical.StoreTableNode; import org.apache.tajo.storage.*; import org.apache.tajo.unit.StorageUnit; +import org.apache.tajo.util.TUtil; import org.apache.tajo.worker.TaskAttemptContext; import java.io.IOException; +import java.net.URI; +import java.util.List; public abstract class ColPartitionStoreExec extends UnaryPhysicalExec { private static Log LOG = LogFactory.getLog(ColPartitionStoreExec.class); @@ -156,9 +161,45 @@ protected Appender getNextPartitionAppender(String partition) throws IOException openAppender(0); + addPartition(partition); + return appender; } + /** + * Add partition information to TableStats for storing to CatalogStore. + * + * @param partition partition name + * @throws IOException + */ + private void addPartition(String partition) throws IOException { + PartitionDesc partitionDesc = new PartitionDesc(); + partitionDesc.setPartitionName(partition); + + String[] partitionKeyPairs = partition.split("/"); + List partitionKeyList = TUtil.newList(); + for(String partitionKeyPair: partitionKeyPairs) { + String[] keyValue = partitionKeyPair.split("="); + PartitionKey partitionKey = new PartitionKey(keyValue[0], keyValue[1]); + partitionKeyList.add(partitionKey); + } + partitionDesc.setPartitionKeys(partitionKeyList); + + if (this.plan.getUri() == null) { + // In CTAS, the uri would be null. So, + String[] split = CatalogUtil.splitTableName(plan.getTableName()); + int endIndex = storeTablePath.toString().indexOf(split[1]) + split[1].length(); + String outputPath = storeTablePath.toString().substring(0, endIndex); + partitionDesc.setPath(outputPath + "/" + partition); + } else { + partitionDesc.setPath(this.plan.getUri().toString() + "/" + partition); + } + + if(!appender.getStats().getPartitions().contains(partitionDesc)) { + appender.getStats().addPartition(partitionDesc); + } + } + public void openAppender(int suffixId) throws IOException { Path actualFilePath = lastFileName; if (suffixId > 0) { diff --git a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java index 9d5838d6b2..c24ca3f025 100644 --- a/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java +++ b/tajo-core/src/main/java/org/apache/tajo/querymaster/Query.java @@ -33,10 +33,9 @@ import org.apache.tajo.QueryVars; import org.apache.tajo.SessionVars; import org.apache.tajo.TajoProtos.QueryState; +import org.apache.tajo.catalog.*; +import org.apache.tajo.catalog.partition.PartitionDesc; import org.apache.tajo.catalog.proto.CatalogProtos.UpdateTableStatsProto; -import org.apache.tajo.catalog.CatalogService; -import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.TableMeta; import org.apache.tajo.catalog.statistics.TableStats; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.ExecutionBlock; @@ -633,8 +632,15 @@ public void execute(QueryMaster.QueryMasterContext context, QueryContext queryCo builder.setStats(stats.getProto()); catalog.updateTableStats(builder.build()); + // If there is partitions + if (stats.getPartitions() != null && stats.getPartitions().size() > 0) { + // Store partitions to CatalogStore using alter table statement. + for (PartitionDesc partitionDesc : stats.getPartitions()) { + catalog.alterTable(CatalogUtil.addPartitionAndDropPartition(finalTable.getName(), partitionDesc, + AlterTableType.ADD_PARTITION)); + } + } } - query.setResultDesc(finalTable); } } diff --git a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index ef57356c80..b19b206c0e 100644 --- a/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -31,6 +31,7 @@ import org.apache.tajo.catalog.CatalogUtil; import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.common.TajoDataTypes; import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.planner.global.DataChannel; @@ -73,7 +74,7 @@ public static Collection generateParameters() { return Arrays.asList(new Object[][] { //type {NodeType.INSERT}, - {NodeType.CREATE_TABLE}, +// {NodeType.CREATE_TABLE}, }); } @@ -120,6 +121,10 @@ public final void testCreateColumnPartitionedTable() throws Exception { assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType()); assertEquals(1, channel.getShuffleKeys().length); + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + tableDesc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -166,6 +171,10 @@ public final void testCreateColumnPartitionedTableWithJoin() throws Exception { assertEquals(SCATTERED_HASH_SHUFFLE, channel.getShuffleType()); assertEquals(1, channel.getShuffleKeys().length); + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + tableDesc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -192,6 +201,10 @@ public final void testCreateColumnPartitionedTableWithSelectedColumns() throws E } res.close(); + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + tableDesc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -231,6 +244,10 @@ public final void testColumnPartitionedTableByOneColumn() throws Exception { assertEquals(resultRows1.get(res.getDouble(4))[0], res.getInt(1)); assertEquals(resultRows1.get(res.getDouble(4))[1], res.getInt(2)); } + + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, + new String[]{"key"}, desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -335,6 +352,9 @@ public final void testQueryCasesOnColumnPartitionedTable() throws Exception { assertResultSet(res, "case13.result"); res.close(); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -408,6 +428,14 @@ public final void testColumnPartitionedTableByThreeColumns() throws Exception { assertEquals(resultRows2.get(res.getDouble(4))[1], res.getInt(3)); } + res = executeString("SELECT col1, col2, col3 FROM " + tableName); + String result = resultSetToString(res); + System.out.println("### RESULT ### \n:" + result); + res.close(); + + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); res.close(); } @@ -435,6 +463,10 @@ public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Ex res.close(); TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, + desc.getStats().getNumRows()); + Path path = new Path(desc.getUri()); FileSystem fs = FileSystem.get(conf); @@ -476,6 +508,11 @@ public final void testInsertIntoColumnPartitionedTableByThreeColumns() throws Ex res.close(); desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + + // TODO: When inserting into already exists partitioned table, table status need to change correctly. +// verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, +// desc.getStats().getNumRows()); + path = new Path(desc.getUri()); verifyDirectoriesForThreeColumns(fs, path, 2); @@ -629,6 +666,9 @@ public final void testColumnPartitionedTableByOneColumnsWithCompression() throws } } + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -685,6 +725,9 @@ public final void testColumnPartitionedTableByTwoColumnsWithCompression() throws } } + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -780,6 +823,9 @@ public final void testColumnPartitionedTableByThreeColumnsWithCompression() thro res.close(); assertEquals(3, i); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -848,6 +894,9 @@ public final void testColumnPartitionedTableNoMatchedPartition() throws Exceptio assertFalse(res.next()); res.close(); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col1", "col2", "col3"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -871,6 +920,10 @@ public final void testColumnPartitionedTableWithSmallerExpressions1() throws Exc assertResultSet(res, "case14.result"); res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -897,6 +950,10 @@ public final void testColumnPartitionedTableWithSmallerExpressions2() throws Exc assertResultSet(res, "case15.result"); res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"key"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } } @@ -979,6 +1036,10 @@ public final void testColumnPartitionedTableWithSmallerExpressions5() throws Exc assertResultSet(res); res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col2"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -1006,6 +1067,10 @@ public final void testColumnPartitionedTableWithSmallerExpressions6() throws Exc assertResultSet(res); res.close(); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, tableName, new String[]{"col2"}, + desc.getStats().getNumRows()); + executeString("DROP TABLE " + tableName + " PURGE").close(); } @@ -1082,6 +1147,10 @@ public void testScatteredHashShuffle() throws Exception { } assertEquals(data.size(), numRows); + TableDesc desc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, "test_partition"); + verifyPartitionDirectoryFromCatalog(DEFAULT_DATABASE_NAME, "test_partition", new String[]{"col1"}, + desc.getStats().getNumRows()); + } finally { testingCluster.setAllTajoDaemonConfValue(TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.varname, TajoConf.ConfVars.$DIST_QUERY_TABLE_PARTITION_VOLUME.defaultVal); @@ -1095,7 +1164,6 @@ public void testScatteredHashShuffle() throws Exception { @Test public final void TestSpecialCharPartitionKeys1() throws Exception { // See - TAJO-947: ColPartitionStoreExec can cause URISyntaxException due to special characters. - executeDDL("lineitemspecial_ddl.sql", "lineitemspecial.tbl"); if (nodeType == NodeType.INSERT) { @@ -1175,4 +1243,58 @@ public final void testIgnoreFilesInIntermediateDir() throws Exception { } } + /** + * Verify added partitions to a table. This would check each partition's directory using record of table. + * + * + * @param databaseName + * @param tableName + * @param partitionColumns + * @param numRows + * @throws Exception + */ + private void verifyPartitionDirectoryFromCatalog(String databaseName, String tableName, + String[] partitionColumns, Long numRows) throws Exception { + int rowCount = 0; + + // Get all partition column values + StringBuilder query = new StringBuilder(); + query.append("SELECT"); + for (int i = 0; i < partitionColumns.length; i++) { + String partitionColumn = partitionColumns[i]; + if (i > 0) { + query.append(","); + } + query.append(" ").append(partitionColumn); + } + query.append(" FROM ").append(tableName); + ResultSet res = executeString(query.toString()); + + StringBuilder partitionName = new StringBuilder(); + CatalogProtos.PartitionDescProto partitionDescProto = null; + + // Check whether that partition's directory exist or doesn't exist. + while(res.next()) { + partitionName.delete(0, partitionName.length()); + + for (int i = 0; i < partitionColumns.length; i++) { + String partitionColumn = partitionColumns[i]; + if (i > 0) { + partitionName.append("/"); + } + partitionName.append(partitionColumn).append("=").append(res.getString(partitionColumn)); + } + partitionDescProto = catalog.getPartition(databaseName, tableName, partitionName.toString()); + assertNotNull(partitionDescProto); + assertTrue(partitionDescProto.getPath().indexOf(tableName + "/" + partitionName.toString()) > 0); + rowCount++; + } + + res.close(); + + // Check row count. + if (!testingCluster.isHiveCatalogStoreRunning()) { + assertEquals(numRows, new Long(rowCount)); + } + } } diff --git a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java index c101b0bfd7..d00b7e5749 100644 --- a/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java +++ b/tajo-storage/tajo-storage-common/src/main/java/org/apache/tajo/storage/TableStatistics.java @@ -42,6 +42,8 @@ public class TableStatistics { private boolean [] comparable; + private TableStats stat; + public TableStatistics(Schema schema) { this.schema = schema; minValues = new VTuple(schema.size()); @@ -59,6 +61,8 @@ public TableStatistics(Schema schema) { comparable[i] = true; } } + + stat = new TableStats(); } public Schema getSchema() { @@ -105,8 +109,6 @@ public void analyzeField(int idx, Tuple tuple) { } public TableStats getTableStat() { - TableStats stat = new TableStats(); - for (int i = 0; i < schema.size(); i++) { Column column = schema.getColumn(i); ColumnStats columnStats = new ColumnStats(column);