New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-45959][SQL] Improving performance when addition of 1 column at a time causes increase in the LogicalPlan tree depth #43854
base: master
Are you sure you want to change the base?
Conversation
…ject. modified the cache manager to take care of intermediate filters
…yRelation not being used due to filter chain case not handled. added new tests
@attilapiros @peter-toth . pls review.. have added details to the PR |
…orrectly. Fixed the PR related to the uncaching issues found in testing
…orrectly. Fixed the PR related to the uncaching issues found in testing
…ache invalidation among independent and dependent dataframes
what's the target use case of this improvement? Super long SQL statement or super long DataFrame transformation chain? |
@cloud-fan |
This is a well-known issue. The suggested fix is to ask users to not chain transformations too much, and use "batch" like APIs such as How does this PR fix the issue without the problem mentioned in 23d9822 ? |
Caching issue is fixed in this PR.
That was the complex part.
It will not miss any cache.
I have described the approach in PR description.
And as I mentioned it makes cache lookup code much robust as described in
other bug I filed.
…On Mon, Apr 8, 2024, 12:22 AM Wenchen Fan ***@***.***> wrote:
This is a well-known issue. The suggested fix is to ask users to not chain
transformations too much, and use "batch" like APIs such as
Dataset#withColumns.
How does this PR fix the issue without the problem mentioned in 23d9822
<23d9822>
?
—
Reply to this email directly, view it on GitHub
<#43854 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2ED66ZCKM7MGK44MHLY4JAUJAVCNFSM6AAAAAA7O7DTR6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBSGAZTKNRTHE>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
I understand that suggestion is to not use api to add single column.. but I
have come across many companies which generate dataframes via some loop
logic. In my previous works I have seen query plans containing 40million
plus project nodes only ( not counting filters joins windows etc).
There are other customers who are now seeing query compilation times
increased from 3 mins to 2 plus hours, due to de dup relation rule or plan
cloning at every stage.
…On Mon, Apr 8, 2024, 12:48 AM Asif Shahid ***@***.***> wrote:
Caching issue is fixed in this PR.
That was the complex part.
It will not miss any cache.
I have described the approach in PR description.
And as I mentioned it makes cache lookup code much robust as described in
other bug I filed.
On Mon, Apr 8, 2024, 12:22 AM Wenchen Fan ***@***.***>
wrote:
> This is a well-known issue. The suggested fix is to ask users to not
> chain transformations too much, and use "batch" like APIs such as
> Dataset#withColumns.
>
> How does this PR fix the issue without the problem mentioned in 23d9822
> <23d9822>
> ?
>
> —
> Reply to this email directly, view it on GitHub
> <#43854 (comment)>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AC6XG2ED66ZCKM7MGK44MHLY4JAUJAVCNFSM6AAAAAA7O7DTR6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBSGAZTKNRTHE>
> .
> You are receiving this because you authored the thread.Message ID:
> ***@***.***>
>
|
Oh, the idea to make cache lookup smarter looks promising. Shall we create an individual PR for it? It's useful for SQL queries as well, as we can hit the cache if one SELECT query only has a few more columns than another cached SELECT query. |
Sure..but I cannot split cache lookup code from this PR as otherwise tests
will fail right left.
The robustness of cache lookup is just a side effect of this fix.
Regards
Asif
…On Mon, Apr 8, 2024, 1:45 AM Wenchen Fan ***@***.***> wrote:
Oh, the idea to make cache lookup smarter looks promising. Shall we create
an individual PR for it? It's useful for SQL queries as well, as we can hit
the cache if one SELECT query only has a few more columns than another
cached SELECT query.
—
Reply to this email directly, view it on GitHub
<#43854 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AC6XG2AM5BMXGKQK4FRLQZ3Y4JKJJAVCNFSM6AAAAAA7O7DTR6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBSGE4TIOBZGI>
.
You are receiving this because you authored the thread.Message ID:
***@***.***>
|
Oh I see.. you mean first use cache lookup pr then this pr goes in
…On Mon, Apr 8, 2024, 1:47 AM Asif Shahid ***@***.***> wrote:
Sure..but I cannot split cache lookup code from this PR as otherwise tests
will fail right left.
The robustness of cache lookup is just a side effect of this fix.
Regards
Asif
On Mon, Apr 8, 2024, 1:45 AM Wenchen Fan ***@***.***> wrote:
> Oh, the idea to make cache lookup smarter looks promising. Shall we
> create an individual PR for it? It's useful for SQL queries as well, as we
> can hit the cache if one SELECT query only has a few more columns than
> another cached SELECT query.
>
> —
> Reply to this email directly, view it on GitHub
> <#43854 (comment)>, or
> unsubscribe
> <https://github.com/notifications/unsubscribe-auth/AC6XG2AM5BMXGKQK4FRLQZ3Y4JKJJAVCNFSM6AAAAAA7O7DTR6VHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMZDANBSGE4TIOBZGI>
> .
> You are receiving this because you authored the thread.Message ID:
> ***@***.***>
>
|
What changes were proposed in this pull request?
This PR attempts to keep the depth of the LogicalPlan tree unchanged, when columns are added /dropped/renamed .
This is done via a special new rule called EarlyCollapseProject.
This is applied after the analysis has happened, but the analyzed plan is yet to be assigned to the holder variable in QueryExecution / DataFrame.
EarlyCollapseProject code does the following :
If the incoming plan is a project1 -> Project2 - X , then it collapses the two projects into one. such that the final plan looks like Project -> X
If the incoming plan is of the form Project1 -Filter1 - Filter2 ---FilterN - Project 2 - X, then it collapses the Project1 and Project2 as
Project -> Filter1 - Filter2 -- FilterN - X.
Pls note that in this case it is as if Project2 is pulled up for collapse, rather than vice versa.
The reason for this is that it is possible that Project1 has some behaviour ( like a UDF) which is not capable of handling certain data , which otherwise would be filtered by the filter chain. If project1 was pushed below Filter chain, then the unfiltered rows can cause issues. Existing spark tests on UDF have test which is sensitive to this.
The EarlyCollapseProject IS NOT applied if
a) any of the incoming Project nodes ( Project1 or Project2) have tag LogicalPlan.PLAN_ID_TAG set ( which implies it is coming from a spark client. It is not handled as of now, because for clients the subsequent resolutions are tied to a direct mapping of the tag ID associated with each Project, and removal of any Project via collapse , breaks the code)
b) If the incoming Project2 has any UserDefinedExpression or non deterministic expression. Or If Project2's child is a Window node.
The reason for non-deterministic exclusion is to ensure the functionality is not broken, as collapse project will replicate the non-deterministic expression. Similarly for UserDefinedExpression and Window node below Project2, the collapse is avoided, as it can cause replication & hence re-evaluation of expensive code when collapsed.
This is needed so as to resurrect dropped attributes, if needed arises, so as to do the resolution correctly.
The dropped attributes are stored in a Seq using the tag LogicalPlan.DROPPED_NAMED_EXPRESSIONS
The need for this arises in following situations:
say we start with a DataFrame df1 , with plan as Project2 ( a, b, c) - X
then we create a new DataFrame df2 = df1.select( b, c). Here, attribute a is dropped
Because of the EarlyCollapseProject rule, the new DataFrame df2 , will have logical plan as Project(b, c) - X
Now spark allows a DataFrame df3 = df2.filter( a > 7).
which would result in a LogicalPlan as Filter( a > 7) -> Project(b, c) -> X
But because "a" has been dropped , its resolution is no longer possible.
To retain the existing behaviour and hence resolution, the Project(b, c) contains the dropped the NamedExpression "a" , and hence this can be revived as last effort for resolution.
ColumnResolutionHelper code change:
The reviving of dropped attributes for plan resolution is done via the code change in ColumnResolutionHelper.resolveExprsAndAddMissingAttrs, where the dropped attributes stored in the Tag are revived back for resolution.
Code changes in CacheManager
The major code change is in CacheManager.
Previously as any change in projection ( addition, drop, rename, shuffling) resulted in a new Project , it was straightforward to lookup cache for fragment logical plan match, as the plan cached would always match a query subtree , if the subtree is used in an immutable form to build complete query tree. But since in this PR , a new Project is collapsed with the existing Project, the subtree is no longer the same as what was cached. Apart from that, the presence of filters between two projects, also muddles the situation.
Case 1: using InMemoryRelation in a plan resulting from collapse of 2 consecutive Projects.
We start with a DataFrame df1 with plan = Project2 -> X
and then we cache this df1. So that the CachedRepresentation has ( IMR and the logical Plan as Project2 -> X)
Now we create a new data frame Df2 = df1.select ( some Proj) , which due to early collapse would look like
Project -> X
Clearly Project may no longer be same as Project2, so a direct check with CacheManager will not result in matching IMR.
But clearly X are same .
So the criteria is : an IMR can be used IFF following conditions are met
To do the check for above point 2, we consider following logic
Now given that X for both are same, which means their outputs are equivalent, so we remap the cached plan's Project2 in terms of output attribute ( Expr IDs) of X of incoming Project Plan
This will help us find out following
So so long as above # 4 types of NamedExpressions are empty, it means that InMemoryRelation of the CachedPlan is usable.
and this above logic is coded in CacheManager. The logic involves modifying the NamedExpressions in incoming Project, in terms of the Seq[Attributes] which will be forced on the IMR.
Case 2: using InMemoryRelation in a plan resulting from collapse of Projects interspersed with Filters.
We start with a DataFrame df1 with plan = Filter3 -> Filter4 -> Project2 -> X
and then we cache this df1. So that the CachedRepresentation has ( IMR and the logical Plan as
Filter3 -> Filter4 -> Project2 -> X )
Now we create a new data frame Df2 = df1.filter( f2) .filter(f1).select (some Proj) , which due to early collapse would look like
Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
( this is because in case of filters, Project2 would be pulled up for collapse)
Clearly here the cached plan chain
Filter3 -> Filter4 -> Project2 -> X
is no longer directly similar to
Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
But it is still possible to use IMR as actually the cached plan's LogicalPlan can be used as a subtree of the incoming Plan.
The logic for such check is partly the same as above for 2 consecutive Projects, with some handling for filters.
The algo for this is as follows
For incoming Plan, we reach X, and store all the consecutive Filter Chain.
For the Cached Plan, we identify the first encountered Project , which is Project 2, and its child which is X.
so we have X from both incoming and cached plan, and we identify the incoming project "Project" and the CachedPlan's "Project2".
Now we can apply the Rule of case 1 of two consecutive Projects, and correctly modify the NamedExpressions of incoming Project , in terms of Seq[Attributes] which will be enforced upon the IMR.
But we also need to ensure that the filter chain present in Cached Plan i.e Filter3 -> Filter4 is a subset of filter chain in the incoming Plan , which is Filter1 -> Filter2 -> Filter3 - Filter4.
Now thing to note is that
for incoming plan it is
Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
In the above chain, the filters are expressed in terms of output of X
But for cached plan the filters are expressed in terms of output of Project2.
Filter3 -> Filter4 -> Project2 -> X
So for comparison we need to express the filter chain of Cached Plan in terms of X, by pulling up the P2 above filters such that
it is now
Project2 -> Filter3' -> Filter4' -> -> X
Now we can see that if we compare
Project -> Filter1 -> Filter2 -> Filter3 - Filter4 - > X
and find that Filter3' -> Filter4' is a subset of Filter1 -> Filter2 -> Filter3 - Filter4
and as the Project and Project2 are already compatible ( by case point 1)
we can use cached IMR , with a modified Project with partial filter chain.
i.e we should be able to get a plan like
Project -> Filter1 -> Filter2 -> IMR.
Why are the changes needed?
Due to addition/mods of new rules Spark 3.0 , clients are seeing extremely large increase in query compilation time, when the client code is adding one column at a time in a loop. Even though API doc does not recommend such practice, but it happens and clients are reluctant to change the code. So this PR attempts to handle the situation where columns are added not in a single shot but one at a time . This would help in Analyzer/ Resolver rules to complete faster.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Added new tests and relying on existing tests.
Was this patch authored or co-authored using generative AI tooling?
No