Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-32616][SQL] Window operators should be added determinedly #29432

Closed
wants to merge 1 commit into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Aug 14, 2020

What changes were proposed in this pull request?

Use the LinkedHashMap instead of immutable.Map to hold the Window expressions in ExtractWindowExpressions.addWindow.

Why are the changes needed?

This is a bug fix for #29270. In that PR, the generated plan(especially for the queries q47, q49, q57) on Jenkins always can not match the golden plan generated on my laptop.

It happens because ExtractWindowExpressions.addWindow now uses immutable.Map to hold the Window expressions by the key (spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr)) and converts the map to Seq at the end. Then, the Seq is used to add Window operators on top of the child plan. However, for the same query, the order of Windows expression inside the Seq could be undetermined when the expression id changes(which can affect the key). As a result, the same query could have different plans because of the undetermined order of Window operators.

Therefore, we use LinkedHashMap, which records the insertion order of entries, to make the adding order determined.

Does this PR introduce any user-facing change?

Maybe yes, users now always see the same plan for the same queries with multiple Window operators.

How was this patch tested?

It's really hard to make a reproduce demo. I just tested manually with #29270 and it looks good.

@Ngone51
Copy link
Member Author

Ngone51 commented Aug 14, 2020

cc @cloud-fan @maropu

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch!

@maropu
Copy link
Member

maropu commented Aug 14, 2020

oh.. good catch! LGTM, pending Jenkins.

@cloud-fan
Copy link
Contributor

github action passes, merging to master!

@cloud-fan cloud-fan closed this in c6be207 Aug 14, 2020
@Ngone51
Copy link
Member Author

Ngone51 commented Aug 14, 2020

thanks all!

@maropu
Copy link
Member

maropu commented Aug 14, 2020

We don't need to backport this into branch-3.0?

// Second, we group extractedWindowExprBuffer based on their Partition and Order Specs.
val groupedWindowExpressions = extractedWindowExprBuffer.groupBy { expr =>
val distinctWindowSpec = expr.collect {
case window: WindowExpression => window.windowSpec
}.distinct
// We do a final check and see if we only have a single Window Spec defined in an
// expressions.
if (distinctWindowSpec.isEmpty) {
failAnalysis(s"$expr does not have any WindowExpression.")
} else if (distinctWindowSpec.length > 1) {
// newExpressionsWithWindowFunctions only have expressions with a single
// WindowExpression. If we reach here, we have a bug.
failAnalysis(s"$expr has multiple Window Specifications ($distinctWindowSpec)." +
s"Please file a bug report with this error message, stack trace, and the query.")
} else {
val spec = distinctWindowSpec.head
(spec.partitionSpec, spec.orderSpec, WindowFunctionType.functionType(expr))
}
}.toSeq

@cloud-fan
Copy link
Contributor

It's not a bug. The query can still run and return correct result.

@maropu
Copy link
Member

maropu commented Aug 14, 2020

okay, I updated the type in the jira (bug -> improvement).

@SparkQA
Copy link

SparkQA commented Aug 14, 2020

Test build #127453 has finished for PR 29432 at commit 42e5a8e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@cxzl25
Copy link
Contributor

cxzl25 commented Jul 13, 2021

Recently encountered a similar problem in the production environment, the calculation order of the window is random, sometimes it only takes 1-2 minutes to run, sometimes it takes 1-2 hours to run, or it may fail due to OOM.

If COUNT(vid) OVER is calculated first, and then SIZE(COLLECT_SET(vid) OVER is calculated, it executes very quickly.
If COLLECT_SET(vid) OVER is calculated first, then COUNT(vid) OVER, and then size(), it will take 1-2 hours to execute.
Use SIZE(COLLECT_SET(vid) OVER to support COUNT(DISTINCT) OVER.
COLLECT_SET will generate a lot of data, and then sort, it is easy to fail at this time.

Using this Pacth, I can ensure the order of the windows and ensure the speed of calculation.

CREATE TABLE XXX (
  `d` STRING ,
  `indexs` STRING,
  `vid` STRING );

select distinct indexs, d, dd, pd, diffday
, count(vid) over(PARTITION BY indexs, d, diffday) as c2
, size(collect_set(vid) OVER (PARTITION BY indexs, d)) AS c1
from
(
    select indexs, vid, d, a.dd, p.pd, datediff(a.d, p.pd) as diffday
    from
    (
        select indexs, vid, d
            , collect_set(d) over(partition by indexs, vid order by d asc rows between unbounded preceding and 1 preceding) as dd
        from XXX
    ) a
    lateral view outer explode(dd) p as pd
) a;

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants