Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1104,19 +1104,23 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild))

// Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be
// pushed beneath must satisfy the following two conditions:
// pushed beneath must satisfy the following conditions:
// 1. All the expressions are part of window partitioning key. The expressions can be compound.
// 2. Deterministic
// 2. Deterministic.
// 3. Placed before any non-deterministic predicates.
case filter @ Filter(condition, w: Window)
if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) =>
val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references))
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
cond.references.subsetOf(partitionAttrs) && cond.deterministic &&
// This is for ensuring all the partitioning expressions have been converted to alias
// in Analyzer. Thus, we do not need to check if the expressions in conditions are
// the same as the expressions used in partitioning columns.
partitionAttrs.forall(_.isInstanceOf[Attribute])

val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
cond.references.subsetOf(partitionAttrs)
}

val stayUp = rest ++ containingNonDeterministic

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val newWindow = w.copy(child = Filter(pushDownPredicate, w.child))
Expand All @@ -1135,11 +1139,16 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {

// For each filter, expand the alias and check if the filter can be evaluated using
// attributes produced by the aggregate operator's child operator.
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(condition).span(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
val replaced = replaceAlias(cond, aliasMap)
replaced.references.subsetOf(aggregate.child.outputSet) && replaced.deterministic
replaced.references.subsetOf(aggregate.child.outputSet)
}

val stayUp = rest ++ containingNonDeterministic

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems that it makes more sense to factor this snippet out as a helper method.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we factor the single line that computes the stayUp predicates? Or should we also put the computation of rest、containingNonDeterministic into a helper method? I'm not quite sure about that. Please tell me what should I update, thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

factor out a single line of code is definitely unnecessary, I think @liancheng means the whole related logic

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've updated related code.

if (pushDown.nonEmpty) {
val pushDownPredicate = pushDown.reduce(And)
val replaced = replaceAlias(pushDownPredicate, aliasMap)
Expand All @@ -1153,9 +1162,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {

case filter @ Filter(condition, union: Union) =>
// Union could change the rows, so non-deterministic predicate can't be pushed down
val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond =>
cond.deterministic
}
val (pushDown, stayUp) = splitConjunctivePredicates(condition).span(_.deterministic)

if (pushDown.nonEmpty) {
val pushDownCond = pushDown.reduceLeft(And)
val output = union.output
Expand Down Expand Up @@ -1195,9 +1203,15 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper {
// come from grandchild.
// TODO: non-deterministic predicates could be pushed through some operators that do not change
// the rows.
val (pushDown, stayUp) = splitConjunctivePredicates(filter.condition).partition { cond =>
cond.deterministic && cond.references.subsetOf(grandchild.outputSet)
val (candidates, containingNonDeterministic) =
splitConjunctivePredicates(filter.condition).span(_.deterministic)

val (pushDown, rest) = candidates.partition { cond =>
cond.references.subsetOf(grandchild.outputSet)
}

val stayUp = rest ++ containingNonDeterministic

if (pushDown.nonEmpty) {
val newChild = insertFilter(pushDown.reduceLeft(And))
if (stayUp.nonEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -531,14 +531,14 @@ class FilterPushdownSuite extends PlanTest {
val originalQuery = {
testRelationWithArrayType
.generate(Explode('c_arr), true, false, Some("arr"))
.where(('b >= 5) && ('a + Rand(10).as("rnd") > 6))
.where(('b >= 5) && ('a + Rand(10).as("rnd") > 6) && ('c > 6))
}
val optimized = Optimize.execute(originalQuery.analyze)
val correctAnswer = {
testRelationWithArrayType
.where('b >= 5)
.generate(Explode('c_arr), true, false, Some("arr"))
.where('a + Rand(10).as("rnd") > 6)
.where('a + Rand(10).as("rnd") > 6 && 'c > 6)
.analyze
}

Expand Down Expand Up @@ -715,14 +715,14 @@ class FilterPushdownSuite extends PlanTest {
val testRelation2 = LocalRelation('d.int, 'e.int, 'f.int)

val originalQuery = Union(Seq(testRelation, testRelation2))
.where('a === 2L && 'b + Rand(10).as("rnd") === 3)
.where('a === 2L && 'b + Rand(10).as("rnd") === 3 && 'c > 5L)

val optimized = Optimize.execute(originalQuery.analyze)

val correctAnswer = Union(Seq(
testRelation.where('a === 2L),
testRelation2.where('d === 2L)))
.where('b + Rand(10).as("rnd") === 3)
.where('b + Rand(10).as("rnd") === 3 && 'c > 5L)
.analyze

comparePlans(optimized, correctAnswer)
Expand Down