From c4dedd21fd6a7986814d2da970e22762e8292aed Mon Sep 17 00:00:00 2001 From: gatorsmile Date: Wed, 20 Apr 2016 11:16:51 -0700 Subject: [PATCH] address comments and added more test cases. --- .../sql/catalyst/optimizer/Optimizer.scala | 13 +-- .../optimizer/FilterPushdownSuite.scala | 85 +++++++++++++++++-- 2 files changed, 88 insertions(+), 10 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6a1ff3f3e9935..39ff1ad269f4b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -959,15 +959,18 @@ object PushDownPredicate extends Rule[LogicalPlan] with PredicateHelper { project.copy(child = Filter(replaceAlias(condition, aliasMap), grandChild)) // Push [[Filter]] operators through [[Window]] operators. Parts of the predicate that can be - // pushed beneath must satisfy three conditions: - // 1. All the columns are part of window partitioning key. - // 2. Window partitioning key should be just a sequence of [[AttributeReference]]. - // 3. Deterministic + // pushed beneath must satisfy the following two conditions: + // 1. All the expressions are part of window partitioning key. The expressions can be compound. + // 2. Deterministic case filter @ Filter(condition, w: Window) if w.partitionSpec.forall(_.isInstanceOf[AttributeReference]) => val partitionAttrs = AttributeSet(w.partitionSpec.flatMap(_.references)) val (pushDown, stayUp) = splitConjunctivePredicates(condition).partition { cond => - cond.references.subsetOf(partitionAttrs) && cond.deterministic + cond.references.subsetOf(partitionAttrs) && cond.deterministic && + // This is for ensuring all the partitioning expressions have been converted to alias + // in Analyzer. Thus, we do not need to check if the expressions in conditions are + // the same as the expressions used in partitioning columns. + partitionAttrs.forall(_.isInstanceOf[Attribute]) } if (pushDown.nonEmpty) { val pushDownPredicate = pushDown.reduce(And) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala index 90c521a344206..4da7d45e35445 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala @@ -871,10 +871,39 @@ class FilterPushdownSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } + // complex predicates with the same references but the same expressions + // Todo: in Analyzer, to enable it, we need to convert the expression in conditions + // to the alias that is defined as the same expression + ignore("Window: predicate push down -- complex predicate with the same expressions") { + val winSpec = windowSpec( + partitionSpec = 'a.attr + 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + val winSpecAnalyzed = windowSpec( + partitionSpec = '_w0.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) + + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val correctAnswer = testRelation + .where('a + 'b > 1).select('a, 'b, 'c, ('a + 'b).as("_w0")) + .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil) + .select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + test("Window: no predicate push down -- predicates are not from partitioning keys") { - val winExpr = - windowExpr(count('b), windowSpec('a.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame)) + val winSpec = windowSpec( + partitionSpec = 'a.attr :: 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + // No push down: the predicate is c > 1, but the partitioning key is (a, b). val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('c > 1) val correctAnswer = testRelation.select('a, 'b, 'c) .window(winExpr.as('window) :: Nil, 'a.attr :: 'b.attr :: Nil, 'b.asc :: Nil) @@ -883,12 +912,20 @@ class FilterPushdownSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } - test("Window: no predicate push down -- compound partition key") { - val winSpec = windowSpec('a.attr + 'b.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + test("Window: no predicate push down -- partial compound partition key") { + val winSpec = windowSpec( + partitionSpec = 'a.attr + 'b.attr :: 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) val winExpr = windowExpr(count('b), winSpec) + + // No push down: the predicate is a > 1, but the partitioning key is (a + b, b) val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a > 1) - val winSpecAnalyzed = windowSpec('_w0.attr :: 'b.attr :: Nil, 'b.asc :: Nil, UnspecifiedFrame) + val winSpecAnalyzed = windowSpec( + partitionSpec = '_w0.attr :: 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0")) .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: 'b.attr :: Nil, 'b.asc :: Nil) @@ -896,4 +933,42 @@ class FilterPushdownSuite extends PlanTest { comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) } + + test("Window: no predicate push down -- complex predicates containing non partitioning columns") { + val winSpec = + windowSpec(partitionSpec = 'b.attr :: Nil, orderSpec = 'b.asc :: Nil, UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + // No push down: the predicate is a + b > 1, but the partitioning key is b. + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a + 'b > 1) + val correctAnswer = testRelation + .select('a, 'b, 'c) + .window(winExpr.as('window) :: Nil, 'b.attr :: Nil, 'b.asc :: Nil) + .where('a + 'b > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } + + // complex predicates with the same references but different expressions + test("Window: no predicate push down -- complex predicate with different expressions") { + val winSpec = windowSpec( + partitionSpec = 'a.attr + 'b.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExpr = windowExpr(count('b), winSpec) + + val winSpecAnalyzed = windowSpec( + partitionSpec = '_w0.attr :: Nil, + orderSpec = 'b.asc :: Nil, + UnspecifiedFrame) + val winExprAnalyzed = windowExpr(count('b), winSpecAnalyzed) + + // No push down: the predicate is a + b > 1, but the partitioning key is a + b. + val originalQuery = testRelation.select('a, 'b, 'c, winExpr.as('window)).where('a - 'b > 1) + val correctAnswer = testRelation.select('a, 'b, 'c, ('a + 'b).as("_w0")) + .window(winExprAnalyzed.as('window) :: Nil, '_w0 :: Nil, 'b.asc :: Nil) + .where('a - 'b > 1).select('a, 'b, 'c, 'window).analyze + + comparePlans(Optimize.execute(originalQuery.analyze), correctAnswer) + } }