From afd75624916f0e4613845d26ee224ffe4b5a96df Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 14 Nov 2014 11:22:10 -0800 Subject: [PATCH 1/3] Add support for external sort. --- .../scala/org/apache/spark/sql/SQLConf.scala | 7 ++++ .../spark/sql/execution/SparkStrategies.scala | 5 ++- .../spark/sql/execution/basicOperators.scala | 34 ++++++++++++++++--- 3 files changed, 41 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index cd7d78e684791..9697beb132fbb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -39,6 +39,10 @@ private[spark] object SQLConf { val COLUMN_NAME_OF_CORRUPT_RECORD = "spark.sql.columnNameOfCorruptRecord" + // Options that control which operators can be chosen by the query planner. These should be + // considered hints and may be ignored by future versions of Spark SQL. + val EXTERNAL_SORT = "spark.sql.planner.externalSort" + // This is only used for the thriftserver val THRIFTSERVER_POOL = "spark.sql.thriftserver.scheduler.pool" @@ -96,6 +100,9 @@ private[sql] trait SQLConf { private[spark] def parquetFilterPushDown = getConf(PARQUET_FILTER_PUSHDOWN_ENABLED, "false").toBoolean + /** When true the planner will use the external sort, which may spill to disk. */ + private[spark] def externalSortEnabled: Boolean = getConf(EXTERNAL_SORT, "false").toBoolean + /** * When set to true, Spark SQL will use the Scala compiler at runtime to generate custom bytecode * that evaluates expressions found in queries. In general this custom code runs much faster diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 03cd5bd6272bb..7ef1f9f2c5c02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -263,9 +263,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.Distinct(child) => execution.Distinct(partial = false, execution.Distinct(partial = true, planLater(child))) :: Nil + + case logical.Sort(sortExprs, child) if sqlContext.externalSortEnabled => + execution.ExternalSort(sortExprs, global = true, planLater(child)):: Nil case logical.Sort(sortExprs, child) => - // This sort is a global sort. Its requiredDistribution will be an OrderedDistribution. execution.Sort(sortExprs, global = true, planLater(child)):: Nil + case logical.SortPartitions(sortExprs, child) => // This sort only sorts tuples within a partition. Its requiredDistribution will be // an UnspecifiedDistribution. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 1b8ba3ace2a82..cacf31f31ae25 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import org.apache.spark.util.collection.ExternalSorter + import scala.collection.mutable.ArrayBuffer import scala.reflect.runtime.universe.TypeTag @@ -189,6 +191,7 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) /** * :: DeveloperApi :: + * Performs a sort on-heap. */ @DeveloperApi case class Sort( @@ -199,12 +202,35 @@ case class Sort( override def requiredChildDistribution = if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil + override def execute() = attachTree(this, "sort") { + child.execute().mapPartitions( { iterator => + val ordering = newOrdering(sortOrder, child.output) + iterator.map(_.copy()).toArray.sorted(ordering).iterator + }, preservesPartitioning = true) + } + + override def output = child.output +} + +/** + * :: DeveloperApi :: + * Performs a sort, spilling to disk as needed. + */ +@DeveloperApi +case class ExternalSort( + sortOrder: Seq[SortOrder], + global: Boolean, + child: SparkPlan) + extends UnaryNode { + override def requiredChildDistribution = + if (global) OrderedDistribution(sortOrder) :: Nil else UnspecifiedDistribution :: Nil override def execute() = attachTree(this, "sort") { - child.execute() - .mapPartitions( { iterator => - val ordering = newOrdering(sortOrder, child.output) - iterator.map(_.copy()).toArray.sorted(ordering).iterator + child.execute().mapPartitions( { iterator => + val ordering = newOrdering(sortOrder, child.output) + val sorter = new ExternalSorter[Row, Null, Row](ordering = Some(ordering)) + sorter.insertAll(iterator.map(r => (r, null))) + sorter.iterator.map(_._1) }, preservesPartitioning = true) } From b98799d1271800e3b5bb1249ac16cf76cee69dac Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Fri, 14 Nov 2014 11:32:18 -0800 Subject: [PATCH 2/3] Add test --- .../org/apache/spark/sql/SQLQuerySuite.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 5dd777f1fb3b7..6c8e388252402 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -196,7 +196,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { Seq(Seq("1"))) } - test("sorting") { + def sortTest() = { checkAnswer( sql("SELECT * FROM testData2 ORDER BY a ASC, b ASC"), Seq((1,1), (1,2), (2,1), (2,2), (3,1), (3,2))) @@ -238,6 +238,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll { mapData.collect().sortBy(_.data(1)).reverse.toSeq) } + test("sorting") { + val before = externalSortEnabled + setConf(SQLConf.EXTERNAL_SORT, "false") + sortTest() + setConf(SQLConf.EXTERNAL_SORT, before.toString) + } + + test("external sorting") { + val before = externalSortEnabled + setConf(SQLConf.EXTERNAL_SORT, "true") + sortTest() + setConf(SQLConf.EXTERNAL_SORT, before.toString) + } + test("limit") { checkAnswer( sql("SELECT * FROM testData LIMIT 10"), From 48b9726cf1c0ca2d81c424382f2666aabe4c80c3 Mon Sep 17 00:00:00 2001 From: Michael Armbrust Date: Sun, 16 Nov 2014 18:07:21 -0800 Subject: [PATCH 3/3] comments --- .../org/apache/spark/sql/execution/basicOperators.scala | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index cacf31f31ae25..e53723c176569 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.execution -import org.apache.spark.util.collection.ExternalSorter - import scala.collection.mutable.ArrayBuffer import scala.reflect.runtime.universe.TypeTag @@ -31,6 +29,7 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical.{ClusteredDistribution, OrderedDistribution, SinglePartition, UnspecifiedDistribution} import org.apache.spark.util.MutablePair +import org.apache.spark.util.collection.ExternalSorter /** * :: DeveloperApi :: @@ -192,6 +191,8 @@ case class TakeOrdered(limit: Int, sortOrder: Seq[SortOrder], child: SparkPlan) /** * :: DeveloperApi :: * Performs a sort on-heap. + * @param global when true performs a global sort of all partitions by shuffling the data first + * if necessary. */ @DeveloperApi case class Sort( @@ -215,6 +216,8 @@ case class Sort( /** * :: DeveloperApi :: * Performs a sort, spilling to disk as needed. + * @param global when true performs a global sort of all partitions by shuffling the data first + * if necessary. */ @DeveloperApi case class ExternalSort(