Skip to content

Commit

Permalink
SPARK-1487 [SQL] Support record filtering via predicate pushdown in P…
Browse files Browse the repository at this point in the history
…arquet

Simple filter predicates such as LessThan, GreaterThan, etc., where one side is a literal and the other one a NamedExpression are now pushed down to the underlying ParquetTableScan. Here are some results for a microbenchmark with a simple schema of six fields of different types where most records failed the test:

             | Uncompressed  | Compressed
-------------| ------------- | -------------
File size  |     10 GB  | 2 GB
Speedup |      2         | 1.8

Since mileage may vary I added a new option to SparkConf:

`org.apache.spark.sql.parquet.filter.pushdown`

Default value would be `true` and setting it to `false` disables the pushdown. When most rows are expected to pass the filter or when there are few fields performance can be better when pushdown is disabled. The default should fit situations with a reasonable number of (possibly nested) fields where not too many records on average pass the filter.

Because of an issue with Parquet ([see here](https://github.com/Parquet/parquet-mr/issues/371])) currently only predicates on non-nullable attributes are pushed down. If one would know that for a given table no optional fields have missing values one could also allow overriding this.

Author: Andre Schumacher <andre.schumacher@iki.fi>

Closes #511 from AndreSchumacher/parquet_filter and squashes the following commits:

16bfe83 [Andre Schumacher] Removing leftovers from merge during rebase
7b304ca [Andre Schumacher] Fixing formatting
c36d5cb [Andre Schumacher] Scalastyle
3da98db [Andre Schumacher] Second round of review feedback
7a78265 [Andre Schumacher] Fixing broken formatting in ParquetFilter
a86553b [Andre Schumacher] First round of code review feedback
b0f7806 [Andre Schumacher] Optimizing imports in ParquetTestData
85fea2d [Andre Schumacher] Adding SparkConf setting to disable filter predicate pushdown
f0ad3cf [Andre Schumacher] Undoing changes not needed for this PR
210e9cb [Andre Schumacher] Adding disjunctive filter predicates
a93a588 [Andre Schumacher] Adding unit test for filtering
6d22666 [Andre Schumacher] Extending ParquetFilters
93e8192 [Andre Schumacher] First commit Parquet record filtering
  • Loading branch information
AndreSchumacher authored and rxin committed May 16, 2014
1 parent 032d663 commit 40d6acd
Show file tree
Hide file tree
Showing 5 changed files with 731 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,35 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
// TODO: Should be pushing down filters as well.
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
val remainingFilters =
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
filters.filter {
// Note: filters cannot be pushed down to Parquet if they contain more complex
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove
// all filters that have been pushed down. Note that a predicate such as
// "(A AND B) OR C" can result in "A OR C" being pushed down.
filter =>
val recordFilter = ParquetFilters.createFilter(filter)
if (!recordFilter.isDefined) {
// First case: the pushdown did not result in any record filter.
true
} else {
// Second case: a record filter was created; here we are conservative in
// the sense that even if "A" was pushed and we check for "A AND B" we
// still want to keep "A AND B" in the higher-level filter, not just "B".
!ParquetFilters.findExpression(recordFilter.get, filter).isDefined
}
}
} else {
filters
}
pruneFilterProject(
projectList,
filters,
ParquetTableScan(_, relation, None)(sparkContext)) :: Nil
remainingFilters,
ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
}

case _ => Nil
}
}
Expand Down
Loading

0 comments on commit 40d6acd

Please sign in to comment.