Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-41805][SQL] Reuse expressions in WindowSpecDefinition #39333

Closed
wants to merge 4 commits into from

Conversation

wankunde
Copy link
Contributor

@wankunde wankunde commented Jan 1, 2023

What changes were proposed in this pull request?

For complex expressions in window spec definition, we extract it and replace it with an AttributeReference (with an internal column name, e.g. "_w0").
We can reuse the extracted expressions to avoid generator unnecessary Window node.

For example, query:

SELECT max(value) over(partition by (key) order by cast(ts as timestamp)) as w1,
       min(value) over(partition by (key) order by cast(ts as timestamp)) as w2
FROM t1

before this PR:

Project [w1#229L, w2#230L]
+- Project [value#232L, key#231L, _w2#236, _w3#237, w1#229L, w2#230L, w1#229L, w2#230L]
   +- Window [min(value#232L) windowspecdefinition(key#231L, _w3#237 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS w2#230L], [key#231L], [_w3#237 ASC NULLS FIRST]
      +- Window [max(value#232L) windowspecdefinition(key#231L, _w2#236 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS w1#229L], [key#231L], [_w2#236 ASC NULLS FIRST]
         +- Project [value#232L, key#231L, cast(ts#233L as timestamp) AS _w2#236, cast(ts#233L as timestamp) AS _w3#237]
            +- SubqueryAlias spark_catalog.default.t1
               +- Relation spark_catalog.default.t1[key#231L,value#232L,ts#233L] parquet

after this PR:

Project [w1#229L, w2#230L]
+- Project [value#232L, key#231L, _w2#236, w1#229L, w2#230L, w1#229L, w2#230L]
   +- Window [max(value#232L) windowspecdefinition(key#231L, _w2#236 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS w1#229L, min(value#232L) windowspecdefinition(key#231L, _w2#236 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS w2#230L], [key#231L], [_w2#236 ASC NULLS FIRST]
      +- Project [value#232L, key#231L, cast(ts#233L as timestamp) AS _w2#236]
         +- SubqueryAlias spark_catalog.default.t1
            +- Relation spark_catalog.default.t1[key#231L,value#232L,ts#233L] parquet

Why are the changes needed?

Optimize window expressions

Does this PR introduce any user-facing change?

No

How was this patch tested?

Added UT

@github-actions github-actions bot added the SQL label Jan 1, 2023
@wankunde wankunde changed the title [SPARK-41805] Reuse expressions in WindowSpecDefinition [SPARK-41805][SQL] Reuse expressions in WindowSpecDefinition Jan 2, 2023
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@wankunde
Copy link
Contributor Author

wankunde commented Jan 3, 2023

Hi, @wangyum @cloud-fan Could you help to review this PR? Thanks

@@ -3230,15 +3230,15 @@ class Analyzer(override val catalogManager: CatalogManager)
// For example, when we have col1 - Sum(col2 + col3) OVER (PARTITION BY col4 ORDER BY col5),
// we need to make sure that col1 to col5 are all projected from the child of the Window
// operator.
val extractedExprBuffer = new ArrayBuffer[NamedExpression]()
val extractedExprBufferMap = mutable.LinkedHashMap.empty[Expression, NamedExpression]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
val extractedExprBufferMap = mutable.LinkedHashMap.empty[Expression, NamedExpression]
val extractedExprMap = mutable.LinkedHashMap.empty[Expression, NamedExpression]

@@ -3230,15 +3230,15 @@ class Analyzer(override val catalogManager: CatalogManager)
// For example, when we have col1 - Sum(col2 + col3) OVER (PARTITION BY col4 ORDER BY col5),
// we need to make sure that col1 to col5 are all projected from the child of the Window
// operator.
val extractedExprBuffer = new ArrayBuffer[NamedExpression]()
val extractedExprBufferMap = mutable.LinkedHashMap.empty[Expression, NamedExpression]
def extractExpr(expr: Expression): Expression = expr match {
case ne: NamedExpression =>
// If a named expression is not in regularExpressions, add it to
// extractedExprBuffer and replace it with an AttributeReference.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

l

Suggested change
// extractedExprBuffer and replace it with an AttributeReference.
// extractedExprMap and replace it with an AttributeReference.

if (missingExpr.nonEmpty) {
extractedExprBuffer += ne
extractedExprBufferMap += ne -> ne
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
extractedExprBufferMap += ne -> ne
extractedExprBufferMap += ne.canonicalized -> ne

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

two expression may have the same semantic even if look differently, like a + 1 and 1 + a. Let's make sure the map key is canonicalizd expression

val withName = Alias(e, s"_w${extractedExprBuffer.length}")()
extractedExprBuffer += withName
withName.toAttribute
extractedExprBufferMap.getOrElseUpdate(e,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
extractedExprBufferMap.getOrElseUpdate(e,
extractedExprBufferMap.getOrElseUpdate(e.canonicalizied,

InputAdapter
Exchange [_w1,_w2] #1
WholeStageCodegen (5)
HashAggregate [i_category,i_class,spark_grouping_id,sum,sum] [sum(UnscaledValue(ss_net_profit)),sum(UnscaledValue(ss_ext_sales_price)),gross_margin,lochierarchy,_w1,_w2,_w3,sum,sum]
HashAggregate [i_category,i_class,spark_grouping_id,sum,sum] [sum(UnscaledValue(ss_net_profit)),sum(UnscaledValue(ss_ext_sales_price)),gross_margin,lochierarchy,_w0,_w1,_w2,sum,sum]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know why the name becomes w0, w1, w2?

Copy link
Contributor Author

@wankunde wankunde Jan 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • For window expr in q36:
rank()
  OVER (
    PARTITION BY grouping(i_category) + grouping(i_class),
      CASE WHEN grouping(i_class) = 0
        THEN i_category END
    ORDER BY sum(ss_net_profit) / sum(ss_ext_sales_price) ASC) AS rank_within_parent

will be parsed to

{WindowExpression@13158} rank() windowspecdefinition((cast((shiftright(spark_grouping_id#117L, 1) & 1) as tinyint) AS grouping(i_category)#122 + cast((shiftright(spark_grouping_id#117L, 0) & 1) as tinyint) AS grouping(i_class)#123), CASE WHEN (cast(cast((shiftright(spark_grouping_id#117L, 0) & 1) as tinyint) AS grouping(i_class)#124 as int) = 0) THEN i_category#118 END, (sum(ss_net_profit#26) / sum(ss_ext_sales_price#19)) ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$()))
 windowFunction = {Rank@9579} rank()
 windowSpec = {WindowSpecDefinition@13160} windowspecdefinition((cast((shiftright(spark_grouping_id#117L, 1) & 1) as tinyint) AS grouping(i_category)#122 + cast((shiftright(spark_grouping_id#117L, 0) & 1) as tinyint) AS grouping(i_class)#123), CASE WHEN (cast(cast((shiftright(spark_grouping_id#117L, 0) & 1) as tinyint) AS grouping(i_class)#124 as int) = 0) THEN i_category#118 END, (sum(ss_net_profit#26) / sum(ss_ext_sales_price#19)) ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$()))
  • Rule ResolveWindowOrder will add the orderSpec to rank function
  object ResolveWindowOrder extends Rule[LogicalPlan] {
    def apply(plan: LogicalPlan): LogicalPlan = plan.resolveExpressionsWithPruning(
      _.containsPattern(WINDOW_EXPRESSION), ruleId) {
      case WindowExpression(wf: WindowFunction, spec) if spec.orderSpec.isEmpty =>
        throw QueryCompilationErrors.windowFunctionWithWindowFrameNotOrderedError(wf)
      case WindowExpression(rank: RankLike, spec) if spec.resolved =>
        val order = spec.orderSpec.map(_.child)
        WindowExpression(rank.withOrder(order), spec)
    }
  }

so the window expression will be:

{WindowExpression@13219} rank((sum(ss_net_profit#26) / sum(ss_ext_sales_price#19))) windowspecdefinition((cast((shiftright(spark_grouping_id#117L, 1) & 1) as tinyint) AS grouping(i_category)#122 + cast((shiftright(spark_grouping_id#117L, 0) & 1) as tinyint) AS grouping(i_class)#123), CASE WHEN (cast(cast((shiftright(spark_grouping_id#117L, 0) & 1) as tinyint) AS grouping(i_class)#124 as int) = 0) THEN i_category#118 END, (sum(ss_net_profit#26) / sum(ss_ext_sales_price#19)) ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$()))
 windowFunction = {Rank@13049} rank((sum(ss_net_profit#26) / sum(ss_ext_sales_price#19)))
 windowSpec = {WindowSpecDefinition@13221} windowspecdefinition((cast((shiftright(spark_grouping_id#117L, 1) & 1) as tinyint) AS grouping(i_category)#122 + cast((shiftright(spark_grouping_id#117L, 0) & 1) as tinyint) AS grouping(i_class)#123), CASE WHEN (cast(cast((shiftright(spark_grouping_id#117L, 0) & 1) as tinyint) AS grouping(i_class)#124 as int) = 0) THEN i_category#118 END, (sum(ss_net_profit#26) / sum(ss_ext_sales_price#19)) ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$()))
  • Before this PR, rule ExtractWindowExpressions will extract the child expr (sum(ss_net_profit#26) / sum(ss_ext_sales_price#19)) in WindowFunction Rank() to _w0 and extract the orderSpec expr (sum(ss_net_profit#26) / sum(ss_ext_sales_price#19)) in WindowSpecDefinition to _w3 again.
  • After this PR, we will reuse the extract expr, and change _w3 to _w0 in q36 result.

I don't really understand why we need add orderSpec to Rank function. Could we remove

case WindowExpression(rank: RankLike, spec) if spec.resolved =>
        val order = spec.orderSpec.map(_.child)
        WindowExpression(rank.withOrder(order), spec)

from ResolveWindowOrder in the following PR?

@cloud-fan cloud-fan closed this in 6f31566 Jan 9, 2023
@cloud-fan
Copy link
Contributor

thanks, merging to master!

wangyum pushed a commit that referenced this pull request May 26, 2023
For complex expressions in window spec definition, we extract it and replace it with an AttributeReference (with an internal column name, e.g. "_w0").
We can reuse the extracted expressions to avoid generator unnecessary Window node.

For example, query:
```
SELECT max(value) over(partition by (key) order by cast(ts as timestamp)) as w1,
       min(value) over(partition by (key) order by cast(ts as timestamp)) as w2
FROM t1
```

before this PR:
```
Project [w1#229L, w2#230L]
+- Project [value#232L, key#231L, _w2#236, _w3#237, w1#229L, w2#230L, w1#229L, w2#230L]
   +- Window [min(value#232L) windowspecdefinition(key#231L, _w3#237 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS w2#230L], [key#231L], [_w3#237 ASC NULLS FIRST]
      +- Window [max(value#232L) windowspecdefinition(key#231L, _w2#236 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS w1#229L], [key#231L], [_w2#236 ASC NULLS FIRST]
         +- Project [value#232L, key#231L, cast(ts#233L as timestamp) AS _w2#236, cast(ts#233L as timestamp) AS _w3#237]
            +- SubqueryAlias spark_catalog.default.t1
               +- Relation spark_catalog.default.t1[key#231L,value#232L,ts#233L] parquet
```

after this PR:
```
Project [w1#229L, w2#230L]
+- Project [value#232L, key#231L, _w2#236, w1#229L, w2#230L, w1#229L, w2#230L]
   +- Window [max(value#232L) windowspecdefinition(key#231L, _w2#236 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS w1#229L, min(value#232L) windowspecdefinition(key#231L, _w2#236 ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS w2#230L], [key#231L], [_w2#236 ASC NULLS FIRST]
      +- Project [value#232L, key#231L, cast(ts#233L as timestamp) AS _w2#236]
         +- SubqueryAlias spark_catalog.default.t1
            +- Relation spark_catalog.default.t1[key#231L,value#232L,ts#233L] parquet
```

Optimize window expressions

No

Added UT

Closes #39333 from wankunde/extract_window.

Authored-by: Kun Wan <wankun@apache.org>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
3 participants