From e7554f005b92b7cf57fb40e32b6fa7419a93b9d2 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 17 May 2016 18:20:40 +0900 Subject: [PATCH 01/19] Move planLater method into GenericStrategy. --- .../sql/catalyst/planning/QueryPlanner.scala | 17 ++++++---- .../spark/sql/ExperimentalMethods.scala | 3 +- .../spark/sql/execution/SparkPlanner.scala | 8 ++--- .../spark/sql/execution/SparkStrategies.scala | 18 ++++++++--- .../datasources/DataSourceStrategy.scala | 32 +++++++++++-------- .../datasources/FileSourceStrategy.scala | 5 ++- .../streaming/IncrementalExecution.scala | 3 +- .../spark/sql/ExtraStrategiesSuite.scala | 4 +-- .../spark/sql/hive/HiveSessionState.scala | 6 ++-- .../spark/sql/hive/HiveStrategies.scala | 9 ++++-- 10 files changed, 62 insertions(+), 43 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 8b1a34f79c42a..5d65fab7a6b91 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -27,6 +27,16 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { + + def planner: QueryPlanner[PhysicalPlan] + + /** + * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. + */ + protected def planLater(plan: LogicalPlan): PhysicalPlan = planner.plan(plan).next() + def apply(plan: LogicalPlan): Seq[PhysicalPlan] } @@ -47,13 +57,6 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] - /** - * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be - * filled in automatically by the QueryPlanner using the other execution strategies that are - * available. - */ - protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next() - def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala index a49da6dc2b8b2..96f252a03cef9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.SparkPlanner /** * :: Experimental :: @@ -42,7 +43,7 @@ class ExperimentalMethods private[sql]() { * @since 1.3.0 */ @Experimental - @volatile var extraStrategies: Seq[Strategy] = Nil + @volatile var extraStrategies: Seq[SparkPlanner => Strategy] = Nil @Experimental @volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index de832ec70b4df..61897b029595b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -26,15 +26,15 @@ import org.apache.spark.sql.internal.SQLConf class SparkPlanner( val sparkContext: SparkContext, val conf: SQLConf, - val extraStrategies: Seq[Strategy]) + val extraStrategies: Seq[SparkPlanner => Strategy]) extends SparkStrategies { def numPartitions: Int = conf.numShufflePartitions def strategies: Seq[Strategy] = - extraStrategies ++ ( - FileSourceStrategy :: - DataSourceStrategy :: + extraStrategies.map(_(this)) ++ ( + FileSourceStrategy(this) :: + DataSourceStrategy(this) :: DDLStrategy :: SpecialLimits :: Aggregation :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index faf359f54838e..b713b69a1f26f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -41,6 +41,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Plans special cases of limit operators. */ object SpecialLimits extends Strategy { + override def planner: SparkPlanner = self override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ReturnAnswer(rootPlan) => rootPlan match { case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => @@ -127,7 +128,9 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + override def planner: SparkPlanner = self + + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // --- BroadcastHashJoin -------------------------------------------------------------------- @@ -203,6 +206,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]] */ object StatefulAggregationStrategy extends Strategy { + override def planner: SparkPlanner = self override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalAggregation( namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) => @@ -221,7 +225,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface. */ object Aggregation extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + override def planner: SparkPlanner = self + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalAggregation( groupingExpressions, aggregateExpressions, resultExpressions, child) => @@ -270,7 +275,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { protected lazy val singleRowRdd = sparkContext.parallelize(Seq(InternalRow()), 1) object InMemoryScans extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + override def planner: SparkPlanner = self + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, filters, mem: InMemoryRelation) => pruneFilterProject( projectList, @@ -285,7 +291,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object BasicOperators extends Strategy { def numPartitions: Int = self.numPartitions - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + override def planner: SparkPlanner = self + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommandExec(r) :: Nil case MemoryPlan(sink, output) => @@ -371,7 +378,8 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } object DDLStrategy extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + override def planner: SparkPlanner = self + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case c: CreateTableUsing if c.temporary && !c.allowExisting => ExecutedCommandExec( CreateTempTableUsing( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 0494fafb0e424..5e0490c01da90 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,6 +34,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.DataSourceScanExec.PUSHED_FILTERS +import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils, ExecutedCommandExec} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -123,7 +124,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[ /** * A Strategy for planning scans over data sources defined using the sources API. */ -private[sql] object DataSourceStrategy extends Strategy with Logging { +private[sql] case class DataSourceStrategy(planner: SparkPlanner) extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) => pruneFilterProjectRaw( @@ -158,18 +159,6 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { case _ => Nil } - // Get the bucket ID based on the bucketing values. - // Restriction: Bucket pruning works iff the bucketing column has one and only one column. - def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = { - val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType)) - mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null) - val bucketIdGeneration = UnsafeProjection.create( - HashPartitioning(bucketColumn :: Nil, numBuckets).partitionIdExpression :: Nil, - bucketColumn :: Nil) - - bucketIdGeneration(mutableRow).getInt(0) - } - // Based on Public API. protected def pruneFilterProject( relation: LogicalRelation, @@ -213,7 +202,7 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { }} val (unhandledPredicates, pushedFilters) = - selectFilters(relation.relation, candidatePredicates) + DataSourceStrategy.selectFilters(relation.relation, candidatePredicates) // A set of column attributes that are only referenced by pushed down filters. We can eliminate // them from requested columns. @@ -291,6 +280,21 @@ private[sql] object DataSourceStrategy extends Strategy with Logging { private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = { toCatalystRDD(relation, relation.output, rdd) } +} + +object DataSourceStrategy { + + // Get the bucket ID based on the bucketing values. + // Restriction: Bucket pruning works iff the bucketing column has one and only one column. + def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = { + val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType)) + mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null) + val bucketIdGeneration = UnsafeProjection.create( + HashPartitioning(bucketColumn :: Nil, numBuckets).partitionIdExpression :: Nil, + bucketColumn :: Nil) + + bucketIdGeneration(mutableRow).getInt(0) + } /** * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index 350508c1d9f4c..b3e83334347a0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -27,9 +27,8 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.DataSourceScanExec +import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan, SparkPlanner} import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS} -import org.apache.spark.sql.execution.SparkPlan /** * A strategy for planning scans over collections of files that might be partitioned or bucketed @@ -54,7 +53,7 @@ import org.apache.spark.sql.execution.SparkPlan * is under the threshold with the addition of the next file, add it. If not, open a new bucket * and add it. Proceed to the next file. */ -private[sql] object FileSourceStrategy extends Strategy with Logging { +private[sql] case class FileSourceStrategy(planner: SparkPlanner) extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, table)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index e9052a309595a..9140c1873fc12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -36,7 +36,8 @@ class IncrementalExecution( extends QueryExecution(sparkSession, logicalPlan) { // TODO: make this always part of planning. - val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil + val stateStrategy = + ((_: SparkPlanner) => sparkSession.sessionState.planner.StatefulAggregationStrategy) :: Nil // Modified planner with stateful operations. override def planner: SparkPlanner = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala index a41b465548622..e2d61a35a6a96 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, SparkPlanner} import org.apache.spark.sql.test.SharedSQLContext case class FastOperator(output: Seq[Attribute]) extends SparkPlan { @@ -38,7 +38,7 @@ case class FastOperator(output: Seq[Attribute]) extends SparkPlan { override def children: Seq[SparkPlan] = Nil } -object TestStrategy extends Strategy { +case class TestStrategy(planner: SparkPlanner) extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case Project(Seq(attr), _) if attr.name == "a" => FastOperator(attr.toAttribute :: Nil) :: Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 46579ecd85caa..1a05d0ffce445 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -83,9 +83,9 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val sparkSession: SparkSession = self.sparkSession override def strategies: Seq[Strategy] = { - experimentalMethods.extraStrategies ++ Seq( - FileSourceStrategy, - DataSourceStrategy, + experimentalMethods.extraStrategies.map(_(this)) ++ Seq( + FileSourceStrategy(this), + DataSourceStrategy(this), DDLStrategy, SpecialLimits, InMemoryScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 71b180e55b58c..022e94e1c9a69 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -32,7 +32,8 @@ private[hive] trait HiveStrategies { val sparkSession: SparkSession object Scripts extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + override def planner: SparkPlanner = self + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child, ioschema) => val hiveIoSchema = HiveScriptIOSchema(ioschema) ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil @@ -41,7 +42,8 @@ private[hive] trait HiveStrategies { } object DataSinks extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + override def planner: SparkPlanner = self + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.InsertIntoTable( table: MetastoreRelation, partition, child, overwrite, ifNotExists) => execution.InsertIntoHiveTable( @@ -59,7 +61,8 @@ private[hive] trait HiveStrategies { * applied. */ object HiveTableScans extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + override def planner: SparkPlanner = self + override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. From 8c6b1bf2015313b78f999f659aefab502f8cddd5 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 23 May 2016 15:41:44 +0900 Subject: [PATCH 02/19] Revert "Move planLater method into GenericStrategy." This reverts commit e7554f005b92b7cf57fb40e32b6fa7419a93b9d2. --- .../sql/catalyst/planning/QueryPlanner.scala | 17 ++++------ .../spark/sql/ExperimentalMethods.scala | 3 +- .../spark/sql/execution/SparkPlanner.scala | 8 ++--- .../spark/sql/execution/SparkStrategies.scala | 18 +++-------- .../datasources/DataSourceStrategy.scala | 32 ++++++++----------- .../datasources/FileSourceStrategy.scala | 5 +-- .../streaming/IncrementalExecution.scala | 3 +- .../spark/sql/ExtraStrategiesSuite.scala | 4 +-- .../spark/sql/hive/HiveSessionState.scala | 6 ++-- .../spark/sql/hive/HiveStrategies.scala | 9 ++---- 10 files changed, 43 insertions(+), 62 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 5d65fab7a6b91..8b1a34f79c42a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -27,16 +27,6 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { - - def planner: QueryPlanner[PhysicalPlan] - - /** - * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be - * filled in automatically by the QueryPlanner using the other execution strategies that are - * available. - */ - protected def planLater(plan: LogicalPlan): PhysicalPlan = planner.plan(plan).next() - def apply(plan: LogicalPlan): Seq[PhysicalPlan] } @@ -57,6 +47,13 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] + /** + * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. + */ + protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next() + def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala index 96f252a03cef9..a49da6dc2b8b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/ExperimentalMethods.scala @@ -20,7 +20,6 @@ package org.apache.spark.sql import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.execution.SparkPlanner /** * :: Experimental :: @@ -43,7 +42,7 @@ class ExperimentalMethods private[sql]() { * @since 1.3.0 */ @Experimental - @volatile var extraStrategies: Seq[SparkPlanner => Strategy] = Nil + @volatile var extraStrategies: Seq[Strategy] = Nil @Experimental @volatile var extraOptimizations: Seq[Rule[LogicalPlan]] = Nil diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 61897b029595b..de832ec70b4df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -26,15 +26,15 @@ import org.apache.spark.sql.internal.SQLConf class SparkPlanner( val sparkContext: SparkContext, val conf: SQLConf, - val extraStrategies: Seq[SparkPlanner => Strategy]) + val extraStrategies: Seq[Strategy]) extends SparkStrategies { def numPartitions: Int = conf.numShufflePartitions def strategies: Seq[Strategy] = - extraStrategies.map(_(this)) ++ ( - FileSourceStrategy(this) :: - DataSourceStrategy(this) :: + extraStrategies ++ ( + FileSourceStrategy :: + DataSourceStrategy :: DDLStrategy :: SpecialLimits :: Aggregation :: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index b713b69a1f26f..faf359f54838e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -41,7 +41,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Plans special cases of limit operators. */ object SpecialLimits extends Strategy { - override def planner: SparkPlanner = self override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ReturnAnswer(rootPlan) => rootPlan match { case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) => @@ -128,9 +127,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { case _ => false } - override def planner: SparkPlanner = self - - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { // --- BroadcastHashJoin -------------------------------------------------------------------- @@ -206,7 +203,6 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * on-demand, only when planning in a [[org.apache.spark.sql.execution.streaming.StreamExecution]] */ object StatefulAggregationStrategy extends Strategy { - override def planner: SparkPlanner = self override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalAggregation( namedGroupingExpressions, aggregateExpressions, rewrittenResultExpressions, child) => @@ -225,8 +221,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { * Used to plan the aggregate operator for expressions based on the AggregateFunction2 interface. */ object Aggregation extends Strategy { - override def planner: SparkPlanner = self - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalAggregation( groupingExpressions, aggregateExpressions, resultExpressions, child) => @@ -275,8 +270,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { protected lazy val singleRowRdd = sparkContext.parallelize(Seq(InternalRow()), 1) object InMemoryScans extends Strategy { - override def planner: SparkPlanner = self - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, filters, mem: InMemoryRelation) => pruneFilterProject( projectList, @@ -291,8 +285,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object BasicOperators extends Strategy { def numPartitions: Int = self.numPartitions - override def planner: SparkPlanner = self - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case r: RunnableCommand => ExecutedCommandExec(r) :: Nil case MemoryPlan(sink, output) => @@ -378,8 +371,7 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { } object DDLStrategy extends Strategy { - override def planner: SparkPlanner = self - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case c: CreateTableUsing if c.temporary && !c.allowExisting => ExecutedCommandExec( CreateTempTableUsing( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala index 5e0490c01da90..0494fafb0e424 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala @@ -34,7 +34,6 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.execution.DataSourceScanExec.PUSHED_FILTERS -import org.apache.spark.sql.execution.SparkPlanner import org.apache.spark.sql.execution.command.{CreateDataSourceTableUtils, DDLUtils, ExecutedCommandExec} import org.apache.spark.sql.sources._ import org.apache.spark.sql.types._ @@ -124,7 +123,7 @@ private[sql] class FindDataSourceTable(sparkSession: SparkSession) extends Rule[ /** * A Strategy for planning scans over data sources defined using the sources API. */ -private[sql] case class DataSourceStrategy(planner: SparkPlanner) extends Strategy with Logging { +private[sql] object DataSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[execution.SparkPlan] = plan match { case PhysicalOperation(projects, filters, l @ LogicalRelation(t: CatalystScan, _, _)) => pruneFilterProjectRaw( @@ -159,6 +158,18 @@ private[sql] case class DataSourceStrategy(planner: SparkPlanner) extends Strate case _ => Nil } + // Get the bucket ID based on the bucketing values. + // Restriction: Bucket pruning works iff the bucketing column has one and only one column. + def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = { + val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType)) + mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null) + val bucketIdGeneration = UnsafeProjection.create( + HashPartitioning(bucketColumn :: Nil, numBuckets).partitionIdExpression :: Nil, + bucketColumn :: Nil) + + bucketIdGeneration(mutableRow).getInt(0) + } + // Based on Public API. protected def pruneFilterProject( relation: LogicalRelation, @@ -202,7 +213,7 @@ private[sql] case class DataSourceStrategy(planner: SparkPlanner) extends Strate }} val (unhandledPredicates, pushedFilters) = - DataSourceStrategy.selectFilters(relation.relation, candidatePredicates) + selectFilters(relation.relation, candidatePredicates) // A set of column attributes that are only referenced by pushed down filters. We can eliminate // them from requested columns. @@ -280,21 +291,6 @@ private[sql] case class DataSourceStrategy(planner: SparkPlanner) extends Strate private[this] def toCatalystRDD(relation: LogicalRelation, rdd: RDD[Row]): RDD[InternalRow] = { toCatalystRDD(relation, relation.output, rdd) } -} - -object DataSourceStrategy { - - // Get the bucket ID based on the bucketing values. - // Restriction: Bucket pruning works iff the bucketing column has one and only one column. - def getBucketId(bucketColumn: Attribute, numBuckets: Int, value: Any): Int = { - val mutableRow = new SpecificMutableRow(Seq(bucketColumn.dataType)) - mutableRow(0) = Cast(Literal(value), bucketColumn.dataType).eval(null) - val bucketIdGeneration = UnsafeProjection.create( - HashPartitioning(bucketColumn :: Nil, numBuckets).partitionIdExpression :: Nil, - bucketColumn :: Nil) - - bucketIdGeneration(mutableRow).getInt(0) - } /** * Tries to translate a Catalyst [[Expression]] into data source [[Filter]]. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala index b3e83334347a0..350508c1d9f4c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala @@ -27,8 +27,9 @@ import org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.planning.PhysicalOperation import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.execution.{DataSourceScanExec, SparkPlan, SparkPlanner} +import org.apache.spark.sql.execution.DataSourceScanExec import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FILTERS} +import org.apache.spark.sql.execution.SparkPlan /** * A strategy for planning scans over collections of files that might be partitioned or bucketed @@ -53,7 +54,7 @@ import org.apache.spark.sql.execution.DataSourceScanExec.{INPUT_PATHS, PUSHED_FI * is under the threshold with the addition of the next file, add it. If not, open a new bucket * and add it. Proceed to the next file. */ -private[sql] case class FileSourceStrategy(planner: SparkPlanner) extends Strategy with Logging { +private[sql] object FileSourceStrategy extends Strategy with Logging { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projects, filters, l @ LogicalRelation(files: HadoopFsRelation, _, table)) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala index 9140c1873fc12..e9052a309595a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala @@ -36,8 +36,7 @@ class IncrementalExecution( extends QueryExecution(sparkSession, logicalPlan) { // TODO: make this always part of planning. - val stateStrategy = - ((_: SparkPlanner) => sparkSession.sessionState.planner.StatefulAggregationStrategy) :: Nil + val stateStrategy = sparkSession.sessionState.planner.StatefulAggregationStrategy :: Nil // Modified planner with stateful operations. override def planner: SparkPlanner = diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala index e2d61a35a6a96..a41b465548622 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExtraStrategiesSuite.scala @@ -21,7 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} -import org.apache.spark.sql.execution.{SparkPlan, SparkPlanner} +import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.test.SharedSQLContext case class FastOperator(output: Seq[Attribute]) extends SparkPlan { @@ -38,7 +38,7 @@ case class FastOperator(output: Seq[Attribute]) extends SparkPlan { override def children: Seq[SparkPlan] = Nil } -case class TestStrategy(planner: SparkPlanner) extends Strategy { +object TestStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case Project(Seq(attr), _) if attr.name == "a" => FastOperator(attr.toAttribute :: Nil) :: Nil diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala index 1a05d0ffce445..46579ecd85caa 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala @@ -83,9 +83,9 @@ private[hive] class HiveSessionState(sparkSession: SparkSession) override val sparkSession: SparkSession = self.sparkSession override def strategies: Seq[Strategy] = { - experimentalMethods.extraStrategies.map(_(this)) ++ Seq( - FileSourceStrategy(this), - DataSourceStrategy(this), + experimentalMethods.extraStrategies ++ Seq( + FileSourceStrategy, + DataSourceStrategy, DDLStrategy, SpecialLimits, InMemoryScans, diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala index 022e94e1c9a69..71b180e55b58c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala @@ -32,8 +32,7 @@ private[hive] trait HiveStrategies { val sparkSession: SparkSession object Scripts extends Strategy { - override def planner: SparkPlanner = self - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.ScriptTransformation(input, script, output, child, ioschema) => val hiveIoSchema = HiveScriptIOSchema(ioschema) ScriptTransformation(input, script, output, planLater(child), hiveIoSchema) :: Nil @@ -42,8 +41,7 @@ private[hive] trait HiveStrategies { } object DataSinks extends Strategy { - override def planner: SparkPlanner = self - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case logical.InsertIntoTable( table: MetastoreRelation, partition, child, overwrite, ifNotExists) => execution.InsertIntoHiveTable( @@ -61,8 +59,7 @@ private[hive] trait HiveStrategies { * applied. */ object HiveTableScans extends Strategy { - override def planner: SparkPlanner = self - override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case PhysicalOperation(projectList, predicates, relation: MetastoreRelation) => // Filter out all predicates that only deal with partition keys, these are given to the // hive table scan operator to be used for partition pruning. From 85d412aa2431333ecf0ba8de270777b0a994fcb3 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 27 May 2016 15:59:36 +0900 Subject: [PATCH 03/19] Move planLater method into GenericStrategy using PlanLater placeholder. --- .../sql/catalyst/planning/QueryPlanner.scala | 15 ++++++------ .../spark/sql/execution/SparkPlanner.scala | 11 +++++++++ .../spark/sql/execution/SparkStrategies.scala | 23 +++++++++++++++++++ .../scala/org/apache/spark/sql/package.scala | 4 ++-- 4 files changed, 44 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 8b1a34f79c42a..327a0481acf7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -27,6 +27,14 @@ import org.apache.spark.sql.catalyst.trees.TreeNode * empty list should be returned. */ abstract class GenericStrategy[PhysicalPlan <: TreeNode[PhysicalPlan]] extends Logging { + + /** + * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be + * filled in automatically by the QueryPlanner using the other execution strategies that are + * available. + */ + protected def planLater(plan: LogicalPlan): PhysicalPlan + def apply(plan: LogicalPlan): Seq[PhysicalPlan] } @@ -47,13 +55,6 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** A list of execution strategies that can be used by the planner */ def strategies: Seq[GenericStrategy[PhysicalPlan]] - /** - * Returns a placeholder for a physical plan that executes `plan`. This placeholder will be - * filled in automatically by the QueryPlanner using the other execution strategies that are - * available. - */ - protected def planLater(plan: LogicalPlan): PhysicalPlan = this.plan(plan).next() - def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... val iter = strategies.view.flatMap(_(plan)).toIterator diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index de832ec70b4df..4bcc3aa7b55f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.execution import org.apache.spark.SparkContext import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.datasources.{DataSourceStrategy, FileSourceStrategy} import org.apache.spark.sql.internal.SQLConf @@ -42,6 +43,16 @@ class SparkPlanner( InMemoryScans :: BasicOperators :: Nil) + override def plan(plan: LogicalPlan): Iterator[SparkPlan] = { + super.plan(plan).map { + _.transform { + case PlanLater(p) => + // TODO: use the first plan for now, but we will implement plan space exploaration later. + this.plan(p).next() + } + } + } + /** * Used to build table scan operators where complex projection and filtering are done using * separate physical operators. This function returns the given scan operator with Project and diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index e40525287a0a1..f70242b5750ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.execution +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.rdd.RDD import org.apache.spark.sql.{AnalysisException, Strategy} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.encoders.RowEncoder @@ -34,6 +36,27 @@ import org.apache.spark.sql.execution.joins.{BuildLeft, BuildRight} import org.apache.spark.sql.execution.streaming.MemoryPlan import org.apache.spark.sql.internal.SQLConf +/** + * Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting + * with the query planner and is not designed to be stable across spark releases. Developers + * writing libraries should instead consider using the stable APIs provided in + * [[org.apache.spark.sql.sources]] + */ +@DeveloperApi +abstract class SparkStrategy extends GenericStrategy[SparkPlan] { + + override protected def planLater(plan: LogicalPlan): SparkPlan = PlanLater(plan) +} + +private[sql] case class PlanLater(plan: LogicalPlan) extends LeafExecNode { + + override def output: Seq[Attribute] = plan.output + + protected override def doExecute(): RDD[InternalRow] = { + throw new UnsupportedOperationException() + } +} + private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { self: SparkPlanner => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/package.scala b/sql/core/src/main/scala/org/apache/spark/sql/package.scala index 97e35bb10407e..28d8bc3de68b8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/package.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/package.scala @@ -18,7 +18,7 @@ package org.apache.spark import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy} /** * Allows the execution of relational queries, including those expressed in SQL using Spark. @@ -40,7 +40,7 @@ package object sql { * [[org.apache.spark.sql.sources]] */ @DeveloperApi - type Strategy = org.apache.spark.sql.catalyst.planning.GenericStrategy[SparkPlan] + type Strategy = SparkStrategy type DataFrame = Dataset[Row] } From 0dccae8fe0c07d085880ea5feec6fafd9848dbc4 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 27 May 2016 19:07:39 +0900 Subject: [PATCH 04/19] Use tranformUp instread of transform. --- .../scala/org/apache/spark/sql/execution/SparkPlanner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 4bcc3aa7b55f0..319fff1f7c122 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -45,7 +45,7 @@ class SparkPlanner( override def plan(plan: LogicalPlan): Iterator[SparkPlan] = { super.plan(plan).map { - _.transform { + _.transformUp { case PlanLater(p) => // TODO: use the first plan for now, but we will implement plan space exploaration later. this.plan(p).next() From 42c18ab847ab8b1e112a3a3129bd7483b21a486c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Fri, 27 May 2016 22:06:23 +0900 Subject: [PATCH 05/19] Modify planners to be able to explore plan space. --- .../sql/catalyst/planning/QueryPlanner.scala | 19 ++++++++++++++++++- .../spark/sql/execution/QueryExecution.scala | 1 + .../spark/sql/execution/SparkPlanner.scala | 10 +++------- 3 files changed, 22 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 327a0481acf7e..0061f0c873396 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -57,8 +57,25 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... - val iter = strategies.view.flatMap(_(plan)).toIterator + val iter = strategies.view.flatMap(_(plan)).toIterator.flatMap { physicalPlan => + val placeholders = collectPlaceholders(physicalPlan) + + (Iterator(physicalPlan) /: placeholders.toIterator) { + case (physicalPlans, (placeholder, logicalPlan)) => + val children = this.plan(logicalPlan) + physicalPlans.flatMap { physicalPlan => + children.map { child => + physicalPlan.transformUp { + case `placeholder` => child + } + } + } + } + } + // TODO: We will need to prune bad plans to prevent from combinatorial explosion. assert(iter.hasNext, s"No plan for $plan") iter } + + protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 330459c11ea98..98587629fec4a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -74,6 +74,7 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) + // TODO: use next() here for now, but we will need to choice the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 319fff1f7c122..bbf49783d500f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -43,13 +43,9 @@ class SparkPlanner( InMemoryScans :: BasicOperators :: Nil) - override def plan(plan: LogicalPlan): Iterator[SparkPlan] = { - super.plan(plan).map { - _.transformUp { - case PlanLater(p) => - // TODO: use the first plan for now, but we will implement plan space exploaration later. - this.plan(p).next() - } + override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = { + plan.collect { case placeholder @ PlanLater(logicalPlan) => + placeholder -> logicalPlan } } From b3c8298961a194e4bcc89800b35ec9e3a0f0aa2c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 28 May 2016 09:30:20 +0900 Subject: [PATCH 06/19] Use `foldLeft` instead of `/:`. --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 0061f0c873396..7a69267c617dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -60,7 +60,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { val iter = strategies.view.flatMap(_(plan)).toIterator.flatMap { physicalPlan => val placeholders = collectPlaceholders(physicalPlan) - (Iterator(physicalPlan) /: placeholders.toIterator) { + placeholders.toIterator.foldLeft(Iterator(physicalPlan)) { case (physicalPlans, (placeholder, logicalPlan)) => val children = this.plan(logicalPlan) physicalPlans.flatMap { physicalPlan => From a3524cd220f840839a7f40c570fe2c5da4c7705b Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 28 May 2016 09:31:29 +0900 Subject: [PATCH 07/19] Spell out explicitly. --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 7a69267c617dd..4c3fb64184fcf 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -66,7 +66,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { physicalPlans.flatMap { physicalPlan => children.map { child => physicalPlan.transformUp { - case `placeholder` => child + case p if p == placeholder => child } } } From d203a757e3bfce7fe6b346331e147463c955370c Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 28 May 2016 13:20:04 +0900 Subject: [PATCH 08/19] Introduce `prunePlans` method as an entry point to prune bad plans. --- .../apache/spark/sql/catalyst/planning/QueryPlanner.scala | 8 +++++--- .../org/apache/spark/sql/execution/SparkPlanner.scala | 6 ++++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 4c3fb64184fcf..181dc25c320e0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -72,10 +72,12 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { } } } - // TODO: We will need to prune bad plans to prevent from combinatorial explosion. - assert(iter.hasNext, s"No plan for $plan") - iter + val pruned = prunePlans(iter) + assert(pruned.hasNext, s"No plan for $plan") + pruned } protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] + + protected def prunePlans(plans: Iterator[PhysicalPlan]): Iterator[PhysicalPlan] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index bbf49783d500f..97b02b7b53b60 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -49,6 +49,12 @@ class SparkPlanner( } } + override protected def prunePlans(plans: Iterator[SparkPlan]): Iterator[SparkPlan] = { + // TODO: We will need to prune bad plans when we improve plan space exploration + // to prevent from combinatorial explosion. + plans + } + /** * Used to build table scan operators where complex projection and filtering are done using * separate physical operators. This function returns the given scan operator with Project and From 674aaf6e731c0f96ee835b8a2a8b886fd9512da6 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 28 May 2016 13:43:11 +0900 Subject: [PATCH 09/19] Revise and add comments. --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 2 ++ .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 181dc25c320e0..4bbf72a192da6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -77,7 +77,9 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { pruned } + /** Collects placeholders marked as [[planLater]] by strategy and its [[LogicalPlan]]s */ protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] + /** Prunes bad plans to prevent from combinatorial explosion. */ protected def prunePlans(plans: Iterator[PhysicalPlan]): Iterator[PhysicalPlan] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 98587629fec4a..9301bd286cc9c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -74,7 +74,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) - // TODO: use next() here for now, but we will need to choice the best plan. + // TODO: We use next(), i.e. take the first one of plans returned by the planner, here for now, + // but we will need to choice the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } From e5ee2c2b4c7351df42d9096b95f4ed3759f540d7 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sat, 28 May 2016 14:42:52 +0900 Subject: [PATCH 10/19] Add SparkPlannerSuite. --- .../sql/execution/SparkPlannerSuite.scala | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala new file mode 100644 index 0000000000000..a04d6287a2227 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.Strategy +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, ReturnAnswer, Union} +import org.apache.spark.sql.test.SharedSQLContext + +class SparkPlannerSuite extends SharedSQLContext { + import testImplicits._ + + private var planned = 0 + + private object MayPlanRecursively extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ReturnAnswer(child) => + planned += 1 + planLater(child) :: planLater(plan) :: Nil + case Union(children) => + planned += 1 + UnionExec(children.map(planLater)) :: planLater(plan) :: Nil + case LocalRelation(output, data) => + planned += 1 + LocalTableScanExec(output, data) :: planLater(plan) :: Nil + case _ => Nil + } + } + + test("Ensure to go down only the first branch, not any other possible branches") { + try { + spark.experimental.extraStrategies = MayPlanRecursively :: Nil + + val ds = Seq("a", "b", "c").toDS().union(Seq("d", "e", "f").toDS()) + + planned = 0 + assert(ds.collect().toSeq === Seq("a", "b", "c", "d", "e", "f")) + assert(planned === 4) + } finally { + spark.experimental.extraStrategies = Nil + } + } +} From 7b3f4a1ac99a3e4ab2648288e3a0b71a7e2defd5 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sun, 29 May 2016 10:00:44 +0900 Subject: [PATCH 11/19] Modify a test to clarify the illegal case. --- .../sql/execution/SparkPlannerSuite.scala | 44 +++++++++++-------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala index a04d6287a2227..aecfd3062147c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkPlannerSuite.scala @@ -18,36 +18,42 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.Strategy -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, ReturnAnswer, Union} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, ReturnAnswer, Union} import org.apache.spark.sql.test.SharedSQLContext class SparkPlannerSuite extends SharedSQLContext { import testImplicits._ - private var planned = 0 - - private object MayPlanRecursively extends Strategy { - def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { - case ReturnAnswer(child) => - planned += 1 - planLater(child) :: planLater(plan) :: Nil - case Union(children) => - planned += 1 - UnionExec(children.map(planLater)) :: planLater(plan) :: Nil - case LocalRelation(output, data) => - planned += 1 - LocalTableScanExec(output, data) :: planLater(plan) :: Nil - case _ => Nil + test("Ensure to go down only the first branch, not any other possible branches") { + + case object NeverPlanned extends LeafNode { + override def output: Seq[Attribute] = Nil + } + + var planned = 0 + object TestStrategy extends Strategy { + def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ReturnAnswer(child) => + planned += 1 + planLater(child) :: planLater(NeverPlanned) :: Nil + case Union(children) => + planned += 1 + UnionExec(children.map(planLater)) :: planLater(NeverPlanned) :: Nil + case LocalRelation(output, data) => + planned += 1 + LocalTableScanExec(output, data) :: planLater(NeverPlanned) :: Nil + case NeverPlanned => + fail("QueryPlanner should not go down to this branch.") + case _ => Nil + } } - } - test("Ensure to go down only the first branch, not any other possible branches") { try { - spark.experimental.extraStrategies = MayPlanRecursively :: Nil + spark.experimental.extraStrategies = TestStrategy :: Nil val ds = Seq("a", "b", "c").toDS().union(Seq("d", "e", "f").toDS()) - planned = 0 assert(ds.collect().toSeq === Seq("a", "b", "c", "d", "e", "f")) assert(planned === 4) } finally { From 7ce53ef91a4410d97a06728eba47b89f991c4945 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sun, 29 May 2016 12:27:31 +0900 Subject: [PATCH 12/19] Fix a comment. --- .../scala/org/apache/spark/sql/execution/QueryExecution.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala index 9301bd286cc9c..3c74e74e58014 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala @@ -74,8 +74,8 @@ class QueryExecution(val sparkSession: SparkSession, val logical: LogicalPlan) { lazy val sparkPlan: SparkPlan = { SparkSession.setActiveSession(sparkSession) - // TODO: We use next(), i.e. take the first one of plans returned by the planner, here for now, - // but we will need to choice the best plan. + // TODO: We use next(), i.e. take the first plan returned by the planner, here for now, + // but we will implement to choose the best plan. planner.plan(ReturnAnswer(optimizedPlan)).next() } From 98c8c86772a76b313309b7d73b8ab65bd2919286 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sun, 29 May 2016 12:28:36 +0900 Subject: [PATCH 13/19] Make Iterator first instread of using view. --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 4bbf72a192da6..7782597bd9399 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -57,7 +57,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... - val iter = strategies.view.flatMap(_(plan)).toIterator.flatMap { physicalPlan => + val iter = strategies.iterator.flatMap(_(plan)).flatMap { physicalPlan => val placeholders = collectPlaceholders(physicalPlan) placeholders.toIterator.foldLeft(Iterator(physicalPlan)) { From fbc9a9f08b65e1210e9db9eb92a9769bcfb693a9 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Sun, 29 May 2016 12:46:09 +0900 Subject: [PATCH 14/19] Use `iterator` instead of `toIterator`. --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 7782597bd9399..62e87f5787c39 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -60,7 +60,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { val iter = strategies.iterator.flatMap(_(plan)).flatMap { physicalPlan => val placeholders = collectPlaceholders(physicalPlan) - placeholders.toIterator.foldLeft(Iterator(physicalPlan)) { + placeholders.iterator.foldLeft(Iterator(physicalPlan)) { case (physicalPlans, (placeholder, logicalPlan)) => val children = this.plan(logicalPlan) physicalPlans.flatMap { physicalPlan => From a61584b9dfb8360ee4af66ca7a6edda9bbe22a78 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Mon, 30 May 2016 12:10:20 +0900 Subject: [PATCH 15/19] Add comments. --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 62e87f5787c39..1ae4543896e55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -60,10 +60,13 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { val iter = strategies.iterator.flatMap(_(plan)).flatMap { physicalPlan => val placeholders = collectPlaceholders(physicalPlan) + // Plan logical plan marked as [[planLater]] and replace placeholders placeholders.iterator.foldLeft(Iterator(physicalPlan)) { case (physicalPlans, (placeholder, logicalPlan)) => + // Plan the logical plan for the placeholder val children = this.plan(logicalPlan) physicalPlans.flatMap { physicalPlan => + // Replace the placeholder by the child plans children.map { child => physicalPlan.transformUp { case p if p == placeholder => child From 2de01c3be6df2391e27b85cbf734953017847ecf Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 31 May 2016 07:50:44 +0900 Subject: [PATCH 16/19] Fix comments. --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 2 +- .../scala/org/apache/spark/sql/execution/SparkPlanner.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 1ae4543896e55..42ca1fc355709 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -83,6 +83,6 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { /** Collects placeholders marked as [[planLater]] by strategy and its [[LogicalPlan]]s */ protected def collectPlaceholders(plan: PhysicalPlan): Seq[(PhysicalPlan, LogicalPlan)] - /** Prunes bad plans to prevent from combinatorial explosion. */ + /** Prunes bad plans to prevent combinatorial explosion. */ protected def prunePlans(plans: Iterator[PhysicalPlan]): Iterator[PhysicalPlan] } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 97b02b7b53b60..41ab8f6c78b87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -51,7 +51,7 @@ class SparkPlanner( override protected def prunePlans(plans: Iterator[SparkPlan]): Iterator[SparkPlan] = { // TODO: We will need to prune bad plans when we improve plan space exploration - // to prevent from combinatorial explosion. + // to prevent combinatorial explosion. plans } From 61001517868fd5ac149346ea791fc8d6408bceec Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 31 May 2016 07:55:23 +0900 Subject: [PATCH 17/19] Fix style. --- .../scala/org/apache/spark/sql/execution/SparkPlanner.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala index 41ab8f6c78b87..73e2ffdf007d3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlanner.scala @@ -44,8 +44,8 @@ class SparkPlanner( BasicOperators :: Nil) override protected def collectPlaceholders(plan: SparkPlan): Seq[(SparkPlan, LogicalPlan)] = { - plan.collect { case placeholder @ PlanLater(logicalPlan) => - placeholder -> logicalPlan + plan.collect { + case placeholder @ PlanLater(logicalPlan) => placeholder -> logicalPlan } } From d35d55069d592f1d315607e2dc43a5425d436067 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 31 May 2016 08:54:22 +0900 Subject: [PATCH 18/19] Modify `QueryPlanner.plan` for readability. --- .../sql/catalyst/planning/QueryPlanner.scala | 40 ++++++++++++------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index 42ca1fc355709..ccb00d2f4df14 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -57,25 +57,37 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { def plan(plan: LogicalPlan): Iterator[PhysicalPlan] = { // Obviously a lot to do here still... - val iter = strategies.iterator.flatMap(_(plan)).flatMap { physicalPlan => - val placeholders = collectPlaceholders(physicalPlan) - // Plan logical plan marked as [[planLater]] and replace placeholders - placeholders.iterator.foldLeft(Iterator(physicalPlan)) { - case (physicalPlans, (placeholder, logicalPlan)) => - // Plan the logical plan for the placeholder - val children = this.plan(logicalPlan) - physicalPlans.flatMap { physicalPlan => - // Replace the placeholder by the child plans - children.map { child => - physicalPlan.transformUp { - case p if p == placeholder => child + // Collect physical plan candidates. + val candidates = strategies.iterator.flatMap(_(plan)) + + // The candidates may contain placeholders marked as [[planLater]], + // so try to replace them by their child plans. + val plans = candidates.flatMap { candidate => + val placeholders = collectPlaceholders(candidate) + + if (placeholders.isEmpty) { + // Return the candidate as is because it does not contain placeholders. + Iterator(candidate) + } else { + // Plan the logical plan marked as [[planLater]] and replace the placeholders. + placeholders.iterator.foldLeft(Iterator(candidate)) { + case (candidatesWithPlaceholders, (placeholder, logicalPlan)) => + // Plan the logical plan for the placeholder. + val childPlans = this.plan(logicalPlan) + + candidatesWithPlaceholders.flatMap { candidateWithPlaceholders => + childPlans.map { childPlan => + // Replace the placeholder by the child plan + candidateWithPlaceholders.transformUp { + case p if p == placeholder => childPlan + } } } - } + } } } - val pruned = prunePlans(iter) + val pruned = prunePlans(plans) assert(pruned.hasNext, s"No plan for $plan") pruned } From 254381d245cabf3cbad57f7ab06eec155ae79d96 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Tue, 31 May 2016 11:10:57 +0900 Subject: [PATCH 19/19] Modify a little. --- .../org/apache/spark/sql/catalyst/planning/QueryPlanner.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala index ccb00d2f4df14..5f694f44b6e8a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/QueryPlanner.scala @@ -67,7 +67,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { val placeholders = collectPlaceholders(candidate) if (placeholders.isEmpty) { - // Return the candidate as is because it does not contain placeholders. + // Take the candidate as is because it does not contain placeholders. Iterator(candidate) } else { // Plan the logical plan marked as [[planLater]] and replace the placeholders. @@ -87,6 +87,7 @@ abstract class QueryPlanner[PhysicalPlan <: TreeNode[PhysicalPlan]] { } } } + val pruned = prunePlans(plans) assert(pruned.hasNext, s"No plan for $plan") pruned