Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-4226][SQL]Add subquery (not) in/exists support #9055

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException

import scala.collection.mutable.ArrayBuffer

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.aggregate.{Complete, AggregateExpression2, AggregateFunction2}
import org.apache.spark.sql.catalyst.plans.{LeftSemi, LeftAnti}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
Expand Down Expand Up @@ -69,6 +71,7 @@ class Analyzer(
WindowsSubstitution ::
Nil : _*),
Batch("Resolution", fixedPoint,
RewriteFilterSubQuery ::
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This rule does not need to go first, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't test it if I place it in the last, but should not be a problem, is there any concern for this?

ResolveRelations ::
ResolveReferences ::
ResolveGroupingAnalytics ::
Expand Down Expand Up @@ -269,6 +272,271 @@ class Analyzer(
}
}

/**
* Rewrite the [[Exists]] [[In]] with left semi join or anti join.
*
* 1. Some of the key concepts:
* Correlated:
* References the attributes of the parent query within subquery, we call that Correlated.
* e.g. We reference the "a.value", which is the attribute in parent query, in the subquery.
*
* SELECT a.value FROM src a
* WHERE a.key in (
* SELECT b.key FROM src1 b
* WHERE a.value > b.value)
*
* Unrelated:
* Do not have any attribute reference to its parent query in the subquery.
* e.g.
* SELECT a.value FROM src a WHERE a.key IN (SELECT key FROM src WHERE key > 100);
*
* 2. Basic Logic for the Transformation
* EXISTS / IN => LEFT SEMI JOIN
* NOT EXISTS / NOT IN => LEFT ANTI JOIN
*
* In logical plan demostration, we support the cases like below:
*
* e.g. EXISTS / NOT EXISTS
* SELECT value FROM src a WHERE (NOT) EXISTS (SELECT 1 FROM src1 b WHERE a.key < b.key)
* ==>
* SELECT a.value FROM src a LEFT (ANTI) SEMI JOIN src1 b WHERE a.key < b.key
*
* e.g. IN / NOT IN
* SELECT value FROM src a WHERE key (NOT) IN (SELECT key FROM src1 b WHERE a.value < b.value)
* ==>
* SELECT value FROM src a LEFT (ANTI) SEMI JOIN src1 b ON a.key = b.key AND a.value < b.value
*
* e.g. IN / NOT IN with other conjunctions
* SELECT value FROM src a
* WHERE key (NOT) IN (
* SELECT key FROM src1 b WHERE a.value < b.value
* ) AND a.key > 10
* ==>
* SELECT value
* (FROM src a WHERE a.key > 10)
* LEFT (ANTI) SEMI JOIN src1 b ON a.key = b.key AND a.value < b.value
*
* 3. There are also some limitations:
* a. IN/NOT IN subqueries may only select a single column.
* e.g.(bad example)
* SELECT value FROM src a WHERE EXISTS (SELECT key, value FROM src1 WHERE key > 10)
* b. EXISTS/NOT EXISTS must have one or more correlated predicates.
* e.g.(bad example)
* SELECT value FROM src a WHERE EXISTS (SELECT 1 FROM src1 b WHERE b.key > 10)
* c. References to the parent query is only supported in the WHERE clause of the subquery.
* e.g.(bad example)
* SELECT value FROM src a WHERE key IN (SELECT a.key + b.key FROM src1 b)
* d. Only a single subquery can support in IN/EXISTS predicate.
* e.g.(bad example)
* SELECT value FROM src WHERE key IN (SELECT xx1 FROM xxx1) AND key in (SELECT xx2 FROM xxx2)
* e. Disjunction is not supported in the top level.
* e.g.(bad example)
* SELECT value FROM src WHERE key > 10 OR key IN (SELECT xx1 FROM xxx1)
* f. Implicit reference expression substitution to the parent query is not supported.
* e.g.(bad example)
* SELECT min(key) FROM src a HAVING EXISTS (SELECT 1 FROM src1 b WHERE b.key = min(a.key))
*
* 4. TODOs
* a. More pretty message to user why we failed in parsing.
* b. Support multiple IN / EXISTS clause in the predicates.
* c. Implicit reference expression substitution to the parent query
* d. ..
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the scaladoc, can we add details on how this rule work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, I will add more detailed description for this rule.

object RewriteFilterSubQuery extends Rule[LogicalPlan] with PredicateHelper {
// This is to extract the SubQuery expression and other conjunction expressions.
def unapply(condition: Expression): Option[(Expression, Seq[Expression])] = {
if (condition.resolved == false) {
return None
}

val conjunctions = splitConjunctivePredicates(condition).map(_ transformDown {
// Remove the Cast expression for SubQueryExpression.
case Cast(f: SubQueryExpression, BooleanType) => f
}
)

val (subqueries, others) = conjunctions.partition(c => c.isInstanceOf[SubQueryExpression])
if (subqueries.isEmpty) {
None
} else if (subqueries.length > 1) {
// We don't support the cases with multiple subquery in the predicates now like:
// SELECT value FROM src
// WHERE
// key IN (SELECT key xxx) AND
// key IN (SELECT key xxx)
// TODO support this case in the future since it's part of the `standard SQL`
throw new AnalysisException(
s"Only 1 SubQuery expression is supported in predicates, but we got $subqueries")
} else {
val subQueryExpr = subqueries(0).asInstanceOf[SubQueryExpression]
// try to resolve the subquery

val subquery = Analyzer.this.execute(subQueryExpr.subquery) match {
case Distinct(child) =>
// Distinct is useless for semi join, ignore it.
// e.g. SELECT value FROM src WHERE key IN (SELECT DISTINCT key FROM src b)
// which is equvilent to
// SELECT value FROM src WHERE key IN (SELECT key FROM src b)
// The reason we discard the DISTINCT keyword is we don't want to make
// additional rule for DISTINCT operator in the `def apply(..)`
child
case other => other
}
Some((subQueryExpr.withNewSubQuery(subquery), others))
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f if f.childrenResolved == false => f

case f @ Filter(RewriteFilterSubQuery(subquery, others), left) =>
subquery match {
case Exists(Project(_, Filter(condition, right)), positive) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the pattern of Project(_, Filter(condition, right)) to restrict?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only case we support right now for EXISTS(NOT).

(This is the correlated case, the correlated reference ONLY via the Filter condition)
SELECT value FROM src a WHERE EXISTS (SELECT key FROM src b WHERE a.key=b.key AND a.key> 100).

(These are the uncorrelated cases)
SELECT value FROM src a WHERE EXISTS (SELECT key FROM src b). // without Filter
SELECT value FROM src a WHERE EXISTS (SELECT key FROM src b WHERE a.key> 100). // Filter condition is resolved, then definitely an unrelated reference.

checkAnalysis(right)
if (condition.resolved) {
// Apparently, it should be not resolved here, since EXIST should be correlated.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why? A exists subquery can be uncorrelated, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, exists subquery should ONLY be correlated, otherwise it's probably meaningless.

This is what I get from Hive

hive> select value from src a where exists (select key from src);
FAILED: SemanticException Line 1:54 Invalid SubQuery expression 'key' in definition of SubQuery sq_1 [
exists (select key from src)
] used as sq_1 at Line 1:30: For Exists/Not Exists operator SubQuery must be Correlated.

hive> select value from src a where exists (select key from src where key > 10000);
FAILED: SemanticException Line 1:54 Invalid SubQuery expression '10000' in definition of SubQuery sq_1 [
exists (select key from src where key > 10000)
] used as sq_1 at Line 1:30: For Exists/Not Exists operator SubQuery must be Correlated.

// and even failed in
hive> select value from src a where exists (select key+a.key from src where key > 10000);
FAILED: SemanticException Line 1:60 Invalid SubQuery expression '10000' in definition of SubQuery sq_1 [
exists (select key+a.key from src where key > 10000)
] used as sq_1 at Line 1:30: For Exists/Not Exists operator SubQuery must be Correlated.

throw new AnalysisException(
s"Exists/Not Exists operator SubQuery must be correlated, but we got $condition")
}
val newLeft = others.reduceOption(And).map(Filter(_, left)).getOrElse(left)
Join(newLeft, right,
if (positive) LeftSemi else LeftAnti,
Some(ResolveReferences.tryResolveAttributes(condition, right)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split this statement to multiple ones for better readability?


case Exists(right, positive) =>
throw new AnalysisException(s"Exists/Not Exists operator SubQuery must be Correlated," +
s"but we got $right")

case InSubquery(key, Project(projectList, Filter(condition, right)), positive) =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this pattern too restrict?

// we don't support nested correlation yet, so the `right` must be resolved.
checkAnalysis(right)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a comment at here to explain why we need this check.

if (projectList.length != 1) {
throw new AnalysisException(
s"Expect only 1 projection in In SubQuery Expression, but we got $projectList")
} else {
// This is a workaround to solve the ambiguous references issue like:
// SELECT 'value FROM src WHERE 'key IN (SELECT 'key FROM src b WHERE 'key > 100)
//
// Literally, we will transform the SQL into:
//
// SELECT 'value FROM src
// LEFT SEMI JOIN src b
// ON 'key = 'key and 'key > 100 -- this is reference ambiguous for 'key!
//
// The ResolveReferences.tryResolveAttributes will partially resolve the project
// list and filter condition of the subquery, and then what we got looks like:
//
// SELECT 'value FROM src
// LEFI SEMI JOIN src b
// ON 'key = key#123 and key#123 > 100
//
// And then we will leave the remaining unresolved attributes for the other rules
// in Analyzer.
val rightKey = ResolveReferences.tryResolveAttributes(projectList(0), right)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to manually resolve attributes at here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good question, actually this is a workaround to solve the ambiguous references issue like:.
SELECT 'value FROM src WHERE 'key IN (SELECT 'key FROM src b WHERE 'key > 100)
Literally, we will transform the SQL as:

SELECT 'value FROM src LEFT SEMI JOIN src b ON 'key = 'key and 'key > 100, this is reference ambiguous for 'key!

The ResolveReferences.tryResolveAttributes will partially resolve the project list and filter condition of the subquery, and then what we got looks like:
SELECT 'value FROM src LEFI SEMI JOIN src b ON 'key = key#123 and key#123 > 100

And then we will leave the unresolved attributes for the other rules.

There is another doable solution is complete the alias for the attributes / relations like first:
SELECT 'value FROM src WHERE 'key IN (SELECT 'key FROM src b WHERE 'key > 100) =>
SELECT 'a.value FROM src a WHERE 'a.key IN (SELECT 'b.key FROM src b WHERE 'b.key > 100) =>
SELECT 'a.value FROM src a LEFT SEMI JOIN src b ON 'a.key = 'b.key and 'b.key > 100
But this probably requires more code change, and probably will confusing people when they check the generated logical plan.


if (!rightKey.resolved) {
throw new AnalysisException(s"Cannot resolve the projection for SubQuery $rightKey")
}

// This is for the SQL with conjunction like:
//
// SELECT value FROM src a
// WHERE key > 5 AND key IN (SELECT key FROM src1 b key >7) AND key < 10
//
// ==>
// SELECT value FROM (src a
// WHERE key > 5 AND key < 10)
// LEFT SEMI JOIN src1 b ON a.key = b.key AND b.key > 7
//
// Ideally, we should transform the original plan into
// SELECT value FROM src a
// LEFT SEMI JOIN src1 b
// ON a.key = b.key AND b.key > 7 AND a.key > 5 AND a.key < 10
//
// However, the former one only requires few code change to support
// the multiple subquery for IN clause, and less overhead for Optimizer
val newLeft = others.reduceOption(And).map(Filter(_, left)).getOrElse(left)
val newCondition = Some(
And(
ResolveReferences.tryResolveAttributes(condition, right),
EqualTo(rightKey, key)))

Join(newLeft, right, if (positive) LeftSemi else LeftAnti, newCondition)
}

case InSubquery(key, Project(projectList, right), positive) =>
// we don't support nested correlation yet, so the `right` must be resolved.
checkAnalysis(right)
if (projectList.length != 1) {
throw new AnalysisException(
s"Expect only 1 projection in In SubQuery Expression, but we got $projectList")
} else {
if (!projectList(0).resolved) {
// We don't support reference in the outer column in the subquery projection list.
// e.g. SELECT value FROM src a WHERE key in (SELECT b.key + a.key FROM src b)
// That means, the project list of the subquery MUST BE resolved already, otherwise
// throws exception.
throw new AnalysisException(s"Cannot resolve the projection ${projectList(0)}")
}
val newLeft = others.reduceOption(And).map(Filter(_, left)).getOrElse(left)
Join(newLeft, right,
if (positive) LeftSemi else LeftAnti,
Some(EqualTo(projectList(0), key)))
}

case InSubquery(key, right @ Aggregate(grouping, aggregations, child), positive) =>
if (aggregations.length != 1) {
throw new AnalysisException(
s"Expect only 1 projection in In SubQuery Expression, but we got $aggregations")
} else {
// we don't support nested correlation yet, so the `child` must be resolved.
checkAnalysis(child)
val rightKey = ResolveReferences.tryResolveAttributes(aggregations(0), child) match {
case e if !e.resolved =>
throw new AnalysisException(
s"Cannot resolve the aggregation $e")
case e: NamedExpression => e
case other =>
// place a space before `in_subquery_key`, hopefully end user
// will not take that as the field name or alias.
Alias(other, " in_subquery_key")()
}

val newLeft = others.reduceOption(And).map(Filter(_, left)).getOrElse(left)
val newRight = Aggregate(grouping, rightKey :: Nil, child)
val newCondition = Some(EqualTo(rightKey.toAttribute, key))

Join(newLeft, newRight, if (positive) LeftSemi else LeftAnti, newCondition)
}

case InSubquery(key,
f @ Filter(condition, right @ Aggregate(grouping, aggregations, child)), positive) =>
if (aggregations.length != 1) {
throw new AnalysisException(
s"Expect only 1 projection in In Subquery Expression, but we got $aggregations")
} else {
// we don't support nested correlation yet, so the `child` must be resolved.
checkAnalysis(child)
val rightKey = ResolveReferences.tryResolveAttributes(aggregations(0), child) match {
case e if !e.resolved =>
throw new AnalysisException(
s"Cannot resolve the aggregation $e")
case e: NamedExpression => e
case other => Alias(other, " in_subquery_key")()
}

val newLeft =
Filter(others.foldLeft(
ResolveReferences.tryResolveAttributes(condition(0), child))(And(_, _)),
left)
val newRight = Aggregate(grouping, rightKey :: Nil, child)
val newCondition = Some(EqualTo(rightKey.toAttribute, key))
Join(newLeft, newRight, if (positive) LeftSemi else LeftAnti, newCondition)
}
}
}
}

/**
* Replaces [[UnresolvedAttribute]]s with concrete [[AttributeReference]]s from
* a logical plan node's children.
Expand Down Expand Up @@ -397,6 +665,7 @@ class Analyzer(
case q: LogicalPlan =>
logTrace(s"Attempting to resolve ${q.simpleString}")
q transformExpressionsUp {
case u @ UnresolvedAlias(expr: NamedExpression) if expr.resolved => expr
case u @ UnresolvedAttribute(nameParts) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
val result =
Expand All @@ -408,6 +677,26 @@ class Analyzer(
}
}

// Try to resolve the attributes from the given logical plan
// TODO share the code with above rules? How?
def tryResolveAttributes(expr: Expression, q: LogicalPlan): Expression = {
checkAnalysis(q)
val projection = Project(q.output, q)

logTrace(s"Attempting to resolve ${expr.simpleString}")
expr transformUp {
case u @ UnresolvedAlias(expr) => expr
case u @ UnresolvedAttribute(nameParts) =>
// Leave unchanged if resolution fails. Hopefully will be resolved next round.
val result =
withPosition(u) { projection.resolveChildren(nameParts, resolver).getOrElse(u) }
logDebug(s"Resolving $u to $result")
result
case UnresolvedExtractValue(child, fieldExpr) if child.resolved =>
ExtractValue(child, fieldExpr, resolver)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just use the resolve method of a logical plan?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the rule that I copied from the ResolveReferences with slight changes, maybe we should leave it along ResolveReferences, otherwise people probably forgot to update it once we need to update the logic of attributes resolution.

And I was planning to move this code into the LogicalPlan, but need more thinking how to make the code shared with the rule ResolveReferences, how about leave it for the further improvement?

def newAliases(expressions: Seq[NamedExpression]): Seq[NamedExpression] = {
expressions.map {
case a: Alias => Alias(a.child, a.name)()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.{LeftSemiJoin, LeftSemi, LeftAnti}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -51,6 +53,12 @@ trait CheckAnalysis {

case operator: LogicalPlan =>
operator transformExpressionsUp {
case u: UnresolvedAlias =>
failAnalysis(s"Couldn't resolve the UnresolvedAlias ${u.prettyString}")

case s: SubQueryExpression =>
failAnalysis(s"Couldn't resolve the subquery expression ${s.prettyString}")

case a: Attribute if !a.resolved =>
val from = operator.inputSet.map(_.name).mkString(", ")
a.failAnalysis(s"cannot resolve '${a.prettyString}' given input columns $from")
Expand Down
Loading