Skip to content

Commit

Permalink
[SPARK-11878][SQL] Eliminate distribute by in case group by is presen…
Browse files Browse the repository at this point in the history
…t with exactly the same grouping expressi

For queries like :
select <> from table group by a distribute by a
we can eliminate distribute by ; since group by will anyways do a hash partitioning
Also applicable when user uses Dataframe API

Author: Yash Datta <Yash.Datta@guavus.com>

Closes apache#9858 from saucam/eliminatedistribute.
  • Loading branch information
Yash Datta authored and marmbrus committed Jan 6, 2016
1 parent 94c202c commit 9061e77
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -467,6 +467,12 @@ private[sql] case class EnsureRequirements(sqlContext: SQLContext) extends Rule[
}

def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case operator @ Exchange(partitioning, child, _) =>
child.children match {
case Exchange(childPartitioning, baseChild, _)::Nil =>
if (childPartitioning.guarantees(partitioning)) child else operator
case _ => operator
}
case operator: SparkPlan => ensureDistributionAndOrdering(operator)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,45 @@ class PlannerSuite extends SharedSQLContext {
}
}

test("EnsureRequirements eliminates Exchange if child has Exchange with same partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 5)
val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5)
assert(!childPartitioning.satisfies(distribution))
val inputPlan = Exchange(finalPartitioning,
DummySparkPlan(
children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
requiredChildDistribution = Seq(distribution),
requiredChildOrdering = Seq(Seq.empty)),
None)

val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
if (outputPlan.collect { case e: Exchange => true }.size == 2) {
fail(s"Topmost Exchange should have been eliminated:\n$outputPlan")
}
}

test("EnsureRequirements does not eliminate Exchange with different partitioning") {
val distribution = ClusteredDistribution(Literal(1) :: Nil)
// Number of partitions differ
val finalPartitioning = HashPartitioning(Literal(1) :: Nil, 8)
val childPartitioning = HashPartitioning(Literal(2) :: Nil, 5)
assert(!childPartitioning.satisfies(distribution))
val inputPlan = Exchange(finalPartitioning,
DummySparkPlan(
children = DummySparkPlan(outputPartitioning = childPartitioning) :: Nil,
requiredChildDistribution = Seq(distribution),
requiredChildOrdering = Seq(Seq.empty)),
None)

val outputPlan = EnsureRequirements(sqlContext).apply(inputPlan)
assertDistributionRequirementsAreSatisfied(outputPlan)
if (outputPlan.collect { case e: Exchange => true }.size == 1) {
fail(s"Topmost Exchange should not have been eliminated:\n$outputPlan")
}
}

// ---------------------------------------------------------------------------------------------
}

Expand Down

0 comments on commit 9061e77

Please sign in to comment.