Skip to content

Commit

Permalink
DRILL-2568: Drop filter plan node if all conjuncts have been pushed i…
Browse files Browse the repository at this point in the history
…nto the scan as part of partition pruning.

Check presence in children list before adding..

Split prune condition into conjuncts before removing from the original condition.  Add plan checking and row count validation to TestPartitionFilter tests.

Move more tests from TestPrune into TestPartitionFilter and added plan checking and results validation.

Remove TestPrune since these tests Are subsumed by the ones in TestPartitionFilter.
  • Loading branch information
Aman Sinha committed Mar 28, 2015
1 parent 20efb2f commit c11fcf7
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 90 deletions.
Expand Up @@ -71,7 +71,9 @@ public SqlOperator getOp() {
return booleanOp; return booleanOp;
} }
public void addChild(RexNode n) { public void addChild(RexNode n) {
children.add(n); if (!children.contains(n)) {
children.add(n);
}
} }
public List<RexNode> getChildren() { public List<RexNode> getChildren() {
return children; return children;
Expand Down
Expand Up @@ -58,6 +58,7 @@
import org.eigenbase.relopt.RelOptRuleOperand; import org.eigenbase.relopt.RelOptRuleOperand;
import org.eigenbase.relopt.RelOptUtil; import org.eigenbase.relopt.RelOptUtil;
import org.eigenbase.rex.RexNode; import org.eigenbase.rex.RexNode;
import org.eigenbase.rex.RexUtil;


import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
Expand Down Expand Up @@ -232,8 +233,11 @@ record = 0;
} }
} }


boolean canDropFilter = true;

if(newFiles.isEmpty()){ if(newFiles.isEmpty()){
newFiles.add(files.get(0)); newFiles.add(files.get(0));
canDropFilter = false;
} }


if(newFiles.size() == files.size()){ if(newFiles.size() == files.size()){
Expand All @@ -242,6 +246,10 @@ record = 0;


logger.debug("Pruned {} => {}", files, newFiles); logger.debug("Pruned {} => {}", files, newFiles);


List<RexNode> conjuncts = RelOptUtil.conjunctions(condition);
List<RexNode> pruneConjuncts = RelOptUtil.conjunctions(pruneCondition);
conjuncts.removeAll(pruneConjuncts);
RexNode newCondition = RexUtil.composeConjunction(filterRel.getCluster().getRexBuilder(), conjuncts, false);


final FileSelection newFileSelection = new FileSelection(newFiles, origSelection.getSelection().selectionRoot, true); final FileSelection newFileSelection = new FileSelection(newFiles, origSelection.getSelection().selectionRoot, true);
final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection); final FileGroupScan newScan = ((FileGroupScan)scanRel.getGroupScan()).clone(newFileSelection);
Expand All @@ -259,8 +267,12 @@ record = 0;
inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel)); inputRel = projectRel.copy(projectRel.getTraitSet(), Collections.singletonList(inputRel));
} }


final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel)); if (newCondition.isAlwaysTrue() && canDropFilter) {
call.transformTo(newFilter); call.transformTo(inputRel);
} else {
final RelNode newFilter = filterRel.copy(filterRel.getTraitSet(), Collections.singletonList(inputRel));
call.transformTo(newFilter);
}


}catch(Exception e){ }catch(Exception e){
logger.warn("Exception while trying to prune partition.", e); logger.warn("Exception while trying to prune partition.", e);
Expand Down
111 changes: 71 additions & 40 deletions exec/java-exec/src/test/java/org/apache/drill/TestPartitionFilter.java
Expand Up @@ -17,79 +17,86 @@
*/ */
package org.apache.drill; package org.apache.drill;



import static org.junit.Assert.assertEquals;

import org.apache.drill.common.util.TestTools; import org.apache.drill.common.util.TestTools;
import org.junit.Test; import org.junit.Test;


public class TestPartitionFilter extends BaseTestQuery{ public class TestPartitionFilter extends PlanTestBase {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPartitionFilter.class); static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestPartitionFilter.class);


static final String WORKING_PATH = TestTools.getWorkingPath(); static final String WORKING_PATH = TestTools.getWorkingPath();
static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources"; static final String TEST_RES_PATH = WORKING_PATH + "/src/test/resources";


@Test //Parquet: basic test with dir0 and dir1 filters in different orders private static void testExcludeFilter(String query, int expectedNumFiles,
String excludedFilterPattern, int expectedRowCount) throws Exception {
int actualRowCount = testSql(query);
assertEquals(expectedRowCount, actualRowCount);
String numFilesPattern = "numFiles=" + expectedNumFiles;
testPlanMatchingPatterns(query, new String[]{numFilesPattern}, new String[]{excludedFilterPattern});
}

private static void testIncludeFilter(String query, int expectedNumFiles,
String includedFilterPattern, int expectedRowCount) throws Exception {
int actualRowCount = testSql(query);
assertEquals(expectedRowCount, actualRowCount);
String numFilesPattern = "numFiles=" + expectedNumFiles;
testPlanMatchingPatterns(query, new String[]{numFilesPattern, includedFilterPattern}, new String[]{});
}

@Test //Parquet: basic test with dir0 and dir1 filters
public void testPartitionFilter1_Parquet() throws Exception { public void testPartitionFilter1_Parquet() throws Exception {
String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/parquet` where dir0=1994 and dir1='Q1'", TEST_RES_PATH); String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/parquet` where dir0=1994 and dir1='Q1'", TEST_RES_PATH);
String query2 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/parquet` where dir1='Q1' and dir0=1994", TEST_RES_PATH); testExcludeFilter(query, 1, "Filter", 10);
test(query1);
test(query2);
} }


@Test //Json: basic test with dir0 and dir1 filters in different orders @Test //Json: basic test with dir0 and dir1 filters
public void testPartitionFilter1_Json() throws Exception { public void testPartitionFilter1_Json() throws Exception {
String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/json` where dir0=1994 and dir1='Q1'", TEST_RES_PATH); String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/json` where dir0=1994 and dir1='Q1'", TEST_RES_PATH);
String query2 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/json` where dir1='Q1' and dir0=1994", TEST_RES_PATH); testExcludeFilter(query, 1, "Filter", 10);
test(query1);
test(query2);
} }


@Test //CSV: basic test with dir0 and dir1 filters in different orders @Test //CSV: basic test with dir0 and dir1 filters in
public void testPartitionFilter1_Csv() throws Exception { public void testPartitionFilter1_Csv() throws Exception {
String query1 = String.format("select * from dfs_test.`%s/multilevel/csv` where dir0=1994 and dir1='Q1'", TEST_RES_PATH); String query = String.format("select * from dfs_test.`%s/multilevel/csv` where dir0=1994 and dir1='Q1'", TEST_RES_PATH);
String query2 = String.format("select * from dfs_test.`%s/multilevel/csv` where dir1='Q1' and dir0=1994", TEST_RES_PATH); testExcludeFilter(query, 1, "Filter", 10);
test(query1);
test(query2);
} }


@Test //Parquet: partition filters are combined with regular columns in an AND @Test //Parquet: partition filters are combined with regular columns in an AND
public void testPartitionFilter2_Parquet() throws Exception { public void testPartitionFilter2_Parquet() throws Exception {
String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/parquet` where o_custkey < 1000 and dir0=1994 and dir1='Q1'", TEST_RES_PATH); String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/parquet` where o_custkey < 1000 and dir0=1994 and dir1='Q1'", TEST_RES_PATH);
String query2 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/parquet` where dir1='Q1' and o_custkey < 1000 and dir0=1994", TEST_RES_PATH); testIncludeFilter(query, 1, "Filter", 5);
test(query1);
test(query2);
} }


@Test //Json: partition filters are combined with regular columns in an AND @Test //Json: partition filters are combined with regular columns in an AND
public void testPartitionFilter2_Json() throws Exception { public void testPartitionFilter2_Json() throws Exception {
String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/json` where o_custkey < 1000 and dir0=1994 and dir1='Q1'", TEST_RES_PATH); String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/json` where o_custkey < 1000 and dir0=1994 and dir1='Q1'", TEST_RES_PATH);
String query2 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/json` where dir1='Q1' and o_custkey < 1000 and dir0=1994", TEST_RES_PATH); testIncludeFilter(query, 1, "Filter", 5);
test(query1);
test(query2);
} }


@Test //CSV: partition filters are combined with regular columns in an AND @Test //CSV: partition filters are combined with regular columns in an AND
public void testPartitionFilter2_Csv() throws Exception { public void testPartitionFilter2_Csv() throws Exception {
String query1 = String.format("select * from dfs_test.`%s/multilevel/csv` where columns[1] < 1000 and dir0=1994 and dir1='Q1'", TEST_RES_PATH); String query = String.format("select * from dfs_test.`%s/multilevel/csv` where columns[1] < 1000 and dir0=1994 and dir1='Q1'", TEST_RES_PATH);
String query2 = String.format("select * from dfs_test.`%s/multilevel/csv` where dir1='Q1' and columns[1] < 1000 and dir0=1994", TEST_RES_PATH); testIncludeFilter(query, 1, "Filter", 5);
test(query1);
test(query2);
} }


@Test //Parquet: partition filters are ANDed and belong to a top-level OR @Test //Parquet: partition filters are ANDed and belong to a top-level OR
public void testPartitionFilter3_Parquet() throws Exception { public void testPartitionFilter3_Parquet() throws Exception {
String query1 = String.format("select * from dfs_test.`%s/multilevel/parquet` where (dir0=1994 and dir1='Q1' and o_custkey < 500) or (dir0=1995 and dir1='Q2' and o_custkey > 500)", TEST_RES_PATH); String query = String.format("select * from dfs_test.`%s/multilevel/parquet` where (dir0=1994 and dir1='Q1' and o_custkey < 500) or (dir0=1995 and dir1='Q2' and o_custkey > 500)", TEST_RES_PATH);
test(query1); testIncludeFilter(query, 2, "Filter", 8);
} }


@Test //Json: partition filters are ANDed and belong to a top-level OR @Test //Json: partition filters are ANDed and belong to a top-level OR
public void testPartitionFilter3_Json() throws Exception { public void testPartitionFilter3_Json() throws Exception {
String query1 = String.format("select * from dfs_test.`%s/multilevel/json` where (dir0=1994 and dir1='Q1' and o_custkey < 500) or (dir0=1995 and dir1='Q2' and o_custkey > 500)", TEST_RES_PATH); String query = String.format("select * from dfs_test.`%s/multilevel/json` where (dir0=1994 and dir1='Q1' and o_custkey < 500) or (dir0=1995 and dir1='Q2' and o_custkey > 500)", TEST_RES_PATH);
test(query1); testIncludeFilter(query, 2, "Filter", 8);
} }


@Test //CSV: partition filters are ANDed and belong to a top-level OR @Test //CSV: partition filters are ANDed and belong to a top-level OR
public void testPartitionFilter3_Csv() throws Exception { public void testPartitionFilter3_Csv() throws Exception {
String query1 = String.format("select * from dfs_test.`%s/multilevel/csv` where (dir0=1994 and dir1='Q1' and columns[1] < 500) or (dir0=1995 and dir1='Q2' and columns[1] > 500)", TEST_RES_PATH); String query = String.format("select * from dfs_test.`%s/multilevel/csv` where (dir0=1994 and dir1='Q1' and columns[1] < 500) or (dir0=1995 and dir1='Q2' and columns[1] > 500)", TEST_RES_PATH);
test(query1); testIncludeFilter(query, 2, "Filter", 8);
} }


@Test //Parquet: filters contain join conditions and partition filters @Test //Parquet: filters contain join conditions and partition filters
Expand All @@ -112,20 +119,44 @@ public void testPartitionFilter4_Csv() throws Exception {


@Test // Parquet: IN filter @Test // Parquet: IN filter
public void testPartitionFilter5_Parquet() throws Exception { public void testPartitionFilter5_Parquet() throws Exception {
String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/parquet` where dir0 in (1995, 1996)", TEST_RES_PATH); String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/parquet` where dir0 in (1995, 1996)", TEST_RES_PATH);
test(query1); testExcludeFilter(query, 8, "Filter", 80);
} }


@Test // Json: IN filter @Test // Json: IN filter
public void testPartitionFilter5_Json() throws Exception { public void testPartitionFilter5_Json() throws Exception {
String query1 = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/json` where dir0 in (1995, 1996)", TEST_RES_PATH); String query = String.format("select dir0, dir1, o_custkey, o_orderdate from dfs_test.`%s/multilevel/json` where dir0 in (1995, 1996)", TEST_RES_PATH);
test(query1); testExcludeFilter(query, 8, "Filter", 80);
} }


@Test // CSV: IN filter @Test // CSV: IN filter
public void testPartitionFilter5_Csv() throws Exception { public void testPartitionFilter5_Csv() throws Exception {
String query1 = String.format("select * from dfs_test.`%s/multilevel/csv` where dir0 in (1995, 1996)", TEST_RES_PATH); String query = String.format("select * from dfs_test.`%s/multilevel/csv` where dir0 in (1995, 1996)", TEST_RES_PATH);
test(query1); testExcludeFilter(query, 8, "Filter", 80);
}

@Test // Parquet: one side of OR has partition filter only, other side has both partition filter and non-partition filter
public void testPartitionFilter6_Parquet() throws Exception {
String query = String.format("select * from dfs_test.`%s/multilevel/parquet` where (dir0=1995 and o_totalprice < 40000) or dir0=1996", TEST_RES_PATH);
testIncludeFilter(query, 8, "Filter", 46);
}

@Test // Parquet: trivial case with 1 partition filter
public void testPartitionFilter7_Parquet() throws Exception {
String query = String.format("select * from dfs_test.`%s/multilevel/parquet` where dir0=1995", TEST_RES_PATH);
testExcludeFilter(query, 4, "Filter", 40);
}

@Test // Parquet: partition filter on subdirectory only
public void testPartitionFilter8_Parquet() throws Exception {
String query = String.format("select * from dfs_test.`%s/multilevel/parquet` where dir1 in ('Q1','Q4')", TEST_RES_PATH);
testExcludeFilter(query, 6, "Filter", 60);
}

@Test // Parquet: partition filter on subdirectory only plus non-partition filter
public void testPartitionFilter9_Parquet() throws Exception {
String query = String.format("select * from dfs_test.`%s/multilevel/parquet` where dir1 in ('Q1','Q4') and o_totalprice < 40000", TEST_RES_PATH);
testIncludeFilter(query, 6, "Filter", 9);
} }


} }

This file was deleted.

0 comments on commit c11fcf7

Please sign in to comment.