From 2f7d2442d025d226b17b6b512050d39ebd339c99 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Wed, 14 Oct 2015 18:42:20 +0900 Subject: [PATCH 1/8] TAJO-1927: PartitionedTableRewriter should use a local variable to remember the volume of partitions. --- .../rules/PartitionedTableRewriter.java | 94 ++++++++++++++----- 1 file changed, 70 insertions(+), 24 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index cf54f7b4c7..7cc1639405 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -41,6 +41,7 @@ import org.apache.tajo.plan.visitor.BasicLogicalPlanVisitor; import org.apache.tajo.storage.Tuple; import org.apache.tajo.storage.VTuple; +import org.apache.tajo.util.Pair; import org.apache.tajo.util.StringUtils; import java.io.IOException; @@ -48,7 +49,6 @@ public class PartitionedTableRewriter implements LogicalPlanRewriteRule { private CatalogService catalog; - private long totalVolume; private static final Log LOG = LogFactory.getLog(PartitionedTableRewriter.class); @@ -110,7 +110,7 @@ public String toString() { } } - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + private FilteredPartition findFilteredPaths(OverridableConf queryContext, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { @@ -127,11 +127,13 @@ public String toString() { * @return * @throws IOException */ - private Path [] findFilteredPaths(OverridableConf queryContext, String tableName, + private FilteredPartition findFilteredPaths(OverridableConf queryContext, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath, ScanNode scanNode) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { + FilteredPartition filteredPartition = new FilteredPartition(); + Pair pair = null; Path [] filteredPaths = null; FileSystem fs = tablePath.getFileSystem(queryContext.getConf()); String [] splits = CatalogUtil.splitFQTableName(tableName); @@ -141,17 +143,17 @@ public String toString() { if (conjunctiveForms == null) { partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + pair = findFilteredPartitionFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + pair = findFilteredPartitionsByPartitionDesc(partitions); } } else { if (catalog.existPartitions(splits[0], splits[1])) { PartitionsByAlgebraProto request = getPartitionsAlgebraProto(splits[0], splits[1], conjunctiveForms); partitions = catalog.getPartitionsByAlgebra(request); - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + pair = findFilteredPartitionsByPartitionDesc(partitions); } else { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + pair = findFilteredPartitionFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); } } } catch (UnsupportedException ue) { @@ -160,15 +162,20 @@ public String toString() { LOG.warn(ue.getMessage()); partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - filteredPaths = findFilteredPathsFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + pair = findFilteredPartitionFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); } else { - filteredPaths = findFilteredPathsByPartitionDesc(partitions); + pair = findFilteredPartitionsByPartitionDesc(partitions); } scanNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(conjunctiveForms)); } - LOG.info("Filtered directory or files: " + filteredPaths.length); - return filteredPaths; + filteredPartition.setPartitionPaths(pair.getFirst()); + filteredPartition.setTotalVolume(pair.getSecond()); + if (partitions != null) { + filteredPartition.setPartitions(partitions); + } + LOG.info("Filtered directory or files: " + pair.getFirst().length); + return filteredPartition; } /** @@ -177,14 +184,15 @@ public String toString() { * @param partitions * @return */ - private Path[] findFilteredPathsByPartitionDesc(List partitions) { + private Pair findFilteredPartitionsByPartitionDesc(List partitions) { Path [] filteredPaths = new Path[partitions.size()]; + long totalVolume = 0L; for (int i = 0; i < partitions.size(); i++) { PartitionDescProto partition = partitions.get(i); filteredPaths[i] = new Path(partition.getPath()); totalVolume += partition.getNumBytes(); } - return filteredPaths; + return new Pair<>(filteredPaths, totalVolume); } /** @@ -198,10 +206,12 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti * @return * @throws IOException */ - private Path [] findFilteredPathsFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, + private Pair findFilteredPartitionFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, FileSystem fs, Path tablePath) throws IOException{ + Pair pair = null; Path [] filteredPaths = null; PathFilter [] filters; + long totalVolume = 0L; if (conjunctiveForms == null) { filters = buildAllAcceptingPathFilters(partitionColumns); @@ -210,13 +220,17 @@ private Path[] findFilteredPathsByPartitionDesc(List partiti } // loop from one to the number of partition columns - filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0])); + pair = getPathArrayAndTotalVolume(fs.listStatus(tablePath, filters[0])); + filteredPaths = pair.getFirst(); + totalVolume += pair.getSecond(); for (int i = 1; i < partitionColumns.size(); i++) { // Get all file status matched to a ith level path filter. - filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i])); + pair = getPathArrayAndTotalVolume(fs.listStatus(filteredPaths, filters[i])); + filteredPaths = pair.getFirst(); + totalVolume += pair.getSecond(); } - return filteredPaths; + return new Pair<>(filteredPaths, totalVolume); } /** @@ -318,17 +332,18 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( return filters; } - private Path [] toPathArray(FileStatus[] fileStatuses) { + private Pair getPathArrayAndTotalVolume(FileStatus[] fileStatuses) { Path [] paths = new Path[fileStatuses.length]; + long totalVolume = 0L; for (int i = 0; i < fileStatuses.length; i++) { FileStatus fileStatus = fileStatuses[i]; paths[i] = fileStatus.getPath(); totalVolume += fileStatus.getLen(); } - return paths; + return new Pair<>(paths, totalVolume); } - public Path [] findFilteredPartitionPaths(OverridableConf queryContext, ScanNode scanNode) throws IOException, + public FilteredPartition findFilteredPartition(OverridableConf queryContext, ScanNode scanNode) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { TableDesc table = scanNode.getTableDesc(); @@ -496,11 +511,12 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP } try { - Path [] filteredPaths = findFilteredPartitionPaths(queryContext, scanNode); - plan.addHistory("PartitionTableRewriter chooses " + filteredPaths.length + " of partitions"); + FilteredPartition filteredPartition = findFilteredPartition(queryContext, scanNode); + plan.addHistory("PartitionTableRewriter chooses " + filteredPartition.getPartitionPaths().length + + " of partitions"); PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); - rewrittenScanNode.init(scanNode, filteredPaths); - rewrittenScanNode.getTableDesc().getStats().setNumBytes(totalVolume); + rewrittenScanNode.init(scanNode, filteredPartition.getPartitionPaths()); + rewrittenScanNode.getTableDesc().getStats().setNumBytes(filteredPartition.getTotalVolume()); // if it is topmost node, set it as the rootnode of this block. if (stack.empty() || block.getRoot().equals(scanNode)) { @@ -515,4 +531,34 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP return null; } } + + class FilteredPartition { + private Path[] partitionPaths; + private List partitions; + private long totalVolume; + + public Path[] getPartitionPaths() { + return partitionPaths; + } + + public void setPartitionPaths(Path[] partitionPaths) { + this.partitionPaths = partitionPaths; + } + + public List getPartitions() { + return partitions; + } + + public void setPartitions(List partitions) { + this.partitions = partitions; + } + + public long getTotalVolume() { + return totalVolume; + } + + public void setTotalVolume(long totalVolume) { + this.totalVolume = totalVolume; + } + } } From be76f7b02719820ba2d2c3f7c993ab6db9acf3bd Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 15 Oct 2015 14:28:05 +0900 Subject: [PATCH 2/8] Add unit test cases for PartitionedTableRewriter::findFilteredPartitionInfo --- .../planner/TestPartitionedTableRewriter.java | 122 ++++++++++++++++++ .../rewrite/rules/FilteredPartitionInfo.java | 53 ++++++++ .../rules/PartitionedTableRewriter.java | 76 +++++------ 3 files changed, 204 insertions(+), 47 deletions(-) create mode 100644 tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java create mode 100644 tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilteredPartitionInfo.java diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java new file mode 100644 index 0000000000..38c7a08bad --- /dev/null +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.engine.planner; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.LocalTajoTestingUtility; +import org.apache.tajo.OverridableConf; +import org.apache.tajo.QueryTestCaseBase; +import org.apache.tajo.algebra.Expr; +import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.TableDesc; +import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.engine.query.QueryContext; +import org.apache.tajo.plan.LogicalPlan; +import org.apache.tajo.plan.logical.*; +import org.apache.tajo.plan.rewrite.rules.FilteredPartitionInfo; +import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; +import org.apache.tajo.util.CommonTestingUtil; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +public class TestPartitionedTableRewriter extends QueryTestCaseBase { + + @Test + public final void testFindFilteredPartitionInfo() throws Exception { + String tableName = "testPartitionPruningUsingDirectories".toLowerCase(); + String canonicalTableName = CatalogUtil.getCanonicalTableName("\"TestPartitionedTableRewriter\"", tableName); + + executeString( + "create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key float8) " + + " as select l_orderkey, l_partkey, l_quantity from default.lineitem"); + + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), tableName); + assertNotNull(tableDesc); + + // Get all partitions + verifyFilteredPartitionInfo("SELECT * FROM " + canonicalTableName + " ORDER BY key", false); + + // Get partition with filter condition + verifyFilteredPartitionInfo("SELECT * FROM " + canonicalTableName + " WHERE key = 17.0 ORDER BY key", true); + + executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); + } + + private void verifyFilteredPartitionInfo(String sql, boolean hasFilterCondition) throws Exception { + Expr expr = sqlParser.parse(sql); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + ScanNode scanNode = null; + if(!hasFilterCondition) { + assertEquals(NodeType.SCAN, sortNode.getChild().getType()); + scanNode = sortNode.getChild(); + } else { + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + } + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + + FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); + assertNotNull(filteredPartitionInfo); + + Path[] filteredPaths = filteredPartitionInfo.getPartitionPaths(); + if (!hasFilterCondition) { + assertEquals(5, filteredPaths.length); + assertEquals("key=17.0", filteredPaths[0].getName()); + assertEquals("key=36.0", filteredPaths[1].getName()); + assertEquals("key=38.0", filteredPaths[2].getName()); + assertEquals("key=45.0", filteredPaths[3].getName()); + assertEquals("key=49.0", filteredPaths[4].getName()); + + assertEquals(filteredPartitionInfo.getTotalVolume(), 20L); + } else { + assertEquals(1, filteredPaths.length); + assertEquals("key=17.0", filteredPaths[0].getName()); + + assertEquals(filteredPartitionInfo.getTotalVolume(), 4L); + } + + assertEquals(filteredPaths.length, filteredPartitionInfo.getPartitions().size()); + for(int i = 0; i < filteredPaths.length; i++) { + PartitionDescProto partition = filteredPartitionInfo.getPartitions().get(i); + assertEquals(filteredPaths[i].toString(), partition.getPath()); + } + } +} diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilteredPartitionInfo.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilteredPartitionInfo.java new file mode 100644 index 0000000000..eda68bf198 --- /dev/null +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilteredPartitionInfo.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.plan.rewrite.rules; + +import org.apache.hadoop.fs.Path; +import org.apache.tajo.catalog.proto.CatalogProtos; + +import java.util.List; + +public class FilteredPartitionInfo { + private Path[] partitionPaths; + private List partitions; + private long totalVolume; + + public Path[] getPartitionPaths() { + return partitionPaths; + } + + public void setPartitionPaths(Path[] partitionPaths) { + this.partitionPaths = partitionPaths; + } + + public List getPartitions() { + return partitions; + } + + public void setPartitions(List partitions) { + this.partitions = partitions; + } + + public long getTotalVolume() { + return totalVolume; + } + + public void setTotalVolume(long totalVolume) { + this.totalVolume = totalVolume; + } +} diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 7cc1639405..9d87e2eb90 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -18,6 +18,7 @@ package org.apache.tajo.plan.rewrite.rules; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.commons.logging.Log; @@ -110,11 +111,21 @@ public String toString() { } } - private FilteredPartition findFilteredPaths(OverridableConf queryContext, String tableName, + @VisibleForTesting + public CatalogService getCatalog() { + return catalog; + } + + @VisibleForTesting + public void setCatalog(CatalogService catalog) { + this.catalog = catalog; + } + + public FilteredPartitionInfo findFilteredPartitionInfo(OverridableConf queryContext, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { - return findFilteredPaths(queryContext, tableName, partitionColumns, conjunctiveForms, tablePath, null); + return findFilteredPartitionInfo(queryContext, tableName, partitionColumns, conjunctiveForms, tablePath, null); } /** @@ -127,12 +138,12 @@ private FilteredPartition findFilteredPaths(OverridableConf queryContext, String * @return * @throws IOException */ - private FilteredPartition findFilteredPaths(OverridableConf queryContext, String tableName, + private FilteredPartitionInfo findFilteredPartitionInfo(OverridableConf queryContext, String tableName, Schema partitionColumns, EvalNode [] conjunctiveForms, Path tablePath, ScanNode scanNode) throws IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { - FilteredPartition filteredPartition = new FilteredPartition(); + FilteredPartitionInfo filteredPartition = new FilteredPartitionInfo(); Pair pair = null; Path [] filteredPaths = null; FileSystem fs = tablePath.getFileSystem(queryContext.getConf()); @@ -143,17 +154,17 @@ private FilteredPartition findFilteredPaths(OverridableConf queryContext, String if (conjunctiveForms == null) { partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - pair = findFilteredPartitionFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + pair = findFilteredPartitionInfoFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); } else { - pair = findFilteredPartitionsByPartitionDesc(partitions); + pair = findFilteredPartitionInfosByPartitionDesc(partitions); } } else { if (catalog.existPartitions(splits[0], splits[1])) { PartitionsByAlgebraProto request = getPartitionsAlgebraProto(splits[0], splits[1], conjunctiveForms); partitions = catalog.getPartitionsByAlgebra(request); - pair = findFilteredPartitionsByPartitionDesc(partitions); + pair = findFilteredPartitionInfosByPartitionDesc(partitions); } else { - pair = findFilteredPartitionFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + pair = findFilteredPartitionInfoFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); } } } catch (UnsupportedException ue) { @@ -162,9 +173,9 @@ private FilteredPartition findFilteredPaths(OverridableConf queryContext, String LOG.warn(ue.getMessage()); partitions = catalog.getPartitionsOfTable(splits[0], splits[1]); if (partitions.isEmpty()) { - pair = findFilteredPartitionFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); + pair = findFilteredPartitionInfoFromFileSystem(partitionColumns, conjunctiveForms, fs, tablePath); } else { - pair = findFilteredPartitionsByPartitionDesc(partitions); + pair = findFilteredPartitionInfosByPartitionDesc(partitions); } scanNode.setQual(AlgebraicUtil.createSingletonExprFromCNF(conjunctiveForms)); } @@ -184,7 +195,7 @@ private FilteredPartition findFilteredPaths(OverridableConf queryContext, String * @param partitions * @return */ - private Pair findFilteredPartitionsByPartitionDesc(List partitions) { + private Pair findFilteredPartitionInfosByPartitionDesc(List partitions) { Path [] filteredPaths = new Path[partitions.size()]; long totalVolume = 0L; for (int i = 0; i < partitions.size(); i++) { @@ -206,7 +217,7 @@ private Pair findFilteredPartitionsByPartitionDesc(List findFilteredPartitionFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, + private Pair findFilteredPartitionInfoFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, FileSystem fs, Path tablePath) throws IOException{ Pair pair = null; Path [] filteredPaths = null; @@ -343,8 +354,9 @@ private Pair getPathArrayAndTotalVolume(FileStatus[] fileStatuses) return new Pair<>(paths, totalVolume); } - public FilteredPartition findFilteredPartition(OverridableConf queryContext, ScanNode scanNode) throws IOException, - UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, + @VisibleForTesting + public FilteredPartitionInfo findFilteredPartitionInfo(OverridableConf queryContext, ScanNode scanNode) throws + IOException, UndefinedDatabaseException, UndefinedTableException, UndefinedPartitionMethodException, UndefinedOperatorException, UnsupportedException { TableDesc table = scanNode.getTableDesc(); PartitionMethodDesc partitionDesc = scanNode.getTableDesc().getPartitionMethod(); @@ -384,10 +396,10 @@ public FilteredPartition findFilteredPartition(OverridableConf queryContext, Sca } if (indexablePredicateSet.size() > 0) { // There are at least one indexable predicates - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, + return findFilteredPartitionInfo(queryContext, table.getName(), paritionValuesSchema, indexablePredicateSet.toArray(new EvalNode[indexablePredicateSet.size()]), new Path(table.getUri()), scanNode); } else { // otherwise, we will get all partition paths. - return findFilteredPaths(queryContext, table.getName(), paritionValuesSchema, null, new Path(table.getUri())); + return findFilteredPartitionInfo(queryContext, table.getName(), paritionValuesSchema, null, new Path(table.getUri())); } } @@ -511,7 +523,7 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP } try { - FilteredPartition filteredPartition = findFilteredPartition(queryContext, scanNode); + FilteredPartitionInfo filteredPartition = findFilteredPartitionInfo(queryContext, scanNode); plan.addHistory("PartitionTableRewriter chooses " + filteredPartition.getPartitionPaths().length + " of partitions"); PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); @@ -531,34 +543,4 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP return null; } } - - class FilteredPartition { - private Path[] partitionPaths; - private List partitions; - private long totalVolume; - - public Path[] getPartitionPaths() { - return partitionPaths; - } - - public void setPartitionPaths(Path[] partitionPaths) { - this.partitionPaths = partitionPaths; - } - - public List getPartitions() { - return partitions; - } - - public void setPartitions(List partitions) { - this.partitions = partitions; - } - - public long getTotalVolume() { - return totalVolume; - } - - public void setTotalVolume(long totalVolume) { - this.totalVolume = totalVolume; - } - } } From e3caa4e6e3258077ef138ec632952533cfa193f7 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 15 Oct 2015 14:30:13 +0900 Subject: [PATCH 3/8] Add parameter to TestPartitionedTableRewriter::verifyFilteredPartitionInfo --- .../planner/TestPartitionedTableRewriter.java | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java index 38c7a08bad..ebb15eeeaf 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java @@ -52,16 +52,21 @@ public final void testFindFilteredPartitionInfo() throws Exception { TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), tableName); assertNotNull(tableDesc); + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); + // Get all partitions - verifyFilteredPartitionInfo("SELECT * FROM " + canonicalTableName + " ORDER BY key", false); + verifyFilteredPartitionInfo(rewriter, "SELECT * FROM " + canonicalTableName + " ORDER BY key", false); // Get partition with filter condition - verifyFilteredPartitionInfo("SELECT * FROM " + canonicalTableName + " WHERE key = 17.0 ORDER BY key", true); + verifyFilteredPartitionInfo(rewriter, "SELECT * FROM " + canonicalTableName + " WHERE key = 17.0 ORDER BY key", + true); executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); } - private void verifyFilteredPartitionInfo(String sql, boolean hasFilterCondition) throws Exception { + private void verifyFilteredPartitionInfo(PartitionedTableRewriter rewriter, String sql, boolean hasFilterCondition) + throws Exception { Expr expr = sqlParser.parse(sql); QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); LogicalPlan newPlan = planner.createPlan(defaultContext, expr); @@ -89,8 +94,6 @@ private void verifyFilteredPartitionInfo(String sql, boolean hasFilterCondition) scanNode.setQual(selNode.getQual()); } - PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); - rewriter.setCatalog(catalog); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); From 9d4513cc82de42407619c28e46f635270ec371e5 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 15 Oct 2015 14:33:45 +0900 Subject: [PATCH 4/8] Remove unused variable --- .../apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 9d87e2eb90..06c6777f52 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -145,7 +145,6 @@ private FilteredPartitionInfo findFilteredPartitionInfo(OverridableConf queryCon FilteredPartitionInfo filteredPartition = new FilteredPartitionInfo(); Pair pair = null; - Path [] filteredPaths = null; FileSystem fs = tablePath.getFileSystem(queryContext.getConf()); String [] splits = CatalogUtil.splitFQTableName(tableName); List partitions = null; From 74d3a8722f801cbf9656497cb4a10cd91a3a48a0 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 16 Oct 2015 10:24:56 +0900 Subject: [PATCH 5/8] Trigger for travis CI build --- .../apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index 06c6777f52..c26eb61bb3 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -525,6 +525,7 @@ public Object visitScan(OverridableConf queryContext, LogicalPlan plan, LogicalP FilteredPartitionInfo filteredPartition = findFilteredPartitionInfo(queryContext, scanNode); plan.addHistory("PartitionTableRewriter chooses " + filteredPartition.getPartitionPaths().length + " of partitions"); + PartitionedTableScanNode rewrittenScanNode = plan.createNode(PartitionedTableScanNode.class); rewrittenScanNode.init(scanNode, filteredPartition.getPartitionPaths()); rewrittenScanNode.getTableDesc().getStats().setNumBytes(filteredPartition.getTotalVolume()); From 2df871409f0b85fb97cb3e6e7c09e5b7d5b11bea Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 22 Oct 2015 12:01:30 +0900 Subject: [PATCH 6/8] Add unit test cases for verifying PartitionedTableRewriter --- .../planner/TestPartitionedTableRewriter.java | 423 +++++++++++++++--- 1 file changed, 369 insertions(+), 54 deletions(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java index ebb15eeeaf..0bd475127b 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java @@ -18,55 +18,228 @@ package org.apache.tajo.engine.planner; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.tajo.LocalTajoTestingUtility; import org.apache.tajo.OverridableConf; import org.apache.tajo.QueryTestCaseBase; import org.apache.tajo.algebra.Expr; import org.apache.tajo.catalog.CatalogUtil; +import org.apache.tajo.catalog.Schema; import org.apache.tajo.catalog.TableDesc; -import org.apache.tajo.catalog.proto.CatalogProtos.PartitionDescProto; +import org.apache.tajo.catalog.TableMeta; +import org.apache.tajo.catalog.partition.PartitionMethodDesc; +import org.apache.tajo.catalog.proto.CatalogProtos; +import org.apache.tajo.common.TajoDataTypes; +import org.apache.tajo.conf.TajoConf; import org.apache.tajo.engine.query.QueryContext; import org.apache.tajo.plan.LogicalPlan; import org.apache.tajo.plan.logical.*; import org.apache.tajo.plan.rewrite.rules.FilteredPartitionInfo; import org.apache.tajo.plan.rewrite.rules.PartitionedTableRewriter; import org.apache.tajo.util.CommonTestingUtil; +import org.apache.tajo.util.FileUtil; +import org.apache.tajo.util.KeyValueSet; +import org.junit.AfterClass; +import org.junit.BeforeClass; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; public class TestPartitionedTableRewriter extends QueryTestCaseBase { - @Test - public final void testFindFilteredPartitionInfo() throws Exception { - String tableName = "testPartitionPruningUsingDirectories".toLowerCase(); - String canonicalTableName = CatalogUtil.getCanonicalTableName("\"TestPartitionedTableRewriter\"", tableName); + final static String PARTITION_TABLE_NAME = "tb_partition"; + final static String MULTIPLE_PARTITION_TABLE_NAME = "tb_multiple_partition"; - executeString( - "create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key float8) " - + " as select l_orderkey, l_partkey, l_quantity from default.lineitem"); + @BeforeClass + public static void setUp() throws Exception { + FileSystem fs = FileSystem.get(conf); + Path rootDir = TajoConf.getWarehouseDir(testingCluster.getConfiguration()); - TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), tableName); + Schema schema = new Schema(); + schema.addColumn("n_nationkey", TajoDataTypes.Type.INT8); + schema.addColumn("n_name", TajoDataTypes.Type.TEXT); + schema.addColumn("n_regionkey", TajoDataTypes.Type.INT8); + + TableMeta meta = CatalogUtil.newTableMeta("TEXT", new KeyValueSet()); + + createExternalTableIncludedOnePartitionKeyColumn(fs, rootDir, schema, meta); + createExternalTableIncludedMultiplePartitionKeyColumns(fs, rootDir, schema, meta); + } + + private static void createExternalTableIncludedOnePartitionKeyColumn(FileSystem fs, Path rootDir, Schema schema, + TableMeta meta) throws Exception { + Schema partSchema = new Schema(); + partSchema.addColumn("key", TajoDataTypes.Type.TEXT); + + PartitionMethodDesc partitionMethodDesc = + new PartitionMethodDesc("TestPartitionedTableRewriter", PARTITION_TABLE_NAME, + CatalogProtos.PartitionType.COLUMN, "key", partSchema); + + Path tablePath = new Path(rootDir, PARTITION_TABLE_NAME); + fs.mkdirs(tablePath); + + client.createExternalTable(PARTITION_TABLE_NAME, schema, tablePath.toUri(), meta, partitionMethodDesc); + + TableDesc tableDesc = client.getTableDesc(PARTITION_TABLE_NAME); + assertNotNull(tableDesc); + + Path path = new Path(tableDesc.getUri().toString() + "/key=part123"); + fs.mkdirs(path); + FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key=part456"); + fs.mkdirs(path); + FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key=part789"); + fs.mkdirs(path); + FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data")); + } + + private static void createExternalTableIncludedMultiplePartitionKeyColumns(FileSystem fs, Path rootDir, + Schema schema, TableMeta meta) throws Exception { + Schema partSchema = new Schema(); + partSchema.addColumn("key1", TajoDataTypes.Type.TEXT); + partSchema.addColumn("key2", TajoDataTypes.Type.TEXT); + partSchema.addColumn("key3", TajoDataTypes.Type.INT8); + + PartitionMethodDesc partitionMethodDesc = + new PartitionMethodDesc("TestPartitionedTableRewriter", MULTIPLE_PARTITION_TABLE_NAME, + CatalogProtos.PartitionType.COLUMN, "key1,key2,key3", partSchema); + + Path tablePath = new Path(rootDir, MULTIPLE_PARTITION_TABLE_NAME); + fs.mkdirs(tablePath); + + client.createExternalTable(MULTIPLE_PARTITION_TABLE_NAME, schema, tablePath.toUri(), meta, partitionMethodDesc); + + TableDesc tableDesc = client.getTableDesc(MULTIPLE_PARTITION_TABLE_NAME); assertNotNull(tableDesc); + Path path = new Path(tableDesc.getUri().toString() + "/key1=part123"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=1"); + fs.mkdirs(path); + FileUtil.writeTextToFile("1|ARGENTINA|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key1=part123/key2=supp123/key3=2"); + fs.mkdirs(path); + FileUtil.writeTextToFile("2|BRAZIL|1", new Path(path, "data")); + + path = new Path(tableDesc.getUri().toString() + "/key1=part789"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789"); + fs.mkdirs(path); + path = new Path(tableDesc.getUri().toString() + "/key1=part789/key2=supp789/key3=3"); + fs.mkdirs(path); + FileUtil.writeTextToFile("3|CANADA|1", new Path(path, "data")); + } + + @AfterClass + public static void tearDown() throws Exception { + client.executeQuery("DROP TABLE IF EXISTS " + PARTITION_TABLE_NAME + " PURGE;"); + client.executeQuery("DROP TABLE IF EXISTS " + MULTIPLE_PARTITION_TABLE_NAME + " PURGE;"); + } + + @Test + public void testFilterIncludePartitionKeyColumn() throws Exception { + Expr expr = sqlParser.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part456' ORDER BY key"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + + FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); + assertNotNull(filteredPartitionInfo); + + Path[] filteredPaths = filteredPartitionInfo.getPartitionPaths(); + assertEquals(1, filteredPaths.length); + assertEquals("key=part456", filteredPaths[0].getName()); + } + + @Test + public void testWithoutAnyFilters() throws Exception { + Expr expr = sqlParser.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " ORDER BY key"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SCAN, sortNode.getChild().getType()); + ScanNode scanNode = sortNode.getChild(); + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); - rewriter.setCatalog(catalog); + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); - // Get all partitions - verifyFilteredPartitionInfo(rewriter, "SELECT * FROM " + canonicalTableName + " ORDER BY key", false); + FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); + assertNotNull(filteredPartitionInfo); - // Get partition with filter condition - verifyFilteredPartitionInfo(rewriter, "SELECT * FROM " + canonicalTableName + " WHERE key = 17.0 ORDER BY key", - true); + Path[] filteredPaths = filteredPartitionInfo.getPartitionPaths(); + assertEquals(3, filteredPaths.length); + assertEquals("key=part123", filteredPaths[0].getName()); + assertEquals("key=part456", filteredPaths[1].getName()); + assertEquals("key=part789", filteredPaths[2].getName()); + } - executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); + @Test + public void testFilterIncludeNonExistingPartitionValue() throws Exception { + Expr expr = sqlParser.parse("SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE key = 'part123456789'"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + + FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); + assertNotNull(filteredPartitionInfo); + + assertEquals(0, filteredPartitionInfo.getPartitionPaths().length); } - private void verifyFilteredPartitionInfo(PartitionedTableRewriter rewriter, String sql, boolean hasFilterCondition) - throws Exception { + @Test + public void testFilterIncludeNonPartitionKeyColumn() throws Exception { + String sql = "SELECT * FROM " + PARTITION_TABLE_NAME + " WHERE n_nationkey = 1"; Expr expr = sqlParser.parse(sql); QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); LogicalPlan newPlan = planner.createPlan(defaultContext, expr); @@ -77,49 +250,191 @@ private void verifyFilteredPartitionInfo(PartitionedTableRewriter rewriter, Stri ProjectionNode projNode = root.getChild(); + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + + FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); + assertNotNull(filteredPartitionInfo); + + Path[] filteredPaths = filteredPartitionInfo.getPartitionPaths(); + assertEquals(3, filteredPaths.length); + assertEquals("key=part123", filteredPaths[0].getName()); + assertEquals("key=part456", filteredPaths[1].getName()); + assertEquals("key=part789", filteredPaths[2].getName()); + } + + @Test + public void testFilterIncludeEveryPartitionKeyColumn() throws Exception { + Expr expr = sqlParser.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part789' and key2 = 'supp789' and key3=3"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SELECTION, projNode.getChild().getType()); + SelectionNode selNode = projNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + + FilteredPartitionInfo filteredPartitionInfo= rewriter.findFilteredPartitionInfo(conf, scanNode); + assertNotNull(filteredPartitionInfo); + + Path[] filteredPaths = filteredPartitionInfo.getPartitionPaths(); + assertEquals(1, filteredPaths.length); + assertEquals("key3=3", filteredPaths[0].getName()); + assertEquals("key2=supp789", filteredPaths[0].getParent().getName()); + assertEquals("key1=part789", filteredPaths[0].getParent().getParent().getName()); + } + + @Test + public void testFilterIncludeSomeOfPartitionKeyColumns() throws Exception { + Expr expr = sqlParser.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part123' and key2 = 'supp123' order by n_nationkey"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + + FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); + assertNotNull(filteredPartitionInfo); + + Path[] filteredPaths = filteredPartitionInfo.getPartitionPaths(); + assertEquals(2, filteredPaths.length); + + assertEquals("key3=1", filteredPaths[0].getName()); + assertEquals("key2=supp123", filteredPaths[0].getParent().getName()); + assertEquals("key1=part123", filteredPaths[0].getParent().getParent().getName()); + + assertEquals("key3=2", filteredPaths[1].getName()); + assertEquals("key2=supp123", filteredPaths[1].getParent().getName()); + assertEquals("key1=part123", filteredPaths[1].getParent().getParent().getName()); + } + + @Test + public void testFilterIncludeNonPartitionKeyColumns() throws Exception { + Expr expr = sqlParser.parse("SELECT * FROM " + MULTIPLE_PARTITION_TABLE_NAME + + " WHERE key1 = 'part123' and n_nationkey >= 2 order by n_nationkey"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + + assertEquals(NodeType.SORT, projNode.getChild().getType()); + SortNode sortNode = projNode.getChild(); + + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); + + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); + + FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); + assertNotNull(filteredPartitionInfo); + + Path[] filteredPaths = filteredPartitionInfo.getPartitionPaths(); + assertEquals(2, filteredPaths.length); + + assertEquals("key3=1", filteredPaths[0].getName()); + assertEquals("key2=supp123", filteredPaths[0].getParent().getName()); + assertEquals("key1=part123", filteredPaths[0].getParent().getParent().getName()); + + assertEquals("key3=2", filteredPaths[1].getName()); + assertEquals("key2=supp123", filteredPaths[1].getParent().getName()); + assertEquals("key1=part123", filteredPaths[1].getParent().getParent().getName()); + } + + @Test + public final void testPartitionPruningWitCTAS() throws Exception { + String tableName = "testPartitionPruningUsingDirectories".toLowerCase(); + String canonicalTableName = CatalogUtil.getCanonicalTableName("\"TestPartitionedTableRewriter\"", tableName); + + executeString( + "create table " + canonicalTableName + "(col1 int4, col2 int4) partition by column(key float8) " + + " as select l_orderkey, l_partkey, l_quantity from default.lineitem"); + + TableDesc tableDesc = catalog.getTableDesc(getCurrentDatabase(), tableName); + assertNotNull(tableDesc); + + // With a filter which checks a partition key column + Expr expr = sqlParser.parse("SELECT * FROM " + canonicalTableName + " WHERE key <= 40.0 ORDER BY key"); + QueryContext defaultContext = LocalTajoTestingUtility.createDummyContext(testingCluster.getConfiguration()); + LogicalPlan newPlan = planner.createPlan(defaultContext, expr); + LogicalNode plan = newPlan.getRootBlock().getRoot(); + + assertEquals(NodeType.ROOT, plan.getType()); + LogicalRootNode root = (LogicalRootNode) plan; + + ProjectionNode projNode = root.getChild(); + assertEquals(NodeType.SORT, projNode.getChild().getType()); SortNode sortNode = projNode.getChild(); - ScanNode scanNode = null; - if(!hasFilterCondition) { - assertEquals(NodeType.SCAN, sortNode.getChild().getType()); - scanNode = sortNode.getChild(); - } else { - assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); - SelectionNode selNode = sortNode.getChild(); - assertTrue(selNode.hasQual()); + assertEquals(NodeType.SELECTION, sortNode.getChild().getType()); + SelectionNode selNode = sortNode.getChild(); + assertTrue(selNode.hasQual()); - assertEquals(NodeType.SCAN, selNode.getChild().getType()); - scanNode = selNode.getChild(); - scanNode.setQual(selNode.getQual()); - } + assertEquals(NodeType.SCAN, selNode.getChild().getType()); + ScanNode scanNode = selNode.getChild(); + scanNode.setQual(selNode.getQual()); + PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); assertNotNull(filteredPartitionInfo); Path[] filteredPaths = filteredPartitionInfo.getPartitionPaths(); - if (!hasFilterCondition) { - assertEquals(5, filteredPaths.length); - assertEquals("key=17.0", filteredPaths[0].getName()); - assertEquals("key=36.0", filteredPaths[1].getName()); - assertEquals("key=38.0", filteredPaths[2].getName()); - assertEquals("key=45.0", filteredPaths[3].getName()); - assertEquals("key=49.0", filteredPaths[4].getName()); - - assertEquals(filteredPartitionInfo.getTotalVolume(), 20L); - } else { - assertEquals(1, filteredPaths.length); - assertEquals("key=17.0", filteredPaths[0].getName()); - - assertEquals(filteredPartitionInfo.getTotalVolume(), 4L); - } - - assertEquals(filteredPaths.length, filteredPartitionInfo.getPartitions().size()); - for(int i = 0; i < filteredPaths.length; i++) { - PartitionDescProto partition = filteredPartitionInfo.getPartitions().get(i); - assertEquals(filteredPaths[i].toString(), partition.getPath()); - } + assertEquals(3, filteredPaths.length); + assertEquals("key=17.0", filteredPaths[0].getName()); + assertEquals("key=36.0", filteredPaths[1].getName()); + assertEquals("key=38.0", filteredPaths[2].getName()); + + executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); } } From 3cc15bf5250c9dc21b14d416289a01160c630bb8 Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Thu, 22 Oct 2015 12:05:06 +0900 Subject: [PATCH 7/8] Remove the list of partitions in FilteredPartitionInfo --- .../tajo/plan/rewrite/rules/FilteredPartitionInfo.java | 9 --------- .../plan/rewrite/rules/PartitionedTableRewriter.java | 3 --- 2 files changed, 12 deletions(-) diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilteredPartitionInfo.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilteredPartitionInfo.java index eda68bf198..b76a3713cf 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilteredPartitionInfo.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/FilteredPartitionInfo.java @@ -24,7 +24,6 @@ public class FilteredPartitionInfo { private Path[] partitionPaths; - private List partitions; private long totalVolume; public Path[] getPartitionPaths() { @@ -35,14 +34,6 @@ public void setPartitionPaths(Path[] partitionPaths) { this.partitionPaths = partitionPaths; } - public List getPartitions() { - return partitions; - } - - public void setPartitions(List partitions) { - this.partitions = partitions; - } - public long getTotalVolume() { return totalVolume; } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index c26eb61bb3..ce4804bee3 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -181,9 +181,6 @@ private FilteredPartitionInfo findFilteredPartitionInfo(OverridableConf queryCon filteredPartition.setPartitionPaths(pair.getFirst()); filteredPartition.setTotalVolume(pair.getSecond()); - if (partitions != null) { - filteredPartition.setPartitions(partitions); - } LOG.info("Filtered directory or files: " + pair.getFirst().length); return filteredPartition; } From 3a3d88b64429454afd6bbc02dd1df5bc4b28766a Mon Sep 17 00:00:00 2001 From: JaeHwa Jung Date: Fri, 23 Oct 2015 11:42:54 +0900 Subject: [PATCH 8/8] Fix table volume calculation bug and add unit test cases --- .../planner/TestPartitionedTableRewriter.java | 24 +++++++++++++ .../rules/PartitionedTableRewriter.java | 34 ++++++++++++------- 2 files changed, 45 insertions(+), 13 deletions(-) diff --git a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java index 0bd475127b..0b6a0d37e7 100644 --- a/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java +++ b/tajo-core-tests/src/test/java/org/apache/tajo/engine/planner/TestPartitionedTableRewriter.java @@ -167,6 +167,7 @@ public void testFilterIncludePartitionKeyColumn() throws Exception { scanNode.setQual(selNode.getQual()); PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); @@ -175,6 +176,8 @@ public void testFilterIncludePartitionKeyColumn() throws Exception { Path[] filteredPaths = filteredPartitionInfo.getPartitionPaths(); assertEquals(1, filteredPaths.length); assertEquals("key=part456", filteredPaths[0].getName()); + + assertEquals(10L, filteredPartitionInfo.getTotalVolume()); } @Test @@ -196,6 +199,7 @@ public void testWithoutAnyFilters() throws Exception { ScanNode scanNode = sortNode.getChild(); PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); @@ -206,6 +210,8 @@ public void testWithoutAnyFilters() throws Exception { assertEquals("key=part123", filteredPaths[0].getName()); assertEquals("key=part456", filteredPaths[1].getName()); assertEquals("key=part789", filteredPaths[2].getName()); + + assertEquals(33L, filteredPartitionInfo.getTotalVolume()); } @Test @@ -229,12 +235,15 @@ public void testFilterIncludeNonExistingPartitionValue() throws Exception { scanNode.setQual(selNode.getQual()); PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); assertNotNull(filteredPartitionInfo); assertEquals(0, filteredPartitionInfo.getPartitionPaths().length); + + assertEquals(0L, filteredPartitionInfo.getTotalVolume()); } @Test @@ -259,6 +268,7 @@ public void testFilterIncludeNonPartitionKeyColumn() throws Exception { scanNode.setQual(selNode.getQual()); PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); @@ -269,6 +279,8 @@ public void testFilterIncludeNonPartitionKeyColumn() throws Exception { assertEquals("key=part123", filteredPaths[0].getName()); assertEquals("key=part456", filteredPaths[1].getName()); assertEquals("key=part789", filteredPaths[2].getName()); + + assertEquals(33L, filteredPartitionInfo.getTotalVolume()); } @Test @@ -293,6 +305,7 @@ public void testFilterIncludeEveryPartitionKeyColumn() throws Exception { scanNode.setQual(selNode.getQual()); PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo= rewriter.findFilteredPartitionInfo(conf, scanNode); @@ -303,6 +316,8 @@ public void testFilterIncludeEveryPartitionKeyColumn() throws Exception { assertEquals("key3=3", filteredPaths[0].getName()); assertEquals("key2=supp789", filteredPaths[0].getParent().getName()); assertEquals("key1=part789", filteredPaths[0].getParent().getParent().getName()); + + assertEquals(10L, filteredPartitionInfo.getTotalVolume()); } @Test @@ -330,6 +345,7 @@ public void testFilterIncludeSomeOfPartitionKeyColumns() throws Exception { scanNode.setQual(selNode.getQual()); PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); @@ -345,6 +361,8 @@ public void testFilterIncludeSomeOfPartitionKeyColumns() throws Exception { assertEquals("key3=2", filteredPaths[1].getName()); assertEquals("key2=supp123", filteredPaths[1].getParent().getName()); assertEquals("key1=part123", filteredPaths[1].getParent().getParent().getName()); + + assertEquals(23L, filteredPartitionInfo.getTotalVolume()); } @Test @@ -372,6 +390,7 @@ public void testFilterIncludeNonPartitionKeyColumns() throws Exception { scanNode.setQual(selNode.getQual()); PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); @@ -387,6 +406,8 @@ public void testFilterIncludeNonPartitionKeyColumns() throws Exception { assertEquals("key3=2", filteredPaths[1].getName()); assertEquals("key2=supp123", filteredPaths[1].getParent().getName()); assertEquals("key1=part123", filteredPaths[1].getParent().getParent().getName()); + + assertEquals(23L, filteredPartitionInfo.getTotalVolume()); } @Test @@ -424,6 +445,7 @@ public final void testPartitionPruningWitCTAS() throws Exception { scanNode.setQual(selNode.getQual()); PartitionedTableRewriter rewriter = new PartitionedTableRewriter(); + rewriter.setCatalog(catalog); OverridableConf conf = CommonTestingUtil.getSessionVarsForTest(); FilteredPartitionInfo filteredPartitionInfo = rewriter.findFilteredPartitionInfo(conf, scanNode); @@ -435,6 +457,8 @@ public final void testPartitionPruningWitCTAS() throws Exception { assertEquals("key=36.0", filteredPaths[1].getName()); assertEquals("key=38.0", filteredPaths[2].getName()); + assertEquals(12L, filteredPartitionInfo.getTotalVolume()); + executeString("DROP TABLE " + canonicalTableName + " PURGE").close(); } } diff --git a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java index ce4804bee3..69ad468f84 100644 --- a/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java +++ b/tajo-plan/src/main/java/org/apache/tajo/plan/rewrite/rules/PartitionedTableRewriter.java @@ -215,10 +215,8 @@ private Pair findFilteredPartitionInfosByPartitionDesc(List findFilteredPartitionInfoFromFileSystem(Schema partitionColumns, EvalNode [] conjunctiveForms, FileSystem fs, Path tablePath) throws IOException{ - Pair pair = null; Path [] filteredPaths = null; PathFilter [] filters; - long totalVolume = 0L; if (conjunctiveForms == null) { filters = buildAllAcceptingPathFilters(partitionColumns); @@ -227,17 +225,29 @@ private Pair findFilteredPartitionInfoFromFileSystem(Schema partit } // loop from one to the number of partition columns - pair = getPathArrayAndTotalVolume(fs.listStatus(tablePath, filters[0])); - filteredPaths = pair.getFirst(); - totalVolume += pair.getSecond(); + filteredPaths = toPathArray(fs.listStatus(tablePath, filters[0])); for (int i = 1; i < partitionColumns.size(); i++) { // Get all file status matched to a ith level path filter. - pair = getPathArrayAndTotalVolume(fs.listStatus(filteredPaths, filters[i])); - filteredPaths = pair.getFirst(); - totalVolume += pair.getSecond(); + filteredPaths = toPathArray(fs.listStatus(filteredPaths, filters[i])); } - return new Pair<>(filteredPaths, totalVolume); + + Long totalVolume = getTableVolume(fs, filteredPaths); + Pair pair = new Pair<>(filteredPaths, totalVolume); + return pair; + } + + private Long getTableVolume(FileSystem fs, Path[] paths) { + Long totalVolume = 0L; + try { + for (Path input : paths) { + ContentSummary summary = fs.getContentSummary(input); + totalVolume += summary.getLength(); + } + } catch (Throwable e) { + throw new TajoInternalError(e); + } + return totalVolume; } /** @@ -339,15 +349,13 @@ public static PartitionsByAlgebraProto getPartitionsAlgebraProto( return filters; } - private Pair getPathArrayAndTotalVolume(FileStatus[] fileStatuses) { + private Path [] toPathArray(FileStatus[] fileStatuses) { Path [] paths = new Path[fileStatuses.length]; - long totalVolume = 0L; for (int i = 0; i < fileStatuses.length; i++) { FileStatus fileStatus = fileStatuses[i]; paths[i] = fileStatus.getPath(); - totalVolume += fileStatus.getLen(); } - return new Pair<>(paths, totalVolume); + return paths; } @VisibleForTesting