Skip to content

Commit

Permalink
DRILL-6865: Filter is not removed from the plan when parquet table fu…
Browse files Browse the repository at this point in the history
…lly matches the filter

closes #1552
  • Loading branch information
vvysotskyi committed Nov 26, 2018
1 parent d1a082c commit 99a3d76
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 42 deletions.
Expand Up @@ -85,6 +85,8 @@ public abstract class AbstractParquetGroupScan extends AbstractFileGroupScan {


private List<EndpointAffinity> endpointAffinities; private List<EndpointAffinity> endpointAffinities;
private ParquetGroupScanStatistics parquetGroupScanStatistics; private ParquetGroupScanStatistics parquetGroupScanStatistics;
// whether all row groups of this group scan fully match the filter
private boolean matchAllRowGroups = false;


protected AbstractParquetGroupScan(String userName, protected AbstractParquetGroupScan(String userName,
List<SchemaPath> columns, List<SchemaPath> columns,
Expand All @@ -111,6 +113,7 @@ protected AbstractParquetGroupScan(AbstractParquetGroupScan that) {
this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet); this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
this.entries = that.entries == null ? null : new ArrayList<>(that.entries); this.entries = that.entries == null ? null : new ArrayList<>(that.entries);
this.readerConfig = that.readerConfig; this.readerConfig = that.readerConfig;
this.matchAllRowGroups = that.matchAllRowGroups;
} }


@JsonProperty @JsonProperty
Expand All @@ -135,6 +138,11 @@ public ParquetReaderConfig getReaderConfig() {
return readerConfig; return readerConfig;
} }


@JsonIgnore
public boolean isMatchAllRowGroups() {
return matchAllRowGroups;
}

@JsonIgnore @JsonIgnore
@Override @Override
public Collection<String> getFiles() { public Collection<String> getFiles() {
Expand Down Expand Up @@ -229,15 +237,12 @@ public void setFilter(LogicalExpression filter) {
} }


@Override @Override
public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities, public AbstractParquetGroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) { FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {


if (rowGroupInfos.size() == 1 || if (!parquetTableMetadata.isRowGroupPrunable() ||
! (parquetTableMetadata.isRowGroupPrunable()) || rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD)) {
rowGroupInfos.size() > optionManager.getOption(PlannerSettings.PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD) // Stop pruning for 2 cases:
) {
// Stop pruning for 3 cases:
// - 1 single parquet file,
// - metadata does not have proper format to support row group level filter pruning, // - metadata does not have proper format to support row group level filter pruning,
// - # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD. // - # of row groups is beyond PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD.
return null; return null;
Expand All @@ -253,6 +258,8 @@ public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtili
return null; return null;
} }


boolean matchAllRowGroupsLocal = true;

for (RowGroupInfo rowGroup : rowGroupInfos) { for (RowGroupInfo rowGroup : rowGroupInfos) {
final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns); final ColumnExplorer columnExplorer = new ColumnExplorer(optionManager, columns);
List<String> partitionValues = getPartitionValues(rowGroup); List<String> partitionValues = getPartitionValues(rowGroup);
Expand All @@ -270,16 +277,27 @@ public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtili
if (match == ParquetFilterPredicate.RowsMatch.NONE) { if (match == ParquetFilterPredicate.RowsMatch.NONE) {
continue; // No row comply to the filter => drop the row group continue; // No row comply to the filter => drop the row group
} }
rowGroup.setRowsMatch(match); // for the case when any of row groups partially matches the filter,
// matchAllRowGroupsLocal should be set to false
if (matchAllRowGroupsLocal) {
matchAllRowGroupsLocal = match == ParquetFilterPredicate.RowsMatch.ALL;
}


qualifiedRGs.add(rowGroup); qualifiedRGs.add(rowGroup);
} }


if (qualifiedRGs.size() == rowGroupInfos.size() ) { if (qualifiedRGs.size() == rowGroupInfos.size()) {
// There is no reduction of rowGroups. Return the original groupScan. // There is no reduction of rowGroups. Return the original groupScan.
logger.debug("applyFilter() does not have any pruning!"); logger.debug("applyFilter() does not have any pruning!");
matchAllRowGroups = matchAllRowGroupsLocal;
return null; return null;
} else if (qualifiedRGs.size() == 0) { } else if (qualifiedRGs.size() == 0) {
if (rowGroupInfos.size() == 1) {
// For the case when group scan has single row group and it was filtered,
// no need to create new group scan with the same row group.
return null;
}
matchAllRowGroupsLocal = false;
logger.debug("All row groups have been filtered out. Add back one to get schema from scanner."); logger.debug("All row groups have been filtered out. Add back one to get schema from scanner.");
RowGroupInfo rg = rowGroupInfos.iterator().next(); RowGroupInfo rg = rowGroupInfos.iterator().next();
qualifiedRGs.add(rg); qualifiedRGs.add(rg);
Expand All @@ -289,7 +307,9 @@ public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtili
ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size()); ExpressionStringBuilder.toString(filterExpr), rowGroupInfos.size(), qualifiedRGs.size());


try { try {
return cloneWithRowGroupInfos(qualifiedRGs); AbstractParquetGroupScan cloneGroupScan = cloneWithRowGroupInfos(qualifiedRGs);
cloneGroupScan.matchAllRowGroups = matchAllRowGroupsLocal;
return cloneGroupScan;
} catch (IOException e) { } catch (IOException e) {
logger.warn("Could not apply filter prune due to Exception : {}", e); logger.warn("Could not apply filter prune due to Exception : {}", e);
return null; return null;
Expand Down
Expand Up @@ -28,9 +28,7 @@
import org.apache.drill.common.expression.LogicalExpression; import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.common.expression.ValueExpressions; import org.apache.drill.common.expression.ValueExpressions;
import org.apache.drill.exec.expr.stat.ParquetFilterPredicate; import org.apache.drill.exec.expr.stat.ParquetFilterPredicate;
import org.apache.drill.exec.expr.stat.ParquetFilterPredicate.RowsMatch;
import org.apache.drill.exec.ops.OptimizerRulesContext; import org.apache.drill.exec.ops.OptimizerRulesContext;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.planner.common.DrillRelOptUtil; import org.apache.drill.exec.planner.common.DrillRelOptUtil;
import org.apache.drill.exec.planner.logical.DrillOptiq; import org.apache.drill.exec.planner.logical.DrillOptiq;
import org.apache.drill.exec.planner.logical.DrillParseContext; import org.apache.drill.exec.planner.logical.DrillParseContext;
Expand Down Expand Up @@ -174,14 +172,35 @@ protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel pro




Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null; Stopwatch timer = logger.isDebugEnabled() ? Stopwatch.createStarted() : null;
final GroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext, AbstractParquetGroupScan newGroupScan = groupScan.applyFilter(conditionExp, optimizerContext,
optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions()); optimizerContext.getFunctionRegistry(), optimizerContext.getPlannerSettings().getOptions());
if (timer != null) { if (timer != null) {
logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS)); logger.debug("Took {} ms to apply filter on parquet row groups. ", timer.elapsed(TimeUnit.MILLISECONDS));
timer.stop(); timer.stop();
} }


if (newGroupScan == null ) { // For the case when newGroupScan wasn't created, the old one may
// fully match the filter for the case when row group pruning did not happen.
if (newGroupScan == null) {
if (groupScan.isMatchAllRowGroups()) {
RelNode child = project == null ? scan : project;
// If current row group fully matches filter,
// but row group pruning did not happen, remove the filter.
if (nonConvertedPredList.size() == 0) {
call.transformTo(child);
} else if (nonConvertedPredList.size() == predList.size()) {
// None of the predicates participated in filter pushdown.
return;
} else {
// If some of the predicates weren't used in the filter, creates new filter with them
// on top of current scan. Excludes the case when all predicates weren't used in the filter.
call.transformTo(filter.copy(filter.getTraitSet(), child,
RexUtil.composeConjunction(
filter.getCluster().getRexBuilder(),
nonConvertedPredList,
true)));
}
}
return; return;
} }


Expand All @@ -191,27 +210,17 @@ protected void doOnMatch(RelOptRuleCall call, FilterPrel filter, ProjectPrel pro
newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode)); newNode = project.copy(project.getTraitSet(), Collections.singletonList(newNode));
} }


if (newGroupScan instanceof AbstractParquetGroupScan) { if (newGroupScan.isMatchAllRowGroups()) {
RowsMatch matchAll = RowsMatch.ALL; // creates filter from the expressions which can't be pushed to the scan
List<RowGroupInfo> rowGroupInfos = ((AbstractParquetGroupScan) newGroupScan).rowGroupInfos; if (nonConvertedPredList.size() > 0) {
for (RowGroupInfo rowGroup : rowGroupInfos) { newNode = filter.copy(filter.getTraitSet(), newNode,
if (rowGroup.getRowsMatch() != RowsMatch.ALL) { RexUtil.composeConjunction(
matchAll = RowsMatch.SOME; filter.getCluster().getRexBuilder(),
break; nonConvertedPredList,
} true));
}
if (matchAll == ParquetFilterPredicate.RowsMatch.ALL) {
// creates filter from the expressions which can't be pushed to the scan
if (nonConvertedPredList.size() > 0) {
newNode = filter.copy(filter.getTraitSet(), newNode,
RexUtil.composeConjunction(
filter.getCluster().getRexBuilder(),
nonConvertedPredList,
true));
}
call.transformTo(newNode);
return;
} }
call.transformTo(newNode);
return;
} }


final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode)); final RelNode newFilter = filter.copy(filter.getTraitSet(), Collections.singletonList(newNode));
Expand Down
Expand Up @@ -19,7 +19,6 @@


import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty; import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.drill.exec.expr.stat.ParquetFilterPredicate.RowsMatch;
import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS; import org.apache.drill.exec.store.dfs.ReadEntryFromHDFS;
import org.apache.drill.exec.store.dfs.easy.FileWork; import org.apache.drill.exec.store.dfs.easy.FileWork;
import org.apache.drill.exec.store.schedule.CompleteWork; import org.apache.drill.exec.store.schedule.CompleteWork;
Expand All @@ -36,7 +35,6 @@ public class RowGroupInfo extends ReadEntryFromHDFS implements CompleteWork, Fil
private List<? extends ColumnMetadata> columns; private List<? extends ColumnMetadata> columns;
private long rowCount; // rowCount = -1 indicates to include all rows. private long rowCount; // rowCount = -1 indicates to include all rows.
private long numRecordsToRead; private long numRecordsToRead;
private RowsMatch rowsMatch = RowsMatch.SOME;


@JsonCreator @JsonCreator
public RowGroupInfo(@JsonProperty("path") String path, public RowGroupInfo(@JsonProperty("path") String path,
Expand Down Expand Up @@ -96,8 +94,4 @@ public List<? extends ColumnMetadata> getColumns() {
public void setColumns(List<? extends ColumnMetadata> columns) { public void setColumns(List<? extends ColumnMetadata> columns) {
this.columns = columns; this.columns = columns;
} }

public RowsMatch getRowsMatch() { return rowsMatch; }

public void setRowsMatch(RowsMatch rowsMatch) { this.rowsMatch = rowsMatch; }
} }
Expand Up @@ -649,6 +649,15 @@ public void testMinTrueMaxTrue() throws Exception {
assertEquals(RowsMatch.ALL, isNotFalse.matches(re)); assertEquals(RowsMatch.ALL, isNotFalse.matches(re));
} }


@Test
public void testParquetSingleRowGroupFilterRemoving() throws Exception {
test("create table dfs.tmp.`singleRowGroupTable` as select * from cp.`tpch/nation.parquet`");

String query = "select * from dfs.tmp.`singleRowGroupTable` where n_nationkey > -1";

testParquetFilterPruning(query, 25, 1, new String[]{"Filter\\("});
}

////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
// Some test helper functions. // Some test helper functions.
////////////////////////////////////////////////////////////////////////////////////////////////// //////////////////////////////////////////////////////////////////////////////////////////////////
Expand Down

0 comments on commit 99a3d76

Please sign in to comment.