Skip to content

Commit

Permalink
DRILL-6259: Support parquet filter push down for complex types
Browse files Browse the repository at this point in the history
close #1173
  • Loading branch information
arina-ielchiieva authored and Aman Sinha committed Mar 31, 2018
1 parent a264e7f commit 4ee5625
Show file tree
Hide file tree
Showing 24 changed files with 312 additions and 142 deletions.
Expand Up @@ -31,6 +31,7 @@
import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.NullExpression; import org.apache.drill.common.expression.NullExpression;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.TypedFieldExpr;
import org.apache.drill.common.expression.TypedNullConstant; import org.apache.drill.common.expression.TypedNullConstant;
import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.ValueExpressions.BooleanExpression; import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
Expand Down Expand Up @@ -231,4 +232,9 @@ public Boolean visitConvertExpression(ConvertExpression e,
public Boolean visitParameter(ValueExpressions.ParameterExpression e, IdentityHashMap<LogicalExpression, Object> value) throws RuntimeException { public Boolean visitParameter(ValueExpressions.ParameterExpression e, IdentityHashMap<LogicalExpression, Object> value) throws RuntimeException {
return false; return false;
} }

@Override
public Boolean visitTypedFieldExpr(TypedFieldExpr e, IdentityHashMap<LogicalExpression, Object> value) throws RuntimeException {
return false;
}
} }
Expand Up @@ -44,6 +44,7 @@
import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.NullExpression; import org.apache.drill.common.expression.NullExpression;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.TypedFieldExpr;
import org.apache.drill.common.expression.TypedNullConstant; import org.apache.drill.common.expression.TypedNullConstant;
import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.ValueExpressions.BooleanExpression; import org.apache.drill.common.expression.ValueExpressions.BooleanExpression;
Expand Down Expand Up @@ -78,7 +79,6 @@
import org.apache.drill.exec.expr.fn.DrillFuncHolder; import org.apache.drill.exec.expr.fn.DrillFuncHolder;
import org.apache.drill.exec.expr.fn.ExceptionFunction; import org.apache.drill.exec.expr.fn.ExceptionFunction;
import org.apache.drill.exec.expr.fn.FunctionLookupContext; import org.apache.drill.exec.expr.fn.FunctionLookupContext;
import org.apache.drill.exec.expr.stat.TypedFieldExpr;
import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorAccessible; import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.resolver.FunctionResolver; import org.apache.drill.exec.resolver.FunctionResolver;
Expand Down Expand Up @@ -323,7 +323,6 @@ public LogicalExpression visitSchemaPath(SchemaPath path, FunctionLookupContext
} else { } else {
logger.warn("Unable to find value vector of path {}, returning null-int instance.", path); logger.warn("Unable to find value vector of path {}, returning null-int instance.", path);
return new TypedFieldExpr(path, Types.OPTIONAL_INT); return new TypedFieldExpr(path, Types.OPTIONAL_INT);
// return NullExpression.INSTANCE;
} }
} }
} }
Expand Down
Expand Up @@ -18,6 +18,8 @@


import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.LogicalExpressionBase; import org.apache.drill.common.expression.LogicalExpressionBase;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.TypedFieldExpr;
import org.apache.drill.common.expression.visitors.ExprVisitor; import org.apache.drill.common.expression.visitors.ExprVisitor;
import org.apache.parquet.column.statistics.Statistics; import org.apache.parquet.column.statistics.Statistics;


Expand All @@ -29,6 +31,7 @@
* IS predicates for parquet filter pushdown. * IS predicates for parquet filter pushdown.
*/ */
public class ParquetIsPredicates { public class ParquetIsPredicates {

public static abstract class ParquetIsPredicate extends LogicalExpressionBase implements ParquetFilterPredicate { public static abstract class ParquetIsPredicate extends LogicalExpressionBase implements ParquetFilterPredicate {
protected final LogicalExpression expr; protected final LogicalExpression expr;


Expand All @@ -54,12 +57,22 @@ public <T, V, E extends Exception> T accept(ExprVisitor<T, V, E> visitor, V valu
* IS NULL predicate. * IS NULL predicate.
*/ */
public static class IsNullPredicate extends ParquetIsPredicate { public static class IsNullPredicate extends ParquetIsPredicate {
private final boolean isArray;

public IsNullPredicate(LogicalExpression expr) { public IsNullPredicate(LogicalExpression expr) {
super(expr); super(expr);
this.isArray = isArray(expr);
} }


@Override @Override
public boolean canDrop(RangeExprEvaluator evaluator) { public boolean canDrop(RangeExprEvaluator evaluator) {

// for arrays we are not able to define exact number of nulls
// [1,2,3] vs [1,2] -> in second case 3 is absent and thus it's null but statistics shows no nulls
if (isArray) {
return false;
}

Statistics exprStat = expr.accept(evaluator, null); Statistics exprStat = expr.accept(evaluator, null);


if (!ParquetPredicatesHelper.hasStats(exprStat)) { if (!ParquetPredicatesHelper.hasStats(exprStat)) {
Expand All @@ -73,6 +86,16 @@ public boolean canDrop(RangeExprEvaluator evaluator) {
return false; return false;
} }
} }

private boolean isArray(LogicalExpression expression) {
if (expression instanceof TypedFieldExpr) {
TypedFieldExpr typedFieldExpr = (TypedFieldExpr) expression;
SchemaPath schemaPath = typedFieldExpr.getPath();
return schemaPath.isArray();
}
return false;
}

} }


/** /**
Expand Down
Expand Up @@ -17,11 +17,11 @@
*/ */
package org.apache.drill.exec.expr.stat; package org.apache.drill.exec.expr.stat;


import com.google.common.base.Preconditions;
import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.FunctionHolderExpression; import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.TypedFieldExpr;
import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.fn.CastFunctions; import org.apache.drill.common.expression.fn.CastFunctions;
import org.apache.drill.common.expression.fn.FuncHolder; import org.apache.drill.common.expression.fn.FuncHolder;
Expand Down Expand Up @@ -70,17 +70,20 @@ public long getRowCount() {


@Override @Override
public Statistics visitUnknown(LogicalExpression e, Void value) throws RuntimeException { public Statistics visitUnknown(LogicalExpression e, Void value) throws RuntimeException {
if (e instanceof TypedFieldExpr) { // do nothing for the unknown expression
TypedFieldExpr fieldExpr = (TypedFieldExpr) e; return null;
final ColumnStatistics columnStatistics = columnStatMap.get(fieldExpr.getPath()); }
if (columnStatistics != null) {
return columnStatistics.getStatistics(); @Override
} else if (fieldExpr.getMajorType().equals(Types.OPTIONAL_INT)) { public Statistics visitTypedFieldExpr(TypedFieldExpr typedFieldExpr, Void value) throws RuntimeException {
// field does not exist. final ColumnStatistics columnStatistics = columnStatMap.get(typedFieldExpr.getPath());
IntStatistics intStatistics = new IntStatistics(); if (columnStatistics != null) {
intStatistics.setNumNulls(rowCount); // all values are nulls return columnStatistics.getStatistics();
return intStatistics; } else if (typedFieldExpr.getMajorType().equals(Types.OPTIONAL_INT)) {
} // field does not exist.
IntStatistics intStatistics = new IntStatistics();
intStatistics.setNumNulls(rowCount); // all values are nulls
return intStatistics;
} }
return null; return null;
} }
Expand Down
Expand Up @@ -18,6 +18,7 @@
package org.apache.drill.exec.planner.common; package org.apache.drill.exec.planner.common;


import java.util.AbstractList; import java.util.AbstractList;
import java.util.Collection;
import java.util.List; import java.util.List;


import com.google.common.collect.Lists; import com.google.common.collect.Lists;
Expand Down Expand Up @@ -178,22 +179,20 @@ private static boolean containIdentity(List<? extends RexNode> exps,
} }


/** /**
* Travesal RexNode to find the item/flattern operator. Continue search if RexNode has a * Travesal RexNode to find at least one operator in the given collection. Continue search if RexNode has a
* RexInputRef which refers to a RexNode in project expressions. * RexInputRef which refers to a RexNode in project expressions.
* *
* @param node : RexNode to search * @param node : RexNode to search
* @param projExprs : the list of project expressions. Empty list means there is No project operator underneath. * @param projExprs : the list of project expressions. Empty list means there is No project operator underneath.
* @param operators collection of operators to find
* @return : Return null if there is NONE; return the first appearance of item/flatten RexCall. * @return : Return null if there is NONE; return the first appearance of item/flatten RexCall.
*/ */
public static RexCall findItemOrFlatten( public static RexCall findOperators(final RexNode node, final List<RexNode> projExprs, final Collection<String> operators) {
final RexNode node,
final List<RexNode> projExprs) {
try { try {
RexVisitor<Void> visitor = RexVisitor<Void> visitor =
new RexVisitorImpl<Void>(true) { new RexVisitorImpl<Void>(true) {
public Void visitCall(RexCall call) { public Void visitCall(RexCall call) {
if ("item".equals(call.getOperator().getName().toLowerCase()) || if (operators.contains(call.getOperator().getName().toLowerCase())) {
"flatten".equals(call.getOperator().getName().toLowerCase())) {
throw new Util.FoundOne(call); /* throw exception to interrupt tree walk (this is similar to throw new Util.FoundOne(call); /* throw exception to interrupt tree walk (this is similar to
other utility methods in RexUtil.java */ other utility methods in RexUtil.java */
} }
Expand All @@ -208,8 +207,7 @@ public Void visitInputRef(RexInputRef inputRef) {
RexNode n = projExprs.get(index); RexNode n = projExprs.get(index);
if (n instanceof RexCall) { if (n instanceof RexCall) {
RexCall r = (RexCall) n; RexCall r = (RexCall) n;
if ("item".equals(r.getOperator().getName().toLowerCase()) || if (operators.contains(r.getOperator().getName().toLowerCase())) {
"flatten".equals(r.getOperator().getName().toLowerCase())) {
throw new Util.FoundOne(r); throw new Util.FoundOne(r);
} }
} }
Expand Down
Expand Up @@ -30,13 +30,23 @@
import org.apache.calcite.util.Pair; import org.apache.calcite.util.Pair;
import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.common.DrillRelOptUtil;


import java.util.ArrayList;
import java.util.Collection;
import java.util.List; import java.util.List;


public class DrillPushFilterPastProjectRule extends RelOptRule { public class DrillPushFilterPastProjectRule extends RelOptRule {


public final static RelOptRule INSTANCE = new DrillPushFilterPastProjectRule(); public final static RelOptRule INSTANCE = new DrillPushFilterPastProjectRule();


protected DrillPushFilterPastProjectRule() { private static final Collection<String> BANNED_OPERATORS;

static {
BANNED_OPERATORS = new ArrayList<>(2);
BANNED_OPERATORS.add("flatten");
BANNED_OPERATORS.add("item");
}

private DrillPushFilterPastProjectRule() {
super( super(
operand( operand(
LogicalFilter.class, LogicalFilter.class,
Expand All @@ -60,7 +70,7 @@ public void onMatch(RelOptRuleCall call) {




for (final RexNode pred : predList) { for (final RexNode pred : predList) {
if (DrillRelOptUtil.findItemOrFlatten(pred, projRel.getProjects()) == null) { if (DrillRelOptUtil.findOperators(pred, projRel.getProjects(), BANNED_OPERATORS) == null) {
qualifiedPredList.add(pred); qualifiedPredList.add(pred);
} else { } else {
unqualifiedPredList.add(pred); unqualifiedPredList.add(pred);
Expand Down
Expand Up @@ -19,8 +19,7 @@
import org.apache.drill.common.expression.BooleanOperator; import org.apache.drill.common.expression.BooleanOperator;
import org.apache.drill.common.expression.FunctionHolderExpression; import org.apache.drill.common.expression.FunctionHolderExpression;
import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.TypedFieldExpr;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.common.expression.fn.CastFunctions; import org.apache.drill.common.expression.fn.CastFunctions;
import org.apache.drill.common.expression.fn.FuncHolder; import org.apache.drill.common.expression.fn.FuncHolder;
Expand All @@ -41,7 +40,6 @@
import org.apache.drill.exec.expr.stat.ParquetBooleanPredicates; import org.apache.drill.exec.expr.stat.ParquetBooleanPredicates;
import org.apache.drill.exec.expr.stat.ParquetComparisonPredicates; import org.apache.drill.exec.expr.stat.ParquetComparisonPredicates;
import org.apache.drill.exec.expr.stat.ParquetIsPredicates; import org.apache.drill.exec.expr.stat.ParquetIsPredicates;
import org.apache.drill.exec.expr.stat.TypedFieldExpr;
import org.apache.drill.exec.ops.UdfUtilities; import org.apache.drill.exec.ops.UdfUtilities;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
Expand All @@ -62,11 +60,12 @@ public class ParquetFilterBuilder extends AbstractExprVisitor<LogicalExpression,
/** /**
* @param expr materialized filter expression * @param expr materialized filter expression
* @param constantBoundaries set of constant expressions * @param constantBoundaries set of constant expressions
* @param udfUtilities * @param udfUtilities udf utilities
*
* @return logical expression
*/ */
public static LogicalExpression buildParquetFilterPredicate(LogicalExpression expr, final Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities) { public static LogicalExpression buildParquetFilterPredicate(LogicalExpression expr, final Set<LogicalExpression> constantBoundaries, UdfUtilities udfUtilities) {
final LogicalExpression predicate = expr.accept(new ParquetFilterBuilder(udfUtilities), constantBoundaries); return expr.accept(new ParquetFilterBuilder(udfUtilities), constantBoundaries);
return predicate;
} }


private ParquetFilterBuilder(UdfUtilities udfUtilities) { private ParquetFilterBuilder(UdfUtilities udfUtilities) {
Expand All @@ -75,18 +74,15 @@ private ParquetFilterBuilder(UdfUtilities udfUtilities) {


@Override @Override
public LogicalExpression visitUnknown(LogicalExpression e, Set<LogicalExpression> value) { public LogicalExpression visitUnknown(LogicalExpression e, Set<LogicalExpression> value) {
if (e instanceof TypedFieldExpr && // for the unknown expression, do nothing
! containsArraySeg(((TypedFieldExpr) e).getPath()) &&
e.getMajorType().getMode() != TypeProtos.DataMode.REPEATED) {
// A filter is not qualified for push down, if
// 1. it contains an array segment : a.b[1], a.b[1].c.d
// 2. it's repeated type.
return e;
}

return null; return null;
} }


@Override
public LogicalExpression visitTypedFieldExpr(TypedFieldExpr typedFieldExpr, Set<LogicalExpression> value) throws RuntimeException {
return typedFieldExpr;
}

@Override @Override
public LogicalExpression visitIntConstant(ValueExpressions.IntExpression intExpr, Set<LogicalExpression> value) public LogicalExpression visitIntConstant(ValueExpressions.IntExpression intExpr, Set<LogicalExpression> value)
throws RuntimeException { throws RuntimeException {
Expand Down Expand Up @@ -161,18 +157,6 @@ public LogicalExpression visitBooleanOperator(BooleanOperator op, Set<LogicalExp
} }
} }


private boolean containsArraySeg(final SchemaPath schemaPath) {
PathSegment seg = schemaPath.getRootSegment();

while (seg != null) {
if (seg.isArray()) {
return true;
}
seg = seg.getChild();
}
return false;
}

private LogicalExpression getValueExpressionFromConst(ValueHolder holder, TypeProtos.MinorType type) { private LogicalExpression getValueExpressionFromConst(ValueHolder holder, TypeProtos.MinorType type) {
switch (type) { switch (type) {
case INT: case INT:
Expand Down Expand Up @@ -229,13 +213,9 @@ public LogicalExpression visitFunctionHolderExpression(FunctionHolderExpression
} }


if (CastFunctions.isCastFunction(funcName)) { if (CastFunctions.isCastFunction(funcName)) {
List<LogicalExpression> newArgs = new ArrayList(); List<LogicalExpression> newArgs = generateNewExpressions(funcHolderExpr.args, value);
for (LogicalExpression arg : funcHolderExpr.args) { if (newArgs == null) {
final LogicalExpression newArg = arg.accept(this, value); return null;
if (newArg == null) {
return null;
}
newArgs.add(newArg);
} }


return funcHolderExpr.copy(newArgs); return funcHolderExpr.copy(newArgs);
Expand All @@ -244,15 +224,22 @@ public LogicalExpression visitFunctionHolderExpression(FunctionHolderExpression
} }
} }


private LogicalExpression handleCompareFunction(FunctionHolderExpression functionHolderExpression, Set<LogicalExpression> value) { private List<LogicalExpression> generateNewExpressions(List<LogicalExpression> expressions, Set<LogicalExpression> value) {
List<LogicalExpression> newArgs = new ArrayList(); List<LogicalExpression> newExpressions = new ArrayList<>();

for (LogicalExpression arg : expressions) {
for (LogicalExpression arg : functionHolderExpression.args) { final LogicalExpression newArg = arg.accept(this, value);
LogicalExpression newArg = arg.accept(this, value);
if (newArg == null) { if (newArg == null) {
return null; return null;
} }
newArgs.add(newArg); newExpressions.add(newArg);
}
return newExpressions;
}

private LogicalExpression handleCompareFunction(FunctionHolderExpression functionHolderExpression, Set<LogicalExpression> value) {
List<LogicalExpression> newArgs = generateNewExpressions(functionHolderExpression.args, value);
if (newArgs == null) {
return null;
} }


String funcName = ((DrillSimpleFuncHolder) functionHolderExpression.getHolder()).getRegisteredNames()[0]; String funcName = ((DrillSimpleFuncHolder) functionHolderExpression.getHolder()).getRegisteredNames()[0];
Expand Down Expand Up @@ -306,19 +293,6 @@ private LogicalExpression handleIsFunction(FunctionHolderExpression functionHold
} }
} }


private LogicalExpression handleCastFunction(FunctionHolderExpression functionHolderExpression, Set<LogicalExpression> value) {
for (LogicalExpression arg : functionHolderExpression.args) {
LogicalExpression newArg = arg.accept(this, value);
if (newArg == null) {
return null;
}
}

String funcName = ((DrillSimpleFuncHolder) functionHolderExpression.getHolder()).getRegisteredNames()[0];

return null;
}

private static boolean isCompareFunction(String funcName) { private static boolean isCompareFunction(String funcName) {
return COMPARE_FUNCTIONS_SET.contains(funcName); return COMPARE_FUNCTIONS_SET.contains(funcName);
} }
Expand Down

0 comments on commit 4ee5625

Please sign in to comment.