Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-10978] [SQL] Allow data sources to eliminate filters #9399

Closed
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
l,
projects,
filters,
(a, f) => toCatalystRDD(l, a, t.buildScan(a, f))) :: Nil
(requestedColumns, allPredicates, _) =>
toCatalystRDD(l, requestedColumns, t.buildScan(requestedColumns, allPredicates))) :: Nil

case PhysicalOperation(projects, filters, l @ LogicalRelation(t: PrunedFilteredScan, _)) =>
pruneFilterProject(
Expand Down Expand Up @@ -266,47 +267,75 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
relation,
projects,
filterPredicates,
(requestedColumns, pushedFilters) => {
scanBuilder(requestedColumns, selectFilters(pushedFilters).toArray)
(requestedColumns, _, pushedFilters) => {
scanBuilder(requestedColumns, pushedFilters.toArray)
})
}

// Based on Catalyst expressions.
// Based on Catalyst expressions. The `scanBuilder` function accepts three arguments:
//
// 1. A `Seq[Attribute]`, containing all required column attributes, used to handle traits like
// `PrunedFilteredScan`.
// 2. A `Seq[Expression]`, containing all gathered Catalyst filter expressions, used by
// `CatalystScan`.
// 3. A `Seq[Filter]`, containing all data source `Filter`s that are converted from (possibly a
// subset of) Catalyst filter expressions and can be handled by `relation`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, Seq[Expression] is used for data source that understand catalyst expressions. Seq[Filter] is used for data sources that only understand Filter API? If so, can we make it clear that 2 and 3 will not not used together?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The first Seq[Expression] argument is only used to handle CatalystScan, which is only left for experimenting purposes, no built-in concrete data sources implement CatalystScan now. The second Seq[Filter] argument is used to handle all other relation traits that support filter push-down, e.g. PrunedFilteredScan and HadoopFsRelation. Added comments to explain this.

protected def pruneFilterProjectRaw(
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression]) => RDD[InternalRow]) = {
relation: LogicalRelation,
projects: Seq[NamedExpression],
filterPredicates: Seq[Expression],
scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter]) => RDD[InternalRow]) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not obvious that we need both Seq[Expression] and Seq[Filter]. Can you add comments to explain what are these?


val projectSet = AttributeSet(projects.flatMap(_.references))
val filterSet = AttributeSet(filterPredicates.flatMap(_.references))
val filterCondition = filterPredicates.reduceLeftOption(expressions.And)

val pushedFilters = filterPredicates.map { _ transform {
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
val candidatePredicates = filterPredicates.map { _ transform {
case a: AttributeReference => relation.attributeMap(a) // Match original case of attributes.
}}

val (unhandledPredicates, pushedFilters) =
selectFilters(relation.relation, candidatePredicates)

// A set of column attributes that are only referenced by pushed down filters. We can eliminate
// them from requested columns.
val handledSet = {
val handledPredicates = filterPredicates.filterNot(unhandledPredicates.contains)
val unhandledSet = AttributeSet(unhandledPredicates.flatMap(_.references))
AttributeSet(handledPredicates.flatMap(_.references)) --
(projectSet ++ unhandledSet).map(relation.attributeMap)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add comments?


// Combines all Catalyst filter `Expression`s that are either not convertible to data source
// `Filter`s or cannot be handled by `relation`.
val filterCondition = unhandledPredicates.reduceLeftOption(expressions.And)

if (projects.map(_.toAttribute) == projects &&
projectSet.size == projects.size &&
filterSet.subsetOf(projectSet)) {
// When it is possible to just use column pruning to get the right projection and
// when the columns of this projection are enough to evaluate all filter conditions,
// just do a scan followed by a filter, with no extra project.
val requestedColumns =
projects.asInstanceOf[Seq[Attribute]] // Safe due to if above.
.map(relation.attributeMap) // Match original case of attributes.
val requestedColumns = projects
// Safe due to if above.
.asInstanceOf[Seq[Attribute]]
// Match original case of attributes.
.map(relation.attributeMap)
// Don't request columns that are only referenced by pushed filters.
.filterNot(handledSet.contains)

val scan = execution.PhysicalRDD.createFromDataSource(
projects.map(_.toAttribute),
scanBuilder(requestedColumns, pushedFilters),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At here, it is not really necessary to pass in candidatePredicates because a data source may reject some filters. We can just pass in the equivalent forms of pushedFilters, right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean pushedFilters contains all filters in the Filter form that can be handled by the data source, why not change candidatePredicates to catalyst filters that can be handled by the data source.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I understand what's going on. Actually, pushedFilters also contains those filters that cannot be handled by a data source and candidatePredicates contains filters that cannot be converted to public Filter interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah right, it's a little bit tricky. Adding comment to explain this.

relation.relation)
filterCondition.map(execution.Filter(_, scan)).getOrElse(scan)
} else {
val requestedColumns = (projectSet ++ filterSet).map(relation.attributeMap).toSeq
// Don't request columns that are only referenced by pushed filters.
val requestedColumns =
(projectSet ++ filterSet -- handledSet).map(relation.attributeMap).toSeq

val scan = execution.PhysicalRDD.createFromDataSource(
requestedColumns,
scanBuilder(requestedColumns, pushedFilters),
scanBuilder(requestedColumns, candidatePredicates, pushedFilters),
relation.relation)
execution.Project(projects, filterCondition.map(execution.Filter(_, scan)).getOrElse(scan))
}
Expand Down Expand Up @@ -334,11 +363,12 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
}

/**
* Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s,
* and convert them.
* Tries to translate a Catalyst [[Expression]] into data source [[Filter]].
*
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
*/
protected[sql] def selectFilters(filters: Seq[Expression]) = {
def translate(predicate: Expression): Option[Filter] = predicate match {
protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
predicate match {
case expressions.EqualTo(a: Attribute, Literal(v, t)) =>
Some(sources.EqualTo(a.name, convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), a: Attribute) =>
Expand Down Expand Up @@ -387,16 +417,16 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {
Some(sources.IsNotNull(a.name))

case expressions.And(left, right) =>
(translate(left) ++ translate(right)).reduceOption(sources.And)
(translateFilter(left) ++ translateFilter(right)).reduceOption(sources.And)

case expressions.Or(left, right) =>
for {
leftFilter <- translate(left)
rightFilter <- translate(right)
leftFilter <- translateFilter(left)
rightFilter <- translateFilter(right)
} yield sources.Or(leftFilter, rightFilter)

case expressions.Not(child) =>
translate(child).map(sources.Not)
translateFilter(child).map(sources.Not)

case expressions.StartsWith(a: Attribute, Literal(v: UTF8String, StringType)) =>
Some(sources.StringStartsWith(a.name, v.toString))
Expand All @@ -409,7 +439,48 @@ private[sql] object DataSourceStrategy extends Strategy with Logging {

case _ => None
}
}

/**
* Selects Catalyst predicate [[Expression]]s which are convertible into data source [[Filter]]s
* and can be handled by `relation`.
*
* @return A pair of `Seq[Expression]` and `Seq[Filter]`. The first element contains all Catalyst
* predicate [[Expression]]s that are either not convertible or cannot be handled by
* `relation`. The second element contains all converted data source [[Filter]]s.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to emphasize that the second element does contain Filters that will be reject by the data source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually including rejected filters here isn't quite intuitive, and they were included only for testing conveniences. So I decided to remove them. Namely the second element should only contain convertible and accepted filters.

*/
protected[sql] def selectFilters(
relation: BaseRelation,
predicates: Seq[Expression]): (Seq[Expression], Seq[Filter]) = {

// For conciseness, all Catalyst filter expressions of type `expressions.Expression` below are
// called `predicate`s, while all data source filters of type `sources.Filter` are simply called
// `filter`s.

val translated: Seq[(Expression, Filter)] =
for {
predicate <- predicates
filter <- translateFilter(predicate)
} yield predicate -> filter

// A map from original Catalyst expressions to corresponding translated data source filters.
val translatedMap: Map[Expression, Filter] = translated.toMap

// Catalyst predicate expressions that cannot be translated to data source filters.
val unrecognizedPredicates = predicates.filterNot(translatedMap.contains)

// Data source filters that cannot be handled by `relation`
val unhandledFilters = relation.unhandledFilters(translatedMap.values.toArray).toSet

// Catalyst predicate expressions that can be translated to data source filters, but cannot be
// handled by `relation`.
val unhandledPredicates = for {
(predicate, filter) <- translated if unhandledFilters.contains(filter)
} yield predicate

// Translated data source filters, no matter `relation` can handle them or not
val (_, translatedFilters) = translated.unzip

filters.flatMap(translate)
(unrecognizedPredicates ++ unhandledPredicates, translatedFilters)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,15 @@ abstract class BaseRelation {
* @since 1.4.0
*/
def needConversion: Boolean = true

/**
* Given an array of [[Filter]]s, returns an array of [[Filter]]s that this data source relation
* cannot handle. Spark SQL will apply all returned [[Filter]]s against rows returned by this
* data source relation.
*
* @since 1.6.0
*/
def unhandledFilters(filters: Array[Filter]): Array[Filter] = filters
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class ParquetFilterSuite extends QueryTest with ParquetTest with SharedSQLContex
}.flatten
assert(analyzedPredicate.nonEmpty)

val selectedFilters = DataSourceStrategy.selectFilters(analyzedPredicate)
val selectedFilters = analyzedPredicate.flatMap(DataSourceStrategy.translateFilter)
assert(selectedFilters.nonEmpty)

selectedFilters.foreach { pred =>
Expand Down
Loading