Skip to content

Commit

Permalink
DRILL-6413: Update ParquetFilterBuilder.visitBooleanOperator to handl…
Browse files Browse the repository at this point in the history
…e simplified boolean expression

closes #1269
  • Loading branch information
arina-ielchiieva committed May 18, 2018
1 parent 55f2599 commit 5d22d70
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 17 deletions.
Expand Up @@ -255,8 +255,7 @@ public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtili
logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter)); logger.debug("materializedFilter : {}", ExpressionStringBuilder.toString(materializedFilter));


Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter); Set<LogicalExpression> constantBoundaries = ConstantExpressionIdentifier.getConstantExpressionSet(materializedFilter);
filterPredicate = (ParquetFilterPredicate) ParquetFilterBuilder.buildParquetFilterPredicate( filterPredicate = ParquetFilterBuilder.buildParquetFilterPredicate(materializedFilter, constantBoundaries, udfUtilities);
materializedFilter, constantBoundaries, udfUtilities);


if (filterPredicate == null) { if (filterPredicate == null) {
return null; return null;
Expand Down
Expand Up @@ -42,6 +42,7 @@
import org.apache.drill.exec.expr.holders.VarDecimalHolder; import org.apache.drill.exec.expr.holders.VarDecimalHolder;
import org.apache.drill.exec.expr.stat.ParquetBooleanPredicates; import org.apache.drill.exec.expr.stat.ParquetBooleanPredicates;
import org.apache.drill.exec.expr.stat.ParquetComparisonPredicates; import org.apache.drill.exec.expr.stat.ParquetComparisonPredicates;
import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
import org.apache.drill.exec.expr.stat.ParquetIsPredicates; import org.apache.drill.exec.expr.stat.ParquetIsPredicates;
import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.util.DecimalUtility;
Expand All @@ -54,7 +55,7 @@


/** /**
* A visitor which visits a materialized logical expression, and build ParquetFilterPredicate * A visitor which visits a materialized logical expression, and build ParquetFilterPredicate
* If a visitXXX method returns null, that means the corresponding filter branch is not qualified for pushdown. * If a visitXXX method returns null, that means the corresponding filter branch is not qualified for push down.
*/ */
public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression, Set<LogicalExpression>, RuntimeException> { public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression, Set<LogicalExpression>, RuntimeException> {
static final Logger logger = LoggerFactory.getLogger(ParquetFilterBuilder.class); static final Logger logger = LoggerFactory.getLogger(ParquetFilterBuilder.class);
Expand All @@ -66,12 +67,18 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
* @param constantBoundaries set of constant expressions * @param constantBoundaries set of constant expressions
* @param udfUtilities udf utilities * @param udfUtilities udf utilities
* *
* @return logical expression * @return parquet filter predicate
*/ */
public static LogicalExpression buildParquetFilterPredicate(LogicalExpression expr, final Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities) { public static ParquetFilterPredicate buildParquetFilterPredicate(LogicalExpression expr, final Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities) {
return expr.accept(new ParquetFilterBuilder(udfUtilities), constantBoundaries); LogicalExpression logicalExpression = expr.accept(new ParquetFilterBuilder(udfUtilities), constantBoundaries);
if (logicalExpression instanceof ParquetFilterPredicate) {
return (ParquetFilterPredicate) logicalExpression;
}
logger.debug("Logical expression {} was not qualified for filter push down", logicalExpression);
return null;
} }



private ParquetFilterBuilder(UdfUtilities udfUtilities) { private ParquetFilterBuilder(UdfUtilities udfUtilities) {
this.udfUtilities = udfUtilities; this.udfUtilities = udfUtilities;
} }
Expand Down Expand Up @@ -150,6 +157,10 @@ public LogicalExpression visitBooleanOperator(BooleanOperator op, Set<LogicalExp
return null; return null;
} }
} else { } else {
if (childPredicate instanceof TypedFieldExpr) {
// Calcite simplifies `= true` expression to field name, wrap it with is true predicate
childPredicate = new ParquetIsPredicates.IsTruePredicate(childPredicate);
}
childPredicates.add(childPredicate); childPredicates.add(childPredicate);
} }
} }
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.parquet.format.converter.ParquetMetadataConverter;
import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.junit.Assert; import org.junit.Assert;
Expand Down Expand Up @@ -424,6 +425,9 @@ public void testBooleanPredicate() throws Exception {


final String queryNotEqualFalse = "select col_bln from dfs.`parquetFilterPush/blnTbl` where not col_bln = false"; final String queryNotEqualFalse = "select col_bln from dfs.`parquetFilterPush/blnTbl` where not col_bln = false";
testParquetFilterPD(queryNotEqualFalse, 4, 2, false); testParquetFilterPD(queryNotEqualFalse, 4, 2, false);

final String queryEqualTrueWithAnd = "select col_bln from dfs.`parquetFilterPush/blnTbl` where col_bln = true and unk_col = 'a'";
testParquetFilterPD(queryEqualTrueWithAnd, 0, 2, false);
} }


@Test // DRILL-5359 @Test // DRILL-5359
Expand All @@ -445,10 +449,9 @@ public void testFilterWithItemFlatten() throws Exception {
public void testMultiRowGroup() throws Exception { public void testMultiRowGroup() throws Exception {
// multirowgroup is a parquet file with 2 rowgroups inside. One with a = 1 and the other with a = 2; // multirowgroup is a parquet file with 2 rowgroups inside. One with a = 1 and the other with a = 2;
// FilterPushDown should be able to remove the rowgroup with a = 1 from the scan operator. // FilterPushDown should be able to remove the rowgroup with a = 1 from the scan operator.
final String sql = String.format("select * from dfs.`parquet/multirowgroup.parquet` where a > 1"); final String sql = "select * from dfs.`parquet/multirowgroup.parquet` where a > 1";
final String[] expectedPlan = {"numRowGroups=1"}; final String[] expectedPlan = {"numRowGroups=1"};
final String[] excludedPlan = {}; PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan);
PlanTestBase.testPlanMatchingPatterns(sql, expectedPlan, excludedPlan);
} }


////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
Expand All @@ -461,23 +464,20 @@ private void testParquetFilterPD(final String query, int expectedRowCount, int e
String numFilesPattern = "numFiles=" + expectedNumFiles; String numFilesPattern = "numFiles=" + expectedNumFiles;
String usedMetaPattern = "usedMetadataFile=" + usedMetadataFile; String usedMetaPattern = "usedMetadataFile=" + usedMetadataFile;


testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern}, new String[] {}); testPlanMatchingPatterns(query, new String[]{numFilesPattern, usedMetaPattern});
} }


private void testParquetRowGroupFilterEval(final ParquetMetadata footer, final String exprStr, private void testParquetRowGroupFilterEval(final ParquetMetadata footer, final String exprStr, boolean canDropExpected) throws Exception{
boolean canDropExpected) throws Exception{
final LogicalExpression filterExpr = parseExpr(exprStr); final LogicalExpression filterExpr = parseExpr(exprStr);
testParquetRowGroupFilterEval(footer, 0, filterExpr, canDropExpected); testParquetRowGroupFilterEval(footer, 0, filterExpr, canDropExpected);
} }


private void testParquetRowGroupFilterEval(final ParquetMetadata footer, final int rowGroupIndex, private void testParquetRowGroupFilterEval(final ParquetMetadata footer, final int rowGroupIndex, final LogicalExpression filterExpr, boolean canDropExpected) {
final LogicalExpression filterExpr, boolean canDropExpected) throws Exception { boolean canDrop = ParquetRGFilterEvaluator.evalFilter(filterExpr, footer, rowGroupIndex, fragContext.getOptions(), fragContext);
boolean canDrop = ParquetRGFilterEvaluator.evalFilter(filterExpr, footer, rowGroupIndex,
fragContext.getOptions(), fragContext);
Assert.assertEquals(canDropExpected, canDrop); Assert.assertEquals(canDropExpected, canDrop);
} }


private ParquetMetadata getParquetMetaData(File file) throws IOException{ private ParquetMetadata getParquetMetaData(File file) throws IOException{
return ParquetFileReader.readFooter(new Configuration(fs.getConf()), new Path(file.toURI())); return ParquetFileReader.readFooter(new Configuration(fs.getConf()), new Path(file.toURI()), ParquetMetadataConverter.NO_FILTER);
} }
} }

0 comments on commit 5d22d70

Please sign in to comment.