From dfe6ff100ae2efcb1778d93bf828f9e2e8c46e18 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Thu, 9 Jul 2015 15:27:05 -0700 Subject: [PATCH 01/16] Use Exchange in Limit (SPARK-8964) --- .../spark/sql/execution/SparkStrategies.scala | 8 +++- .../spark/sql/execution/basicOperators.scala | 45 ++++--------------- 2 files changed, 15 insertions(+), 38 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 59b9b553a7ae5..67bb1a1bbd976 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -332,8 +332,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil - case logical.Limit(IntegerLiteral(limit), child) => - execution.Limit(limit, planLater(child)) :: Nil + case logical.Limit(IntegerLiteral(limit), child) => { + val perPartitionLimit = execution.PartitionLocalLimit(limit, planLater(child)) + val globalLimit = execution.PartitionLocalLimit( + limit, execution.Exchange(SinglePartition, Nil, perPartitionLimit)) + globalLimit :: Nil + } case Unions(unionChildren) => execution.Union(unionChildren.map(planLater)) :: Nil case logical.Except(left, right) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index de14e6ad79ad6..708107a37b664 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -18,17 +18,14 @@ package org.apache.spark.sql.execution import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.rdd.{RDD, ShuffledRDD} -import org.apache.spark.shuffle.sort.SortShuffleManager -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.CatalystTypeConverters +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.Row import org.apache.spark.sql.catalyst.plans.physical._ +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter -import org.apache.spark.util.{CompletionIterator, MutablePair} -import org.apache.spark.{HashPartitioner, SparkEnv} /** * :: DeveloperApi :: @@ -108,48 +105,24 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { /** * :: DeveloperApi :: - * Take the first limit elements. Note that the implementation is different depending on whether - * this is a terminal operator or not. If it is terminal and is invoked using executeCollect, - * this operator uses something similar to Spark's take method on the Spark driver. If it is not - * terminal or is invoked using execute, we first take the limit on each partition, and then - * repartition all the data to a single partition to compute the global limit. + * Take the first `limit` elements from each partition. */ @DeveloperApi -case class Limit(limit: Int, child: SparkPlan) +case class PartitionLocalLimit(limit: Int, child: SparkPlan) extends UnaryNode { - // TODO: Implement a partition local limit, and use a strategy to generate the proper limit plan: - // partition local limit -> exchange into one partition -> partition local limit again - - /** We must copy rows when sort based shuffle is on */ - private def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] - override def output: Seq[Attribute] = child.output - override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[Row] = child.executeTake(limit) - protected override def doExecute(): RDD[InternalRow] = { - val rdd: RDD[_ <: Product2[Boolean, InternalRow]] = if (sortBasedShuffleOn) { - child.execute().mapPartitions { iter => - iter.take(limit).map(row => (false, row.copy())) - } - } else { - child.execute().mapPartitions { iter => - val mutablePair = new MutablePair[Boolean, InternalRow]() - iter.take(limit).map(row => mutablePair.update(false, row)) - } - } - val part = new HashPartitioner(1) - val shuffled = new ShuffledRDD[Boolean, InternalRow, InternalRow](rdd, part) - shuffled.setSerializer(new SparkSqlSerializer(child.sqlContext.sparkContext.getConf)) - shuffled.mapPartitions(_.take(limit).map(_._2)) + protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => + iter.take(limit) } } /** * :: DeveloperApi :: * Take the first limit elements as defined by the sortOrder, and do projection if needed. - * This is logically equivalent to having a [[Limit]] operator after a [[Sort]] operator, + * This is logically equivalent to having a Limit operator after a [[Sort]] operator, * or having a [[Project]] operator between them. * This could have been named TopK, but Spark's top operator does the opposite in ordering * so we name it TakeOrdered to avoid confusion. From c02324ca36ad7a9cf8df00246d27f6fbfa61b7c0 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 13 Jul 2015 16:31:12 -0700 Subject: [PATCH 02/16] Use requiredChildDistribution in Limit --- .../spark/sql/execution/SparkStrategies.scala | 8 +++---- .../spark/sql/execution/basicOperators.scala | 21 ++++++++++++++----- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 264a8e85bb114..bfa98edb00057 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -346,12 +346,10 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil - case logical.Limit(IntegerLiteral(limit), child) => { - val perPartitionLimit = execution.PartitionLocalLimit(limit, planLater(child)) - val globalLimit = execution.PartitionLocalLimit( - limit, execution.Exchange(SinglePartition, Nil, perPartitionLimit)) + case logical.Limit(IntegerLiteral(limit), child) => + val perPartitionLimit = execution.Limit(global = false, limit, planLater(child)) + val globalLimit = execution.Limit(global = true, limit, perPartitionLimit) globalLimit :: Nil - } case Unions(unionChildren) => execution.Union(unionChildren.map(planLater)) :: Nil case logical.Except(left, right) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index c836416915811..f694e1fded2a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -25,11 +25,9 @@ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} -import org.apache.spark.util.CompletionIterator import org.apache.spark.util.collection.ExternalSorter import org.apache.spark.util.collection.unsafe.sort.PrefixComparator -import org.apache.spark.util.{CompletionIterator, MutablePair} -import org.apache.spark.{HashPartitioner, SparkEnv} +import org.apache.spark.util.CompletionIterator /** * :: DeveloperApi :: @@ -109,11 +107,24 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { /** * :: DeveloperApi :: - * Take the first `limit` elements from each partition. + * Take the first `limit` elements. + * + * @param global if true, then this operator will take the first `limit` elements of the entire + * input. If false, it will take the first `limit` elements of each partition. + * @param limit the number of elements to take. +* @param child the input data source. */ @DeveloperApi -case class PartitionLocalLimit(limit: Int, child: SparkPlan) +case class Limit(global: Boolean, limit: Int, child: SparkPlan) extends UnaryNode { + override def requiredChildDistribution: List[Distribution] = { + if (global) { + AllTuples :: Nil + } else { + UnspecifiedDistribution :: Nil + } + } + override def output: Seq[Attribute] = child.output override def executeCollect(): Array[Row] = child.executeTake(limit) From 70f69b63cfa561e84370fe2553d3f5570ceb488f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Mon, 13 Jul 2015 16:58:21 -0700 Subject: [PATCH 03/16] Fix test compilation --- .../org/apache/spark/sql/execution/PlannerSuite.scala | 6 ++++++ .../spark/sql/execution/UnsafeExternalSortSuite.scala | 8 ++++---- 2 files changed, 10 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 3dd24130af81a..7a17ab7d975e8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -147,4 +147,10 @@ class PlannerSuite extends SparkFunSuite { val planned = planner.TakeOrderedAndProject(query) assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject]) } + + test("efficient select -> project -> limit") { + val query = testData.select('value).limit(2).logicalPlan + val planned = planner.TakeLimit(query) + assert(planned.head.isInstanceOf[execution.TakeLimit]) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala index 4f4c1f28564cb..3290f3ff2fbf9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeExternalSortSuite.scala @@ -42,8 +42,8 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll { TestSQLContext.sparkContext.conf.set("spark.unsafe.exceptionOnMemoryLeak", "true") checkThatPlansAgree( (1 to 100).map(v => Tuple1(v)).toDF("a"), - (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)), - (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)), + (child: SparkPlan) => Limit(true, 10, UnsafeExternalSort('a.asc :: Nil, true, child)), + (child: SparkPlan) => Limit(true, 10, Sort('a.asc :: Nil, global = true, child)), sortAnswers = false ) } @@ -53,8 +53,8 @@ class UnsafeExternalSortSuite extends SparkPlanTest with BeforeAndAfterAll { try { checkThatPlansAgree( (1 to 100).map(v => Tuple1(v)).toDF("a"), - (child: SparkPlan) => Limit(10, UnsafeExternalSort('a.asc :: Nil, true, child)), - (child: SparkPlan) => Limit(10, Sort('a.asc :: Nil, global = true, child)), + (child: SparkPlan) => Limit(true, 10, UnsafeExternalSort('a.asc :: Nil, true, child)), + (child: SparkPlan) => Limit(true, 10, Sort('a.asc :: Nil, global = true, child)), sortAnswers = false ) } finally { From 272c349394f6c53a42c621dce68fc840bcdfac16 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 15 Jul 2015 17:02:57 -0700 Subject: [PATCH 04/16] CollectLimit --- .../plans/logical/basicOperators.scala | 11 +++++++ .../org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/SparkStrategies.scala | 3 ++ .../spark/sql/execution/basicOperators.scala | 29 ++++++++++++++++--- .../spark/sql/execution/PlannerSuite.scala | 6 ++-- .../apache/spark/sql/hive/HiveContext.scala | 2 +- 6 files changed, 44 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index fae339808c233..d7ca638c1063c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -22,6 +22,17 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.types._ import org.apache.spark.util.collection.OpenHashSet +/** + * When planning take() or collect() operations, this special node that is inserted at the top of + * the logical plan before invoking the query planner. + * + * Rules can pattern-match on this node in order to apply transformations that only take effect + * at the top of the logical query plan. + */ +case class ReturnAnswer(child: LogicalPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output +} + case class Project(projectList: Seq[NamedExpression], child: LogicalPlan) extends UnaryNode { override def output: Seq[Attribute] = projectList.map(_.toAttribute) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 477dea9164726..437c307051b0a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.errors.DialectException import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.{DefaultOptimizer, Optimizer} -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{ReturnAnswer, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.catalyst.{InternalRow, ParserDialect, _} import org.apache.spark.sql.execution.{Filter, _} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index bfa98edb00057..a8d31daf2ba75 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -346,6 +346,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { execution.Sample(lb, ub, withReplacement, seed, planLater(child)) :: Nil case logical.LocalRelation(output, data) => LocalTableScan(output, data) :: Nil + case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), child)) => + execution.CollectLimit(limit, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => val perPartitionLimit = execution.Limit(global = false, limit, planLater(child)) val globalLimit = execution.Limit(global = true, limit, perPartitionLimit) @@ -368,6 +370,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd) :: Nil case BroadcastHint(child) => apply(child) + case logical.ReturnAnswer(child) => apply(child) case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index f694e1fded2a8..21fa10b188cea 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -105,6 +105,21 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { sparkContext.union(children.map(_.execute())) } +/** + * :: DeveloperApi :: + * Take the first `limit` elements and return the to the driver. + */ +@DeveloperApi +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def executeCollect(): Array[Row] = child.executeTake(limit) + // TODO(josh): Throw error on executeTake()? + protected override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException( + "Should not invoke doExecute() on CollectLimit; use Limit instead.") + } +} + /** * :: DeveloperApi :: * Take the first `limit` elements. @@ -115,8 +130,7 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { * @param child the input data source. */ @DeveloperApi -case class Limit(global: Boolean, limit: Int, child: SparkPlan) - extends UnaryNode { +case class Limit(global: Boolean, limit: Int, child: SparkPlan) extends UnaryNode { override def requiredChildDistribution: List[Distribution] = { if (global) { AllTuples :: Nil @@ -124,10 +138,17 @@ case class Limit(global: Boolean, limit: Int, child: SparkPlan) UnspecifiedDistribution :: Nil } } - override def output: Seq[Attribute] = child.output - override def executeCollect(): Array[Row] = child.executeTake(limit) + override def executeTake(n: Int): Array[Row] = { + throw new UnsupportedOperationException( + "Should not invoke executeTake() on Limit; use CollectLimit instead.") + } + + override def executeCollect(): Array[Row] = { + throw new UnsupportedOperationException( + "Should not invoke executeCollect() on Limit; use CollectLimit instead.") + } protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 7a17ab7d975e8..57400c48899c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -148,9 +148,9 @@ class PlannerSuite extends SparkFunSuite { assert(planned.head.isInstanceOf[execution.TakeOrderedAndProject]) } - test("efficient select -> project -> limit") { + test("efficient physical planning of terminal limit operators") { val query = testData.select('value).limit(2).logicalPlan - val planned = planner.TakeLimit(query) - assert(planned.head.isInstanceOf[execution.TakeLimit]) + val planned = BasicOperators(query) + assert(planned.head.isInstanceOf[CollectLimit]) } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 4684d48aff889..4676633ae2bbf 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -43,7 +43,7 @@ import org.apache.spark.sql.SQLConf.SQLConfEntry._ import org.apache.spark.sql.catalyst.ParserDialect import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.plans.logical._ -import org.apache.spark.sql.execution.{ExecutedCommand, ExtractPythonUDFs, SetCommand} +import org.apache.spark.sql.execution.{SparkPlan, ExecutedCommand, ExtractPythonUDFs, SetCommand} import org.apache.spark.sql.hive.client._ import org.apache.spark.sql.hive.execution.{DescribeHiveTableCommand, HiveNativeCommand} import org.apache.spark.sql.sources.DataSourceStrategy From 964838f2b5eb58da06afa18f77a056753e71defc Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 15 Jul 2015 17:55:07 -0700 Subject: [PATCH 05/16] Fix test --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 57400c48899c2..79d85640ddc01 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.SparkFunSuite import org.apache.spark.sql.TestData._ import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.ReturnAnswer import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.TestSQLContext._ @@ -149,7 +150,7 @@ class PlannerSuite extends SparkFunSuite { } test("efficient physical planning of terminal limit operators") { - val query = testData.select('value).limit(2).logicalPlan + val query = ReturnAnswer(testData.select('value).limit(2).logicalPlan) val planned = BasicOperators(query) assert(planned.head.isInstanceOf[CollectLimit]) } From cc634566bdaed630c5bfff887f74defd003b4a58 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 15 Jul 2015 18:05:45 -0700 Subject: [PATCH 06/16] Insert ReturnAnswer before calling physical planner --- sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 437c307051b0a..e1fedb5067c3d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -977,9 +977,9 @@ class SQLContext(@transient val sparkContext: SparkContext) lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData) // TODO: Don't just pick the first one... - lazy val sparkPlan: SparkPlan = { + final lazy val sparkPlan: SparkPlan = { SparkPlan.currentContext.set(self) - planner.plan(optimizedPlan).next() + planner.plan(ReturnAnswer(optimizedPlan)).next() } // executedPlan should not be used to initialize any SparkPlan. It should be // only used for execution. From 7dbb28e8fd49d670a3dba02b9470a6b10f8589c3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 15 Jul 2015 18:14:43 -0700 Subject: [PATCH 07/16] Fix style --- .../spark/sql/catalyst/plans/logical/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala index d7ca638c1063c..9a562b2377c65 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicOperators.scala @@ -25,7 +25,7 @@ import org.apache.spark.util.collection.OpenHashSet /** * When planning take() or collect() operations, this special node that is inserted at the top of * the logical plan before invoking the query planner. - * + * * Rules can pattern-match on this node in order to apply transformations that only take effect * at the top of the logical query plan. */ From e3caa767517e6df89a058b492d8cfd9fa820a013 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Feb 2016 12:04:58 -0800 Subject: [PATCH 08/16] Minor test cleanup. --- .../scala/org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/QueryExecution.scala | 4 ++-- .../spark/sql/execution/PlannerSuite.scala | 16 +++++++--------- 3 files changed, 10 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d227f6828ccae..13700be06828d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.encoders.encoderFor import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.optimizer.Optimizer -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range, ReturnAnswer} +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.execution._ import org.apache.spark.sql.execution.datasources._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 107570f9dbcc8..8616fe317034f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD import org.apache.spark.sql.SQLContext import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} /** * The primary workflow for executing relational queries using Spark. Designed to allow easy @@ -44,7 +44,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { lazy val sparkPlan: SparkPlan = { SQLContext.setActive(sqlContext) - sqlContext.planner.plan(optimizedPlan).next() + sqlContext.planner.plan(ReturnAnswer(optimizedPlan)).next() } // executedPlan should not be used to initialize any SparkPlan. It should be diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index d2858d55a9507..74521e7b4edef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -181,6 +181,12 @@ class PlannerSuite extends SharedSQLContext { } } + test("terminal limits use CollectLimit") { + val query = testData.select('value).limit(2) + val planned = query.queryExecution.sparkPlan + assert(planned.isInstanceOf[CollectLimit]) + } + test("PartitioningCollection") { withTempTable("normal", "small", "tiny") { testData.registerTempTable("normal") @@ -422,7 +428,7 @@ class PlannerSuite extends SharedSQLContext { fail(s"Topmost Exchange should have been eliminated:\n$outputPlan") } } - + test("EnsureRequirements does not eliminate Exchange with different partitioning") { val distribution = ClusteredDistribution(Literal(1) :: Nil) // Number of partitions differ @@ -443,14 +449,6 @@ class PlannerSuite extends SharedSQLContext { } } - test("efficient physical planning of terminal limit operators") { - val planner = sqlContext.planner - import planner._ - val query = ReturnAnswer(testData.select('value).limit(2).logicalPlan) - val planned = BasicOperators(query) - assert(planned.head.isInstanceOf[CollectLimit]) - } - // --------------------------------------------------------------------------------------------- } From cffe4daf218fd5c2a48b84ae831a6b40436a0f62 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Feb 2016 14:03:02 -0800 Subject: [PATCH 09/16] Refactoring; split classes into separate file; fix tests. --- .../spark/sql/execution/SparkStrategies.scala | 6 +- .../spark/sql/execution/basicOperators.scala | 103 +-------------- .../apache/spark/sql/execution/limit.scala | 118 ++++++++++++++++++ .../spark/sql/execution/PlannerSuite.scala | 4 +- .../spark/sql/execution/SortSuite.scala | 6 +- 5 files changed, 127 insertions(+), 110 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 081cbe2c8ed2b..2d87fb695c778 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -340,8 +340,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case logical.ReturnAnswer(logical.Limit(IntegerLiteral(limit), child)) => execution.CollectLimit(limit, planLater(child)) :: Nil case logical.Limit(IntegerLiteral(limit), child) => - val perPartitionLimit = execution.Limit(global = false, limit, planLater(child)) - val globalLimit = execution.Limit(global = true, limit, perPartitionLimit) + val perPartitionLimit = execution.LocalLimit(limit, planLater(child)) + val globalLimit = execution.GlobalLimit(limit, perPartitionLimit) globalLimit :: Nil case logical.Union(unionChildren) => execution.Union(unionChildren.map(planLater)) :: Nil @@ -361,7 +361,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { BatchPythonEvaluation(udf, e.output, planLater(child)) :: Nil case LogicalRDD(output, rdd) => PhysicalRDD(output, rdd, "ExistingRDD") :: Nil case BroadcastHint(child) => planLater(child) :: Nil - case logical.ReturnAnswer(child) => apply(child) + case logical.ReturnAnswer(child) => planLater(child) :: Nil case _ => Nil } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 80c7fe6bd467c..d2adf52a18bbc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -17,16 +17,13 @@ package org.apache.spark.sql.execution -import org.apache.spark.{HashPartitioner, SparkEnv} -import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD, ShuffledRDD} -import org.apache.spark.shuffle.sort.SortShuffleManager +import org.apache.spark.rdd.{PartitionwiseSampledRDD, RDD} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode, ExpressionCanonicalizer} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.SQLMetrics import org.apache.spark.sql.types.LongType -import org.apache.spark.util.MutablePair import org.apache.spark.util.random.PoissonSampler case class Project(projectList: Seq[NamedExpression], child: SparkPlan) @@ -37,7 +34,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) override def output: Seq[Attribute] = projectList.map(_.toAttribute) - override def upstream(): RDD[InternalRow] = { + override def upstream(): RDD[InternalRow] = { child.asInstanceOf[CodegenSupport].upstream() } @@ -299,102 +296,6 @@ case class Union(children: Seq[SparkPlan]) extends SparkPlan { sparkContext.union(children.map(_.execute())) } -/** - * Take the first `limit` elements and return the to the driver. - */ -case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output - override def executeCollect(): Array[InternalRow] = child.executeTake(limit) - protected override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException("Should not invoke doExecute() on CollectLimit.") - } -} - -/** - * Take the first `limit` elements. - * - * @param global if true, then this operator will take the first `limit` elements of the entire - * input. If false, it will take the first `limit` elements of each partition. - * @param limit the number of elements to take. -* @param child the input data source. - */ -case class Limit(global: Boolean, limit: Int, child: SparkPlan) extends UnaryNode { - override def requiredChildDistribution: List[Distribution] = { - if (global) { - AllTuples :: Nil - } else { - UnspecifiedDistribution :: Nil - } - } - override def output: Seq[Attribute] = child.output - - override def executeTake(n: Int): Array[InternalRow] = { - throw new UnsupportedOperationException( - "Should not invoke executeTake() on Limit; use CollectLimit instead.") - } - - override def executeCollect(): Array[InternalRow] = { - throw new UnsupportedOperationException( - "Should not invoke executeCollect() on Limit; use CollectLimit instead.") - } - - protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => - iter.take(limit) - } -} - -/** - * Take the first limit elements as defined by the sortOrder, and do projection if needed. - * This is logically equivalent to having a Limit operator after a [[Sort]] operator, - * or having a [[Project]] operator between them. - * This could have been named TopK, but Spark's top operator does the opposite in ordering - * so we name it TakeOrdered to avoid confusion. - */ -case class TakeOrderedAndProject( - limit: Int, - sortOrder: Seq[SortOrder], - projectList: Option[Seq[NamedExpression]], - child: SparkPlan) extends UnaryNode { - - override def output: Seq[Attribute] = { - val projectOutput = projectList.map(_.map(_.toAttribute)) - projectOutput.getOrElse(child.output) - } - - override def outputPartitioning: Partitioning = SinglePartition - - // We need to use an interpreted ordering here because generated orderings cannot be serialized - // and this ordering needs to be created on the driver in order to be passed into Spark core code. - private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output) - - private def collectData(): Array[InternalRow] = { - val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) - if (projectList.isDefined) { - val proj = UnsafeProjection.create(projectList.get, child.output) - data.map(r => proj(r).copy()) - } else { - data - } - } - - override def executeCollect(): Array[InternalRow] = { - collectData() - } - - // TODO: Terminal split should be implemented differently from non-terminal split. - // TODO: Pick num splits based on |limit|. - protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1) - - override def outputOrdering: Seq[SortOrder] = sortOrder - - override def simpleString: String = { - val orderByString = sortOrder.mkString("[", ",", "]") - val outputString = output.mkString("[", ",", "]") - - s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" - } -} - /** * Return a new RDD that has exactly `numPartitions` partitions. * Similar to coalesce defined on an [[RDD]], this operation results in a narrow dependency, e.g. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala new file mode 100644 index 0000000000000..c9120bd0bfab2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.physical._ + + +trait BaseLimit extends UnaryNode { + val limit: Int + override def output: Seq[Attribute] = child.output + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + override def outputPartitioning: Partitioning = child.outputPartitioning + override def executeTake(n: Int): Array[InternalRow] = { + throw new UnsupportedOperationException( + s"Should not invoke executeTake() on ${getClass.getName}; use CollectLimit instead.") + } + override def executeCollect(): Array[InternalRow] = { + throw new UnsupportedOperationException( + s"Should not invoke executeCollect() on ${getClass.getName}; use CollectLimit instead.") + } + protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => + iter.take(limit) + } +} + +/** + * Take the first `limit` elements and return the to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = { + sparkContext.parallelize(executeCollect()) + } +} + +/** + * Take the first `limit` elements of each partition. + */ +case class LocalLimit(limit: Int, child: SparkPlan) extends BaseLimit + +/** + * Take the first `limit` elements of the child operator's output. + */ +case class GlobalLimit(limit: Int, child: SparkPlan) extends BaseLimit { + override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil +} + +/** + * Take the first limit elements as defined by the sortOrder, and do projection if needed. + * This is logically equivalent to having a Limit operator after a [[Sort]] operator, + * or having a [[Project]] operator between them. + * This could have been named TopK, but Spark's top operator does the opposite in ordering + * so we name it TakeOrdered to avoid confusion. + */ +case class TakeOrderedAndProject( + limit: Int, + sortOrder: Seq[SortOrder], + projectList: Option[Seq[NamedExpression]], + child: SparkPlan) extends UnaryNode { + + override def output: Seq[Attribute] = { + val projectOutput = projectList.map(_.map(_.toAttribute)) + projectOutput.getOrElse(child.output) + } + + override def outputPartitioning: Partitioning = SinglePartition + + // We need to use an interpreted ordering here because generated orderings cannot be serialized + // and this ordering needs to be created on the driver in order to be passed into Spark core code. + private val ord: InterpretedOrdering = new InterpretedOrdering(sortOrder, child.output) + + private def collectData(): Array[InternalRow] = { + val data = child.execute().map(_.copy()).takeOrdered(limit)(ord) + if (projectList.isDefined) { + val proj = UnsafeProjection.create(projectList.get, child.output) + data.map(r => proj(r).copy()) + } else { + data + } + } + + override def executeCollect(): Array[InternalRow] = { + collectData() + } + + // TODO: Terminal split should be implemented differently from non-terminal split. + // TODO: Pick num splits based on |limit|. + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(collectData(), 1) + + override def outputOrdering: Seq[SortOrder] = sortOrder + + override def simpleString: String = { + val orderByString = sortOrder.mkString("[", ",", "]") + val outputString = output.mkString("[", ",", "]") + + s"TakeOrderedAndProject(limit=$limit, orderBy=$orderByString, output=$outputString)" + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 74521e7b4edef..6fc966c9b1c7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -206,7 +206,7 @@ class PlannerSuite extends SharedSQLContext { ).queryExecution.executedPlan.collect { case exchange: Exchange => exchange }.length - assert(numExchanges === 3) + assert(numExchanges === 5) } { @@ -221,7 +221,7 @@ class PlannerSuite extends SharedSQLContext { ).queryExecution.executedPlan.collect { case exchange: Exchange => exchange }.length - assert(numExchanges === 3) + assert(numExchanges === 5) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala index 35ef4957c0c01..cb6d68dc3ac46 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SortSuite.scala @@ -56,10 +56,8 @@ class SortSuite extends SparkPlanTest with SharedSQLContext { test("sort followed by limit") { checkThatPlansAgree( (1 to 100).map(v => Tuple1(v)).toDF("a"), - (child: SparkPlan) => - Limit(global = false, 10, Sort('a.asc :: Nil, global = true, child = child)), - (child: SparkPlan) => - Limit(global = false, 10, ReferenceSort('a.asc :: Nil, global = true, child)), + (child: SparkPlan) => GlobalLimit(10, Sort('a.asc :: Nil, global = true, child = child)), + (child: SparkPlan) => GlobalLimit(10, ReferenceSort('a.asc :: Nil, global = true, child)), sortAnswers = false ) } From 37f66887b35af510c93021e20e30d5d1c0ac696f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Feb 2016 14:06:35 -0800 Subject: [PATCH 10/16] Style fix. --- .../scala/org/apache/spark/sql/execution/basicOperators.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index d2adf52a18bbc..4bfac8efda1b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -34,7 +34,7 @@ case class Project(projectList: Seq[NamedExpression], child: SparkPlan) override def output: Seq[Attribute] = projectList.map(_.toAttribute) - override def upstream(): RDD[InternalRow] = { + override def upstream(): RDD[InternalRow] = { child.asInstanceOf[CodegenSupport].upstream() } From b4de46737e393f67bac114b6343f968df6bba733 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Feb 2016 14:23:27 -0800 Subject: [PATCH 11/16] Comments; order-preservation. --- .../apache/spark/sql/execution/limit.scala | 34 +++++++++++-------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index c9120bd0bfab2..f72fa972fd5cf 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -23,6 +23,22 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ +/** + * Take the first `limit` elements and collect them to a single partition. + * + * This operator will be used when a logical `Limit` operation is the final operator in an + * logical plan, which happens when the user is collecting results back to the driver. + */ +case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = SinglePartition + override def executeCollect(): Array[InternalRow] = child.executeTake(limit) + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) +} + +/** + * Helper trait which defines methods that are shared by both [[LocalLimit]] and [[GlobalLimit]]. + */ trait BaseLimit extends UnaryNode { val limit: Int override def output: Seq[Attribute] = child.output @@ -42,24 +58,14 @@ trait BaseLimit extends UnaryNode { } /** - * Take the first `limit` elements and return the to the driver. + * Take the first `limit` elements of each child partition, but do not collect or shuffle them. */ -case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { - override def output: Seq[Attribute] = child.output - override def outputPartitioning: Partitioning = SinglePartition - override def executeCollect(): Array[InternalRow] = child.executeTake(limit) - protected override def doExecute(): RDD[InternalRow] = { - sparkContext.parallelize(executeCollect()) - } +case class LocalLimit(limit: Int, child: SparkPlan) extends BaseLimit { + override def outputOrdering: Seq[SortOrder] = child.outputOrdering } /** - * Take the first `limit` elements of each partition. - */ -case class LocalLimit(limit: Int, child: SparkPlan) extends BaseLimit - -/** - * Take the first `limit` elements of the child operator's output. + * Take the first `limit` elements of the child's single output partition. */ case class GlobalLimit(limit: Int, child: SparkPlan) extends BaseLimit { override def requiredChildDistribution: List[Distribution] = AllTuples :: Nil From 924925f881c78c19bc792582fb9f606ba1441422 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Feb 2016 15:22:47 -0800 Subject: [PATCH 12/16] Update PlannerSuite.scala --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 6fc966c9b1c7e..2ff1bfaf95153 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -21,7 +21,8 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{execution, Row, SQLConf} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Ascending, Attribute, Literal, SortOrder} -import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, SortMergeJoin} import org.apache.spark.sql.functions._ From 55e27af45f4a39bb1e86425be5687af3608fd4f3 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Wed, 3 Feb 2016 16:02:49 -0800 Subject: [PATCH 13/16] Remove assertions which guarded against misplanning. --- .../main/scala/org/apache/spark/sql/execution/limit.scala | 8 -------- 1 file changed, 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index f72fa972fd5cf..5acde5996b543 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -44,14 +44,6 @@ trait BaseLimit extends UnaryNode { override def output: Seq[Attribute] = child.output override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputPartitioning: Partitioning = child.outputPartitioning - override def executeTake(n: Int): Array[InternalRow] = { - throw new UnsupportedOperationException( - s"Should not invoke executeTake() on ${getClass.getName}; use CollectLimit instead.") - } - override def executeCollect(): Array[InternalRow] = { - throw new UnsupportedOperationException( - s"Should not invoke executeCollect() on ${getClass.getName}; use CollectLimit instead.") - } protected override def doExecute(): RDD[InternalRow] = child.execute().mapPartitions { iter => iter.take(limit) } From c4b0a5303b4de328e11e8bfb79e8b3e795bd1174 Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Feb 2016 11:49:05 -0800 Subject: [PATCH 14/16] Move callsite of ReturnAnswer to fix caching / .rdd(). --- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 7 ++++--- .../org/apache/spark/sql/execution/QueryExecution.scala | 2 +- .../main/scala/org/apache/spark/sql/execution/limit.scala | 6 ++++-- .../org/apache/spark/sql/execution/PlannerSuite.scala | 6 ------ 4 files changed, 9 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index 6de17e5924d04..a5102b1018a8e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1510,12 +1510,13 @@ class DataFrame private[sql]( } private def collect(needCallback: Boolean): Array[Row] = { - def execute(): Array[Row] = withNewExecutionId { - queryExecution.executedPlan.executeCollectPublic() + val dfToExecute = withPlan(ReturnAnswer(logicalPlan)) + def execute(): Array[Row] = dfToExecute.withNewExecutionId { + dfToExecute.queryExecution.executedPlan.executeCollectPublic() } if (needCallback) { - withCallback("collect", this)(_ => execute()) + withCallback("collect", dfToExecute)(_ => execute()) } else { execute() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 8616fe317034f..8e9f1d6aaaff8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -44,7 +44,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { lazy val sparkPlan: SparkPlan = { SQLContext.setActive(sqlContext) - sqlContext.planner.plan(ReturnAnswer(optimizedPlan)).next() + sqlContext.planner.plan(optimizedPlan).next() } // executedPlan should not be used to initialize any SparkPlan. It should be diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 5acde5996b543..615a5a94445ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -27,13 +27,15 @@ import org.apache.spark.sql.catalyst.plans.physical._ * Take the first `limit` elements and collect them to a single partition. * * This operator will be used when a logical `Limit` operation is the final operator in an - * logical plan, which happens when the user is collecting results back to the driver. + * logical plan and the user is collecting results back to the driver. */ case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) - protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) + protected override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException("doExecute() should not be called on CollectLimit()") + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 2ff1bfaf95153..f62f8a2df81ba 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -182,12 +182,6 @@ class PlannerSuite extends SharedSQLContext { } } - test("terminal limits use CollectLimit") { - val query = testData.select('value).limit(2) - val planned = query.queryExecution.sparkPlan - assert(planned.isInstanceOf[CollectLimit]) - } - test("PartitioningCollection") { withTempTable("normal", "small", "tiny") { testData.registerTempTable("normal") From d6e780203b4beb3c111246e2de1234545cd3671f Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Feb 2016 15:33:46 -0800 Subject: [PATCH 15/16] Revert "Move callsite of ReturnAnswer to fix caching / .rdd()." This reverts commit c4b0a5303b4de328e11e8bfb79e8b3e795bd1174. --- .../src/main/scala/org/apache/spark/sql/DataFrame.scala | 7 +++---- .../org/apache/spark/sql/execution/QueryExecution.scala | 2 +- .../main/scala/org/apache/spark/sql/execution/limit.scala | 6 ++---- .../org/apache/spark/sql/execution/PlannerSuite.scala | 6 ++++++ 4 files changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala index a5102b1018a8e..6de17e5924d04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala @@ -1510,13 +1510,12 @@ class DataFrame private[sql]( } private def collect(needCallback: Boolean): Array[Row] = { - val dfToExecute = withPlan(ReturnAnswer(logicalPlan)) - def execute(): Array[Row] = dfToExecute.withNewExecutionId { - dfToExecute.queryExecution.executedPlan.executeCollectPublic() + def execute(): Array[Row] = withNewExecutionId { + queryExecution.executedPlan.executeCollectPublic() } if (needCallback) { - withCallback("collect", dfToExecute)(_ => execute()) + withCallback("collect", this)(_ => execute()) } else { execute() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 8e9f1d6aaaff8..8616fe317034f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -44,7 +44,7 @@ class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) { lazy val sparkPlan: SparkPlan = { SQLContext.setActive(sqlContext) - sqlContext.planner.plan(optimizedPlan).next() + sqlContext.planner.plan(ReturnAnswer(optimizedPlan)).next() } // executedPlan should not be used to initialize any SparkPlan. It should be diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 615a5a94445ec..5acde5996b543 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -27,15 +27,13 @@ import org.apache.spark.sql.catalyst.plans.physical._ * Take the first `limit` elements and collect them to a single partition. * * This operator will be used when a logical `Limit` operation is the final operator in an - * logical plan and the user is collecting results back to the driver. + * logical plan, which happens when the user is collecting results back to the driver. */ case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) - protected override def doExecute(): RDD[InternalRow] = { - throw new UnsupportedOperationException("doExecute() should not be called on CollectLimit()") - } + protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index f62f8a2df81ba..2ff1bfaf95153 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -182,6 +182,12 @@ class PlannerSuite extends SharedSQLContext { } } + test("terminal limits use CollectLimit") { + val query = testData.select('value).limit(2) + val planned = query.queryExecution.sparkPlan + assert(planned.isInstanceOf[CollectLimit]) + } + test("PartitioningCollection") { withTempTable("normal", "small", "tiny") { testData.registerTempTable("normal") From b8c9e47d5b0220c1074d686fcc62b2bb547f61ec Mon Sep 17 00:00:00 2001 From: Josh Rosen Date: Fri, 5 Feb 2016 15:52:23 -0800 Subject: [PATCH 16/16] Revert back to old limit approach, but reuse code from Exchange. --- .../apache/spark/sql/execution/Exchange.scala | 130 ++++++++++-------- .../apache/spark/sql/execution/limit.scala | 8 +- 2 files changed, 78 insertions(+), 60 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index 3770883af1e2f..97f65f18bfdcc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -57,6 +57,69 @@ case class Exchange( override def output: Seq[Attribute] = child.output + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + + override protected def doPrepare(): Unit = { + // If an ExchangeCoordinator is needed, we register this Exchange operator + // to the coordinator when we do prepare. It is important to make sure + // we register this operator right before the execution instead of register it + // in the constructor because it is possible that we create new instances of + // Exchange operators when we transform the physical plan + // (then the ExchangeCoordinator will hold references of unneeded Exchanges). + // So, we should only call registerExchange just before we start to execute + // the plan. + coordinator match { + case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) + case None => + } + } + + /** + * Returns a [[ShuffleDependency]] that will partition rows of its child based on + * the partitioning scheme defined in `newPartitioning`. Those partitions of + * the returned ShuffleDependency will be the input of shuffle. + */ + private[sql] def prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] = { + Exchange.prepareShuffleDependency(child.execute(), child.output, newPartitioning, serializer) + } + + /** + * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset. + * This [[ShuffledRowRDD]] is created based on a given [[ShuffleDependency]] and an optional + * partition start indices array. If this optional array is defined, the returned + * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array. + */ + private[sql] def preparePostShuffleRDD( + shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow], + specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = { + // If an array of partition start indices is provided, we need to use this array + // to create the ShuffledRowRDD. Also, we need to update newPartitioning to + // update the number of post-shuffle partitions. + specifiedPartitionStartIndices.foreach { indices => + assert(newPartitioning.isInstanceOf[HashPartitioning]) + newPartitioning = UnknownPartitioning(indices.length) + } + new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices) + } + + protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { + coordinator match { + case Some(exchangeCoordinator) => + val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) + assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) + shuffleRDD + case None => + val shuffleDependency = prepareShuffleDependency() + preparePostShuffleRDD(shuffleDependency) + } + } +} + +object Exchange { + def apply(newPartitioning: Partitioning, child: SparkPlan): Exchange = { + Exchange(newPartitioning, child, coordinator = None: Option[ExchangeCoordinator]) + } + /** * Determines whether records must be defensively copied before being sent to the shuffle. * Several of Spark's shuffle components will buffer deserialized Java objects in memory. The @@ -82,7 +145,7 @@ case class Exchange( // passed instead of directly passing the number of partitions in order to guard against // corner-cases where a partitioner constructed with `numPartitions` partitions may output // fewer partitions (like RangePartitioner, for example). - val conf = child.sqlContext.sparkContext.conf + val conf = SparkEnv.get.conf val shuffleManager = SparkEnv.get.shuffleManager val sortBasedShuffleOn = shuffleManager.isInstanceOf[SortShuffleManager] val bypassMergeThreshold = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) @@ -117,30 +180,16 @@ case class Exchange( } } - private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) - - override protected def doPrepare(): Unit = { - // If an ExchangeCoordinator is needed, we register this Exchange operator - // to the coordinator when we do prepare. It is important to make sure - // we register this operator right before the execution instead of register it - // in the constructor because it is possible that we create new instances of - // Exchange operators when we transform the physical plan - // (then the ExchangeCoordinator will hold references of unneeded Exchanges). - // So, we should only call registerExchange just before we start to execute - // the plan. - coordinator match { - case Some(exchangeCoordinator) => exchangeCoordinator.registerExchange(this) - case None => - } - } - /** * Returns a [[ShuffleDependency]] that will partition rows of its child based on * the partitioning scheme defined in `newPartitioning`. Those partitions of * the returned ShuffleDependency will be the input of shuffle. */ - private[sql] def prepareShuffleDependency(): ShuffleDependency[Int, InternalRow, InternalRow] = { - val rdd = child.execute() + private[sql] def prepareShuffleDependency( + rdd: RDD[InternalRow], + outputAttributes: Seq[Attribute], + newPartitioning: Partitioning, + serializer: Serializer): ShuffleDependency[Int, InternalRow, InternalRow] = { val part: Partitioner = newPartitioning match { case RoundRobinPartitioning(numPartitions) => new HashPartitioner(numPartitions) case HashPartitioning(_, n) => @@ -160,7 +209,7 @@ case class Exchange( // We need to use an interpreted ordering here because generated orderings cannot be // serialized and this ordering needs to be created on the driver in order to be passed into // Spark core code. - implicit val ordering = new InterpretedOrdering(sortingExpressions, child.output) + implicit val ordering = new InterpretedOrdering(sortingExpressions, outputAttributes) new RangePartitioner(numPartitions, rddForSampling, ascending = true) case SinglePartition => new Partitioner { @@ -180,7 +229,7 @@ case class Exchange( position } case h: HashPartitioning => - val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, child.output) + val projection = UnsafeProjection.create(h.partitionIdExpression :: Nil, outputAttributes) row => projection(row).getInt(0) case RangePartitioning(_, _) | SinglePartition => identity case _ => sys.error(s"Exchange not implemented for $newPartitioning") @@ -211,43 +260,6 @@ case class Exchange( dependency } - - /** - * Returns a [[ShuffledRowRDD]] that represents the post-shuffle dataset. - * This [[ShuffledRowRDD]] is created based on a given [[ShuffleDependency]] and an optional - * partition start indices array. If this optional array is defined, the returned - * [[ShuffledRowRDD]] will fetch pre-shuffle partitions based on indices of this array. - */ - private[sql] def preparePostShuffleRDD( - shuffleDependency: ShuffleDependency[Int, InternalRow, InternalRow], - specifiedPartitionStartIndices: Option[Array[Int]] = None): ShuffledRowRDD = { - // If an array of partition start indices is provided, we need to use this array - // to create the ShuffledRowRDD. Also, we need to update newPartitioning to - // update the number of post-shuffle partitions. - specifiedPartitionStartIndices.foreach { indices => - assert(newPartitioning.isInstanceOf[HashPartitioning]) - newPartitioning = UnknownPartitioning(indices.length) - } - new ShuffledRowRDD(shuffleDependency, specifiedPartitionStartIndices) - } - - protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { - coordinator match { - case Some(exchangeCoordinator) => - val shuffleRDD = exchangeCoordinator.postShuffleRDD(this) - assert(shuffleRDD.partitions.length == newPartitioning.numPartitions) - shuffleRDD - case None => - val shuffleDependency = prepareShuffleDependency() - preparePostShuffleRDD(shuffleDependency) - } - } -} - -object Exchange { - def apply(newPartitioning: Partitioning, child: SparkPlan): Exchange = { - Exchange(newPartitioning, child, coordinator = None: Option[ExchangeCoordinator]) - } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala index 5acde5996b543..256f4228ae99e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution import org.apache.spark.rdd.RDD +import org.apache.spark.serializer.Serializer import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.physical._ @@ -33,7 +34,12 @@ case class CollectLimit(limit: Int, child: SparkPlan) extends UnaryNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = SinglePartition override def executeCollect(): Array[InternalRow] = child.executeTake(limit) - protected override def doExecute(): RDD[InternalRow] = sparkContext.makeRDD(executeCollect(), 1) + private val serializer: Serializer = new UnsafeRowSerializer(child.output.size) + protected override def doExecute(): RDD[InternalRow] = { + val shuffled = new ShuffledRowRDD( + Exchange.prepareShuffleDependency(child.execute(), child.output, SinglePartition, serializer)) + shuffled.mapPartitionsInternal(_.take(limit)) + } } /**