[SPARK-39834][SQL][SS] Include the origin stats and constraints for LogicalRDD if it comes from DataFrame#37248
[SPARK-39834][SQL][SS] Include the origin stats and constraints for LogicalRDD if it comes from DataFrame#37248HeartSaVioR wants to merge 5 commits intoapache:masterfrom
Conversation
There was a problem hiding this comment.
This test is removed as we no longer carry over logical plan.
sql/core/src/main/scala/org/apache/spark/sql/execution/ExistingRDD.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Maybe we can call fromDataset instead?
There was a problem hiding this comment.
Unfortunately we can't, as OutputListAwareStatsTestPlan and OutputListAwareConstraintsTestPlan do not have corresponding physical plan. (output partitioning is picked up from physical plan, as same as we do in Dataset.checkpoint)
viirya
left a comment
There was a problem hiding this comment.
The concern for checkpoint is a good point.
There was a problem hiding this comment.
can we add a comment to explain why they are in the curry constructor? because we don't want other rules to mistakenly transform and rewrite them.
|
Well, I see failures in Other PR (#37252): Seems like the build became unstable recently. |
|
Just rebased to master branch. Let's see whether the failure keeps happening... |
sbt.ForkMain 40811 failed with exit code 137 This seems to denote the memory issue on the process - killed by OS/container. Not sure whether it hits the limit of Github Action container or not, and if it is, we can increase the memory or not. @HyukjinKwon Do you have any idea of this? Is it known issue we have to tolerate as of now? |
|
I think we can ignore it but mind retriggering one more time please? I made some fixes. |
…ogicalRDD if it comes from DataFrame
…gRDD.scala Co-authored-by: Liang-Chi Hsieh <viirya@gmail.com>
|
CI build pass. Thanks all for reviewing! Merging to master. |
- This PR fixes #527 - MERGE consists of two passes. During the first pass, it scans over the target table to find all files that are affected by the MERGE operation. During the second pass it reads those files again to update/insert the rows from the source table. - If the source changes between the two passes and contains an additional row that is in the target table, but not in one of the files that have been identified in pass 1, it will insert this row into the target table instead of updating the original row, leading to duplicate rows. - This can happen if the source is non-deterministic. A source is classified as non-deterministic if any of the operators in the source plan is non-deterministic (i.e. depends on some mutable internal state or some other input that is not part of the outputs of the children), or if it is a non-delta scan. - We solve this issue by materializing the source table at the start of a MERGE operation if it is non-deterministic, removing the possibility that the table changes during the two passes. The logic of source materialization is encapsulated in ```MergeIntoMaterializeSource``` and is used by ```MergeIntoCommand```. - The source is materialized onto the local disks of the executors using RDD local checkpoint. In case RDD blocks are lost, a retry loop is introduced. Blocks can be lost e.g. because of Spot instance kills. In case of using autoscaling through Spark dynamic allocation, executor decomissioning can be enabled with the following configs to gracefully migrate the blocks. ``` spark.decommission.enabled=true spark.storage.decommission.rddBlocks.enabled=true ``` - When materializing the source table we lose the statistics and inferred constraints about the table, which can lead to regressions. We include a manual broadcast hint in the source table if the table size is small, ensuring that we choose the most efficient join when possible, and a "dummy" filter to re-introduce the constraints that can be used for further filter inference. apache/spark#37248 has implemented to make it work out-of-the-box in Spark 3.4, so these workarounds can be removed then. Closes #1418 GitOrigin-RevId: f8cd57e28b52c58ed7ba0b44ae868d5ea5bd534c
Credit to @juliuszsompolski for figuring out issues and proposing the alternative.
What changes were proposed in this pull request?
This PR proposes to effectively revert SPARK-39748 but include the origin stats and constraints instead in LogicalRDD if it comes from DataFrame, to help optimizer figuring out better plan.
Why are the changes needed?
We figured out several issues from SPARK-39748:
One of major use case for DataFrame.checkpoint is ML, especially "iterative algorithm", and the purpose on calling checkpoint is to "prune" the logical plan. That is against the purpose of including origin logical plan and we have a risk to have nested LogicalRDDs which grows the size of logical plan infinitely.
We leverage logical plan to carry over stats, but the correct stats information is in optimized plan.
(Not an issue but missing spot) constraints is also something we can carry over.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Existing and new UTs.