From 54cbad15ed17bf9c16b372fb3e5fdf6a8fee69ae Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Mon, 13 Mar 2017 17:14:47 +0100 Subject: [PATCH 1/2] Do not change output of a subquery --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 16 +++++++++++++--- .../RemoveRedundantAliasAndProjectSuite.scala | 10 ++++++++++ .../org/apache/spark/sql/SubquerySuite.scala | 14 ++++++++++++++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index caafa1c134cd4..8786fffc76e9d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -140,7 +140,8 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) object OptimizeSubqueries extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case s: SubqueryExpression => - s.withNewPlan(Optimizer.this.execute(s.plan)) + val ReturnAnswer(newPlan) = Optimizer.this.execute(ReturnAnswer(s.plan)) + s.withNewPlan(newPlan) } } } @@ -185,7 +186,10 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { // If the alias name is different from attribute name, we can't strip it either, or we // may accidentally change the output schema name of the root plan. case a @ Alias(attr: Attribute, name) - if a.metadata == Metadata.empty && name == attr.name && !blacklist.contains(attr) => + if a.metadata == Metadata.empty && + name == attr.name && + !blacklist.contains(attr) && + !blacklist.contains(a) => attr case a => a } @@ -193,10 +197,16 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { /** * Remove redundant alias expression from a LogicalPlan and its subtree. A blacklist is used to * prevent the removal of seemingly redundant aliases used to deduplicate the input for a (self) - * join. + * join or to prevent the removal of top-level subquery attributes. */ private def removeRedundantAliases(plan: LogicalPlan, blacklist: AttributeSet): LogicalPlan = { plan match { + // A ReturnAnswer node means that we want to keep the same output attributes. This means we + // cannot remove aliases that produce these attributes. A ReturnAnswer node is currently only + // used during sub query optimization. + case ReturnAnswer(child) => + ReturnAnswer(removeRedundantAliases(child, blacklist ++ child.outputSet)) + // A join has to be treated differently, because the left and the right side of the join are // not allowed to use the same attributes. We use a blacklist to prevent us from creating a // situation in which this happens; the rule will only remove an alias if its child diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala index c01ea01ec6808..40778a81b75f2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala @@ -116,4 +116,14 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper val expected = relation.window(Seq('b), Seq('a), Seq()).analyze comparePlans(optimized, expected) } + + test("do not remove output attributes from a return answer node") { + val relation = LocalRelation('a.int, 'b.int) + val query = ReturnAnswer( + relation.select('a as "a", 'b as "b").where('b < 10).select('a).analyze) + val optimized = Optimize.execute(query) + val expected = ReturnAnswer( + relation.select('a as "a", 'b).where('b < 10).select('a).analyze) + comparePlans(optimized, expected) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 25dbecb5894e4..fb92f314088d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -825,4 +825,18 @@ class SubquerySuite extends QueryTest with SharedSQLContext { Row(1) :: Row(0) :: Nil) } } + + test("SPARK-19933 Do not eliminate top-level aliases in sub-queries") { + withTempView("t1", "t2") { + spark.range(4).createOrReplaceTempView("t1") + checkAnswer( + sql("select * from t1 where id in (select id as id from t1)"), + Row(0) :: Row(1) :: Row(2) :: Row(3) :: Nil) + + spark.range(2).createOrReplaceTempView("t2") + checkAnswer( + sql("select * from t1 where id in (select id as id from t2)"), + Row(0) :: Row(1) :: Nil) + } + } } From 3e460b6c3c5544f856bd3ed4da69833894ef8c08 Mon Sep 17 00:00:00 2001 From: Herman van Hovell Date: Tue, 14 Mar 2017 10:57:46 +0100 Subject: [PATCH 2/2] Create a Subquery plan --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 11 +++++------ .../plans/logical/basicLogicalOperators.scala | 8 ++++++++ .../RemoveRedundantAliasAndProjectSuite.scala | 8 +++----- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 8786fffc76e9d..a4a0d072a76c4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -140,7 +140,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, conf: CatalystConf) object OptimizeSubqueries extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case s: SubqueryExpression => - val ReturnAnswer(newPlan) = Optimizer.this.execute(ReturnAnswer(s.plan)) + val Subquery(newPlan) = Optimizer.this.execute(Subquery(s.plan)) s.withNewPlan(newPlan) } } @@ -201,11 +201,10 @@ object RemoveRedundantAliases extends Rule[LogicalPlan] { */ private def removeRedundantAliases(plan: LogicalPlan, blacklist: AttributeSet): LogicalPlan = { plan match { - // A ReturnAnswer node means that we want to keep the same output attributes. This means we - // cannot remove aliases that produce these attributes. A ReturnAnswer node is currently only - // used during sub query optimization. - case ReturnAnswer(child) => - ReturnAnswer(removeRedundantAliases(child, blacklist ++ child.outputSet)) + // We want to keep the same output attributes for subqueries. This means we cannot remove + // the aliases that produce these attributes + case Subquery(child) => + Subquery(removeRedundantAliases(child, blacklist ++ child.outputSet)) // A join has to be treated differently, because the left and the right side of the join are // not allowed to use the same attributes. We use a blacklist to prevent us from creating a diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 31b6ed48a2230..5cbf263d1ce42 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -38,6 +38,14 @@ case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output } +/** + * This node is inserted at the top of a subquery when it is optimized. This makes sure we can + * recognize a subquery as such, and it allows us to write subquery aware transformations. + */ +case class Subquery(child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} + case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) override def maxRows: Option[Long] = child.maxRows diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala index 40778a81b75f2..1973b5abb462d 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/RemoveRedundantAliasAndProjectSuite.scala @@ -117,13 +117,11 @@ class RemoveRedundantAliasAndProjectSuite extends PlanTest with PredicateHelper comparePlans(optimized, expected) } - test("do not remove output attributes from a return answer node") { + test("do not remove output attributes from a subquery") { val relation = LocalRelation('a.int, 'b.int) - val query = ReturnAnswer( - relation.select('a as "a", 'b as "b").where('b < 10).select('a).analyze) + val query = Subquery(relation.select('a as "a", 'b as "b").where('b < 10).select('a).analyze) val optimized = Optimize.execute(query) - val expected = ReturnAnswer( - relation.select('a as "a", 'b).where('b < 10).select('a).analyze) + val expected = Subquery(relation.select('a as "a", 'b).where('b < 10).select('a).analyze) comparePlans(optimized, expected) } }