From 040b60a980627f5d5c6cb21a61ba0758dea8cf29 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 14 Sep 2015 17:05:38 +0800 Subject: [PATCH 1/2] fix set optimization by eliminate empty project push down --- .../sql/catalyst/optimizer/Optimizer.scala | 8 ++++++-- .../optimizer/SetOperationPushDownSuite.scala | 19 ++++++++++++++++++- .../org/apache/spark/sql/DataFrameSuite.scala | 9 +++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 0f4caec7451a2..7b4e4bb952975 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -137,7 +137,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] { Filter(pushToRight(condition, rewrites), right)) // Push down projection into intersect - case Project(projectList, i @ Intersect(left, right)) => + // SPARK-10539: Pushed down empty project list would make all the rows + // in both child to empty row, which is against Intersect's semantic + case Project(projectList, i @ Intersect(left, right)) if projectList.length > 0 => val rewrites = buildRewrites(i) Intersect( Project(projectList, left), @@ -151,7 +153,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] { Filter(pushToRight(condition, rewrites), right)) // Push down projection into except - case Project(projectList, e @ Except(left, right)) => + // SPARK-10539: Pushed down empty project list would make all the rows + // in both child to empty row, which is against Except's semantic + case Project(projectList, e @ Except(left, right)) if projectList.length > 0 => val rewrites = buildRewrites(e) Except( Project(projectList, left), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index 49c979bc7d72c..856b2e5e77ded 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -78,5 +78,22 @@ class SetOperationPushDownSuite extends PlanTest { comparePlans(unionOptimized, unionCorrectAnswer) comparePlans(intersectOptimized, intersectCorrectAnswer) - comparePlans(exceptOptimized, exceptCorrectAnswer) } + comparePlans(exceptOptimized, exceptCorrectAnswer) + } + + test("SPARK-10539: Empty project list should not be pushed down through intersect or except") { + val intersectQuery = testIntersect.select() + val exceptQuery = testExcept.select() + + val intersectOptimized = Optimize.execute(intersectQuery.analyze) + val exceptOptimized = Optimize.execute(exceptQuery.analyze) + + val intersectCorrectAnswer = + Intersect(testRelation, testRelation2).select().analyze + val exceptCorrectAnswer = + Except(testRelation, testRelation2).select().analyze + + comparePlans(intersectOptimized, intersectCorrectAnswer) + comparePlans(exceptOptimized, exceptCorrectAnswer) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c167999af580e..be6e0b186fff4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -907,4 +907,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(row.getDouble(1) - row.getDouble(3) === 0.0 +- 0.001) } } + + test("SPARK-10539: Empty project list should not be pushed down through intersect or except") { + val df1 = (1 to 100).map(Tuple1.apply).toDF("i") + val df2 = (1 to 30).map(Tuple1.apply).toDF("i") + val intersect = df1.intersect(df2) + val except = df1.except(df2) + assert(intersect.count() === 30) + assert(except.count() === 70) + } } From ce6ed80f1d3b7138664010a415a8501ea68dcd28 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 15 Sep 2015 00:30:38 +0800 Subject: [PATCH 2/2] Project should not be pushed down through Intersect or Except --- .../sql/catalyst/optimizer/Optimizer.scala | 18 ----------- .../optimizer/SetOperationPushDownSuite.scala | 30 ++++--------------- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- 3 files changed, 7 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 7b4e4bb952975..5b4359894c79d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -136,30 +136,12 @@ object SetOperationPushDown extends Rule[LogicalPlan] { Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) - // Push down projection into intersect - // SPARK-10539: Pushed down empty project list would make all the rows - // in both child to empty row, which is against Intersect's semantic - case Project(projectList, i @ Intersect(left, right)) if projectList.length > 0 => - val rewrites = buildRewrites(i) - Intersect( - Project(projectList, left), - Project(projectList.map(pushToRight(_, rewrites)), right)) - // Push down filter into except case Filter(condition, e @ Except(left, right)) => val rewrites = buildRewrites(e) Except( Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) - - // Push down projection into except - // SPARK-10539: Pushed down empty project list would make all the rows - // in both child to empty row, which is against Except's semantic - case Project(projectList, e @ Except(left, right)) if projectList.length > 0 => - val rewrites = buildRewrites(e) - Except( - Project(projectList, left), - Project(projectList.map(pushToRight(_, rewrites)), right)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index 856b2e5e77ded..3fca47a023dc6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -60,40 +60,22 @@ class SetOperationPushDownSuite extends PlanTest { comparePlans(exceptOptimized, exceptCorrectAnswer) } - test("union/intersect/except: project to each side") { + test("union: project to each side") { val unionQuery = testUnion.select('a) - val intersectQuery = testIntersect.select('b, 'c) - val exceptQuery = testExcept.select('a, 'b, 'c) - val unionOptimized = Optimize.execute(unionQuery.analyze) - val intersectOptimized = Optimize.execute(intersectQuery.analyze) - val exceptOptimized = Optimize.execute(exceptQuery.analyze) - val unionCorrectAnswer = Union(testRelation.select('a), testRelation2.select('d)).analyze - val intersectCorrectAnswer = - Intersect(testRelation.select('b, 'c), testRelation2.select('e, 'f)).analyze - val exceptCorrectAnswer = - Except(testRelation.select('a, 'b, 'c), testRelation2.select('d, 'e, 'f)).analyze - comparePlans(unionOptimized, unionCorrectAnswer) - comparePlans(intersectOptimized, intersectCorrectAnswer) - comparePlans(exceptOptimized, exceptCorrectAnswer) } - test("SPARK-10539: Empty project list should not be pushed down through intersect or except") { - val intersectQuery = testIntersect.select() - val exceptQuery = testExcept.select() + test("SPARK-10539: Project should not be pushed down through Intersect or Except") { + val intersectQuery = testIntersect.select('b, 'c) + val exceptQuery = testExcept.select('a, 'b, 'c) val intersectOptimized = Optimize.execute(intersectQuery.analyze) val exceptOptimized = Optimize.execute(exceptQuery.analyze) - val intersectCorrectAnswer = - Intersect(testRelation, testRelation2).select().analyze - val exceptCorrectAnswer = - Except(testRelation, testRelation2).select().analyze - - comparePlans(intersectOptimized, intersectCorrectAnswer) - comparePlans(exceptOptimized, exceptCorrectAnswer) + comparePlans(intersectOptimized, intersectQuery.analyze) + comparePlans(exceptOptimized, exceptQuery.analyze) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index be6e0b186fff4..1370713975f2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -908,7 +908,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-10539: Empty project list should not be pushed down through intersect or except") { + test("SPARK-10539: Project should not be pushed down through Intersect or Except") { val df1 = (1 to 100).map(Tuple1.apply).toDF("i") val df2 = (1 to 30).map(Tuple1.apply).toDF("i") val intersect = df1.intersect(df2)