From 33c8008efe667ec25c07f34529a4113a4f9a5a9b Mon Sep 17 00:00:00 2001 From: Liang-Chi Hsieh Date: Fri, 7 Aug 2015 17:03:53 +0800 Subject: [PATCH] Use Exchange to perform shuffle for Repartition operator. --- .../org/apache/spark/sql/execution/SparkStrategies.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index c5aaebe673225..a474bf9bcad65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -312,7 +312,11 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { throw new IllegalStateException( "logical distinct operator should have been replaced by aggregate in the optimizer") case logical.Repartition(numPartitions, shuffle, child) => - execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil + if (shuffle) { + execution.Exchange(HashPartitioning(child.output, numPartitions), planLater(child)) :: Nil + } else { + execution.Repartition(numPartitions, shuffle, planLater(child)) :: Nil + } case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution.