From 9061e777fdcd5767718808e325e8953d484aa761 Mon Sep 17 00:00:00 2001 From: Yash Datta Date: Wed, 6 Jan 2016 10:37:53 -0800 Subject: [PATCH] [SPARK-11878][SQL] Eliminate distribute by in case group by is present with exactly the same grouping expressi For queries like : select <> from table group by a distribute by a we can eliminate distribute by ; since group by will anyways do a hash partitioning Also applicable when user uses Dataframe API Author: Yash Datta Closes #9858 from saucam/eliminatedistribute. --- .../apache/spark/sql/execution/Exchange.scala | 6 +++ .../spark/sql/execution/PlannerSuite.scala | 39 +++++++++++++++++++ 2 files changed, 45 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 7b4161930b7d2..6b100577077c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -467,6 +467,12 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[ } def apply(plan: SparkPlan): SparkPlan = plan.transformUp { + case operator @ Exchange(partitioning, child, _) => + child.children match { + case Exchange(childPartitioning, baseChild, _)::Nil => + if (childPartitioning.guarantees(partitioning)) child else operator + case _ => operator + } case operator: SparkPlan => ensureDistributionAndOrdering(operator) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 858e289c2716e..03a1b8e11d455 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -417,6 +417,45 @@ class PlannerSuite extends SharedSQLContext { } } + test("EnsureRequirements eliminates Exchange if child has Exchange with same partitioning") { + val distribution = ClusteredDistribution(Literal(1) :: Nil) + val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 5) + val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5) + assert(!childPartitioning.satisfies(distribution)) + val inputPlan = Exchange(finalPartitioning, + DummySparkPlan( + children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil, + requiredChildDistribution = Seq(distribution), + requiredChildOrdering = Seq(Seq.empty)), + None) + + val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) + assertDistributionRequirementsAreSatisfied(outputPlan) + if (outputPlan.collect { case e: Exchange => true }.size == 2) { + fail(s"Topmost Exchange should have been eliminated:\n$outputPlan") + } + } + + test("EnsureRequirements does not eliminate Exchange with different partitioning") { + val distribution = ClusteredDistribution(Literal(1) :: Nil) + // Number of partitions differ + val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 8) + val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5) + assert(!childPartitioning.satisfies(distribution)) + val inputPlan = Exchange(finalPartitioning, + DummySparkPlan( + children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil, + requiredChildDistribution = Seq(distribution), + requiredChildOrdering = Seq(Seq.empty)), + None) + + val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan) + assertDistributionRequirementsAreSatisfied(outputPlan) + if (outputPlan.collect { case e: Exchange => true }.size == 1) { + fail(s"Topmost Exchange should not have been eliminated:\n$outputPlan") + } + } + // --------------------------------------------------------------------------------------------- }