From 0665a69752998206d8bc0d35a4f0cfc390492ab9 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 12 Feb 2016 14:09:45 -0800 Subject: [PATCH 1/9] uncorrelated scalar subquery --- .../sql/catalyst/parser/ExpressionParser.g | 2 ++ .../spark/sql/catalyst/CatalystQl.scala | 2 ++ .../sql/catalyst/analysis/Analyzer.scala | 29 +++++++++++++++ .../spark/sql/catalyst/CatalystQlSuite.scala | 8 +++++ .../spark/sql/execution/SparkPlan.scala | 36 ++++++++++++++++++- .../org/apache/spark/sql/SQLQuerySuite.scala | 12 +++++++ 6 files changed, 88 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g index c162c1a0c5789..10f2e2416bb64 100644 --- a/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g +++ b/sql/catalyst/src/main/antlr3/org/apache/spark/sql/catalyst/parser/ExpressionParser.g @@ -205,6 +205,8 @@ atomExpression | whenExpression | (functionName LPAREN) => function | tableOrColumn + | (LPAREN KW_SELECT) => subQueryExpression + -> ^(TOK_SUBQUERY_EXPR ^(TOK_SUBQUERY_OP) subQueryExpression) | LPAREN! expression RPAREN! ; diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala index 8099751900a42..b16025a17e2e8 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystQl.scala @@ -667,6 +667,8 @@ https://cwiki.apache.org/confluence/display/Hive/Enhanced+Aggregation%2C+Cube%2C UnresolvedAttribute(nameParts :+ cleanIdentifier(attr)) case other => UnresolvedExtractValue(other, Literal(cleanIdentifier(attr))) } + case Token("TOK_SUBQUERY_EXPR", Token("TOK_SUBQUERY_OP", Nil) :: subquery :: Nil) => + ScalarSubquery(nodeToPlan(subquery)) /* Stars (*) */ case Token("TOK_ALLCOLREF", Nil) => UnresolvedStar(None) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 26c3d286b19fa..fc6a041731a2e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -80,6 +80,7 @@ class Analyzer( ResolveGenerate :: ResolveFunctions :: ResolveAliases :: + ResolveSubquery :: ResolveWindowOrder :: ResolveWindowFrame :: ResolveNaturalJoin :: @@ -120,7 +121,13 @@ class Analyzer( withAlias.getOrElse(relation) } substituted.getOrElse(u) + case other => + other transformExpressions { + case e: SubqueryExpression => + e.withNewPlan(substituteCTE(e.query, cteRelations)) + } } + } } @@ -693,6 +700,28 @@ class Analyzer( } } + /** + * This rule resolve subqueries inside expressions. + */ + object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper { + + private def hasSubquery(e: Expression): Boolean = { + e.find(_.isInstanceOf[SubqueryExpression]).isDefined + } + + private def hasSubquery(q: LogicalPlan): Boolean = { + q.expressions.exists(hasSubquery) + } + + def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { + case q: LogicalPlan if q.childrenResolved && hasSubquery(q) => + q transformExpressions { + case e: SubqueryExpression if !e.query.resolved => + e.withNewPlan(execute(e.query)) + } + } + } + /** * Turns projections that contain aggregate expressions into aggregations. */ diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala index 8d7d6b5bf52ea..d729cab4446c8 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala @@ -201,4 +201,12 @@ class CatalystQlSuite extends PlanTest { parser.parsePlan("select sum(product + 1) over (partition by (product + (1)) order by 2) " + "from windowData") } + + test("subquery") { + parser.parsePlan("select (select max(b) from s) ss from t") + parser.parsePlan("select * from t where a = (select b from s)") + parser.parsePlan("select * from t where a > (select b from s)") + parser.parsePlan("select * from t group by g having a = (select b from s)") + parser.parsePlan("select * from t group by g having a > (select b from s)") + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index c72b8dc70708f..12df648e68f3b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -20,10 +20,12 @@ package org.apache.spark.sql.execution import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable.ArrayBuffer +import scala.concurrent.{Await, ExecutionContext, Future} +import scala.concurrent.duration.Duration import org.apache.spark.Logging import org.apache.spark.rdd.{RDD, RDDOperationScope} -import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.{DataFrame, Row, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ @@ -31,6 +33,7 @@ import org.apache.spark.sql.catalyst.plans.QueryPlan import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric} import org.apache.spark.sql.types.DataType +import org.apache.spark.util.ThreadUtils /** * The base class for physical operators. @@ -122,7 +125,33 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ final def prepare(): Unit = { if (prepareCalled.compareAndSet(false, true)) { doPrepare() + + // collect all the subqueries and submit jobs to execute them in background + val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]() + val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e}) + allSubqueries.foreach { e => + val futureResult = scala.concurrent.future { + val df = DataFrame(sqlContext, e.query) + df.queryExecution.toRdd.collect() + }(SparkPlan.subqueryExecutionContext) + queryResults += e -> futureResult + } + children.foreach(_.prepare()) + + // fill in the result of subqueries + queryResults.foreach { + case (e, futureResult) => + val rows = Await.result(futureResult, Duration.Inf) + if (rows.length > 1) { + sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " + + s"${e.query.treeString}") + } + // Analyzer will make sure that it only return on column + if (rows.length > 0) { + e.updateResult(rows(0).get(0, e.dataType)) + } + } } } @@ -231,6 +260,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } } +object SparkPlan { + private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) +} + private[sql] trait LeafNode extends SparkPlan { override def children: Seq[SparkPlan] = Nil override def producedAttributes: AttributeSet = outputSet diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index f665a1c87bd78..bd1f89beaf192 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2105,6 +2105,18 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { assert(error.getMessage contains "grouping_id() can only be used with GroupingSets/Cube/Rollup") } + test("uncorrelated scalar subquery") { + assertResult(Array(Row(1))) { + sql("select (select 1 as b) as b").collect() + } + + assertResult(Array(Row(1))) { + sql("with t2 as (select 1 as b, 2 as c) " + + "select a from (select 1 as a union all select 2 as a) t " + + "where a = (select max(b) from t2) ").collect() + } + } + test("SPARK-13056: Null in map value causes NPE") { val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value") withTempTable("maptest") { From 236ac88306e1d5613295eb862b2819b331fc4002 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 12 Feb 2016 15:48:41 -0800 Subject: [PATCH 2/9] add missing file --- .../sql/catalyst/expressions/subquery.scala | 66 +++++++++++++++++++ 1 file changed, 66 insertions(+) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala new file mode 100644 index 0000000000000..909e2dc999561 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.expressions + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.types.DataType + +/** + * A interface for subquery that is used in expressions. + */ +trait SubqueryExpression extends LeafExpression { + def query: LogicalPlan + def withNewPlan(plan: LogicalPlan): SubqueryExpression +} + +/** + * A subquery that will return only one row and one column. + */ +case class ScalarSubquery(query: LogicalPlan) extends SubqueryExpression with CodegenFallback { + + override lazy val resolved: Boolean = query.resolved + + override def dataType: DataType = query.schema.fields.head.dataType + + override def checkInputDataTypes(): TypeCheckResult = { + if (query.schema.length != 1) { + TypeCheckResult.TypeCheckFailure("Scalar subquery can only have 1 column, but got " + + query.schema.length.toString) + } else { + TypeCheckResult.TypeCheckSuccess + } + } + + // It can not be evaluated by optimizer. + override def foldable: Boolean = false + override def nullable: Boolean = true + + override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan) + + // the first column in first row from `query`. + private var result: Any = null + + def updateResult(v: Any): Unit = { + result = v + } + + override def eval(input: InternalRow): Any = result +} From 016c36c316e0fe2a544646291ba3e4573aa4bb4c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 12 Feb 2016 15:52:59 -0800 Subject: [PATCH 3/9] use broadcastTimeout --- .../org/apache/spark/sql/execution/SparkPlan.scala | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 12df648e68f3b..f17838c08001d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicBoolean import scala.collection.mutable.ArrayBuffer import scala.concurrent.{Await, ExecutionContext, Future} -import scala.concurrent.duration.Duration +import scala.concurrent.duration._ import org.apache.spark.Logging import org.apache.spark.rdd.{RDD, RDDOperationScope} @@ -139,10 +139,19 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ children.foreach(_.prepare()) + val timeout: Duration = { + val timeoutValue = sqlContext.conf.broadcastTimeout + if (timeoutValue < 0) { + Duration.Inf + } else { + timeoutValue.seconds + } + } + // fill in the result of subqueries queryResults.foreach { case (e, futureResult) => - val rows = Await.result(futureResult, Duration.Inf) + val rows = Await.result(futureResult, timeout) if (rows.length > 1) { sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " + s"${e.query.treeString}") From a4bae33c7f42660d13f5f8cf98d34bae0325f6ad Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 12 Feb 2016 22:07:03 -0800 Subject: [PATCH 4/9] address comments --- .../sql/catalyst/expressions/subquery.scala | 2 + .../spark/sql/catalyst/CatalystQlSuite.scala | 48 +++++++++++++++++-- .../org/apache/spark/sql/SQLQuerySuite.scala | 4 ++ 3 files changed, 49 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index 909e2dc999561..f6f307fcb99b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -55,6 +55,8 @@ case class ScalarSubquery(query: LogicalPlan) extends SubqueryExpression with Co override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan) + // TODO: support sql() + // the first column in first row from `query`. private var result: Any = null diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala index d729cab4446c8..9d80d4b2ce5b3 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala @@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.PlanTest import org.apache.spark.sql.catalyst.plans.logical._ +import org.apache.spark.sql.types.BooleanType import org.apache.spark.unsafe.types.CalendarInterval class CatalystQlSuite extends PlanTest { @@ -203,10 +204,47 @@ class CatalystQlSuite extends PlanTest { } test("subquery") { - parser.parsePlan("select (select max(b) from s) ss from t") - parser.parsePlan("select * from t where a = (select b from s)") - parser.parsePlan("select * from t where a > (select b from s)") - parser.parsePlan("select * from t group by g having a = (select b from s)") - parser.parsePlan("select * from t group by g having a > (select b from s)") + comparePlans( + parser.parsePlan("select (select max(b) from s) ss from t"), + Project( + UnresolvedAlias( + Alias( + ScalarSubquery( + Project( + UnresolvedAlias( + UnresolvedFunction("max", UnresolvedAttribute("b") :: Nil, false)) :: Nil, + UnresolvedRelation(TableIdentifier("s")))), + "ss")(ExprId(0))) :: Nil, + UnresolvedRelation(TableIdentifier("t")))) + comparePlans( + parser.parsePlan("select * from t where a = (select b from s)"), + Project( + UnresolvedAlias( + UnresolvedStar(None)) :: Nil, + Filter( + EqualTo( + UnresolvedAttribute("a"), + ScalarSubquery( + Project( + UnresolvedAlias( + UnresolvedAttribute("b")) :: Nil, + UnresolvedRelation(TableIdentifier("s"))))), + UnresolvedRelation(TableIdentifier("t"))))) + comparePlans( + parser.parsePlan("select * from t group by g having a > (select b from s)"), + Filter( + Cast( + GreaterThan( + UnresolvedAttribute("a"), + ScalarSubquery( + Project( + UnresolvedAlias( + UnresolvedAttribute("b")) :: Nil, + UnresolvedRelation(TableIdentifier("s"))))), + BooleanType), + Aggregate( + UnresolvedAttribute("g") :: Nil, + UnresolvedAlias(UnresolvedStar(None)) :: Nil, + UnresolvedRelation(TableIdentifier("t"))))) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index bd1f89beaf192..63bb015642f56 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -2115,6 +2115,10 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { "select a from (select 1 as a union all select 2 as a) t " + "where a = (select max(b) from t2) ").collect() } + + assertResult(Array(Row(3))) { + sql("select (select (select 1) + 1) + 1").collect() + } } test("SPARK-13056: Null in map value causes NPE") { From d0974cf2fc50f81e8663273c7b9e60be6913c5c7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 19 Feb 2016 17:28:54 -0800 Subject: [PATCH 5/9] improve explain on subquery --- .../sql/catalyst/analysis/Analyzer.scala | 7 +- .../sql/catalyst/expressions/subquery.scala | 53 ++++++++------ .../sql/catalyst/optimizer/Optimizer.scala | 14 +++- .../spark/sql/catalyst/plans/QueryPlan.scala | 26 +++++++ .../analysis/AnalysisErrorSuite.scala | 6 ++ .../org/apache/spark/sql/SQLContext.scala | 1 + .../spark/sql/execution/SparkPlan.scala | 22 ++---- .../spark/sql/execution/basicOperators.scala | 15 ++++ .../apache/spark/sql/execution/subquery.scala | 71 +++++++++++++++++++ 9 files changed, 176 insertions(+), 39 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fc6a041731a2e..04e56a8fdaf7d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -122,6 +122,7 @@ class Analyzer( } substituted.getOrElse(u) case other => + // This can't be done in ResolveSubquery because that does not know the CTE. other transformExpressions { case e: SubqueryExpression => e.withNewPlan(substituteCTE(e.query, cteRelations)) @@ -701,8 +702,10 @@ class Analyzer( } /** - * This rule resolve subqueries inside expressions. - */ + * This rule resolve subqueries inside expressions. + * + * Note: CTE are handled in CTESubstitution. + */ object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper { private def hasSubquery(e: Expression): Boolean = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index f6f307fcb99b1..2482fdf2de5ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -17,24 +17,45 @@ package org.apache.spark.sql.catalyst.expressions -import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.analysis.TypeCheckResult -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Subquery} import org.apache.spark.sql.types.DataType /** - * A interface for subquery that is used in expressions. - */ -trait SubqueryExpression extends LeafExpression { + * An interface for subquery that is used in expressions. + */ +abstract class SubqueryExpression extends LeafExpression{ + + /** + * The logical plan of the query. + */ def query: LogicalPlan + + /** + * The underline plan for the query, could be logical plan or physical plan. + * + * This is used to generate tree string. + */ + def plan: QueryPlan[_] + + /** + * Updates the query with new logical plan. + */ def withNewPlan(plan: LogicalPlan): SubqueryExpression } /** - * A subquery that will return only one row and one column. - */ -case class ScalarSubquery(query: LogicalPlan) extends SubqueryExpression with CodegenFallback { + * A subquery that will return only one row and one column. + * + * This is not evaluable, it should be converted into SparkScalaSubquery. + */ +case class ScalarSubquery( + query: LogicalPlan, + exprId: ExprId = NamedExpression.newExprId) + extends SubqueryExpression with Unevaluable { + + override def plan: LogicalPlan = Subquery(toString, query) override lazy val resolved: Boolean = query.resolved @@ -49,20 +70,12 @@ case class ScalarSubquery(query: LogicalPlan) extends SubqueryExpression with Co } } - // It can not be evaluated by optimizer. override def foldable: Boolean = false override def nullable: Boolean = true - override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan) - - // TODO: support sql() - - // the first column in first row from `query`. - private var result: Any = null + override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan, exprId) - def updateResult(v: Any): Unit = { - result = v - } + override def toString: String = s"subquery#${exprId.id}" - override def eval(input: InternalRow): Any = result + // TODO: support sql() } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 902e18081bddf..1f61aac2b1381 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -88,7 +88,19 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { Batch("Decimal Optimizations", FixedPoint(100), DecimalAggregates) :: Batch("LocalRelation", FixedPoint(100), - ConvertToLocalRelation) :: Nil + ConvertToLocalRelation) :: + Batch("Subquery", Once, + Subquery) :: Nil + } + + /** + * Optimize all the subqueries inside expression. + */ + object Subquery extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { + case subquery: SubqueryExpression => + subquery.withNewPlan(execute(subquery.query)) + } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index 05f5bdbfc0769..a59681a1152b6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.plans import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.plans.logical.Subquery import org.apache.spark.sql.catalyst.trees.TreeNode import org.apache.spark.sql.types.{DataType, StructType} @@ -226,4 +227,29 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy protected def statePrefix = if (missingInput.nonEmpty && children.nonEmpty) "!" else "" override def simpleString: String = statePrefix + super.simpleString + + override def generateTreeString( + depth: Int, lastChildren: Seq[Boolean], builder: StringBuilder): StringBuilder = { + if (depth > 0) { + lastChildren.init.foreach { isLast => + val prefixFragment = if (isLast) " " else ": " + builder.append(prefixFragment) + } + + val branch = if (lastChildren.last) "+- " else ":- " + builder.append(branch) + } + + builder.append(simpleString) + builder.append("\n") + + val allSubqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e}) + val allChildren = children ++ allSubqueries.map(e => e.plan) + if (allChildren.nonEmpty) { + allChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder)) + allChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder) + } + + builder + } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index e0cec09742eba..de10ba9c91372 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -113,6 +113,12 @@ class AnalysisErrorSuite extends AnalysisTest { val dateLit = Literal.create(null, DateType) + errorTest( + "invalid scalar subquery", + testRelation.select( + (ScalarSubquery(testRelation.select('a, dateLit.as('b))) + Literal(1)).as('a)), + "Scalar subquery can only have 1 column, but got 2" :: Nil) + errorTest( "single invalid type, single arg", testRelation.select(TestFunction(dateLit :: Nil, IntegerType :: Nil).as('a)), diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d58b99655c1eb..a4ce0eb592a18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -884,6 +884,7 @@ class SQLContext private[sql]( @transient protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { val batches = Seq( + Batch("Subquery", Once, ConvertSubquery(self)), Batch("Add exchange", Once, EnsureRequirements(self)), Batch("Whole stage codegen", Once, CollapseCodegenStages(self)) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index f17838c08001d..ef08365a583a8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -25,7 +25,7 @@ import scala.concurrent.duration._ import org.apache.spark.Logging import org.apache.spark.rdd.{RDD, RDDOperationScope} -import org.apache.spark.sql.{DataFrame, Row, SQLContext} +import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.codegen._ @@ -127,31 +127,21 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ doPrepare() // collect all the subqueries and submit jobs to execute them in background - val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]() - val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e}) + val queryResults = ArrayBuffer[(SparkScalarSubquery, Future[Array[InternalRow]])]() + val allSubqueries = expressions.flatMap(_.collect {case e: SparkScalarSubquery => e}) allSubqueries.foreach { e => - val futureResult = scala.concurrent.future { - val df = DataFrame(sqlContext, e.query) - df.queryExecution.toRdd.collect() + val futureResult = Future { + e.plan.executeCollect() }(SparkPlan.subqueryExecutionContext) queryResults += e -> futureResult } children.foreach(_.prepare()) - val timeout: Duration = { - val timeoutValue = sqlContext.conf.broadcastTimeout - if (timeoutValue < 0) { - Duration.Inf - } else { - timeoutValue.seconds - } - } - // fill in the result of subqueries queryResults.foreach { case (e, futureResult) => - val rows = Await.result(futureResult, timeout) + val rows = Await.result(futureResult, Duration.Inf) if (rows.length > 1) { sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " + s"${e.query.treeString}") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 4b82d5563460b..5f065eb173f12 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -343,3 +343,18 @@ case class OutputFaker(output: Seq[Attribute], child: SparkPlan) extends SparkPl protected override def doExecute(): RDD[InternalRow] = child.execute() } + +/** + * A plan as subquery. + * + * This is used to generate tree string for SparkScalarSubquery. + */ +case class Subquery(name: String, child: SparkPlan) extends UnaryNode { + override def output: Seq[Attribute] = child.output + override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputOrdering: Seq[SortOrder] = child.outputOrdering + + protected override def doExecute(): RDD[InternalRow] = { + child.execute() + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala new file mode 100644 index 0000000000000..78fab20e8b652 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{ExprId, ScalarSubquery, SubqueryExpression} +import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.types.DataType + +/** + * A subquery that will return only one row and one column. + * + * This is the physical copy of ScalarSubquery to be used inside SparkPlan. + */ +case class SparkScalarSubquery( + @transient executedPlan: SparkPlan, + exprId: ExprId) + extends SubqueryExpression with CodegenFallback { + + override def query: LogicalPlan = throw new UnsupportedOperationException + override def withNewPlan(plan: LogicalPlan): SubqueryExpression = { + throw new UnsupportedOperationException + } + override def plan: SparkPlan = Subquery(simpleString, executedPlan) + + override def dataType: DataType = executedPlan.schema.fields.head.dataType + override def nullable: Boolean = true + override def toString: String = s"subquery#${exprId.id}" + + // the first column in first row from `query`. + private var result: Any = null + + def updateResult(v: Any): Unit = { + result = v + } + + override def eval(input: InternalRow): Any = result +} + +/** + * Convert the subquery from logical plan into executed plan. + */ +private[sql] case class ConvertSubquery(sqlContext: SQLContext) extends Rule[SparkPlan] { + def apply(plan: SparkPlan): SparkPlan = { + plan.transformAllExpressions { + // Only scalar subquery will be executed separately, all others will be written as join. + case subquery: ScalarSubquery => + val sparkPlan = sqlContext.planner.plan(ReturnAnswer(subquery.query)).next() + val executedPlan = sqlContext.prepareForExecution.execute(sparkPlan) + SparkScalarSubquery(executedPlan, subquery.exprId) + } + } +} From 3a8f08d286a70ecf00055c51c41baf567c8bcb7a Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 19 Feb 2016 23:51:25 -0800 Subject: [PATCH 6/9] address comments --- .../sql/catalyst/expressions/subquery.scala | 13 +++--- .../sql/catalyst/optimizer/Optimizer.scala | 6 +-- .../spark/sql/catalyst/CatalystQlSuite.scala | 45 ++----------------- .../analysis/AnalysisErrorSuite.scala | 9 +++- .../org/apache/spark/sql/SQLContext.scala | 2 +- .../spark/sql/execution/SparkPlan.scala | 9 ++-- .../apache/spark/sql/execution/subquery.scala | 13 +++--- 7 files changed, 33 insertions(+), 64 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala index 2482fdf2de5ea..a8f5e1f63d4c7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/subquery.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.types.DataType /** * An interface for subquery that is used in expressions. */ -abstract class SubqueryExpression extends LeafExpression{ +abstract class SubqueryExpression extends LeafExpression { /** * The logical plan of the query. @@ -33,9 +33,8 @@ abstract class SubqueryExpression extends LeafExpression{ def query: LogicalPlan /** - * The underline plan for the query, could be logical plan or physical plan. - * - * This is used to generate tree string. + * Either a logical plan or a physical plan. The generated tree string (explain output) uses this + * field to explain the subquery. */ def plan: QueryPlan[_] @@ -48,7 +47,9 @@ abstract class SubqueryExpression extends LeafExpression{ /** * A subquery that will return only one row and one column. * - * This is not evaluable, it should be converted into SparkScalaSubquery. + * This will be converted into [[execution.ScalarSubquery]] during physical planning. + * + * Note: `exprId` is used to have unique name in explain string output. */ case class ScalarSubquery( query: LogicalPlan, @@ -63,7 +64,7 @@ case class ScalarSubquery( override def checkInputDataTypes(): TypeCheckResult = { if (query.schema.length != 1) { - TypeCheckResult.TypeCheckFailure("Scalar subquery can only have 1 column, but got " + + TypeCheckResult.TypeCheckFailure("Scalar subquery must return only one column, but got " + query.schema.length.toString) } else { TypeCheckResult.TypeCheckSuccess diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 1f61aac2b1381..f1f438075164e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -90,16 +90,16 @@ abstract class Optimizer extends RuleExecutor[LogicalPlan] { Batch("LocalRelation", FixedPoint(100), ConvertToLocalRelation) :: Batch("Subquery", Once, - Subquery) :: Nil + OptimizeSubqueries) :: Nil } /** * Optimize all the subqueries inside expression. */ - object Subquery extends Rule[LogicalPlan] { + object OptimizeSubqueries extends Rule[LogicalPlan] { def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { case subquery: SubqueryExpression => - subquery.withNewPlan(execute(subquery.query)) + subquery.withNewPlan(Optimizer.this.execute(subquery.query)) } } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala index 9d80d4b2ce5b3..ed7121831ac29 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/CatalystQlSuite.scala @@ -204,47 +204,8 @@ class CatalystQlSuite extends PlanTest { } test("subquery") { - comparePlans( - parser.parsePlan("select (select max(b) from s) ss from t"), - Project( - UnresolvedAlias( - Alias( - ScalarSubquery( - Project( - UnresolvedAlias( - UnresolvedFunction("max", UnresolvedAttribute("b") :: Nil, false)) :: Nil, - UnresolvedRelation(TableIdentifier("s")))), - "ss")(ExprId(0))) :: Nil, - UnresolvedRelation(TableIdentifier("t")))) - comparePlans( - parser.parsePlan("select * from t where a = (select b from s)"), - Project( - UnresolvedAlias( - UnresolvedStar(None)) :: Nil, - Filter( - EqualTo( - UnresolvedAttribute("a"), - ScalarSubquery( - Project( - UnresolvedAlias( - UnresolvedAttribute("b")) :: Nil, - UnresolvedRelation(TableIdentifier("s"))))), - UnresolvedRelation(TableIdentifier("t"))))) - comparePlans( - parser.parsePlan("select * from t group by g having a > (select b from s)"), - Filter( - Cast( - GreaterThan( - UnresolvedAttribute("a"), - ScalarSubquery( - Project( - UnresolvedAlias( - UnresolvedAttribute("b")) :: Nil, - UnresolvedRelation(TableIdentifier("s"))))), - BooleanType), - Aggregate( - UnresolvedAttribute("g") :: Nil, - UnresolvedAlias(UnresolvedStar(None)) :: Nil, - UnresolvedRelation(TableIdentifier("t"))))) + parser.parsePlan("select (select max(b) from s) ss from t") + parser.parsePlan("select * from t where a = (select b from s)") + parser.parsePlan("select * from t group by g having a > (select b from s)") } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala index de10ba9c91372..ca6dcd8bdfb84 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala @@ -114,10 +114,15 @@ class AnalysisErrorSuite extends AnalysisTest { val dateLit = Literal.create(null, DateType) errorTest( - "invalid scalar subquery", + "scalar subquery with 2 columns", testRelation.select( (ScalarSubquery(testRelation.select('a, dateLit.as('b))) + Literal(1)).as('a)), - "Scalar subquery can only have 1 column, but got 2" :: Nil) + "Scalar subquery must return only one column, but got 2" :: Nil) + + errorTest( + "scalar subquery with no column", + testRelation.select(ScalarSubquery(LocalRelation()).as('a)), + "Scalar subquery must return only one column, but got 0" :: Nil) errorTest( "single invalid type, single arg", diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a4ce0eb592a18..55325c1662e2d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -884,7 +884,7 @@ class SQLContext private[sql]( @transient protected[sql] val prepareForExecution = new RuleExecutor[SparkPlan] { val batches = Seq( - Batch("Subquery", Once, ConvertSubquery(self)), + Batch("Subquery", Once, PlanSubqueries(self)), Batch("Add exchange", Once, EnsureRequirements(self)), Batch("Whole stage codegen", Once, CollapseCodegenStages(self)) ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ef08365a583a8..ad29942420899 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -127,8 +127,8 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ doPrepare() // collect all the subqueries and submit jobs to execute them in background - val queryResults = ArrayBuffer[(SparkScalarSubquery, Future[Array[InternalRow]])]() - val allSubqueries = expressions.flatMap(_.collect {case e: SparkScalarSubquery => e}) + val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]() + val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e}) allSubqueries.foreach { e => val futureResult = Future { e.plan.executeCollect() @@ -146,9 +146,12 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " + s"${e.query.treeString}") } - // Analyzer will make sure that it only return on column if (rows.length > 0) { + assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column") e.updateResult(rows(0).get(0, e.dataType)) + } else { + // the result should be null, since the expression already have null as default value, + // we don't need to update that. } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 78fab20e8b652..67426dde6b5cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{ExprId, ScalarSubquery, SubqueryExpression} +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{ExprId, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule @@ -30,7 +30,7 @@ import org.apache.spark.sql.types.DataType * * This is the physical copy of ScalarSubquery to be used inside SparkPlan. */ -case class SparkScalarSubquery( +case class ScalarSubquery( @transient executedPlan: SparkPlan, exprId: ExprId) extends SubqueryExpression with CodegenFallback { @@ -58,14 +58,13 @@ case class SparkScalarSubquery( /** * Convert the subquery from logical plan into executed plan. */ -private[sql] case class ConvertSubquery(sqlContext: SQLContext) extends Rule[SparkPlan] { +private[sql] case class PlanSubqueries(sqlContext: SQLContext) extends Rule[SparkPlan] { def apply(plan: SparkPlan): SparkPlan = { plan.transformAllExpressions { - // Only scalar subquery will be executed separately, all others will be written as join. - case subquery: ScalarSubquery => + case subquery: expressions.ScalarSubquery => val sparkPlan = sqlContext.planner.plan(ReturnAnswer(subquery.query)).next() val executedPlan = sqlContext.prepareForExecution.execute(sparkPlan) - SparkScalarSubquery(executedPlan, subquery.exprId) + ScalarSubquery(executedPlan, subquery.exprId) } } } From 75961739d8c83a1591eaeb93a74598a7f1861a88 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 20 Feb 2016 00:38:26 -0800 Subject: [PATCH 7/9] add tests and codegen --- .../spark/sql/execution/SparkPlan.scala | 11 ++- .../apache/spark/sql/execution/subquery.scala | 18 +++- .../org/apache/spark/sql/SQLQuerySuite.scala | 23 +---- .../org/apache/spark/sql/SubquerySuite.scala | 84 +++++++++++++++++++ 4 files changed, 105 insertions(+), 31 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index ad29942420899..dcdfc4cf29e7a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -131,7 +131,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e}) allSubqueries.foreach { e => val futureResult = Future { - e.plan.executeCollect() + e.plan.executeTake(2) }(SparkPlan.subqueryExecutionContext) queryResults += e -> futureResult } @@ -143,15 +143,14 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ case (e, futureResult) => val rows = Await.result(futureResult, Duration.Inf) if (rows.length > 1) { - sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " + - s"${e.query.treeString}") + sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}") } - if (rows.length > 0) { + if (rows.length == 1) { assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column") e.updateResult(rows(0).get(0, e.dataType)) } else { - // the result should be null, since the expression already have null as default value, - // we don't need to update that. + // There is no rows returned, the result should be null. + e.updateResult(null) } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 67426dde6b5cb..9f456442380f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.SQLContext -import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} import org.apache.spark.sql.catalyst.expressions.{ExprId, SubqueryExpression} -import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.types.DataType /** @@ -33,7 +33,7 @@ import org.apache.spark.sql.types.DataType case class ScalarSubquery( @transient executedPlan: SparkPlan, exprId: ExprId) - extends SubqueryExpression with CodegenFallback { + extends SubqueryExpression { override def query: LogicalPlan = throw new UnsupportedOperationException override def withNewPlan(plan: LogicalPlan): SubqueryExpression = { @@ -53,6 +53,18 @@ case class ScalarSubquery( } override def eval(input: InternalRow): Any = result + + override def genCode(ctx: CodegenContext, ev: ExprCode): String = { + val thisTerm = ctx.addReferenceObj("subquery", this) + val isNull = ctx.freshName("isNull") + ctx.addMutableState("boolean", isNull, s"$isNull = $thisTerm.eval(null) == null;") + val value = ctx.freshName("value") + ctx.addMutableState(ctx.javaType(dataType), value, + s"$value = (${ctx.boxedType(dataType)}) $thisTerm.eval(null);") + ev.isNull = isNull + ev.value = value + "" + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 63bb015642f56..d2ddf0af4f041 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -21,18 +21,13 @@ import java.math.MathContext import java.sql.Timestamp import org.apache.spark.AccumulatorSuite -import org.apache.spark.sql.catalyst.CatalystQl -import org.apache.spark.sql.catalyst.analysis.FunctionRegistry -import org.apache.spark.sql.catalyst.parser.ParserConf -import org.apache.spark.sql.execution.{aggregate, SparkQl} +import org.apache.spark.sql.execution.aggregate import org.apache.spark.sql.execution.joins.{CartesianProduct, SortMergeJoin} -import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.functions._ import org.apache.spark.sql.test.{SharedSQLContext, TestSQLContext} import org.apache.spark.sql.test.SQLTestData._ import org.apache.spark.sql.types._ - class SQLQuerySuite extends QueryTest with SharedSQLContext { import testImplicits._ @@ -2105,22 +2100,6 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { assert(error.getMessage contains "grouping_id() can only be used with GroupingSets/Cube/Rollup") } - test("uncorrelated scalar subquery") { - assertResult(Array(Row(1))) { - sql("select (select 1 as b) as b").collect() - } - - assertResult(Array(Row(1))) { - sql("with t2 as (select 1 as b, 2 as c) " + - "select a from (select 1 as a union all select 2 as a) t " + - "where a = (select max(b) from t2) ").collect() - } - - assertResult(Array(Row(3))) { - sql("select (select (select 1) + 1) + 1").collect() - } - } - test("SPARK-13056: Null in map value causes NPE") { val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value") withTempTable("maptest") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala new file mode 100644 index 0000000000000..6b4cae425e4ec --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql + +import org.apache.spark.sql.test.SharedSQLContext + +class SubquerySuite extends QueryTest with SharedSQLContext { + import testImplicits._ + + setupTestData() + + test("simple uncorrelated scalar subquery") { + assertResult(Array(Row(1))) { + sql("select (select 1 as b) as b").collect() + } + + assertResult(Array(Row(1))) { + sql("with t2 as (select 1 as b, 2 as c) " + + "select a from (select 1 as a union all select 2 as a) t " + + "where a = (select max(b) from t2) ").collect() + } + + assertResult(Array(Row(3))) { + sql("select (select (select 1) + 1) + 1").collect() + } + + // more than one columns + val error = intercept[AnalysisException] { + sql("select (select 1, 2) as b").collect() + } + assert(error.message contains "Scalar subquery must return only one column, but got 2") + + // more than one rows + val error2 = intercept[RuntimeException] { + sql("select (select a from (select 1 as a union all select 2 as a) t) as b").collect() + } + assert(error2.getMessage contains + "more than one row returned by a subquery used as an expression") + + // string type + assertResult(Array(Row("s"))) { + sql("select (select 's' as s) as b").collect() + } + + // zero rows + assertResult(Array(Row(null))) { + sql("select (select 's' as s limit 0) as b").collect() + } + } + + test("uncorrelated scalar subquery on testData") { + assertResult(Array(Row(5))) { + sql("select (select key from testData where key > 3 limit 1) + 1").collect() + } + + assertResult(Array(Row(-100))) { + sql("select -(select max(key) from testData)").collect() + } + + assertResult(Array(Row(null))) { + sql("select (select value from testData limit 0)").collect() + } + + assertResult(Array(Row("99"))) { + sql("select (select min(value) from testData" + + " where key = (select max(key) from testData) - 1)").collect() + } + } +} From 00341725928c60b1cd0c0785a2c3217d311849b1 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 20 Feb 2016 01:32:26 -0800 Subject: [PATCH 8/9] move wait subqueries into execute()/produce() --- .../spark/sql/execution/SparkPlan.scala | 69 +++++++++++-------- .../sql/execution/WholeStageCodegen.scala | 5 +- .../spark/sql/execution/basicOperators.scala | 2 +- .../apache/spark/sql/execution/subquery.scala | 14 +--- 4 files changed, 49 insertions(+), 41 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index dcdfc4cf29e7a..258ca2df2f3cd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -115,44 +115,59 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ final def execute(): RDD[InternalRow] = { RDDOperationScope.withScope(sparkContext, nodeName, false, true) { prepare() + waitForSubqueries() doExecute() } } + // All the subquries and their Future of results. + @transient private val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]() + + /** + * Collects all the subqueries and create a Future to take the first two rows of them. + */ + protected def prepareSubqueries(): Unit = { + val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e}) + allSubqueries.foreach { e => + val futureResult = Future { + // We only need the first row, try to take two rows so we can throw an exception if there + // are more than one rows returned. + e.executedPlan.executeTake(2) + }(SparkPlan.subqueryExecutionContext) + queryResults += e -> futureResult + } + } + + /** + * Waits for all the subquires to finish and updates the results. + */ + protected def waitForSubqueries(): Unit = { + // fill in the result of subqueries + queryResults.foreach { + case (e, futureResult) => + val rows = Await.result(futureResult, Duration.Inf) + if (rows.length > 1) { + sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}") + } + if (rows.length == 1) { + assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column") + e.updateResult(rows(0).get(0, e.dataType)) + } else { + // There is no rows returned, the result should be null. + e.updateResult(null) + } + } + queryResults.clear() + } + /** * Prepare a SparkPlan for execution. It's idempotent. */ final def prepare(): Unit = { if (prepareCalled.compareAndSet(false, true)) { doPrepare() - - // collect all the subqueries and submit jobs to execute them in background - val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]() - val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e}) - allSubqueries.foreach { e => - val futureResult = Future { - e.plan.executeTake(2) - }(SparkPlan.subqueryExecutionContext) - queryResults += e -> futureResult - } - + prepareSubqueries() children.foreach(_.prepare()) - - // fill in the result of subqueries - queryResults.foreach { - case (e, futureResult) => - val rows = Await.result(futureResult, Duration.Inf) - if (rows.length > 1) { - sys.error(s"more than one row returned by a subquery used as an expression:\n${e.plan}") - } - if (rows.length == 1) { - assert(rows(0).numFields == 1, "Analyzer should make sure this only returns one column") - e.updateResult(rows(0).get(0, e.dataType)) - } else { - // There is no rows returned, the result should be null. - e.updateResult(null) - } - } } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala index f35efb5b24b1f..116013f307782 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegen.scala @@ -73,9 +73,10 @@ trait CodegenSupport extends SparkPlan { /** * Returns Java source code to process the rows from upstream. */ - def produce(ctx: CodegenContext, parent: CodegenSupport): String = { + final def produce(ctx: CodegenContext, parent: CodegenSupport): String = { this.parent = parent ctx.freshNamePrefix = variablePrefix + waitForSubqueries() doProduce(ctx) } @@ -101,7 +102,7 @@ trait CodegenSupport extends SparkPlan { /** * Consume the columns generated from current SparkPlan, call it's parent. */ - def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = { + final def consume(ctx: CodegenContext, input: Seq[ExprCode], row: String = null): String = { if (input != null) { assert(input.length == output.length) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala index 5f065eb173f12..55bddd196ec46 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicOperators.scala @@ -355,6 +355,6 @@ case class Subquery(name: String, child: SparkPlan) extends UnaryNode { override def outputOrdering: Seq[SortOrder] = child.outputOrdering protected override def doExecute(): RDD[InternalRow] = { - child.execute() + throw new UnsupportedOperationException } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala index 9f456442380f6..9c645c78e8732 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala @@ -18,11 +18,11 @@ package org.apache.spark.sql.execution import org.apache.spark.sql.SQLContext +import org.apache.spark.sql.catalyst.{expressions, InternalRow} +import org.apache.spark.sql.catalyst.expressions.{ExprId, Literal, SubqueryExpression} import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} -import org.apache.spark.sql.catalyst.expressions.{ExprId, SubqueryExpression} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, ReturnAnswer} import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.catalyst.{InternalRow, expressions} import org.apache.spark.sql.types.DataType /** @@ -55,15 +55,7 @@ case class ScalarSubquery( override def eval(input: InternalRow): Any = result override def genCode(ctx: CodegenContext, ev: ExprCode): String = { - val thisTerm = ctx.addReferenceObj("subquery", this) - val isNull = ctx.freshName("isNull") - ctx.addMutableState("boolean", isNull, s"$isNull = $thisTerm.eval(null) == null;") - val value = ctx.freshName("value") - ctx.addMutableState(ctx.javaType(dataType), value, - s"$value = (${ctx.boxedType(dataType)}) $thisTerm.eval(null);") - ev.isNull = isNull - ev.value = value - "" + Literal.create(result, dataType).genCode(ctx, ev) } } From e08284557fff7345165a6e3bc6317e90a8f3393d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 20 Feb 2016 19:12:31 -0800 Subject: [PATCH 9/9] address comments --- .../spark/sql/catalyst/plans/QueryPlan.scala | 26 +++---------------- .../spark/sql/catalyst/trees/TreeNode.scala | 11 +++++--- .../spark/sql/execution/SparkPlan.scala | 6 ++--- .../org/apache/spark/sql/SubquerySuite.scala | 6 ++--- 4 files changed, 17 insertions(+), 32 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala index a59681a1152b6..86bd33f526209 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala @@ -228,28 +228,8 @@ abstract class QueryPlan[PlanType <: TreeNode[PlanType]] extends TreeNode[PlanTy override def simpleString: String = statePrefix + super.simpleString - override def generateTreeString( - depth: Int, lastChildren: Seq[Boolean], builder: StringBuilder): StringBuilder = { - if (depth > 0) { - lastChildren.init.foreach { isLast => - val prefixFragment = if (isLast) " " else ": " - builder.append(prefixFragment) - } - - val branch = if (lastChildren.last) "+- " else ":- " - builder.append(branch) - } - - builder.append(simpleString) - builder.append("\n") - - val allSubqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e}) - val allChildren = children ++ allSubqueries.map(e => e.plan) - if (allChildren.nonEmpty) { - allChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder)) - allChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder) - } - - builder + override def treeChildren: Seq[PlanType] = { + val subqueries = expressions.flatMap(_.collect {case e: SubqueryExpression => e}) + children ++ subqueries.map(e => e.plan.asInstanceOf[PlanType]) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 30df2a84f62c4..e46ce1cee7c6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -448,6 +448,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { } } + /** + * All the nodes that will be used to generate tree string. + */ + protected def treeChildren: Seq[BaseType] = children + /** * Appends the string represent of this node and its children to the given StringBuilder. * @@ -470,9 +475,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { builder.append(simpleString) builder.append("\n") - if (children.nonEmpty) { - children.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder)) - children.last.generateTreeString(depth + 1, lastChildren :+ true, builder) + if (treeChildren.nonEmpty) { + treeChildren.init.foreach(_.generateTreeString(depth + 1, lastChildren :+ false, builder)) + treeChildren.last.generateTreeString(depth + 1, lastChildren :+ true, builder) } builder diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala index 258ca2df2f3cd..872ccde883060 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala @@ -120,7 +120,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } } - // All the subquries and their Future of results. + // All the subqueries and their Future of results. @transient private val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]() /** @@ -128,7 +128,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ */ protected def prepareSubqueries(): Unit = { val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e}) - allSubqueries.foreach { e => + allSubqueries.asInstanceOf[Seq[ScalarSubquery]].foreach { e => val futureResult = Future { // We only need the first row, try to take two rows so we can throw an exception if there // are more than one rows returned. @@ -139,7 +139,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ } /** - * Waits for all the subquires to finish and updates the results. + * Waits for all the subqueries to finish and updates the results. */ protected def waitForSubqueries(): Unit = { // fill in the result of subqueries diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala index 6b4cae425e4ec..e851eb02f01b3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala @@ -20,9 +20,6 @@ package org.apache.spark.sql import org.apache.spark.sql.test.SharedSQLContext class SubquerySuite extends QueryTest with SharedSQLContext { - import testImplicits._ - - setupTestData() test("simple uncorrelated scalar subquery") { assertResult(Array(Row(1))) { @@ -64,6 +61,9 @@ class SubquerySuite extends QueryTest with SharedSQLContext { } test("uncorrelated scalar subquery on testData") { + // initialize test Data + testData + assertResult(Array(Row(5))) { sql("select (select key from testData where key > 3 limit 1) + 1").collect() }