From 81313e49b6267550a1fcb2856ba9d93958d3bc34 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Thu, 29 Oct 2015 14:09:24 -0700 Subject: [PATCH 1/3] [SPARK-11410] [SQL] Add APIs to provide functionality similar to Hive's DISTRIBUTE BY and SORT BY DISTRIBUTE BY allows the user to hash partition the data by specified exprs. It also allows for optioning sorting within each resulting partition. There is no required relationship between the exprs for partitioning and sorting (i.e. one does not need to be a prefix of the other). This patch adds to APIs to DataFrames which can be used together to provide this functionality: 1. distributeBy() which partitions the data frame into a specified number of partitions using the partitioning exprs. 2. localSort() which sorts each partition using the provided sorting exprs. To get the DISTRIBUTE BY functionality, the user simply does: df.distributeBy(...).localSort(...) --- .../catalyst/plans/logical/partitioning.scala | 8 ++ .../org/apache/spark/sql/DataFrame.scala | 51 ++++++-- .../spark/sql/execution/SparkStrategies.scala | 7 ++ .../org/apache/spark/sql/DataFrameSuite.scala | 116 ++++++++++++++++++ 4 files changed, 173 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala index 1f76b03bcb0f6..b78421eb2de60 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala @@ -38,3 +38,11 @@ case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan) */ case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan) extends RedistributeData + +/** + * This method repartitions data using [[Expression]]s into `numPartitions`. If numPartitions is + * less than zero, a default is used. Otherwise this behaves identically to RepartitionByExpression. + */ +case class PartitionByExpression(partitionExpressions: Seq[Expression], + child: LogicalPlan, numPartitions: Int = -1) + extends RedistributeData diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index aa817a037ef5e..895f0110aa382 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -241,6 +241,18 @@ class DataFrame private[sql]( sb.toString() } + private[sql] def sortInternal(global: Boolean, sortExprs: Seq[Column]): DataFrame = { + val sortOrder: Seq[SortOrder] = sortExprs.map { col => + col.expr match { + case expr: SortOrder => + expr + case expr: Expression => + SortOrder(expr, Ascending) + } + } + Sort(sortOrder, global = global, logicalPlan) + } + override def toString: String = { try { schema.map(f => s"${f.name}: ${f.dataType.simpleString}").mkString("[", ", ", "]") @@ -633,15 +645,7 @@ class DataFrame private[sql]( */ @scala.annotation.varargs def sort(sortExprs: Column*): DataFrame = { - val sortOrder: Seq[SortOrder] = sortExprs.map { col => - col.expr match { - case expr: SortOrder => - expr - case expr: Expression => - SortOrder(expr, Ascending) - } - } - Sort(sortOrder, global = true, logicalPlan) + sortInternal(true, sortExprs) } /** @@ -662,6 +666,35 @@ class DataFrame private[sql]( @scala.annotation.varargs def orderBy(sortExprs: Column*): DataFrame = sort(sortExprs : _*) + /** + * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into + * `numPartitions` + * `numPartitions` can be < 0 to preserve the current number of partitions. + * @group dfops + * @since 1.6.0 + */ + def distributeBy(partitionExprs: Seq[Column], numPartitions: Int = -1): DataFrame = { + PartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, numPartitions) + } + + /** + * Returns a new [[DataFrame]] with each partition sorted by the given expressions. + * @group dfops + * @since 1.6.0 + */ + @scala.annotation.varargs + def localSort(sortCol: String, sortCols: String*): DataFrame = localSort(sortCol, sortCols : _*) + + /** + * Returns a new [[DataFrame]] with each partition sorted by the given expressions. + * @group dfops + * @since 1.6.0 + */ + @scala.annotation.varargs + def localSort(sortExprs: Column*): DataFrame = { + sortInternal(false, sortExprs) + } + /** * Selects column based on the column name and return it as a [[Column]]. * Note that the column name can also reference to a nested column like `a.b`. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 86d1d390f1918..1d8ac848541bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -457,6 +457,13 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil case logical.RepartitionByExpression(expressions, child) => execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil + case logical.PartitionByExpression(expressions, child, nPartitions) => + val p = if (nPartitions < 0) { + numPartitions + } else { + nPartitions + } + execution.Exchange(HashPartitioning(expressions, p), planLater(child)) :: Nil case e @ EvaluatePython(udf, child, _) => BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index c9d6e19d2ce93..2593ff5579589 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -19,6 +19,11 @@ package org.apache.spark.sql import java.io.File +import org.apache.spark.SparkException +import org.apache.spark.sql.execution.Exchange +import org.apache.spark.sql.execution.aggregate.TungstenAggregate +import org.apache.spark.sql.test.SQLTestData.TestData2 + import scala.language.postfixOps import scala.util.Random @@ -997,4 +1002,115 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } } + + /** + * Verifies that there is no Exchange between the Aggregations for `df` + */ + private def verifyNonExchangingAgg(df: DataFrame) = { + var atFirstAgg: Boolean = false + df.queryExecution.executedPlan.foreach { + case agg: TungstenAggregate => { + atFirstAgg = !atFirstAgg + } + case _ => { + if (atFirstAgg) { + fail("Should not have operators between the two aggregations") + } + } + } + } + + /** + * Verifies that there is an Exchange between the Aggregations for `df` + */ + private def verifyExchangingAgg(df: DataFrame) = { + var atFirstAgg: Boolean = false + df.queryExecution.executedPlan.foreach { + case agg: TungstenAggregate => { + if (atFirstAgg) { + fail("Should not have back to back Aggregates") + } + atFirstAgg = true + } + case e: Exchange => atFirstAgg = false + case _ => + } + } + + test("distributeBy") { + val original = testData.repartition(1) + assert(original.rdd.partitions.length == 1) + val df = original.distributeBy(Column("key") :: Nil, 5) + assert(df.rdd.partitions.length == 5) + checkAnswer(original.select(), df.select()) + + val df2 = original.distributeBy(Column("key") :: Nil, 10) + assert(df2.rdd.partitions.length == 10) + checkAnswer(original.select(), df2.select()) + + // Group by the column we are distributed by. This should generate a plan with no exchange + // between the aggregates + val df3 = testData.distributeBy(Column("key") :: Nil).groupBy("key").count() + verifyNonExchangingAgg(df3) + verifyNonExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil) + .groupBy("key", "value").count()) + + // Grouping by just the first distributeBy expr, need to exchange. + verifyExchangingAgg(testData.distributeBy(Column("key") :: Column("value") :: Nil) + .groupBy("key").count()) + + val data = sqlContext.sparkContext.parallelize( + (1 to 100).map(i => TestData2(i % 10, i))).toDF() + + // Distribute and order by. + val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc) + // Walk each partition and verify that it is sorted descending and not globally sorted. + df4.rdd.foreachPartition(p => { + var previousValue: Int = -1 + var globallyOrdered: Boolean = true + p.foreach(r => { + val v: Int = r.getInt(1) + if (previousValue != -1) { + if (previousValue < v) throw new SparkException("Partition is not ordered.") + if (v + 1 != previousValue) globallyOrdered = false + } + previousValue = v + }) + if (globallyOrdered) throw new SparkException("Partition should not be globally ordered") + }) + + // Distribute and order by with multiple order bys + val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, $"a".asc) + // Walk each partition and verify that it is sorted descending and not globally sorted. + df5.rdd.foreachPartition(p => { + var previousValue: Int = -1 + var globallyOrdered: Boolean = true + p.foreach(r => { + val v: Int = r.getInt(1) + if (previousValue != -1) { + if (previousValue > v) throw new SparkException("Partition is not ordered.") + if (v - 1 != previousValue) globallyOrdered = false + } + previousValue = v + }) + if (globallyOrdered) throw new SparkException("Partition should not be globally ordered") + }) + + // Distribute into one partition and order by. This *should* be globally sorted. + val df6 = data.distributeBy(Column("a") :: Nil, 1).localSort($"b".asc) + // Walk each partition and verify that it is sorted descending and not globally sorted. + df6.rdd.foreachPartition(p => { + var previousValue: Int = -1 + var globallyOrdered: Boolean = true + p.foreach(r => { + val v: Int = r.getInt(1) + if (previousValue != -1) { + if (previousValue > v) throw new SparkException("Partition is not ordered.") + if (v - 1 != previousValue) globallyOrdered = false + } + previousValue = v + }) + if (!globallyOrdered) throw new SparkException("Partition should be globally ordered") + }) + } } From e6c05e273a25b28ae2cc0560dc7aa1fb32bbbe89 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Thu, 29 Oct 2015 17:22:45 -0700 Subject: [PATCH 2/3] Review comments from Yin. --- .../catalyst/plans/logical/partitioning.scala | 19 +++------ .../org/apache/spark/sql/DataFrame.scala | 17 ++++++-- .../spark/sql/execution/SparkStrategies.scala | 15 ++----- .../org/apache/spark/sql/DataFrameSuite.scala | 39 +++++++++---------- 4 files changed, 42 insertions(+), 48 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala index b78421eb2de60..ae1c5678be571 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala @@ -31,18 +31,11 @@ case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan) extends RedistributeData /** - * This method repartitions data using [[Expression]]s, and receives information about the - * number of partitions during execution. Used when a specific ordering or distribution is - * expected by the consumer of the query result. Use [[Repartition]] for RDD-like + * This method repartitions data using [[Expression]]s into `numPartitions`, and receives + * information about the number of partitions during execution. Used when a specific ordering or + * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like * `coalesce` and `repartition`. + * If `numPartitions` is not specified, the partition in `child` is preserved. */ -case class RepartitionByExpression(partitionExpressions: Seq[Expression], child: LogicalPlan) - extends RedistributeData - -/** - * This method repartitions data using [[Expression]]s into `numPartitions`. If numPartitions is - * less than zero, a default is used. Otherwise this behaves identically to RepartitionByExpression. - */ -case class PartitionByExpression(partitionExpressions: Seq[Expression], - child: LogicalPlan, numPartitions: Int = -1) - extends RedistributeData +case class RepartitionByExpression(partitionExpressions: Seq[Expression], + child: LogicalPlan, numPartitions: Option[Int] = None) extends RedistributeData diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 895f0110aa382..53ad3c0266cdb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -668,13 +668,22 @@ class DataFrame private[sql]( /** * Returns a new [[DataFrame]] partitioned by the given partitioning expressions into - * `numPartitions` - * `numPartitions` can be < 0 to preserve the current number of partitions. + * `numPartitions`. The resulting DataFrame is hash partitioned. * @group dfops * @since 1.6.0 */ - def distributeBy(partitionExprs: Seq[Column], numPartitions: Int = -1): DataFrame = { - PartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, numPartitions) + def distributeBy(partitionExprs: Seq[Column], numPartitions: Int): DataFrame = { + RepartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, Some(numPartitions)) + } + + /** + * Returns a new [[DataFrame]] partitioned by the given partitioning expressions preserving + * the existing number of partitions. The resulting DataFrame is hash partitioned. + * @group dfops + * @since 1.6.0 + */ + def distributeBy(partitionExprs: Seq[Column]): DataFrame = { + RepartitionByExpression(partitionExprs.map { _.expr }, logicalPlan, None) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 1d8ac848541bb..f4464e0b916f8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -27,8 +27,7 @@ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.columnar.{InMemoryColumnarTableScan, InMemoryRelation} import org.apache.spark.sql.execution.datasources.{CreateTableUsing, CreateTempTableUsing, DescribeCommand => LogicalDescribeCommand, _} import org.apache.spark.sql.execution.{DescribeCommand => RunnableDescribeCommand} -import org.apache.spark.sql.types._ -import org.apache.spark.sql.{SQLContext, Strategy, execution} +import org.apache.spark.sql.{Strategy, execution} private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => @@ -455,15 +454,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { generator, join = join, outer = outer, g.output, planLater(child)) :: Nil case logical.OneRowRelation => execution.PhysicalRDD(Nil, singleRowRdd, "OneRowRelation") :: Nil - case logical.RepartitionByExpression(expressions, child) => - execution.Exchange(HashPartitioning(expressions, numPartitions), planLater(child)) :: Nil - case logical.PartitionByExpression(expressions, child, nPartitions) => - val p = if (nPartitions < 0) { - numPartitions - } else { - nPartitions - } - execution.Exchange(HashPartitioning(expressions, p), planLater(child)) :: Nil + case logical.RepartitionByExpression(expressions, child, nPartitions) => + execution.Exchange(HashPartitioning( + expressions, nPartitions.getOrElse(numPartitions)), planLater(child)) :: Nil case e @ EvaluatePython(udf, child, _) => BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "PhysicalRDD") :: Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 2593ff5579589..3f66ef56f9076 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -20,20 +20,18 @@ package org.apache.spark.sql import java.io.File import org.apache.spark.SparkException +import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation import org.apache.spark.sql.execution.Exchange import org.apache.spark.sql.execution.aggregate.TungstenAggregate +import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SQLTestData.TestData2 +import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext} +import org.apache.spark.sql.types._ +import org.scalatest.Matchers._ import scala.language.postfixOps import scala.util.Random -import org.scalatest.Matchers._ - -import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation -import org.apache.spark.sql.functions._ -import org.apache.spark.sql.types._ -import org.apache.spark.sql.test.{ExamplePointUDT, ExamplePoint, SharedSQLContext} - class DataFrameSuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -1037,7 +1035,7 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { } } - test("distributeBy") { + test("distributeBy and localSort") { val original = testData.repartition(1) assert(original.rdd.partitions.length == 1) val df = original.distributeBy(Column("key") :: Nil, 5) @@ -1064,53 +1062,54 @@ class DataFrameSuite extends QueryTest with SharedSQLContext { // Distribute and order by. val df4 = data.distributeBy(Column("a") :: Nil).localSort($"b".desc) - // Walk each partition and verify that it is sorted descending and not globally sorted. + // Walk each partition and verify that it is sorted descending and does not contain all + // the values. df4.rdd.foreachPartition(p => { var previousValue: Int = -1 - var globallyOrdered: Boolean = true + var allSequential: Boolean = true p.foreach(r => { val v: Int = r.getInt(1) if (previousValue != -1) { if (previousValue < v) throw new SparkException("Partition is not ordered.") - if (v + 1 != previousValue) globallyOrdered = false + if (v + 1 != previousValue) allSequential = false } previousValue = v }) - if (globallyOrdered) throw new SparkException("Partition should not be globally ordered") + if (allSequential) throw new SparkException("Partition should not be globally ordered") }) // Distribute and order by with multiple order bys val df5 = data.distributeBy(Column("a") :: Nil, 2).localSort($"b".asc, $"a".asc) - // Walk each partition and verify that it is sorted descending and not globally sorted. + // Walk each partition and verify that it is sorted ascending df5.rdd.foreachPartition(p => { var previousValue: Int = -1 - var globallyOrdered: Boolean = true + var allSequential: Boolean = true p.foreach(r => { val v: Int = r.getInt(1) if (previousValue != -1) { if (previousValue > v) throw new SparkException("Partition is not ordered.") - if (v - 1 != previousValue) globallyOrdered = false + if (v - 1 != previousValue) allSequential = false } previousValue = v }) - if (globallyOrdered) throw new SparkException("Partition should not be globally ordered") + if (allSequential) throw new SparkException("Partition should not be all sequential") }) - // Distribute into one partition and order by. This *should* be globally sorted. + // Distribute into one partition and order by. This partition should contain all the values. val df6 = data.distributeBy(Column("a") :: Nil, 1).localSort($"b".asc) // Walk each partition and verify that it is sorted descending and not globally sorted. df6.rdd.foreachPartition(p => { var previousValue: Int = -1 - var globallyOrdered: Boolean = true + var allSequential: Boolean = true p.foreach(r => { val v: Int = r.getInt(1) if (previousValue != -1) { if (previousValue > v) throw new SparkException("Partition is not ordered.") - if (v - 1 != previousValue) globallyOrdered = false + if (v - 1 != previousValue) allSequential = false } previousValue = v }) - if (!globallyOrdered) throw new SparkException("Partition should be globally ordered") + if (!allSequential) throw new SparkException("Partition should contain all sequential values") }) } } From 98c05ae08281b14dd67151bf8edd55cb884d1061 Mon Sep 17 00:00:00 2001 From: Nong Li Date: Fri, 30 Oct 2015 15:46:10 -0700 Subject: [PATCH 3/3] More review comments. --- .../sql/catalyst/plans/logical/partitioning.scala | 13 ++++++++++--- .../scala/org/apache/spark/sql/DataFrameSuite.scala | 9 +++++---- 2 files changed, 15 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala index ae1c5678be571..5474c3cfd964f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/partitioning.scala @@ -35,7 +35,14 @@ case class SortPartitions(sortExpressions: Seq[SortOrder], child: LogicalPlan) * information about the number of partitions during execution. Used when a specific ordering or * distribution is expected by the consumer of the query result. Use [[Repartition]] for RDD-like * `coalesce` and `repartition`. - * If `numPartitions` is not specified, the partition in `child` is preserved. + * If `numPartitions` is not specified, the partitioning of `child` is preserved. */ -case class RepartitionByExpression(partitionExpressions: Seq[Expression], - child: LogicalPlan, numPartitions: Option[Int] = None) extends RedistributeData +case class RepartitionByExpression( + partitionExpressions: Seq[Expression], + child: LogicalPlan, + numPartitions: Option[Int] = None) extends RedistributeData { + numPartitions match { + case Some(n) => require(n > 0, "numPartitions must be greater than 0.") + case None => // Ok + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 3f66ef56f9076..6b86c5951b413 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -19,6 +19,11 @@ package org.apache.spark.sql import java.io.File +import scala.language.postfixOps +import scala.util.Random + +import org.scalatest.Matchers._ + import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.plans.logical.OneRowRelation import org.apache.spark.sql.execution.Exchange @@ -27,10 +32,6 @@ import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.SQLTestData.TestData2 import org.apache.spark.sql.test.{ExamplePoint, ExamplePointUDT, SharedSQLContext} import org.apache.spark.sql.types._ -import org.scalatest.Matchers._ - -import scala.language.postfixOps -import scala.util.Random class DataFrameSuite extends QueryTest with SharedSQLContext { import testImplicits._