-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32941][SQL] Optimize UpdateFields expression chain and put the rule early in Analysis phase #29812
Conversation
8662d76
to
0217130
Compare
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
This comment has been minimized.
Test build #128904 has finished for PR 29812 at commit
|
Test build #128905 has finished for PR 29812 at commit
|
GitHub Actions was passed. |
retest this please |
Test build #128918 has finished for PR 29812 at commit
|
LGTM, cc @fqaiser94 |
val newNames = mutable.ArrayBuffer.empty[String] | ||
val newValues = mutable.ArrayBuffer.empty[Expression] | ||
names.zip(values).reverse.foreach { case (name, value) => | ||
if (!newNames.contains(name)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should use resolver
here otherwise I think we will have correct-ness issues.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok.
def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions { | ||
case WithFields(structExpr, names, values) if names.distinct.length != names.length => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could this case
statement be after the next case
statement? So that we combine the chains first before deduplicating?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't run this rule just once, so the order should be fine.
newValues += value | ||
} | ||
} | ||
WithFields(structExpr, names = newNames.reverse.toSeq, valExprs = newValues.reverse.toSeq) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For my understanding, can you explain how we expect to benefit from this optimization?
I ask because we do this kind of deduplication inside of WithFields
already as part of the foldLeft
operation here. It will only keep the last valExpr
for each name
. So I think the optimized logical plan will be the same with or without this optimization in all scenarios? CMIIW
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You are right. It is eventually the same. But for some cases, before we extend WithFields
, the expression tree might be very complex. This is coming from improving scalability of #29587. This is applied during I fixed the scalability issue. I found this is useful to reduce the complex of WithFields
expression tree.
I will run these rules in #29587 to simplify expression tree before entering optimizer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, so I took a look at the PR you linked and left a related comment there. I don't think you actually need this optimization for #29587
This optimization is only useful if someone uses WithFields
to update the same field multiple times. However, it would simply be better to not update the same field multiple times. At the very least, we should not do this when we re-use this Expression internally within Spark.
Unfortunately, "bad" end-users might still update the same field multiple times. Assuming we should optimize for such users (not sure), since this batch is only applied half-way through the optimization cycle anyway, I think we could just move up the Batch("ReplaceWithFieldsExpression", Once, ReplaceWithFieldsExpression)
to get the same benefit (which is just simplified tree). What do you reckon?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I'd like to run these rules to simplify WithFields
tree early in analysis stage. During fixing scale issue of #29587, I thought that it is very likely to write bad WithFields
tree. Once hitting that, it is very hard to debug and the analyzer/optimizer spend a lot of time traversing expression tree. So I think it is very useful keep this rule to simplify the expression tree, but I don't think we want to do ReplaceWithFieldsExpression
in analysis stage.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ahh I see, yes, in the analysis stage this would likely be helpful!
Okay in that case, could this PR wait till #29795 goes in? I'm refactoring WithFields
so this optimization would need to change anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine to wait until #29795.
Test build #128955 has finished for PR 29812 at commit
|
val newNames = mutable.ArrayBuffer.empty[String] | ||
val newValues = mutable.ArrayBuffer.empty[Expression] | ||
names.zip(values).reverse.foreach { case (name, value) => | ||
if (newNames.find(resolver(_, name)).isEmpty) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a bit inefficient. Shall we build a set with lowercased names if case sensitivity is false?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a set for case-sensitive case.
Test build #128996 has finished for PR 29812 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
val optimizeUpdateFields: PartialFunction[Expression, Expression] = { | ||
case UpdateFields(structExpr, fieldOps) | ||
if fieldOps.forall(_.isInstanceOf[WithField]) && | ||
fieldOps.map(_.asInstanceOf[WithField].name.toLowerCase(Locale.ROOT)).distinct.length != |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case of case-sensitive
mode, this seems to allow unnecessarily computation. Can we improve this if
statement to handle both case-sensitive
and case-insensitive
together?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The if
condition should cover both case-sensitive and case-insensitive cases now. I compare names in lowercase in the condition.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, what I meant is that we don't need to execute line 39~69 at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For example, for case-sensitive case, fieldOps.map(_.asInstanceOf[WithField].name).distinct.length != fieldOps.length
should be used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. Updated. Thanks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM (except one comment). Could you consider it, @viirya ?
Kubernetes integration test starting |
Kubernetes integration test status success |
Test build #129964 has finished for PR 29812 at commit
|
Test build #129967 has finished for PR 29812 at commit
|
Kubernetes integration test starting |
Kubernetes integration test status success |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for updates.
Test build #129972 has finished for PR 29812 at commit
|
Merged to master. |
@viirya, BTW, do you mind fixing the PR description to explain what this PR specifically improves?
Seems like this PR does not describe what exactly optimizes. Is my understanding correct that this PR proposes two separate optimizations?
|
val newValues = mutable.ArrayBuffer.empty[Expression] | ||
|
||
if (caseSensitive) { | ||
names.zip(values).reverse.foreach { case (name, value) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could just do like: collection.immutable.ListMap(names.zip(values): _*)
which will keep the last win here and keep the order of fields to use later. But I guess it's no big deal. Just saying.
@HyukjinKwon Thanks for the suggestion. I updated this PR description. |
(late LGTM) |
Thanks @maropu |
…ined withField operations ### What changes were proposed in this pull request? Modifies the UpdateFields optimizer to fix correctness issues with certain nested and chained withField operations. Examples for recreating the issue are in the new unit tests as well as the JIRA issue. ### Why are the changes needed? Certain withField patterns can cause Exceptions or even incorrect results. It appears to be a result of the additional UpdateFields optimization added in #29812. It traverses fieldOps in reverse order to take the last one per field, but this can cause nested structs to change order which leads to mismatches between the schema and the actual data. This updates the optimization to maintain the initial ordering of nested structs to match the generated schema. ### Does this PR introduce _any_ user-facing change? It fixes exceptions and incorrect results for valid uses in the latest Spark release. ### How was this patch tested? Added new unit tests for these edge cases. Closes #32338 from Kimahriman/bug/optimize-with-fields. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…ined withField operations ### What changes were proposed in this pull request? Modifies the UpdateFields optimizer to fix correctness issues with certain nested and chained withField operations. Examples for recreating the issue are in the new unit tests as well as the JIRA issue. ### Why are the changes needed? Certain withField patterns can cause Exceptions or even incorrect results. It appears to be a result of the additional UpdateFields optimization added in #29812. It traverses fieldOps in reverse order to take the last one per field, but this can cause nested structs to change order which leads to mismatches between the schema and the actual data. This updates the optimization to maintain the initial ordering of nested structs to match the generated schema. ### Does this PR introduce _any_ user-facing change? It fixes exceptions and incorrect results for valid uses in the latest Spark release. ### How was this patch tested? Added new unit tests for these edge cases. Closes #32338 from Kimahriman/bug/optimize-with-fields. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 74afc68) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…ined withField operations ### What changes were proposed in this pull request? Modifies the UpdateFields optimizer to fix correctness issues with certain nested and chained withField operations. Examples for recreating the issue are in the new unit tests as well as the JIRA issue. ### Why are the changes needed? Certain withField patterns can cause Exceptions or even incorrect results. It appears to be a result of the additional UpdateFields optimization added in apache#29812. It traverses fieldOps in reverse order to take the last one per field, but this can cause nested structs to change order which leads to mismatches between the schema and the actual data. This updates the optimization to maintain the initial ordering of nested structs to match the generated schema. ### Does this PR introduce _any_ user-facing change? It fixes exceptions and incorrect results for valid uses in the latest Spark release. ### How was this patch tested? Added new unit tests for these edge cases. Closes apache#32338 from Kimahriman/bug/optimize-with-fields. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 74afc68) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
…ined withField operations ### What changes were proposed in this pull request? Modifies the UpdateFields optimizer to fix correctness issues with certain nested and chained withField operations. Examples for recreating the issue are in the new unit tests as well as the JIRA issue. ### Why are the changes needed? Certain withField patterns can cause Exceptions or even incorrect results. It appears to be a result of the additional UpdateFields optimization added in apache#29812. It traverses fieldOps in reverse order to take the last one per field, but this can cause nested structs to change order which leads to mismatches between the schema and the actual data. This updates the optimization to maintain the initial ordering of nested structs to match the generated schema. ### Does this PR introduce _any_ user-facing change? It fixes exceptions and incorrect results for valid uses in the latest Spark release. ### How was this patch tested? Added new unit tests for these edge cases. Closes apache#32338 from Kimahriman/bug/optimize-with-fields. Authored-by: Adam Binford <adamq43@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit 74afc68) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
What changes were proposed in this pull request?
This patch proposes to add more optimization to
UpdateFields
expression chain. And optimizeUpdateFields
early in analysis phase.In particular, this optimization includes:
WithField
atUpdateFields
SimplifyExtractValueOps
, respect nullability in input struct atGetStructField(UpdateFields(struct, ...))
, and unwrap if-else.Why are the changes needed?
UpdateFields
can manipulate complex nested data, but usingUpdateFields
can easily create inefficient expression chain. We should optimize it further.Because when manipulating deeply nested schema, the
UpdateFields
expression tree could be too complex to analyze, this change optimizesUpdateFields
early in analysis phase.Does this PR introduce any user-facing change?
No
How was this patch tested?
Unit test.