-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-13306] [SQL] uncorrelated scalar subquery #11190
Changes from 4 commits
0665a69
236ac88
016c36c
a4bae33
d0974cf
3a8f08d
7596173
0034172
e082845
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -80,6 +80,7 @@ class Analyzer( | |
ResolveGenerate :: | ||
ResolveFunctions :: | ||
ResolveAliases :: | ||
ResolveSubquery :: | ||
ResolveWindowOrder :: | ||
ResolveWindowFrame :: | ||
ResolveNaturalJoin :: | ||
|
@@ -120,7 +121,13 @@ class Analyzer( | |
withAlias.getOrElse(relation) | ||
} | ||
substituted.getOrElse(u) | ||
case other => | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. quick comment on why this isn't in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||
other transformExpressions { | ||
case e: SubqueryExpression => | ||
e.withNewPlan(substituteCTE(e.query, cteRelations)) | ||
} | ||
} | ||
|
||
} | ||
} | ||
|
||
|
@@ -693,6 +700,28 @@ class Analyzer( | |
} | ||
} | ||
|
||
/** | ||
* This rule resolve subqueries inside expressions. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indent. maybe comment that CTEs are handled elsewhere. |
||
*/ | ||
object ResolveSubquery extends Rule[LogicalPlan] with PredicateHelper { | ||
|
||
private def hasSubquery(e: Expression): Boolean = { | ||
e.find(_.isInstanceOf[SubqueryExpression]).isDefined | ||
} | ||
|
||
private def hasSubquery(q: LogicalPlan): Boolean = { | ||
q.expressions.exists(hasSubquery) | ||
} | ||
|
||
def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { | ||
case q: LogicalPlan if q.childrenResolved && hasSubquery(q) => | ||
q transformExpressions { | ||
case e: SubqueryExpression if !e.query.resolved => | ||
e.withNewPlan(execute(e.query)) | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Turns projections that contain aggregate expressions into aggregations. | ||
*/ | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.spark.sql.catalyst.expressions | ||
|
||
import org.apache.spark.sql.catalyst.InternalRow | ||
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult | ||
import org.apache.spark.sql.catalyst.expressions.codegen.CodegenFallback | ||
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan | ||
import org.apache.spark.sql.types.DataType | ||
|
||
/** | ||
* A interface for subquery that is used in expressions. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indent |
||
*/ | ||
trait SubqueryExpression extends LeafExpression { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
def query: LogicalPlan | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why is this needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an helper function used in Analyzer and Optimizer, or we need to do type conversion. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the base class for both logical plan and physical plan, kind of weird. This is to make the generateTreeString works in QueryPlan There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Analyzer and Optimizer only applies to logical plan right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yes |
||
def withNewPlan(plan: LogicalPlan): SubqueryExpression | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scala doc There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can't this be just in the logical plan itself? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think you can just remove this and move it into the logical subquery expression, since it's only used for logical plan anyway? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Then should we have LogicalSubqueryExpression ? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I meant ScalarSubquery. That's already the one isn't it? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We will have ExistsSubquery, InSubquery shortly (or next release). |
||
} | ||
|
||
/** | ||
* A subquery that will return only one row and one column. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. indent |
||
*/ | ||
case class ScalarSubquery(query: LogicalPlan) extends SubqueryExpression with CodegenFallback { | ||
|
||
override lazy val resolved: Boolean = query.resolved | ||
|
||
override def dataType: DataType = query.schema.fields.head.dataType | ||
|
||
override def checkInputDataTypes(): TypeCheckResult = { | ||
if (query.schema.length != 1) { | ||
TypeCheckResult.TypeCheckFailure("Scalar subquery can only have 1 column, but got " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. needs tests. Maybe, probably There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Scalar subquery must return only one column, but got " ... (postgres') |
||
query.schema.length.toString) | ||
} else { | ||
TypeCheckResult.TypeCheckSuccess | ||
} | ||
} | ||
|
||
// It can not be evaluated by optimizer. | ||
override def foldable: Boolean = false | ||
override def nullable: Boolean = true | ||
|
||
override def withNewPlan(plan: LogicalPlan): ScalarSubquery = ScalarSubquery(plan) | ||
|
||
// TODO: support sql() | ||
|
||
// the first column in first row from `query`. | ||
private var result: Any = null | ||
|
||
def updateResult(v: Any): Unit = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. scaladoc |
||
result = v | ||
} | ||
|
||
override def eval(input: InternalRow): Any = result | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,6 +22,7 @@ import org.apache.spark.sql.catalyst.analysis._ | |
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.plans.PlanTest | ||
import org.apache.spark.sql.catalyst.plans.logical._ | ||
import org.apache.spark.sql.types.BooleanType | ||
import org.apache.spark.unsafe.types.CalendarInterval | ||
|
||
class CatalystQlSuite extends PlanTest { | ||
|
@@ -201,4 +202,49 @@ class CatalystQlSuite extends PlanTest { | |
parser.parsePlan("select sum(product + 1) over (partition by (product + (1)) order by 2) " + | ||
"from windowData") | ||
} | ||
|
||
test("subquery") { | ||
comparePlans( | ||
parser.parsePlan("select (select max(b) from s) ss from t"), | ||
Project( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @hvanhovell I'm going to remove these plan checking, it's very easy to break. The details of plan does not mean much, we will have other tests to verify the correctness. |
||
UnresolvedAlias( | ||
Alias( | ||
ScalarSubquery( | ||
Project( | ||
UnresolvedAlias( | ||
UnresolvedFunction("max", UnresolvedAttribute("b") :: Nil, false)) :: Nil, | ||
UnresolvedRelation(TableIdentifier("s")))), | ||
"ss")(ExprId(0))) :: Nil, | ||
UnresolvedRelation(TableIdentifier("t")))) | ||
comparePlans( | ||
parser.parsePlan("select * from t where a = (select b from s)"), | ||
Project( | ||
UnresolvedAlias( | ||
UnresolvedStar(None)) :: Nil, | ||
Filter( | ||
EqualTo( | ||
UnresolvedAttribute("a"), | ||
ScalarSubquery( | ||
Project( | ||
UnresolvedAlias( | ||
UnresolvedAttribute("b")) :: Nil, | ||
UnresolvedRelation(TableIdentifier("s"))))), | ||
UnresolvedRelation(TableIdentifier("t"))))) | ||
comparePlans( | ||
parser.parsePlan("select * from t group by g having a > (select b from s)"), | ||
Filter( | ||
Cast( | ||
GreaterThan( | ||
UnresolvedAttribute("a"), | ||
ScalarSubquery( | ||
Project( | ||
UnresolvedAlias( | ||
UnresolvedAttribute("b")) :: Nil, | ||
UnresolvedRelation(TableIdentifier("s"))))), | ||
BooleanType), | ||
Aggregate( | ||
UnresolvedAttribute("g") :: Nil, | ||
UnresolvedAlias(UnresolvedStar(None)) :: Nil, | ||
UnresolvedRelation(TableIdentifier("t"))))) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,17 +20,20 @@ package org.apache.spark.sql.execution | |
import java.util.concurrent.atomic.AtomicBoolean | ||
|
||
import scala.collection.mutable.ArrayBuffer | ||
import scala.concurrent.{Await, ExecutionContext, Future} | ||
import scala.concurrent.duration._ | ||
|
||
import org.apache.spark.Logging | ||
import org.apache.spark.rdd.{RDD, RDDOperationScope} | ||
import org.apache.spark.sql.{Row, SQLContext} | ||
import org.apache.spark.sql.{DataFrame, Row, SQLContext} | ||
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} | ||
import org.apache.spark.sql.catalyst.expressions._ | ||
import org.apache.spark.sql.catalyst.expressions.codegen._ | ||
import org.apache.spark.sql.catalyst.plans.QueryPlan | ||
import org.apache.spark.sql.catalyst.plans.physical._ | ||
import org.apache.spark.sql.execution.metric.{LongSQLMetric, SQLMetric} | ||
import org.apache.spark.sql.types.DataType | ||
import org.apache.spark.util.ThreadUtils | ||
|
||
/** | ||
* The base class for physical operators. | ||
|
@@ -122,7 +125,42 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ | |
final def prepare(): Unit = { | ||
if (prepareCalled.compareAndSet(false, true)) { | ||
doPrepare() | ||
|
||
// collect all the subqueries and submit jobs to execute them in background | ||
val queryResults = ArrayBuffer[(ScalarSubquery, Future[Array[InternalRow]])]() | ||
val allSubqueries = expressions.flatMap(_.collect {case e: ScalarSubquery => e}) | ||
allSubqueries.foreach { e => | ||
val futureResult = scala.concurrent.future { | ||
val df = DataFrame(sqlContext, e.query) | ||
df.queryExecution.toRdd.collect() | ||
}(SparkPlan.subqueryExecutionContext) | ||
queryResults += e -> futureResult | ||
} | ||
|
||
children.foreach(_.prepare()) | ||
|
||
val timeout: Duration = { | ||
val timeoutValue = sqlContext.conf.broadcastTimeout | ||
if (timeoutValue < 0) { | ||
Duration.Inf | ||
} else { | ||
timeoutValue.seconds | ||
} | ||
} | ||
|
||
// fill in the result of subqueries | ||
queryResults.foreach { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we should move the blocking phase into execute, otherwise if multiple nodes have subqueries, it becomes blocking. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah ok you can't have a general execute. I guess this is why some query engines have init and then prepare. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. or a subquery is now blocking broadcasting ... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is called after There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what if there is a broadcast join after this? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Broadcast will be issued before this. |
||
case (e, futureResult) => | ||
val rows = Await.result(futureResult, timeout) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This timeout is kind of weird right? Like the max timeout here is There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. All the subqueries are submitted in the same (during beginning of prepare()), so the total time should be Should we create another config for subquery or rename the broadcast one? |
||
if (rows.length > 1) { | ||
sys.error(s"Scalar subquery should return at most one row, but got ${rows.length}: " + | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we can use postgres' error message: "more than one row returned by a subquery used as an expression" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have never thought we should match the exactly error message with PostgreSQL, that's great. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current error message has more information than postgres', should we change? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. not 100% sure. maybe it's better to just say more than one, so we don't need to run the whole plan (e..g i'm thinking maybe we should inject a limit of 2 to subquery) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good idea, changed to call |
||
s"${e.query.treeString}") | ||
} | ||
// Analyzer will make sure that it only return on column | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. "Analyzer should make sure this only returns one column" and add an assert after this. |
||
if (rows.length > 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can rows.length ever be 0 here? if it can only be 1, why we are testing > 0 here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the length could be zero, then the value is null. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do we write a query with 0 column? The comment above said the analyzer would make sure there's only one column. If it is possible to have 0 column, then I'd make it explicitly here to set the value to null. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also if it is possible to have 0 column, we also need to add a test case. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ok makes sense. please change the check to rows.length == 1 it's pretty confusing to first check it's greater than 1, and then check it is greater than 0, when you are just expecting 1. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and same thing applies - the test coverage for this is pretty bad. add a test case where the subquery returns 0 or more than 1 rows. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Repeated: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea, it'd be better to make it more explicit, e.g.
|
||
e.updateResult(rows(0).get(0, e.dataType)) | ||
} | ||
} | ||
} | ||
} | ||
|
||
|
@@ -231,6 +269,11 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ | |
} | ||
} | ||
|
||
object SparkPlan { | ||
private[execution] val subqueryExecutionContext = ExecutionContext.fromExecutorService( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What threadpool are broadcasts done on? Should it be the same? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This could be refactored later, use the same thread pool for all of them. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
ThreadUtils.newDaemonCachedThreadPool("subquery", 16)) | ||
} | ||
|
||
private[sql] trait LeafNode extends SparkPlan { | ||
override def children: Seq[SparkPlan] = Nil | ||
override def producedAttributes: AttributeSet = outputSet | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2105,6 +2105,22 @@ class SQLQuerySuite extends QueryTest with SharedSQLContext { | |
assert(error.getMessage contains "grouping_id() can only be used with GroupingSets/Cube/Rollup") | ||
} | ||
|
||
test("uncorrelated scalar subquery") { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. About test coverage: let's create a subquery suite and move the test cases there. Also it would be great to have at least once test case that actually runs on a dataset that is not generated by just select x, because I worry in the future we add some special optimizations and then all the test cases here become no-op. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Also we should test the behavior when there is no rows returned. |
||
assertResult(Array(Row(1))) { | ||
sql("select (select 1 as b) as b").collect() | ||
} | ||
|
||
assertResult(Array(Row(1))) { | ||
sql("with t2 as (select 1 as b, 2 as c) " + | ||
"select a from (select 1 as a union all select 2 as a) t " + | ||
"where a = (select max(b) from t2) ").collect() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if we support nested subqueries, can we add a test case There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added |
||
} | ||
|
||
assertResult(Array(Row(3))) { | ||
sql("select (select (select 1) + 1) + 1").collect() | ||
} | ||
} | ||
|
||
test("SPARK-13056: Null in map value causes NPE") { | ||
val df = Seq(1 -> Map("abc" -> "somestring", "cba" -> null)).toDF("key", "value") | ||
withTempTable("maptest") { | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might sound excedingly dumb but I cannot find
ScalarSubquery
orSubqueryExpression
. Are they already in the code base? Or did you create branch on top of another branch?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nevermind I just found the other PR...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed a file, sorry