Skip to content

Commit

Permalink
[KYUUBI #5580] Support generate and window operators for InferRebalan…
Browse files Browse the repository at this point in the history
…ceAndSortOrders

### _Why are the changes needed?_

Support `generate` and `window` operators for `InferRebalanceAndSortOrders`.

### _How was this patch tested?_
- [X] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/master/contributing/code/testing.html#running-tests) locally before make a pull request

### _Was this patch authored or co-authored using generative AI tooling?_

No

Closes #5580 from wForget/dev.

Closes #5580

1a3bfad [wforget] fix
be74aac [wforget] fix
587c493 [wforget] Support generate and window operators for InferRebalanceAndSortOrders

Authored-by: wforget <643348094@qq.com>
Signed-off-by: ulyssesyou <ulyssesyou@apache.org>
  • Loading branch information
wForget authored and ulysses-you committed Nov 6, 2023
1 parent 97fd5b7 commit fdacd23
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.annotation.tailrec
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeSet, Expression, NamedExpression, UnaryExpression}
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner, LeftAnti, LeftOuter, LeftSemi, RightOuter}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan, Project, Sort, SubqueryAlias, View}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, Generate, LogicalPlan, Project, Sort, SubqueryAlias, View, Window}

/**
* Infer the columns for Rebalance and Sort to improve the compression ratio.
Expand Down Expand Up @@ -96,6 +96,12 @@ object InferRebalanceAndSortOrders {
case f: Filter => candidateKeys(f.child, output)
case s: SubqueryAlias => candidateKeys(s.child, output)
case v: View => candidateKeys(v.child, output)
case g: Generate => candidateKeys(g.child, AttributeSet(g.requiredChildOutput))
case w: Window =>
val aliasMap = getAliasMap(w.windowExpressions)
Some((
w.partitionSpec.map(p => aliasMap.getOrElse(p.canonicalized, p)),
w.orderSpec.map(_.child).map(o => aliasMap.getOrElse(o.canonicalized, o))))

case _ => None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,9 +199,10 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
}

withView("v") {
withTable("t", "input1", "input2") {
withTable("t", "t2", "input1", "input2") {
withSQLConf(KyuubiSQLConf.INFER_REBALANCE_AND_SORT_ORDERS.key -> "true") {
sql(s"CREATE TABLE t (c1 int, c2 long) USING PARQUET PARTITIONED BY (p string)")
sql(s"CREATE TABLE t2 (c1 int, c2 long, c3 long) USING PARQUET PARTITIONED BY (p string)")
sql(s"CREATE TABLE input1 USING PARQUET AS SELECT * FROM VALUES(1,2),(1,3)")
sql(s"CREATE TABLE input2 USING PARQUET AS SELECT * FROM VALUES(1,3),(1,3)")
sql(s"CREATE VIEW v as SELECT col1, count(*) as col2 FROM input1 GROUP BY col1")
Expand Down Expand Up @@ -264,6 +265,30 @@ class RebalanceBeforeWritingSuite extends KyuubiSparkSQLExtensionTest {
|SELECT * FROM v
|""".stripMargin)
checkShuffleAndSort(df5.queryExecution.analyzed, 1, 1)

// generate
val df6 = sql(
s"""
|INSERT INTO TABLE t2 PARTITION(p='a')
|SELECT /*+ broadcast(input2) */ input1.col1, input2.col1, cast(cc.action1 as bigint)
|FROM input1
|JOIN input2
|ON input1.col1 = input2.col1
| lateral view explode(ARRAY(input1.col1, input1.col2)) cc as action1
|""".stripMargin)
checkShuffleAndSort(df6.queryExecution.analyzed, 1, 1)

// window
val df7 = sql(
s"""
|INSERT INTO TABLE t2 PARTITION(p='a')
|SELECT /*+ broadcast(input2) */ input1.col1, input2.col2,
| RANK() OVER (PARTITION BY input2.col2 ORDER BY input1.col1) AS rank
|FROM input1
|JOIN input2
|ON input1.col1 = input2.col1
|""".stripMargin)
checkShuffleAndSort(df7.queryExecution.analyzed, 1, 1)
}
}
}
Expand Down

0 comments on commit fdacd23

Please sign in to comment.