-
Notifications
You must be signed in to change notification settings - Fork 1.8k
feat: Preserve File Partitioning From File Scans #19124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat: Preserve File Partitioning From File Scans #19124
Conversation
e4a8d0c to
f05c6fa
Compare
I read this knob as primarily controlling the tradeoff between scan parallelism vs I/O overhead and eliminating shuffles, which is very useful and more expressive than a boolean. However, I think we should also consider the impact of data skew. For heavily skewed tables, preserving file partitioning can make the scan itself significantly unbalanced (one or a few partition groups doing most of the I/O), and in those cases you might actually prefer to pay the shuffle cost rather than constrain execution to the file partition layout. That’s why I think it’s fine to keep a global configuration option, but ideally we would also support passing a different value per scan. In practice you may have:
If adding per-scan configuration is too much for this PR, it’s probably enough to call it out explicitly as follow-up work, but I think this skew aspect is important to keep in mind for the feature’s overall design. |
This is a great call out and I think could see some improvements for file I/O while using this behavior. I did some poking and looks like one approach may be to expose a new listing option, I think this would be great follow-up work as I am trying to keep the scope of this PR tight with good benefits 😄 . |
That’s a very good point. I think we can frame this as two distinct scenarios:
Overall, I believe we should support both options: let DataFusion operate as a library while giving users the flexibility to decide how they want to handle their data. |
NGA-TRAN
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clear description, excellent optimization and code, strong SQL & explain tests, and an impressive benchmark. I only have minor suggestions: adding a few more comments and a unit test.
I’m especially excited about the linear scaling performance.
| target_groups[bucket].extend(files); | ||
| } | ||
|
|
||
| target_groups.into_iter().map(FileGroup::new).collect() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this else condition means the statement in your comment ensuring all files with same partition values are in the same group no longer hold? If the expression is simple (e.g column name), the round robin will preserved file partitions but it might not in complicated use cases. You may want to add comment here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no it still holds as we are just round-robin distributing groups that are already partitioned so that each group contains all values for the partition key. Thus we are just merging partition groups into larger ones
will add comment for clarification
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, it works in this case but will it be work all the time? Maybe the answer also yes but I think we should say that: N partitions is now M (M < N) and thus one partition now will include files/data of many partitions.
By the way, does round robin means data of 2 or more partitions will go into the same partition or data of a partition may be split between different partitions?
I agree it will work but we want to document it clearly in case others have customized partitions and may not work with this
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt
Outdated
Show resolved
Hide resolved
datafusion/sqllogictest/test_files/preserve_file_partitioning.slt
Outdated
Show resolved
Hide resolved
f06c115 to
0529b9d
Compare
gabotechs
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left some minor comments, but overall this looks pretty good, so +1 from me!
This is good for DataFusion, but huge for any distributed engine built on top of it, as this allows avoiding expensive shuffles at a close-to-0 cost.
Nice work!
Will leave this PR here for a day or so in case other maintainers want to comment.
…er to set threshold on when to fall back to traitional split and updated comments to reflect this
86a8909 to
1bb79cd
Compare
Which issue does this PR close?
Rationale for this change
Datafusion does not have the option to preserve file partitioning from file scans, rather it always returns
UknownPartitioning.Some queries and datasets would see great benefits by preserving their explicit partitioning to avoid shuffles. An example of this would be the following scenario:
Say have data partitioned by
f_dkeyand ordered by(f_dkey, timestamp), which is hive-style partitioned:Runnuing the query:
Prior to this PR would produce the plan:
This can be improved. Our data is ordered on
(f_dkey, timestamp), when we hash repartition byf_dkeywe lose our sort ordering thus forcing aSortExecto be inserted after the repartition. You could setdatafusion.optimizer.prefer_existing_sort = true;to preserve the ordering through the repartition and thus preserve the ordering, but with the tradeoff of a more expensive shuffle.Since our data is partitioned by
f_dkeyat file scan time we can eliminate both the hash repartitioning, the eliminating theSortExecin the process. This would result in a plan that looks like:What changes are included in this PR?
I have extended the
FileScanConfig's implementation ofDataSourceto have the ability to preserve hive-style partitioning by declaring it'soutput_partitioningas hash partitoned on the hive columns.When the user sets the option
preserve_file_partitions > 0(0 by default, which is disabled) Datafusion will take advantage of partitioned files. Specifically, whenpreserve_file_partitionsis enabled:preserve_file_partitions> the number distinct partitions found -> we fallback to partitioning by byte rangespreserve_file_partitions<= the number distinct partitions found -> we will keep the file partitioningBecause we can have fewer file partition groups than
target_partitions, forcing a partition group (with possibly large amounts of data) to be read in a single partition can increase file I/O. This configuration choice was made to be able to control the amount of I/O overhead a user is willing to have in order to eliminate shuffles. (This was recommended by @gabotechs and is a great approach to have more granularity over this behavior rather than a boolean flag, thank you)Reusing hash repartitioning has rippling effects throughout query plans, such as propagating through joins and windows as well as preserving order, which is great to see.
Are these changes tested?
Small Data:
The optimized plan is roughly on par with the other plans for preserve_order and preserve_order_join, but it makes preserve_order_window 6.7× faster than not optimized and 4.1× faster than preserve-sort-repartition.
Medium Dataset:
The optimized plan makes preserve_order about 8.0× faster than not optimized (4.2× vs PSR), preserve_order_join about 2.2× faster than not optimized (2.35× vs PSR), and preserve_order_window a huge 19.6× faster than not optimized (11.7× vs PSR).
Large Dataset:
The optimized plan makes preserve_order about 9.3× faster than not optimized (4.3× vs PSR), preserve_order_join about 2.4× faster than not optimized (2.5× vs PSR), and preserve_order_window an extreme 53× faster than not optimized (32× vs PSR).
Are there any user-facing changes?
Yes users now can use the
preserve_file_partitionsoption to define the amount of partitions they want to preserve file partitioning for (0 disabled). If enabled and triggered, users will see elimination of repartitions on their file partition key when appropriate.Follow-Up Work
Hash(a)doesn't satisfyHash(a, b)although it should. This is becauseHash(a)guarantees that all ofais contained in a single partition. Thus, sinceHash(a, b)is a subset ofHash(a), anything that isHash(a)is alsoHash(a, b).timestampand then try to order bydate_bin('1 hour', timestamp), Datafusion will not recognize that this is implicitly satisfied. Thus, for monotonic functions:date_bin,CAST,FLOOR, etc. we should maintain ordering, eliminating unnecessary sorts.