From f10864b95e99d55fea580e369a1c0d7df0758ecb Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 14 Oct 2015 10:30:46 +0900 Subject: [PATCH 1/6] Add list of partitions to LogicalNode --- .../plan/logical/PartitionedTableScanNode.java | 16 ++++++++++++++-- .../plan/serder/LogicalNodeDeserializer.java | 1 + .../tajo/plan/serder/LogicalNodeSerializer.java | 4 ++++ tajo-plan/src/main/proto/Plan.proto | 1 + 4 files changed, 20 insertions(+), 2 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java index a4bb94ced9..ab565e8170 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java @@ -22,13 +22,17 @@ import com.google.gson.annotations.Expose; import org.apache.hadoop.fs.Path; import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.plan.PlanString; import org.apache.tajo.plan.Target; import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.util.TUtil; +import java.util.List; + public class PartitionedTableScanNode extends ScanNode { @Expose Path [] inputPaths; + @Expose List partitions; public PartitionedTableScanNode(int pid) { super(pid, NodeType.PARTITIONS_SCAN); @@ -54,8 +58,16 @@ public void setInputPaths(Path [] paths) { public Path [] getInputPaths() { return inputPaths; } - - public String toString() { + + public List getPartitions() { + return partitions; + } + + public void setPartitions(List partitions) { + this.partitions = partitions; + } + + public String toString() { StringBuilder sb = new StringBuilder("Partitions Scan (table=").append(getTableName()); if (hasAlias()) { sb.append(", alias=").append(alias); diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java index 2051dfb0de..8879fc8e10 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeDeserializer.java @@ -469,6 +469,7 @@ private static PartitionedTableScanNode convertPartitionScan(OverridableConf con paths[i] = new Path(partitionScanProto.getPaths(i)); } partitionedScan.setInputPaths(paths); + partitionedScan.setPartitions(partitionScanProto.getPartitionsList()); return partitionedScan; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java index 7907668ef4..67623c91fe 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/serder/LogicalNodeSerializer.java @@ -484,6 +484,10 @@ public LogicalNode visitPartitionedTableScan(SerializeContext context, LogicalPl partitionScan.addAllPaths(pathStrs); } + if (node.getPartitions() != null) { + partitionScan.addAllPartitions(node.getPartitions()); + } + PlanProto.LogicalNode.Builder nodeBuilder = createNodeBuilder(context, node); nodeBuilder.setScan(scanBuilder); nodeBuilder.setPartitionScan(partitionScan); diff --git a/tajo-plan/src/main/proto/Plan.proto b/tajo-plan/src/main/proto/Plan.proto index 0cd0c32dcc..4cbbd4de22 100644 --- a/tajo-plan/src/main/proto/Plan.proto +++ b/tajo-plan/src/main/proto/Plan.proto @@ -117,6 +117,7 @@ message ScanNode { message PartitionScanSpec { repeated string paths = 1; + repeated PartitionDescProto partitions = 2; } message IndexScanSpec { From fa0e8011c9bd13edfd5c026b53f3471bf551c278 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 14 Oct 2015 11:18:56 +0900 Subject: [PATCH 2/6] Get a partition key value from a PartitionDescProto --- .../engine/planner/physical/SeqScanExec.java | 29 ++++++- .../logical/PartitionedTableScanNode.java | 19 +++++ .../rules/PartitionedTableRewriter.java | 75 +++++++++++++------ 3 files changed, 100 insertions(+), 23 deletions(-) diff --git a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java index 2572e1db8d..953aa591df 100644 --- a/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java +++ b/tajo-core/src/main/java/org/apache/tajo/engine/planner/physical/SeqScanExec.java @@ -18,6 +18,9 @@ package org.apache.tajo.engine.planner.physical; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; @@ -32,6 +35,7 @@ import org.apache.tajo.plan.expr.EvalNode; import org.apache.tajo.plan.expr.EvalTreeUtil; import org.apache.tajo.plan.expr.FieldEval; +import org.apache.tajo.plan.logical.PartitionedTableScanNode; import org.apache.tajo.plan.logical.ScanNode; import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; import org.apache.tajo.plan.util.PlannerUtil; @@ -100,9 +104,30 @@ private void rewriteColumnPartitionedTableSchema() throws IOException { if (fragments != null && fragments.length > 0) { List fileFragments = FragmentConvertor.convert(FileFragment.class, fragments); + Path partitionPath = fileFragments.get(0).getPath(); + int startIdx = partitionPath.toString().indexOf(PartitionedTableRewriter.getColumnPartitionPathPrefix + (columnPartitionSchema)); + + PartitionedTableScanNode partitionedTableScanNode = (PartitionedTableScanNode) plan; + // Get a partition key value from a PartitionDescProto + if (startIdx == -1) { + partitionedTableScanNode.initPartitionMap(); + FileSystem fs = partitionPath.getFileSystem(context.getConf()); + FileStatus fileStatus = fs.getFileStatus(partitionPath); + String path = null; + if (fileStatus.isDirectory()) { + path = partitionPath.toString(); + } else { + path = partitionPath.getParent().toString(); + } + CatalogProtos.PartitionDescProto partitionDescProto = partitionedTableScanNode.getPartitionDescProto(path); + partitionRow = PartitionedTableRewriter.buildTupleFromPartitionDescProto(columnPartitionSchema, + partitionDescProto, false); // Get a partition key value from a given path - partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath( - columnPartitionSchema, fileFragments.get(0).getPath(), false); + } else { + partitionRow = PartitionedTableRewriter.buildTupleFromPartitionPath( + columnPartitionSchema, fileFragments.get(0).getPath(), false); + } } // Targets or search conditions may contain column references. diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java index ab565e8170..c53bec4561 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/logical/PartitionedTableScanNode.java @@ -29,11 +29,14 @@ import org.apache.tajo.util.TUtil; import java.util.List; +import java.util.Map; public class PartitionedTableScanNode extends ScanNode { @Expose Path [] inputPaths; @Expose List partitions; + private Map partitionMap; + public PartitionedTableScanNode(int pid) { super(pid, NodeType.PARTITIONS_SCAN); } @@ -67,6 +70,22 @@ public void setPartitions(List partitions) { this.partitions = partitions; } + public void initPartitionMap() { + if (partitionMap == null) { + partitionMap = TUtil.newHashMap(); + } else { + partitionMap.clear(); + } + + for(PartitionDescProto partition : partitions) { + partitionMap.put(partition.getPath(), partition); + } + } + + public PartitionDescProto getPartitionDescProto(String path) { + return partitionMap.get(path); + } + public String toString() { StringBuilder sb = new StringBuilder("Partitions Scan (table=").append(getTableName()); if (hasAlias()) { diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index cf54f7b4c7..ba0812ea28 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -26,8 +26,10 @@ import org.apache.tajo.OverridableConf; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionsByAlgebraProto; import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.datum.DatumFactory; import org.apache.tajo.datum.NullDatum; import org.apache.tajo.exception.*; @@ -41,7 +43,9 @@ import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.Pair; import org.apache.tajo.util.StringUtils; +import org.apache.tajo.util.TUtil; import java.io.IOException; import java.util.*; @@ -110,11 +114,11 @@ public String toString() { } } - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, - Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) + private Pair> findFilteredPatitionPair(OverridableConf queryContext, + String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { - return findFilteredPaths(queryContext, tableName, partitionColumns, conjunctiveForms, tablePath, null); + return findFilteredPatitionPair(queryContext, tableName, partitionColumns, conjunctiveForms, tablePath, null); } /** @@ -127,7 +131,7 @@ public String toString() { * @return * @throws IOException */ - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + private Pair> findFilteredPatitionPair(OverridableConf queryContext, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath, ScanNode scanNode) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { @@ -141,17 +145,17 @@ public String toString() { if (conjunctiveForms == null) { partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + filteredPaths = findFilteredPatitionPairFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + filteredPaths = findFilteredPatitionPairByPartitionDesc(partitions); } } else { if (catalog.existPartitions(splits[0], splits[1])) { PartitionsByAlgebraProto request = getPartitionsAlgebraProto(splits[0], splits[1], conjunctiveForms); partitions = catalog.getPartitionsByAlgebra(request); - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + filteredPaths = findFilteredPatitionPairByPartitionDesc(partitions); } else { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + filteredPaths = findFilteredPatitionPairFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); } } } catch (UnsupportedException ue) { @@ -160,15 +164,17 @@ public String toString() { LOG.warn(ue.getMessage()); partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + filteredPaths = findFilteredPatitionPairFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + filteredPaths = findFilteredPatitionPairByPartitionDesc(partitions); } scanNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(conjunctiveForms)); } LOG.info("Filtered directory or files: " + filteredPaths.length); - return filteredPaths; + + Pair> pair = new Pair<>(filteredPaths, partitions); + return pair; } /** @@ -177,7 +183,7 @@ public String toString() { * @param partitions * @return */ - private Path[] findFilteredPathsByPartitionDesc(List partitions) { + private Path[] findFilteredPatitionPairByPartitionDesc(List partitions) { Path [] filteredPaths = new Path[partitions.size()]; for (int i = 0; i < partitions.size(); i++) { PartitionDescProto partition = partitions.get(i); @@ -198,7 +204,7 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti * @return * @throws IOException */ - private Path [] findFilteredPathsFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, + private Path [] findFilteredPatitionPairFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, FileSystem fs, Path tablePath) throws IOException{ Path [] filteredPaths = null; PathFilter [] filters; @@ -328,9 +334,9 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( return paths; } - public Path [] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException, - UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, - UndefinedOperatorException, UnsupportedException { + public Pair> findFilteredPartitionPair(OverridableConf queryContext, + ScanNode scanNode) throws IOException, UndefinedDatabaseException, UndefinedTableException, + UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { TableDesc table = scanNode.getTableDesc(); PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod(); @@ -369,10 +375,10 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( } if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, + return findFilteredPatitionPair(queryContext, table.getName(), paritionValuesSchema, indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getUri()), scanNode); } else { // otherwise, we will get all partition paths. - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, null, new Path(table.getUri())); + return findFilteredPatitionPair(queryContext, table.getName(), paritionValuesSchema, null, new Path(table.getUri())); } } @@ -472,6 +478,32 @@ public static Tuple buildTupleFromPartitionPath(Schema partitionColumnSchema, Pa return tuple; } + public static Tuple buildTupleFromPartitionDescProto(Schema partitionColumnSchema, PartitionDescProto partition, + boolean beNullIfFile) throws IOException { + List partitonKeys = partition.getPartitionKeysList(); + + // true means this is a file. + if (beNullIfFile && partitionColumnSchema.size() < partitonKeys.size()) { + return null; + } + + Tuple tuple = new VTuple(partitionColumnSchema.size()); + int i = 0; + for (; i < partitonKeys.size() && i < partitionColumnSchema.size(); i++) { + int columnId = partitionColumnSchema.getColumnIdByName(partitonKeys.get(i).getColumnName()); + Column keyColumn = partitionColumnSchema.getColumn(columnId); + tuple.put(columnId, DatumFactory.createFromString(keyColumn.getDataType(), StringUtils.unescapePathName + (partitonKeys.get(i).getPartitionValue()))); + } + + for (; i < partitionColumnSchema.size(); i++) { + tuple.put(i, NullDatum.get()); + } + + return tuple; + } + + /** * Get a prefix of column partition path. For example, consider a column partition (col1, col2). * Then, you will get a string 'col1='. @@ -496,10 +528,11 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP } try { - Path [] filteredPaths = findFilteredPartitionPaths(queryContext, scanNode); - plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions"); + Pair> pair = findFilteredPartitionPair(queryContext, scanNode); + plan.addHistory("PartitionTableRewriter chooses " + pair.getFirst().length + " of partitions"); PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); - rewrittenScanNode.init(scanNode, filteredPaths); + rewrittenScanNode.init(scanNode, pair.getFirst()); + rewrittenScanNode.setPartitions(pair.getSecond()); rewrittenScanNode.getTableDesc().getStats().setNumBytes(totalVolume); // if it is topmost node, set it as the rootnode of this block. From 78c4f5262eb803d54f8256c5f54f2e63f641b793 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 15 Oct 2015 14:50:21 +0900 Subject: [PATCH 3/6] Add unit test cases (commented out) --- .../engine/query/TestTablePartitions.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index e8ca0da31a..91069ce308 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -1833,6 +1833,28 @@ public void testAbnormalDirectories() throws Exception { res.close(); assertEquals(expectedResult, result); + // Invalid directory name +// executeString("alter table " + tableName + " add partition (key = 70.0)" +// + " location '" + tableDesc.getUri().toString() + "/tajo1'").close(); +// +// executeString("INSERT INTO LOCATION '" + tableDesc.getUri().toString() + "/tajo1'" +// + " SELECT col1, col2 FROM " + sortedTableName); +// +// res = executeString("SELECT * FROM " + tableName + " ORDER BY col1, col2 desc, key desc;"); +// result = resultSetToString(res); +// expectedResult = "col1,col2,key\n" + +// "-------------------------------\n" + +// "1,1,70.0\n" + +// "1,1,17.0\n" + +// "2,2,70.0\n" + +// "2,2,38.0\n" + +// "3,3,70.0\n" + +// "3,3,49.0\n" + +// "3,2,70.0\n" + +// "3,2,45.0\n"; +// res.close(); +// assertEquals(expectedResult, result); + executeString("DROP TABLE " + sortedTableName + " PURGE").close(); executeString("DROP TABLE " + externalTableName).close(); executeString("DROP TABLE " + tableName + " PURGE").close(); From 137f6f223d41c4b6a794a0a6187c3cbce4d7c070 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 15 Oct 2015 15:12:44 +0900 Subject: [PATCH 4/6] Restore alter table add partition --- .../catalog/store/TestHiveCatalogStore.java | 2 +- .../apache/tajo/catalog/CatalogServer.java | 5 ----- .../org/apache/tajo/catalog/TestCatalog.java | 2 +- .../TestCatalogAgainstCaseSensitivity.java | 2 +- .../tajo/catalog/TestCatalogExceptions.java | 20 +++---------------- .../org/apache/tajo/cli/tools/TajoDump.java | 11 +++++----- .../apache/tajo/cli/tools/TestTajoDump.java | 11 +++++----- .../org/apache/tajo/cli/tsql/TestTajoCli.java | 12 ----------- .../engine/planner/TestLogicalPlanner.java | 2 +- .../tajo/engine/query/TestAlterTable.java | 2 +- .../tajo/parser/sql/TestSQLAnalyzer.java | 7 +------ .../TestTajoDump/testPartitionsDump.result | 5 +++-- .../org/apache/tajo/jdbc/TestTajoJdbc.java | 2 +- .../plan/verifier/PreLogicalPlanVerifier.java | 10 ---------- 14 files changed, 24 insertions(+), 69 deletions(-) diff --git a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java index 43de047fa9..e79fec5ffc 100644 --- a/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java +++ b/tajo-catalog/tajo-catalog-drivers/tajo-hive/src/test/java/org/apache/tajo/catalog/store/TestHiveCatalogStore.java @@ -219,7 +219,7 @@ public void testTableWithNullValue() throws Exception { } - // TODO: This should be added at TAJO-1891 + @Test public void testAddTableByPartition() throws Exception { TableMeta meta = new TableMeta("TEXT", new KeyValueSet()); diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java index 060cb78a64..58d0fc40a5 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/CatalogServer.java @@ -458,11 +458,6 @@ public ReturnState alterTable(RpcController controller, AlterTableDescProto prot return errInsufficientPrivilege("alter a table in database '" + split[0] + "'"); } - // TODO: This should be removed at TAJO-1891 - if (proto.getAlterTableType() == CatalogProtos.AlterTableType.ADD_PARTITION) { - return errFeatureNotImplemented("ADD PARTTIION"); - } - wlock.lock(); try { diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java index 606f39a382..36f98cf017 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalog.java @@ -777,7 +777,7 @@ public final void testAddAndDeleteTablePartitionByRange() throws Exception { assertFalse(catalog.existsTable(tableName)); } - // TODO: This should be added at TAJO-1891 + @Test public final void testAddAndDeleteTablePartitionByColumn() throws Exception { Schema schema = new Schema(); schema.addColumn("id", Type.INT4) diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java index 99a7b43536..5e8571d3f0 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogAgainstCaseSensitivity.java @@ -163,7 +163,7 @@ public void testTable() throws Exception { } } - // TODO: This should be added at TAJO-1891 + @Test public void testTablePartition() throws Exception { ////////////////////////////////////////////////////////////////////////////// // Test add partition diff --git a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogExceptions.java b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogExceptions.java index 5d4065673b..2aaf418a9a 100644 --- a/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogExceptions.java +++ b/tajo-catalog/tajo-catalog-server/src/test/java/org/apache/tajo/catalog/TestCatalogExceptions.java @@ -157,7 +157,7 @@ public void testUpdateTableStatsOfUndefinedTable() throws Exception { build()); } - // TODO: This should be added at TAJO-1891 + @Test public void testAddPartitionWithWrongUri() throws Exception { // TODO: currently, wrong uri does not occur any exception. String partitionName = "DaTe=/=AaA"; @@ -171,7 +171,7 @@ public void testAddPartitionWithWrongUri() throws Exception { catalog.alterTable(alterTableDesc); } - // TODO: This should be added at TAJO-1891 + @Test(expected = DuplicatePartitionException.class) public void testAddDuplicatePartition() throws Exception { String partitionName = "DaTe=bBb/dAtE=AaA"; PartitionDesc partitionDesc = CatalogTestingUtil.buildPartitionDesc(partitionName); @@ -194,7 +194,7 @@ public void testAddDuplicatePartition() throws Exception { catalog.alterTable(alterTableDesc); } - // TODO: This should be added at TAJO-1891 + @Test(expected = UndefinedTableException.class) public void testAddPartitionToUndefinedTable() throws Exception { String partitionName = "DaTe=bBb/dAtE=AaA"; PartitionDesc partitionDesc = CatalogTestingUtil.buildPartitionDesc(partitionName); @@ -207,20 +207,6 @@ public void testAddPartitionToUndefinedTable() throws Exception { catalog.alterTable(alterTableDesc); } - // TODO: This should be removed at TAJO-1891 - @Test(expected = NotImplementedException.class) - public void testAddPartitionNotimplementedException() throws Exception { - String partitionName = "DaTe=/=AaA"; - PartitionDesc partitionDesc = CatalogTestingUtil.buildPartitionDesc(partitionName); - - AlterTableDesc alterTableDesc = new AlterTableDesc(); - alterTableDesc.setTableName(CatalogUtil.buildFQName("TestDatabase1", "TestPartition1")); - alterTableDesc.setPartitionDesc(partitionDesc); - alterTableDesc.setAlterTableType(AlterTableType.ADD_PARTITION); - - catalog.alterTable(alterTableDesc); - } - @Test(expected = UndefinedPartitionException.class) public void testDropUndefinedPartition() throws Exception { String partitionName = "DaTe=undefined/dAtE=undefined"; diff --git a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java index 7fa1705567..3a492034aa 100644 --- a/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java +++ b/tajo-cli/src/main/java/org/apache/tajo/cli/tools/TajoDump.java @@ -23,6 +23,7 @@ import org.apache.tajo.auth.UserRoleInfo; import org.apache.tajo.catalog.*; import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; import org.apache.tajo.client.TajoClient; import org.apache.tajo.client.TajoClientImpl; import org.apache.tajo.conf.TajoConf; @@ -191,12 +192,10 @@ private static void dumpDatabase(TajoClient client, String databaseName, PrintWr writer.write(String.format("-- Table Partitions: %s%n", tableName)); writer.write("--\n"); // TODO: This should be improved at TAJO-1891 -// List partitionProtos = client.getPartitionsOfTable(fqName); -// for (PartitionDescProto eachPartitionProto : partitionProtos) { -// writer.write(DDLBuilder.buildDDLForAddPartition(table, eachPartitionProto)); -// } - writer.write(String.format("ALTER TABLE %s REPAIR PARTITION;", - CatalogUtil.denormalizeIdentifier(databaseName) + "." + CatalogUtil.denormalizeIdentifier(tableName))); + List partitionProtos = client.getPartitionsOfTable(fqName); + for (PartitionDescProto eachPartitionProto : partitionProtos) { + writer.write(DDLBuilder.buildDDLForAddPartition(table, eachPartitionProto)); + } writer.write("\n\n"); } diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java index 13936e6853..365aca6f2d 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tools/TestTajoDump.java @@ -113,16 +113,17 @@ public void testPartitionsDump() throws Exception { + " partition by column(\"col3\" int4, \"col4\" int4)" ); - // TODO: This should be improved at TAJO-1891 -// executeString("ALTER TABLE \"" + getCurrentDatabase() + "\".\"TableName3\"" + -// " ADD PARTITION (\"col3\" = 1 , \"col4\" = 2)"); -// executeString("ALTER TABLE \"" + getCurrentDatabase() + "\".\"TableName4\"" + -// " ADD PARTITION (\"col3\" = 'tajo' , \"col4\" = '2015-09-01')"); + executeString("ALTER TABLE \"" + getCurrentDatabase() + "\".\"TableName3\"" + + " ADD PARTITION (\"col3\" = 1 , \"col4\" = 2)"); + executeString("create table \"" + getCurrentDatabase() + "\".\"TableName4\"" + " (\"col1\" int4, \"col2\" int4) " + " partition by column(\"col3\" TEXT, \"col4\" date)" ); + executeString("ALTER TABLE \"" + getCurrentDatabase() + "\".\"TableName4\"" + + " ADD PARTITION (\"col3\" = 'tajo' , \"col4\" = '2015-09-01')"); + try { UserRoleInfo userInfo = UserRoleInfo.getCurrentUser(); ByteArrayOutputStream bos = new ByteArrayOutputStream(); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java index 0503d5d218..ec68294177 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/cli/tsql/TestTajoCli.java @@ -473,19 +473,7 @@ public void testNonForwardQueryPause() throws Exception { } } - // TODO: This should be removed at TAJO-1891 @Test - public void testAddPartitionNotimplementedException() throws Exception { - String tableName = CatalogUtil.normalizeIdentifier("testAddPartitionNotimplementedException"); - tajoCli.executeScript("create table " + tableName + " (col1 int4, col2 int4) partition by column(key float8)"); - tajoCli.executeScript("alter table " + tableName + " add partition (key2 = 0.1)"); - - String consoleResult; - consoleResult = new String(out.toByteArray()); - assertOutputResult(consoleResult); - } - - // TODO: This should be added at TAJO-1891 public void testAlterTableAddDropPartition() throws Exception { String tableName = CatalogUtil.normalizeIdentifier("testAlterTableAddPartition"); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java index 0f09c0cd69..a91a1782c8 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestLogicalPlanner.java @@ -1286,7 +1286,7 @@ public final void testAlterTableRepairPartiton() throws TajoException { "ALTER TABLE partitioned_table DROP PARTITION (col1 = '2015' , col2 = '01', col3 = '11' )", //1 }; - // TODO: This should be added at TAJO-1891 + @Test public final void testAddPartitionAndDropPartition() throws TajoException { String tableName = CatalogUtil.normalizeIdentifier("partitioned_table"); String qualifiedTableName = CatalogUtil.buildFQName(DEFAULT_DATABASE_NAME, tableName); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java index 4c9f367223..63c2a0e7f0 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestAlterTable.java @@ -88,7 +88,7 @@ public final void testAlterTableSetProperty() throws Exception { cleanupQuery(after_res); } - // TODO: This should be added at TAJO-1891 + @Test public final void testAlterTableAddPartition() throws Exception { executeDDL("create_partitioned_table.sql", null); diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java b/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java index e2968a8664..3a577e824f 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/parser/sql/TestSQLAnalyzer.java @@ -75,12 +75,7 @@ public Collection getResourceFiles(String subdir) throws URISyntaxExceptio new Predicate() { @Override public boolean apply(@Nullable FileStatus input) { - // TODO: This should be removed at TAJO-1891 - if (input.getPath().getName().indexOf("add_partition") > -1) { - return false; - } else { - return input.isFile(); - } + return input.isFile(); } } ); diff --git a/tajo-core-tests/src/test/resources/results/TestTajoDump/testPartitionsDump.result b/tajo-core-tests/src/test/resources/results/TestTajoDump/testPartitionsDump.result index 677b5f21ec..696bb191cc 100644 --- a/tajo-core-tests/src/test/resources/results/TestTajoDump/testPartitionsDump.result +++ b/tajo-core-tests/src/test/resources/results/TestTajoDump/testPartitionsDump.result @@ -17,7 +17,8 @@ CREATE TABLE "TestTajoDump"."TableName3" (col1 INT4, col2 INT4) USING TEXT WITH -- -- Table Partitions: TableName3 -- -ALTER TABLE "TestTajoDump"."TableName3" REPAIR PARTITION; +ALTER TABLE "TestTajoDump"."TableName3" ADD IF NOT EXISTS PARTITION (col3=1,col4=2) LOCATION '${partition.path1}/col3=1/col4=2'; + @@ -29,4 +30,4 @@ CREATE TABLE "TestTajoDump"."TableName4" (col1 INT4, col2 INT4) USING TEXT WITH -- -- Table Partitions: TableName4 -- -ALTER TABLE "TestTajoDump"."TableName4" REPAIR PARTITION; \ No newline at end of file +ALTER TABLE "TestTajoDump"."TableName4" ADD IF NOT EXISTS PARTITION (col3='tajo',col4='2015-09-01') LOCATION '${partition.path2}/col3=tajo/col4=2015-09-01'; \ No newline at end of file diff --git a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java index 80c6e86784..33b3727c38 100644 --- a/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java +++ b/tajo-jdbc/src/test/java/org/apache/tajo/jdbc/TestTajoJdbc.java @@ -578,7 +578,7 @@ public void testSortWithDateTime() throws Exception { } } - // TODO: This should be added at TAJO-1891 + @Test public void testAlterTableAddPartition() throws Exception { Statement stmt = null; ResultSet resultSet = null; diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java index 2a50ca4ec8..400b302d8f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/verifier/PreLogicalPlanVerifier.java @@ -354,14 +354,4 @@ public Expr visitInsert(Context context, Stack stack, Insert expr) throws return expr; } - - // TODO: This should be removed at TAJO-1891 - @Override - public Expr visitAlterTable(Context context, Stack stack, AlterTable expr) throws TajoException { - if (expr.getAlterTableOpType() == AlterTableOpType.ADD_PARTITION) { - context.state.addVerification(new NotImplementedException("ADD PARTITION")); - } - - return expr; - } } From 8ebe368009af74ac6ae304265d6586d2dee893ab Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 15 Oct 2015 16:38:34 +0900 Subject: [PATCH 5/6] Fix bugs and add unit test cases for arbitrary partition path --- .../tajo/catalog/store/AbstractDBStore.java | 2 + .../engine/query/TestTablePartitions.java | 142 +++++++++++++++--- 2 files changed, 122 insertions(+), 22 deletions(-) diff --git a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java index 176f569f14..6cf74c6545 100644 --- a/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java +++ b/tajo-catalog/tajo-catalog-server/src/main/java/org/apache/tajo/catalog/store/AbstractDBStore.java @@ -2357,6 +2357,8 @@ public List getPartitionsByAlgebra(PartitionsByAlgebraProto builder.setPath(res.getString("PATH")); builder.setNumBytes(res.getLong(COL_PARTITION_BYTES)); + setPartitionKeys(res.getInt(COL_PARTITIONS_PK), builder); + partitions.add(builder.build()); } } catch (SQLException se) { diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java index 91069ce308..95f9cb6ba1 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/query/TestTablePartitions.java @@ -1833,30 +1833,128 @@ public void testAbnormalDirectories() throws Exception { res.close(); assertEquals(expectedResult, result); - // Invalid directory name -// executeString("alter table " + tableName + " add partition (key = 70.0)" -// + " location '" + tableDesc.getUri().toString() + "/tajo1'").close(); -// -// executeString("INSERT INTO LOCATION '" + tableDesc.getUri().toString() + "/tajo1'" -// + " SELECT col1, col2 FROM " + sortedTableName); -// -// res = executeString("SELECT * FROM " + tableName + " ORDER BY col1, col2 desc, key desc;"); -// result = resultSetToString(res); -// expectedResult = "col1,col2,key\n" + -// "-------------------------------\n" + -// "1,1,70.0\n" + -// "1,1,17.0\n" + -// "2,2,70.0\n" + -// "2,2,38.0\n" + -// "3,3,70.0\n" + -// "3,3,49.0\n" + -// "3,2,70.0\n" + -// "3,2,45.0\n"; -// res.close(); -// assertEquals(expectedResult, result); - executeString("DROP TABLE " + sortedTableName + " PURGE").close(); executeString("DROP TABLE " + externalTableName).close(); executeString("DROP TABLE " + tableName + " PURGE").close(); } + + @Test + public void testArbitraryPartitionPath() throws Exception { + FileSystem fs = FileSystem.get(conf); + Path path = null; + ContentSummary contentSummary = null; + + PartitionDescProto partitionDescProto = null; + ResultSet res = null; + String result = null, expectedResult = null; + + String tableName = CatalogUtil.normalizeIdentifier("testAbnormalDirectories"); + if (nodeType == NodeType.INSERT) { + executeString( + "create table " + tableName + " (col1 int4, col2 float8) partition by column(key int4) ").close(); + executeString( + "insert overwrite into " + tableName + " select l_orderkey, l_quantity, l_partkey " + + " from lineitem where l_partkey > 2").close(); + } else { + executeString( + "create table " + tableName + "(col1 int4, col2 float8) partition by column(key int4) " + + " as select l_orderkey, l_quantity, l_partkey from lineitem where l_partkey > 2").close(); + } + + TableDesc tableDesc = catalog.getTableDesc(DEFAULT_DATABASE_NAME, tableName); + + // Check the number of rows + res = executeString("SELECT count(*) as cnt FROM " + tableName); + result = resultSetToString(res); + expectedResult = "cnt\n" + + "-------------------------------\n" + + "1\n"; + res.close(); + assertEquals(expectedResult, result); + + // Make arbitrary partition path + path = new Path(tableDesc.getUri().toString() + "/part12345"); + + executeString("INSERT INTO LOCATION '" + path + "'" + + " select l_orderkey, l_quantity, l_partkey from lineitem where l_partkey = 1"); + + executeString("alter table " + tableName + " add partition (key = 1)" + + " location '" + path.toString() + "'").close(); + + partitionDescProto = catalog.getPartition(getCurrentDatabase(), tableName, "key=1"); + contentSummary = fs.getContentSummary(path); + assertEquals(contentSummary.getLength(), partitionDescProto.getNumBytes()); + assertEquals(path.toString(), partitionDescProto.getPath()); + + path = new Path(tableDesc.getUri().toString() + "/part6789"); + + executeString("INSERT INTO LOCATION '" + path + "'" + + " select l_orderkey, l_quantity, l_partkey from lineitem where l_partkey = 2"); + + executeString("alter table " + tableName + " add partition (key = 2)" + + " location '" + path.toString() + "'").close(); + + partitionDescProto = catalog.getPartition(getCurrentDatabase(), tableName, "key=2"); + contentSummary = fs.getContentSummary(path); + assertEquals(contentSummary.getLength(), partitionDescProto.getNumBytes()); + assertEquals(path.toString(), partitionDescProto.getPath()); + + // Check the number of rows + res = executeString("SELECT count(*) as cnt FROM " + tableName); + result = resultSetToString(res); + expectedResult = "cnt\n" + + "-------------------------------\n" + + "5\n"; + res.close(); + assertEquals(expectedResult, result); + + // Equal operator + res = executeString("SELECT * FROM " + tableName + " where key = 1 order by col2"); + result = resultSetToString(res); + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,17.0,1\n" + + "1,36.0,1\n" ; + res.close(); + assertEquals(expectedResult, result); + + // Less than equals + res = executeString("SELECT * FROM " + tableName + " where key <= 2 order by key, col2"); + result = resultSetToString(res); + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,17.0,1\n" + + "1,36.0,1\n" + + "2,38.0,2\n" + + "3,45.0,2\n" ; + res.close(); + assertEquals(expectedResult, result); + + // In + res = executeString("SELECT * FROM " + tableName + " where key in (1, 3) order by key, col2"); + result = resultSetToString(res); + expectedResult = "col1,col2,key\n" + + "-------------------------------\n" + + "1,17.0,1\n" + + "1,36.0,1\n" + + "3,49.0,3\n" ; + res.close(); + assertEquals(expectedResult, result); + + // Join + res = executeString( + " select col1,col2,key, p_partkey, p_name from \"" + getCurrentDatabase() + "\"." + tableName + + ", part where key = " + "p_partkey and col1 >= 2 order by key, col2"); + + result = resultSetToString(res); + expectedResult = "col1,col2,key,p_partkey,p_name\n" + + "-------------------------------\n" + + "2,38.0,2,2,blush thistle blue yellow saddle\n" + + "3,45.0,2,2,blush thistle blue yellow saddle\n" + + "3,49.0,3,3,spring green yellow purple cornsilk\n" ; + res.close(); + assertEquals(expectedResult, result); + + executeString("DROP TABLE " + tableName + " PURGE").close(); + } } From 2c3d014f2144bf4b313819ee38c926645674ffdc Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 16 Oct 2015 10:24:05 +0900 Subject: [PATCH 6/6] Trigger for travis CI build --- .../apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index ba0812ea28..2130e0401f 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -530,6 +530,7 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP try { Pair> pair = findFilteredPartitionPair(queryContext, scanNode); plan.addHistory("PartitionTableRewriter chooses " + pair.getFirst().length + " of partitions"); + PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); rewrittenScanNode.init(scanNode, pair.getFirst()); rewrittenScanNode.setPartitions(pair.getSecond());