Skip to content

Commit

Permalink
[SPARK-7232] [SQL] Add a Substitution batch for spark sql analyzer
Browse files Browse the repository at this point in the history
  Added a new batch named `Substitution` before `Resolution` batch. The motivation for this is there are kind of cases we want to do some substitution on the parsed logical plan before resolve it.
Consider this two cases:
1 CTE, for cte we first build a row logical plan
```
'With Map(q1 -> 'Subquery q1
                   'Project ['key]
                      'UnresolvedRelation [src], None)
 'Project [*]
  'Filter ('key = 5)
   'UnresolvedRelation [q1], None
```
In `With` logicalplan here is a map stored the (`q1-> subquery`), we want first take off the with command and substitute the  `q1` of `UnresolvedRelation` by the `subquery`

2 Another example is Window function, in window function user may define some windows, we also need substitute the window name of child by the concrete window. this should also done in the Substitution batch.

Author: wangfei <wangfei1@huawei.com>

Closes #5776 from scwf/addbatch and squashes the following commits:

d4b962f [wangfei] added WindowsSubstitution
70f6932 [wangfei] Merge branch 'master' of https://github.com/apache/spark into addbatch
ecaeafb [wangfei] address yhuai's comments
553005a [wangfei] fix test case
0c54798 [wangfei] address comments
29aaaaf [wangfei] fix compile
1c9a092 [wangfei] added Substitution bastch
  • Loading branch information
scwf authored and yhuai committed May 8, 2015
1 parent 714db2e commit f496bf3
Showing 1 changed file with 60 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class Analyzer(
val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Nil

lazy val batches: Seq[Batch] = Seq(
Batch("Substitution", fixedPoint,
CTESubstitution ::
WindowsSubstitution ::
Nil : _*),
Batch("Resolution", fixedPoint,
ResolveRelations ::
ResolveReferences ::
Expand All @@ -71,6 +75,55 @@ class Analyzer(
extendedResolutionRules : _*)
)

/**
* Substitute child plan with cte definitions
*/
object CTESubstitution extends Rule[LogicalPlan] {
// TODO allow subquery to define CTE
def apply(plan: LogicalPlan): LogicalPlan = plan match {
case With(child, relations) => substituteCTE(child, relations)
case other => other
}

def substituteCTE(plan: LogicalPlan, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
plan transform {
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
// see https://github.com/apache/spark/pull/4929#discussion_r27186638 for more info
case u : UnresolvedRelation =>
val substituted = cteRelations.get(u.tableIdentifier.last).map { relation =>
val withAlias = u.alias.map(Subquery(_, relation))
withAlias.getOrElse(relation)
}
substituted.getOrElse(u)
}
}
}

/**
* Substitute child plan with WindowSpecDefinitions.
*/
object WindowsSubstitution extends Rule[LogicalPlan] {
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) =>
child.transform {
case plan => plan.transformExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
val errorMessage =
s"Window specification $windowName is not defined in the WINDOW clause."
val windowSpecDefinition =
windowDefinitions
.get(windowName)
.getOrElse(failAnalysis(errorMessage))
WindowExpression(c, windowSpecDefinition)
}
}
}
}

/**
* Removes no-op Alias expressions from the plan.
*/
Expand Down Expand Up @@ -172,36 +225,20 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the catalog.
*/
object ResolveRelations extends Rule[LogicalPlan] {
def getTable(u: UnresolvedRelation, cteRelations: Map[String, LogicalPlan]): LogicalPlan = {
def getTable(u: UnresolvedRelation): LogicalPlan = {
try {
// In hive, if there is same table name in database and CTE definition,
// hive will use the table in database, not the CTE one.
// Taking into account the reasonableness and the implementation complexity,
// here use the CTE definition first, check table name only and ignore database name
cteRelations.get(u.tableIdentifier.last)
.map(relation => u.alias.map(Subquery(_, relation)).getOrElse(relation))
.getOrElse(catalog.lookupRelation(u.tableIdentifier, u.alias))
catalog.lookupRelation(u.tableIdentifier, u.alias)
} catch {
case _: NoSuchTableException =>
u.failAnalysis(s"no such table ${u.tableName}")
}
}

def apply(plan: LogicalPlan): LogicalPlan = {
val (realPlan, cteRelations) = plan match {
// TODO allow subquery to define CTE
// Add cte table to a temp relation map,drop `with` plan and keep its child
case With(child, relations) => (child, relations)
case other => (other, Map.empty[String, LogicalPlan])
}

realPlan transform {
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(
table = EliminateSubQueries(getTable(u, cteRelations)))
case u: UnresolvedRelation =>
getTable(u, cteRelations)
}
def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case i@InsertIntoTable(u: UnresolvedRelation, _, _, _, _) =>
i.copy(table = EliminateSubQueries(getTable(u)))
case u: UnresolvedRelation =>
getTable(u)
}
}

Expand Down Expand Up @@ -664,21 +701,6 @@ class Analyzer(
// We have to use transformDown at here to make sure the rule of
// "Aggregate with Having clause" will be triggered.
def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
// Lookup WindowSpecDefinitions. This rule works with unresolved children.
case WithWindowDefinition(windowDefinitions, child) =>
child.transform {
case plan => plan.transformExpressions {
case UnresolvedWindowExpression(c, WindowSpecReference(windowName)) =>
val errorMessage =
s"Window specification $windowName is not defined in the WINDOW clause."
val windowSpecDefinition =
windowDefinitions
.get(windowName)
.getOrElse(failAnalysis(errorMessage))
WindowExpression(c, windowSpecDefinition)
}
}

// Aggregate with Having clause. This rule works with an unresolved Aggregate because
// a resolved Aggregate will not have Window Functions.
case f @ Filter(condition, a @ Aggregate(groupingExprs, aggregateExprs, child))
Expand Down

0 comments on commit f496bf3

Please sign in to comment.