Skip to content

Commit

Permalink
Update SparkSQL in branch-1.0 to match master.
Browse files Browse the repository at this point in the history
  • Loading branch information
marmbrus committed Jun 13, 2014
1 parent eb2f5c5 commit c5d0adf
Show file tree
Hide file tree
Showing 8 changed files with 745 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,17 +220,21 @@ class SQLContext(@transient val sparkContext: SparkContext)
* final desired output requires complex expressions to be evaluated or when columns can be
* further eliminated out after filtering has been done.
*
* The `prunePushedDownFilters` parameter is used to remove those filters that can be optimized
* away by the filter pushdown optimization.
*
* The required attributes for both filtering and expression evaluation are passed to the
* provided `scanBuilder` function so that it can avoid unnecessary column materialization.
*/
def pruneFilterProject(
projectList: Seq[NamedExpression],
filterPredicates: Seq[Expression],
prunePushedDownFilters: Seq[Expression] => Seq[Expression],
scanBuilder: Seq[Attribute] => SparkPlan): SparkPlan = {

val projectSet = projectList.flatMap(_.references).toSet
val filterSet = filterPredicates.flatMap(_.references).toSet
val filterCondition = filterPredicates.reduceLeftOption(And)
val filterCondition = prunePushedDownFilters(filterPredicates).reduceLeftOption(And)

// Right now we still use a projection even if the only evaluation is applying an alias
// to a column. Since this is a no-op, it could be avoided. However, using this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,37 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
InsertIntoParquetTable(relation, planLater(child), overwrite=true)(sparkContext) :: Nil
case logical.InsertIntoTable(table: ParquetRelation, partition, child, overwrite) =>
InsertIntoParquetTable(table, planLater(child), overwrite)(sparkContext) :: Nil
case PhysicalOperation(projectList, filters, relation: ParquetRelation) =>
// TODO: Should be pushing down filters as well.
case PhysicalOperation(projectList, filters: Seq[Expression], relation: ParquetRelation) => {
val prunePushedDownFilters =
if (sparkContext.conf.getBoolean(ParquetFilters.PARQUET_FILTER_PUSHDOWN_ENABLED, true)) {
(filters: Seq[Expression]) => {
filters.filter { filter =>
// Note: filters cannot be pushed down to Parquet if they contain more complex
// expressions than simple "Attribute cmp Literal" comparisons. Here we remove
// all filters that have been pushed down. Note that a predicate such as
// "(A AND B) OR C" can result in "A OR C" being pushed down.
val recordFilter = ParquetFilters.createFilter(filter)
if (!recordFilter.isDefined) {
// First case: the pushdown did not result in any record filter.
true
} else {
// Second case: a record filter was created; here we are conservative in
// the sense that even if "A" was pushed and we check for "A AND B" we
// still want to keep "A AND B" in the higher-level filter, not just "B".
!ParquetFilters.findExpression(recordFilter.get, filter).isDefined
}
}
}
} else {
identity[Seq[Expression]] _
}
pruneFilterProject(
projectList,
filters,
ParquetTableScan(_, relation, None)(sparkContext)) :: Nil
prunePushedDownFilters,
ParquetTableScan(_, relation, filters)(sparkContext)) :: Nil
}

case _ => Nil
}
}
Expand Down
Loading

0 comments on commit c5d0adf

Please sign in to comment.