From a9aa64388bae6afffecb27c432d9cf3de77aeba0 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Mon, 14 Sep 2015 17:05:38 +0800 Subject: [PATCH 1/3] fix set optimization by eliminate empty project push down --- .../sql/catalyst/optimizer/Optimizer.scala | 8 ++++++-- .../optimizer/SetOperationPushDownSuite.scala | 19 ++++++++++++++++++- .../org/apache/spark/sql/DataFrameSuite.scala | 9 +++++++++ 3 files changed, 33 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 648a65e7c0eb3..22191fbc605be 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -137,7 +137,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] { Filter(pushToRight(condition, rewrites), right)) // Push down projection into intersect - case Project(projectList, i @ Intersect(left, right)) => + // SPARK-10539: Pushed down empty project list would make all the rows + // in both child to empty row, which is against Intersect's semantic + case Project(projectList, i @ Intersect(left, right)) if projectList.length > 0 => val rewrites = buildRewrites(i) Intersect( Project(projectList, left), @@ -151,7 +153,9 @@ object SetOperationPushDown extends Rule[LogicalPlan] { Filter(pushToRight(condition, rewrites), right)) // Push down projection into except - case Project(projectList, e @ Except(left, right)) => + // SPARK-10539: Pushed down empty project list would make all the rows + // in both child to empty row, which is against Except's semantic + case Project(projectList, e @ Except(left, right)) if projectList.length > 0 => val rewrites = buildRewrites(e) Except( Project(projectList, left), diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index 49c979bc7d72c..856b2e5e77ded 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -78,5 +78,22 @@ class SetOperationPushDownSuite extends PlanTest { comparePlans(unionOptimized, unionCorrectAnswer) comparePlans(intersectOptimized, intersectCorrectAnswer) - comparePlans(exceptOptimized, exceptCorrectAnswer) } + comparePlans(exceptOptimized, exceptCorrectAnswer) + } + + test("SPARK-10539: Empty project list should not be pushed down through intersect or except") { + val intersectQuery = testIntersect.select() + val exceptQuery = testExcept.select() + + val intersectOptimized = Optimize.execute(intersectQuery.analyze) + val exceptOptimized = Optimize.execute(exceptQuery.analyze) + + val intersectCorrectAnswer = + Intersect(testRelation, testRelation2).select().analyze + val exceptCorrectAnswer = + Except(testRelation, testRelation2).select().analyze + + comparePlans(intersectOptimized, intersectCorrectAnswer) + comparePlans(exceptOptimized, exceptCorrectAnswer) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c167999af580e..be6e0b186fff4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -907,4 +907,13 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { assert(row.getDouble(1) - row.getDouble(3) === 0.0 +- 0.001) } } + + test("SPARK-10539: Empty project list should not be pushed down through intersect or except") { + val df1 = (1 to 100).map(Tuple1.apply).toDF("i") + val df2 = (1 to 30).map(Tuple1.apply).toDF("i") + val intersect = df1.intersect(df2) + val except = df1.except(df2) + assert(intersect.count() === 30) + assert(except.count() === 70) + } } From f9a3b70c7f4c73c7f0572c31a90bb7f7b4698ef7 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Tue, 15 Sep 2015 00:30:38 +0800 Subject: [PATCH 2/3] Project should not be pushed down through Intersect or Except --- .../sql/catalyst/optimizer/Optimizer.scala | 18 ----------- .../optimizer/SetOperationPushDownSuite.scala | 30 ++++--------------- .../org/apache/spark/sql/DataFrameSuite.scala | 2 +- 3 files changed, 7 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 22191fbc605be..5823754502ecf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -136,30 +136,12 @@ object SetOperationPushDown extends Rule[LogicalPlan] { Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) - // Push down projection into intersect - // SPARK-10539: Pushed down empty project list would make all the rows - // in both child to empty row, which is against Intersect's semantic - case Project(projectList, i @ Intersect(left, right)) if projectList.length > 0 => - val rewrites = buildRewrites(i) - Intersect( - Project(projectList, left), - Project(projectList.map(pushToRight(_, rewrites)), right)) - // Push down filter into except case Filter(condition, e @ Except(left, right)) => val rewrites = buildRewrites(e) Except( Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) - - // Push down projection into except - // SPARK-10539: Pushed down empty project list would make all the rows - // in both child to empty row, which is against Except's semantic - case Project(projectList, e @ Except(left, right)) if projectList.length > 0 => - val rewrites = buildRewrites(e) - Except( - Project(projectList, left), - Project(projectList.map(pushToRight(_, rewrites)), right)) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala index 856b2e5e77ded..3fca47a023dc6 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SetOperationPushDownSuite.scala @@ -60,40 +60,22 @@ class SetOperationPushDownSuite extends PlanTest { comparePlans(exceptOptimized, exceptCorrectAnswer) } - test("union/intersect/except: project to each side") { + test("union: project to each side") { val unionQuery = testUnion.select('a) - val intersectQuery = testIntersect.select('b, 'c) - val exceptQuery = testExcept.select('a, 'b, 'c) - val unionOptimized = Optimize.execute(unionQuery.analyze) - val intersectOptimized = Optimize.execute(intersectQuery.analyze) - val exceptOptimized = Optimize.execute(exceptQuery.analyze) - val unionCorrectAnswer = Union(testRelation.select('a), testRelation2.select('d)).analyze - val intersectCorrectAnswer = - Intersect(testRelation.select('b, 'c), testRelation2.select('e, 'f)).analyze - val exceptCorrectAnswer = - Except(testRelation.select('a, 'b, 'c), testRelation2.select('d, 'e, 'f)).analyze - comparePlans(unionOptimized, unionCorrectAnswer) - comparePlans(intersectOptimized, intersectCorrectAnswer) - comparePlans(exceptOptimized, exceptCorrectAnswer) } - test("SPARK-10539: Empty project list should not be pushed down through intersect or except") { - val intersectQuery = testIntersect.select() - val exceptQuery = testExcept.select() + test("SPARK-10539: Project should not be pushed down through Intersect or Except") { + val intersectQuery = testIntersect.select('b, 'c) + val exceptQuery = testExcept.select('a, 'b, 'c) val intersectOptimized = Optimize.execute(intersectQuery.analyze) val exceptOptimized = Optimize.execute(exceptQuery.analyze) - val intersectCorrectAnswer = - Intersect(testRelation, testRelation2).select().analyze - val exceptCorrectAnswer = - Except(testRelation, testRelation2).select().analyze - - comparePlans(intersectOptimized, intersectCorrectAnswer) - comparePlans(exceptOptimized, exceptCorrectAnswer) + comparePlans(intersectOptimized, intersectQuery.analyze) + comparePlans(exceptOptimized, exceptQuery.analyze) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index be6e0b186fff4..1370713975f2f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -908,7 +908,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } - test("SPARK-10539: Empty project list should not be pushed down through intersect or except") { + test("SPARK-10539: Project should not be pushed down through Intersect or Except") { val df1 = (1 to 100).map(Tuple1.apply).toDF("i") val df2 = (1 to 30).map(Tuple1.apply).toDF("i") val intersect = df1.intersect(df2) From 1f56d2e626ed75d4ce96ffd70929243eed4c8143 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Fri, 18 Sep 2015 11:13:05 -0700 Subject: [PATCH 3/3] Add comments. --- .../sql/catalyst/optimizer/Optimizer.scala | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 5823754502ecf..324f40a051c38 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -85,7 +85,22 @@ object SamplePushDown extends Rule[LogicalPlan] { } /** - * Pushes operations to either side of a Union, Intersect or Except. + * Pushes certain operations to both sides of a Union, Intersect or Except operator. + * Operations that are safe to pushdown are listed as follows. + * Union: + * Right now, Union means UNION ALL, which does not de-duplicate rows. So, it is + * safe to pushdown Filters and Projections through it. Once we add UNION DISTINCT, + * we will not be able to pushdown Projections. + * + * Intersect: + * It is not safe to pushdown Projections through it because we need to get the + * intersect of rows by comparing the entire rows. It is fine to pushdown Filters + * because we will not have non-deterministic expressions. + * + * Except: + * It is not safe to pushdown Projections through it because we need to get the + * intersect of rows by comparing the entire rows. It is fine to pushdown Filters + * because we will not have non-deterministic expressions. */ object SetOperationPushDown extends Rule[LogicalPlan] { @@ -122,21 +137,21 @@ object SetOperationPushDown extends Rule[LogicalPlan] { Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) - // Push down projection into union + // Push down projection through UNION ALL case Project(projectList, u @ Union(left, right)) => val rewrites = buildRewrites(u) Union( Project(projectList, left), Project(projectList.map(pushToRight(_, rewrites)), right)) - // Push down filter into intersect + // Push down filter through INTERSECT case Filter(condition, i @ Intersect(left, right)) => val rewrites = buildRewrites(i) Intersect( Filter(condition, left), Filter(pushToRight(condition, rewrites), right)) - // Push down filter into except + // Push down filter through EXCEPT case Filter(condition, e @ Except(left, right)) => val rewrites = buildRewrites(e) Except(