-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-54496][SQL] Fix Merge Into Schema Evolution for Dataframe API #53207
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| } else { | ||
| m transformUpWithNewOutput { | ||
| case r @ DataSourceV2Relation(_: SupportsRowLevelOperations, _, _, _, _, _) => | ||
| val finalAttrMapping = ArrayBuffer.empty[(Attribute, Attribute)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a bug here, as it actually hits both the sourceTable and targetTable and tries schema evolution on both, when actually schema evolution should always be performed only for target table.
I had done it this way because of the limitation of transformUpWithNewOutput that it doesn't re-map the attribues of the top level object (MergeIntoTable). See #52866 (comment) for my finding. So I transformed all children of MergeIntoTable and assumed that the match with SupportsRowLevelOperation Table would be enough to only do schema evolution on the targetTable, but I was wrong.
So the fix is to explicitly transform on the MergeIntoTable's targetTable, and I add an extra rewriteAttrs to rewrite the top level MergeIntoTable object.
| (assignment.value.resolved && sourcePaths.exists { | ||
| path => MergeIntoTable.isEqual(assignment, path) | ||
| }) | ||
| val hasStarActions = actions.exists { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The call to canEvaluateSchemaEvolution (that guards whether the schema evolution check gets evaluated) gets called with updateStar and insertStar. This is triggered in the Dataframe API, and revealed I had missed this case. So here I return false to explicitly skip until they are resolved. Else this hits an analysis error later. This was not triggered in SQL case where the stars got resolved earlier.
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @szehon-ho . I have a few questions in this domain.
- May I ask how you proceed the testing this
MERGE INTOfeatures? - Do you think you have more area which needs to be investigated from now?
| sql(s"DROP TABLE IF EXISTS $tableNameAsString") | ||
| } | ||
|
|
||
| test("merge with schema evolution using dataframe API: add new column and set all") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for adding 1050 lines additionally.
|
Could you fix the compilation failure, @szehon-ho ? |
|
ah, i didnt rebase on my other fix, let me do it tomorrow |
|
Got it. Thank you, @szehon-ho . |
8e2996d to
b240faf
Compare
|
Thanks I fixed it @dongjoon-hyun , sorry for delay. Re: testing the overall feature for MERGE INTO of schema evolution, @aokolnychyi and @cloud-fan are testing it. In fact @aokolnychyi found this issue as per the JIRA description. Actually unrelated to this pr, @cloud-fan had a bit of concern of #53149 , as there is some ambiguity in what the user wants by UPDATE SET * if there is struct mismatch (by column or by field). So we are discussing it and thinking potentially revert it or disable this part of it for the release |
|
Ack. Thank you for sharing the updated status, @szehon-ho . |
|
build failures look infra related (free up disk space..) |
dongjoon-hyun
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (Pending CIs).
### What changes were proposed in this pull request? Some fixes to allow the Dataframe Merge API to support schema evolution. The DataFrame API is here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala#L7 The fixes are described inline. ### Why are the changes needed? The Dataframe Merge API is broken for schema evolution mode without these fixes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add unit tests. Will try to refactor later to combine test re-use. ### Was this patch authored or co-authored using generative AI tooling? No Closes #53207 from szehon-ho/merge_schema_evolution_bug. Authored-by: Szehon Ho <szehon.apache@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 9feb1b2) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
|
Merged to master/4.1 for Apache Spark 4.1.0. |
What changes were proposed in this pull request?
Some fixes to allow the Dataframe Merge API to support schema evolution. The DataFrame API is here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/classic/MergeIntoWriter.scala#L7
The fixes are described inline.
Why are the changes needed?
The Dataframe Merge API is broken for schema evolution mode without these fixes.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add unit tests. Will try to refactor later to combine test re-use.
Was this patch authored or co-authored using generative AI tooling?
No