From cac83e7ecb5a0c0fff097eda6e438819d9658d73 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Wed, 17 May 2017 10:24:24 -0700 Subject: [PATCH 1/3] bugfix --- .../spark/sql/catalyst/plans/QueryPlan.scala | 6 ++-- .../InferFiltersFromConstraintsSuite.scala | 28 +++++++++---------- 2 files changed, 16 insertions(+), 18 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 959fcf7c7548e..e1243e8459c84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -82,10 +82,10 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT } // Collect aliases from expressions, so we may avoid producing recursive constraints. - private lazy val aliasMap = AttributeMap( - (expressions ++ children.flatMap(_.expressions)).collect { + protected lazy val aliasMap: AttributeMap[Expression] = AttributeMap( + expressions.collect { case a: Alias => (a.toAttribute, a.child) - }) + } ++ children.flatMap(_.aliasMap)) /** * Infers an additional set of constraints from a given set of equality constraints. diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index c8fe37462726a..3f7e21e3457bd 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -33,7 +33,8 @@ class InferFiltersFromConstraintsSuite extends PlanTest { PushPredicateThroughJoin, PushDownPredicate, InferFiltersFromConstraints(conf), - CombineFilters) :: Nil + CombineFilters, + BooleanSimplification) :: Nil } object OptimizeWithConstraintPropagationDisabled extends RuleExecutor[LogicalPlan] { @@ -172,7 +173,8 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val t1 = testRelation.subquery('t1) val t2 = testRelation.subquery('t2) - val originalQuery = t1.select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col)).as("t") + val originalQuery = t1.select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col)) + .select('int_col, 'd, 'a).as("t") .join(t2, Inner, Some("t.a".attr === "t2.a".attr && "t.d".attr === "t2.a".attr @@ -180,22 +182,18 @@ class InferFiltersFromConstraintsSuite extends PlanTest { .analyze val correctAnswer = t1 .where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a))) - && 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a)) && 'a <=> 'a - && Coalesce(Seq('a, 'a)) <=> 'b && Coalesce(Seq('a, 'a)) <=> Coalesce(Seq('a, 'a)) - && 'a === 'b && IsNotNull(Coalesce(Seq('a, 'b))) && 'a === Coalesce(Seq('a, 'b)) - && Coalesce(Seq('a, 'b)) <=> Coalesce(Seq('b, 'b)) && Coalesce(Seq('a, 'b)) === 'b + && 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a)) + && Coalesce(Seq('b, 'b)) <=> 'a && 'a === 'b && IsNotNull(Coalesce(Seq('a, 'b))) + && 'a === Coalesce(Seq('a, 'b)) && Coalesce(Seq('a, 'b)) === 'b && IsNotNull('b) && IsNotNull(Coalesce(Seq('b, 'b))) - && 'b === Coalesce(Seq('b, 'b)) && 'b <=> Coalesce(Seq('b, 'b)) - && Coalesce(Seq('b, 'b)) <=> Coalesce(Seq('b, 'b)) && 'b <=> 'b) - .select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col)).as("t") + && 'b === Coalesce(Seq('b, 'b)) && 'b <=> Coalesce(Seq('b, 'b))) + .select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col)) + .select('int_col, 'd, 'a).as("t") .join(t2 .where(IsNotNull('a) && IsNotNull(Coalesce(Seq('a, 'a))) - && 'a === Coalesce(Seq('a, 'a)) && 'a <=> Coalesce(Seq('a, 'a)) && 'a <=> 'a - && Coalesce(Seq('a, 'a)) <=> Coalesce(Seq('a, 'a))), Inner, - Some("t.a".attr === "t2.a".attr - && "t.d".attr === "t2.a".attr - && "t.int_col".attr === "t2.a".attr - && Coalesce(Seq("t.d".attr, "t.d".attr)) <=> "t.int_col".attr)) + && 'a <=> Coalesce(Seq('a, 'a)) && 'a === Coalesce(Seq('a, 'a)) && 'a <=> 'a), Inner, + Some("t.a".attr === "t2.a".attr && "t.d".attr === "t2.a".attr + && "t.int_col".attr === "t2.a".attr)) .analyze val optimized = Optimize.execute(originalQuery) comparePlans(optimized, correctAnswer) From 3890b91f42205d4db19349af459c6511ab81daf1 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Wed, 17 May 2017 14:58:26 -0700 Subject: [PATCH 2/3] minor change --- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index e1243e8459c84..3b3d2d7e8decf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -82,7 +82,7 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT } // Collect aliases from expressions, so we may avoid producing recursive constraints. - protected lazy val aliasMap: AttributeMap[Expression] = AttributeMap( + private lazy val aliasMap: AttributeMap[Expression] = AttributeMap( expressions.collect { case a: Alias => (a.toAttribute, a.child) } ++ children.flatMap(_.aliasMap)) From aa16ab38fc0e0c80b179a5860f477c3650f64609 Mon Sep 17 00:00:00 2001 From: Xingbo Jiang Date: Wed, 17 May 2017 16:47:28 -0700 Subject: [PATCH 3/3] update comments. --- .../scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala | 3 ++- .../catalyst/optimizer/InferFiltersFromConstraintsSuite.scala | 4 ++++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 3b3d2d7e8decf..d3f822bf7eb0e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -81,7 +81,8 @@ abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends TreeNode[PlanT case _ => Seq.empty[Attribute] } - // Collect aliases from expressions, so we may avoid producing recursive constraints. + // Collect aliases from expressions of the whole tree rooted by the current QueryPlan node, so + // we may avoid producing recursive constraints. private lazy val aliasMap: AttributeMap[Expression] = AttributeMap( expressions.collect { case a: Alias => (a.toAttribute, a.child) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala index 3f7e21e3457bd..9a4bcdb011435 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/InferFiltersFromConstraintsSuite.scala @@ -173,7 +173,11 @@ class InferFiltersFromConstraintsSuite extends PlanTest { val t1 = testRelation.subquery('t1) val t2 = testRelation.subquery('t2) + // We should prevent `Coalese(a, b)` from recursively creating complicated constraints through + // the constraint inference procedure. val originalQuery = t1.select('a, 'b.as('d), Coalesce(Seq('a, 'b)).as('int_col)) + // We hide an `Alias` inside the child's child's expressions, to cover the situation reported + // in [SPARK-20700]. .select('int_col, 'd, 'a).as("t") .join(t2, Inner, Some("t.a".attr === "t2.a".attr