-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-12718][SPARK-13720][SQL] SQL generation support for window functions #11555
Conversation
Test build #52544 has finished for PR 11555 at commit
|
@cloud-fan The issue is much more complex in my implementation. As you saw in the JIRA, I originally want to add extra subqueryAlias between each Window. However, I hit a couple of problems caused by private def getAllWindowExprs(
plan: Window,
windowExprs: ArrayBuffer[NamedExpression]): (LogicalPlan, ArrayBuffer[NamedExpression]) = {
plan.child match {
case w: Window =>
getAllWindowExprs(plan.child.asInstanceOf[Window], windowExprs ++ plan.windowExpressions)
case _ => (plan.child, windowExprs ++ plan.windowExpressions)
}
}
// Replace the attributes of aliased expressions in windows expressions
// by the original expressions in Project or Aggregate
private def replaceAliasedByExpr(
projectList: Seq[NamedExpression],
windowExprs: Seq[NamedExpression]): Seq[Expression] = {
val aliasMap = AttributeMap(projectList.collect {
case a: Alias => (a.toAttribute, a.child)
})
windowExprs.map { case expr =>
expr.transformDown {
case ar: AttributeReference if aliasMap.contains(ar) => aliasMap(ar)
}
}
}
private def buildProjectListForWindow(plan: Window): (String, String, String, LogicalPlan) = {
// get all the windowExpressions from all the adjacent Window
val (child, windowExpressions) = getAllWindowExprs(plan, ArrayBuffer.empty[NamedExpression])
child match {
case p: Project =>
val newWindowExpr = replaceAliasedByExpr(p.projectList, windowExpressions)
((p.projectList ++ newWindowExpr).map(_.sql).mkString(", "), "", "", p.child)
case _: Aggregate | _ @ Filter(_, _: Aggregate) =>
val agg: Aggregate = child match {
case a: Aggregate => a
case Filter(_, a: Aggregate) => a
}
val newWindowExpr = replaceAliasedByExpr(agg.aggregateExpressions, windowExpressions)
val groupingSQL = agg.groupingExpressions.map(_.sql).mkString(", ")
val havingSQL = child match {
case a: Aggregate => ""
case Filter(condition, a: Aggregate) => "HAVING " + condition.sql
}
((agg.aggregateExpressions ++ newWindowExpr).map(_.sql).mkString(", "),
groupingSQL,
havingSQL,
agg.child)
}
}
private def windowToSQL(plan: Window): String = {
val (selectList, groupingSQL, havingSQL, nextPlan) = buildProjectListForWindow(plan)
build(
"SELECT",
selectList,
if (nextPlan == OneRowRelation) "" else "FROM",
toSQL(nextPlan),
if (groupingSQL.isEmpty) "" else "GROUP BY",
groupingSQL,
havingSQL
)
} |
So far, the test cases I wrote are listed below. I think we still need to add more to cover all the cases. I am not sure if your current implementation can pass the first one. test("window basic") {
checkHiveQl(
s"""
|select key, value,
|round(avg(value) over (), 2)
|from parquet_t1 order by key
""".stripMargin)
}
test("window with different window specification") {
checkHiveQl(
s"""
|select key, value,
|dense_rank() over (order by key, value) as dr,
|sum(value) over (partition by key order by key) as sum
|from parquet_t1
""".stripMargin)
}
test("window with the same window specification with aggregate + having") {
checkHiveQl(
s"""
|select key, value,
|sum(value) over (partition by key % 5 order by key) as dr
|from parquet_t1 group by key, value having key > 5
""".stripMargin)
}
test("window with the same window specification with aggregate functions") {
checkHiveQl(
s"""
|select key, value,
|sum(value) over (partition by key % 5 order by key) as dr
|from parquet_t1 group by key, value
""".stripMargin)
}
test("window with the same window specification with aggregate") {
checkHiveQl(
s"""
|select key, value,
|dense_rank() over (distribute by key sort by key, value) as dr,
|count(key)
|from parquet_t1 group by key, value
""".stripMargin)
}
test("window with the same window specification without aggregate and filter") {
checkHiveQl(
s"""
|select key, value,
|dense_rank() over (distribute by key sort by key, value) as dr,
|count(key) over(distribute by key sort by key, value) as ca
|from parquet_t1
""".stripMargin)
} |
Sorry, I have an early morning conference call with the patent attorneys. Will reply your response tomorrow. Thanks! |
have a good rest, we can discuss more tomorrow :) |
Thank you! Talk to you tomorrow. BTW, we also need to fix a couple of windows expressions, for example, |
Test build #52545 has finished for PR 11555 at commit
|
} | ||
|
||
results.toSeq | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So this method is basically a DFS search for all the outermost SubqueryAlias
operators. Maybe the following version is clearer:
def findOutermostQualifiers(input: LogicalPlan): Seq[(String, AttributeSet)] = {
input.collectFirst {
case SubqueryAlias(alias, child) => Seq(alias -> child.outputSet)
case plan => plan.children.flatMap(findOutermostQualifiers)
}.toSeq.flatten
}
Test build #52557 has finished for PR 11555 at commit
|
* Finds the outer most [[SubqueryAlias]] nodes in the input logical plan and return their alias | ||
* names and outputSet. | ||
*/ | ||
private def findOutermostQualifiers(input: LogicalPlan): Seq[(String, AttributeSet)] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have another alternative. We are facing the same issue everywhere when we add an extra Qualifier or remove an extra Qualifier. How about adding another rule/batch below the existing Batch("Canonicalizer") For example,
Batch("Replace Qualifier", Once,
ReplaceQualifier)
The rule is simple. We always can get the qualifier from the inputSet if we are doing in bottom up traversal. I did not do a full test last night. Below is the code draft:
object ReplaceQualifier extends Rule[LogicalPlan] {
override def apply(tree: LogicalPlan): LogicalPlan = tree transformUp { case plan =>
plan transformExpressions {
case e: AttributeReference => e.withQualifiers(getQualifier(plan.inputSet, e))
}
}
private def getQualifier(inputSet: AttributeSet, e: AttributeReference): Seq[String] = {
inputSet.collectFirst {
case a if a.semanticEquals(e) => a.qualifiers
}.getOrElse(Seq.empty[String])
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, I like this one :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is really a good idea! thanks, updated.
Test build #52558 has finished for PR 11555 at commit
|
Test build #52560 has finished for PR 11555 at commit
|
Test build #52566 has finished for PR 11555 at commit
|
Test build #52571 has finished for PR 11555 at commit
|
Test build #52569 has finished for PR 11555 at commit
|
retest this please |
Test build #52616 has finished for PR 11555 at commit
|
Test build #52626 has finished for PR 11555 at commit
|
Test build #52649 has finished for PR 11555 at commit
|
@@ -499,12 +520,25 @@ case class CumeDist() extends RowNumberLike with SizeBasedWindowFunction { | |||
case class NTile(buckets: Expression) extends RowNumberLike with SizeBasedWindowFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I missed, where is the method of sql
for NTile?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I see. Thanks!
Thanks @cloud-fan for working on it! Overall, it looks good. It will be great to have more test cases. Like the following
Also, maybe we are missing some window functions (like |
Test build #52718 has finished for PR 11555 at commit
|
Test build #52720 has finished for PR 11555 at commit
|
Test build #52728 has finished for PR 11555 at commit
|
@dilipbiswal and I just had an offline discussion about this. Sorry, to mention this at the last minute. Adding extra subqueries could be a big issue if the column names are the same but the original qualifier are different. For example, we can join two tables which have the same column names. Normally, we use different qualifier names to differentiate them. Now, if we just replace it by the identical subquery name, they will lose the original qualifiers. Then, the generated SQL statement will be rejected by the Analyzer due to name ambiguity. We are facing this issue in multiple SQL generation cases. Please correct us if our understanding is wrong. Thanks! @cloud-fan @liancheng |
BTW, we are having another related discussion in the JIRA: https://issues.apache.org/jira/browse/SPARK-13393. Not sure if you are interested in this. Please feel free to jump in, if you have better ideas. Thanks! |
I think it's not true. Every added subquery will have a unique name, so we won't have same qualifiers from left and right child of a |
For example, given the following sub-plan:
Assuming we still have multiple operators above this sub-plan and these operators are using both |
@gatorsmile , can you give a more detailed example? where does the |
For example, the following query sqlContext.range(10).select('id as 'key, 'id as 'value).write.saveAsTable("test1")
sqlContext.range(10).select('id as 'key, 'id as 'value).write.saveAsTable("test2")
sql("SELECT sum(a.value) over (ORDER BY a.key), sum(b.value) over (ORDER BY b.key) FROM test1 a JOIN test2 b ON a.key = b.key").explain(true) The plan will be like
|
Ah, makes sense, thanks for the explanation! |
hmmm, it's not quite related to SPARK-13720, but a fundamental bug of the SQL builder infrastructure. How about we merge this PR first and fix it later? |
Yeah, this is a fundamental issue. I am afraid we are unable to add any extra subqueries for SQL generation. I will check whether SQL generation in traditional RDBMS is also using subqueries. Will post the answer I got in this PR. BTW, I am fine to merge this at first. Thank you! |
retest this please |
Test build #52813 has finished for PR 11555 at commit
|
When adding an extra Subquery, we always detect if duplicate names exist. If found one, how about adding another Project with unique Alias names for the columns with duplicate names? BTW, I am still waiting for the inputs from RDBMS experts. Will keep you posted. cc @ioana-delaney Thanks! |
The generated alias names will have different column names. To keep the original column names, we need another top Project to convert their names back. |
Got the offline inputs from @ioana-delaney.
Basically, I think we can safely merge this PR. Fix the naming ambiguity issues in a separate PR. Thanks! |
I'm going to merge this PR as it blocks my next work. cc @liancheng I'll address your comments in follow-up PRs if you have any. |
…ctions ## What changes were proposed in this pull request? Add SQL generation support for window functions. The idea is simple, just treat `Window` operator like `Project`, i.e. add subquery to its child when necessary, generate a `SELECT ... FROM ...` SQL string, implement `sql` method for window related expressions, e.g. `WindowSpecDefinition`, `WindowFrame`, etc. This PR also fixed SPARK-13720 by improving the process of adding extra `SubqueryAlias`(the `RecoverScopingInfo` rule). Before this PR, we update the qualifiers in project list while adding the subquery. However, this is incomplete as we need to update qualifiers in all ancestors that refer attributes here. In this PR, we split `RecoverScopingInfo` into 2 rules: `AddSubQuery` and `UpdateQualifier`. `AddSubQuery` only add subquery if necessary, and `UpdateQualifier` will re-propagate and update qualifiers bottom up. Ideally we should put the bug fix part in an individual PR, but this bug also blocks the window stuff, so I put them together here. Many thanks to gatorsmile for the initial discussion and test cases! ## How was this patch tested? new tests in `LogicalPlanToSQLSuite` Author: Wenchen Fan <wenchen@databricks.com> Closes apache#11555 from cloud-fan/window.
Fix the compilation failure introduced by apache#11555 because of a merge conflict. Author: Wenchen Fan <wenchen@databricks.com> Closes apache#11648 from cloud-fan/hotbug.
What changes were proposed in this pull request?
Add SQL generation support for window functions. The idea is simple, just treat
Window
operator likeProject
, i.e. add subquery to its child when necessary, generate aSELECT ... FROM ...
SQL string, implementsql
method for window related expressions, e.g.WindowSpecDefinition
,WindowFrame
, etc.This PR also fixed SPARK-13720 by improving the process of adding extra
SubqueryAlias
(theRecoverScopingInfo
rule). Before this PR, we update the qualifiers in project list while adding the subquery. However, this is incomplete as we need to update qualifiers in all ancestors that refer attributes here. In this PR, we splitRecoverScopingInfo
into 2 rules:AddSubQuery
andUpdateQualifier
.AddSubQuery
only add subquery if necessary, andUpdateQualifier
will re-propagate and update qualifiers bottom up.Ideally we should put the bug fix part in an individual PR, but this bug also blocks the window stuff, so I put them together here.
Many thanks to @gatorsmile for the initial discussion and test cases!
How was this patch tested?
new tests in
LogicalPlanToSQLSuite