Skip to content

Commit

Permalink
improve explain on subquery
Browse files Browse the repository at this point in the history
  • Loading branch information
Davies Liu committed Feb 20, 2016
1 parent a4bae33 commit d0974cf
Show file tree
Hide file tree
Showing 9 changed files with 176 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ class Analyzer(
}
substituted.getOrElse(u)
case other =>
// This can't be done in ResolveSubquery because that does not know the CTE.
other transformExpressions {
case e: SubqueryExpression =>
e.withNewPlan(substituteCTE(e.query, cteRelations))
Expand Down Expand Up @@ -701,8 +702,10 @@ class Analyzer(
}

/**
* This rule resolve subqueries inside expressions.
*/
* This rule resolve subqueries inside expressions.
*
* Note: CTE are handled in CTESubstitution.
*/
object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper {

private def hasSubquery(e: Expression): Boolean = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,45 @@

package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery}
import org.apache.spark.sql.types.DataType

/**
* A interface for subquery that is used in expressions.
*/
trait SubqueryExpression extends LeafExpression {
* An interface for subquery that is used in expressions.
*/
abstract class SubqueryExpression extends LeafExpression{

/**
* The logical plan of the query.
*/
def query: LogicalPlan

/**
* The underline plan for the query, could be logical plan or physical plan.
*
* This is used to generate tree string.
*/
def plan: QueryPlan[_]

/**
* Updates the query with new logical plan.
*/
def withNewPlan(plan: LogicalPlan): SubqueryExpression
}

/**
* A subquery that will return only one row and one column.
*/
case class ScalarSubquery(query: LogicalPlan) extends SubqueryExpression with CodegenFallback {
* A subquery that will return only one row and one column.
*
* This is not evaluable, it should be converted into SparkScalaSubquery.
*/
case class ScalarSubquery(
query: LogicalPlan,
exprId: ExprId = NamedExpression.newExprId)
extends SubqueryExpression with Unevaluable {

override def plan: LogicalPlan = Subquery(toString, query)

override lazy val resolved: Boolean = query.resolved

Expand All @@ -49,20 +70,12 @@ case class ScalarSubquery(query: LogicalPlan) extends SubqueryExpression with Co
}
}

// It can not be evaluated by optimizer.
override def foldable: Boolean = false
override def nullable: Boolean = true

override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan)

// TODO: support sql()

// the first column in first row from `query`.
private var result: Any = null
override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan, exprId)

def updateResult(v: Any): Unit = {
result = v
}
override def toString: String = s"subquery#${exprId.id}"

override def eval(input: InternalRow): Any = result
// TODO: support sql()
}
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,19 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] {
Batch("Decimal Optimizations", FixedPoint(100),
DecimalAggregates) ::
Batch("LocalRelation", FixedPoint(100),
ConvertToLocalRelation) :: Nil
ConvertToLocalRelation) ::
Batch("Subquery", Once,
Subquery) :: Nil
}

/**
* Optimize all the subqueries inside expression.
*/
object Subquery extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {
case subquery: SubqueryExpression =>
subquery.withNewPlan(execute(subquery.query))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.catalyst.plans

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.Subquery
import org.apache.spark.sql.catalyst.trees.TreeNode
import org.apache.spark.sql.types.{DataType, StructType}

Expand Down Expand Up @@ -226,4 +227,29 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy
protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else ""

override def simpleString: String = statePrefix + super.simpleString

override def generateTreeString(
depth: Int, lastChildren: Seq[Boolean], builder: StringBuilder): StringBuilder = {
if (depth > 0) {
lastChildren.init.foreach { isLast =>
val prefixFragment = if (isLast) " " else ": "
builder.append(prefixFragment)
}

val branch = if (lastChildren.last) "+- " else ":- "
builder.append(branch)
}

builder.append(simpleString)
builder.append("\n")

val allSubqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e})
val allChildren = children ++ allSubqueries.map(e => e.plan)
if (allChildren.nonEmpty) {
allChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder))
allChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder)
}

builder
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,12 @@ class AnalysisErrorSuite extends AnalysisTest {

val dateLit = Literal.create(null, DateType)

errorTest(
"invalid scalar subquery",
testRelation.select(
(ScalarSubquery(testRelation.select('a, dateLit.as('b))) + Literal(1)).as('a)),
"Scalar subquery can only have 1 column, but got 2" :: Nil)

errorTest(
"single invalid type, single arg",
testRelation.select(TestFunction(dateLit :: Nil, IntegerType :: Nil).as('a)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,7 @@ class SQLContext private[sql](
@transient
protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] {
val batches = Seq(
Batch("Subquery", Once, ConvertSubquery(self)),
Batch("Add exchange", Once, EnsureRequirements(self)),
Batch("Whole stage codegen", Once, CollapseCodegenStages(self))
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import scala.concurrent.duration._

import org.apache.spark.Logging
import org.apache.spark.rdd.{RDD, RDDOperationScope}
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
Expand Down Expand Up @@ -127,31 +127,21 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
doPrepare()

// collect all the subqueries and submit jobs to execute them in background
val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]()
val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e})
val queryResults = ArrayBuffer[(SparkScalarSubquery, Future[Array[InternalRow]])]()
val allSubqueries = expressions.flatMap(_.collect {case e: SparkScalarSubquery => e})
allSubqueries.foreach { e =>
val futureResult = scala.concurrent.future {
val df = DataFrame(sqlContext, e.query)
df.queryExecution.toRdd.collect()
val futureResult = Future {
e.plan.executeCollect()
}(SparkPlan.subqueryExecutionContext)
queryResults += e -> futureResult
}

children.foreach(_.prepare())

val timeout: Duration = {
val timeoutValue = sqlContext.conf.broadcastTimeout
if (timeoutValue < 0) {
Duration.Inf
} else {
timeoutValue.seconds
}
}

// fill in the result of subqueries
queryResults.foreach {
case (e, futureResult) =>
val rows = Await.result(futureResult, timeout)
val rows = Await.result(futureResult, Duration.Inf)
if (rows.length > 1) {
sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " +
s"${e.query.treeString}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -343,3 +343,18 @@ case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPl

protected override def doExecute(): RDD[InternalRow] = child.execute()
}

/**
* A plan as subquery.
*
* This is used to generate tree string for SparkScalarSubquery.
*/
case class Subquery(name: String, child: SparkPlan) extends UnaryNode {
override def output: Seq[Attribute] = child.output
override def outputPartitioning: Partitioning = child.outputPartitioning
override def outputOrdering: Seq[SortOrder] = child.outputOrdering

protected override def doExecute(): RDD[InternalRow] = {
child.execute()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution

import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{ExprId, ScalarSubquery, SubqueryExpression}
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.types.DataType

/**
* A subquery that will return only one row and one column.
*
* This is the physical copy of ScalarSubquery to be used inside SparkPlan.
*/
case class SparkScalarSubquery(
@transient executedPlan: SparkPlan,
exprId: ExprId)
extends SubqueryExpression with CodegenFallback {

override def query: LogicalPlan = throw new UnsupportedOperationException
override def withNewPlan(plan: LogicalPlan): SubqueryExpression = {
throw new UnsupportedOperationException
}
override def plan: SparkPlan = Subquery(simpleString, executedPlan)

override def dataType: DataType = executedPlan.schema.fields.head.dataType
override def nullable: Boolean = true
override def toString: String = s"subquery#${exprId.id}"

// the first column in first row from `query`.
private var result: Any = null

def updateResult(v: Any): Unit = {
result = v
}

override def eval(input: InternalRow): Any = result
}

/**
* Convert the subquery from logical plan into executed plan.
*/
private[sql] case class ConvertSubquery(sqlContext: SQLContext) extends Rule[SparkPlan] {
def apply(plan: SparkPlan): SparkPlan = {
plan.transformAllExpressions {
// Only scalar subquery will be executed separately, all others will be written as join.
case subquery: ScalarSubquery =>
val sparkPlan = sqlContext.planner.plan(ReturnAnswer(subquery.query)).next()
val executedPlan = sqlContext.prepareForExecution.execute(sparkPlan)
SparkScalarSubquery(executedPlan, subquery.exprId)
}
}
}

0 comments on commit d0974cf

Please sign in to comment.