[SPARK-31164][SQL] Inconsistent rdd and output partitioning for bucket table when output doesn't contain all bucket columns#27924
Conversation
|
Test build #119861 has finished for PR 27924 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
Outdated
Show resolved
Hide resolved
|
Test build #119864 has finished for PR 27924 at commit
|
|
cc @dbtsai since this is related to table bucketing. |
sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala
Outdated
Show resolved
Hide resolved
|
Test build #119898 has finished for PR 27924 at commit
|
…t table when output doesn't contain all bucket columns ### What changes were proposed in this pull request? For a bucketed table, when deciding output partitioning, if the output doesn't contain all bucket columns, the result is `UnknownPartitioning`. But when generating rdd, current Spark uses `createBucketedReadRDD` because it doesn't check if the output contains all bucket columns. So the rdd and its output partitioning are inconsistent. ### Why are the changes needed? To fix a bug. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified existing tests. Closes #27924 from wzhfy/inconsistent_rdd_partitioning. Authored-by: Zhenhua Wang <wzh_zju@163.com> Signed-off-by: Zhenhua Wang <wzh_zju@163.com> (cherry picked from commit 1369a97) Signed-off-by: Zhenhua Wang <wzh_zju@163.com>
|
thanks for reviewing, merged to master/3.0 |
…t table when output doesn't contain all bucket columns For a bucketed table, when deciding output partitioning, if the output doesn't contain all bucket columns, the result is `UnknownPartitioning`. But when generating rdd, current Spark uses `createBucketedReadRDD` because it doesn't check if the output contains all bucket columns. So the rdd and its output partitioning are inconsistent. To fix a bug. No. Modified existing tests. Closes apache#27924 from wzhfy/inconsistent_rdd_partitioning. Authored-by: Zhenhua Wang <wzh_zju@163.com> Signed-off-by: Zhenhua Wang <wzh_zju@163.com>
|
Thank you, @wzhfy and @cloud-fan |
…bucket table when output doesn't contain all bucket columns ### What changes were proposed in this pull request? This is a backport for [pr#27924](#27924). For a bucketed table, when deciding output partitioning, if the output doesn't contain all bucket columns, the result is `UnknownPartitioning`. But when generating rdd, current Spark uses `createBucketedReadRDD` because it doesn't check if the output contains all bucket columns. So the rdd and its output partitioning are inconsistent. ### Why are the changes needed? To fix a bug. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified existing tests. Closes #27934 from wzhfy/inconsistent_rdd_partitioning_2.4. Authored-by: Zhenhua Wang <wzh_zju@163.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…t table when output doesn't contain all bucket columns ### What changes were proposed in this pull request? For a bucketed table, when deciding output partitioning, if the output doesn't contain all bucket columns, the result is `UnknownPartitioning`. But when generating rdd, current Spark uses `createBucketedReadRDD` because it doesn't check if the output contains all bucket columns. So the rdd and its output partitioning are inconsistent. ### Why are the changes needed? To fix a bug. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Modified existing tests. Closes apache#27924 from wzhfy/inconsistent_rdd_partitioning. Authored-by: Zhenhua Wang <wzh_zju@163.com> Signed-off-by: Zhenhua Wang <wzh_zju@163.com>
|
@cloud-fan @wzhfy As I understand it, bucketed scan has two effects
When 2 is not effective, we may still make use of 1. For example, we can have a bounded number of Is there a correctness issue I've missed? |
|
I don't think limiting the number of file partitions was the design goal of the bucketed table. We can set |
|
Yes, but the effect has leaked into user space and this breaks it. As to my original question, is |
|
I don't think so, and Spark will disable bucketed scan if it has no benefit for downstream operators, see the rule |
|
That rule has same issue but can be disabled. However, this change can't be. |
|
That rule means this is by design. We believe bucketed scan is more expensive than a normal scan and only want to use it if it can avoid shuffles. Maybe this does not apply in your case but it applies in many other cases. If you found this issue before we release it, we can revert to avoid perf regression. But revert is not an option today as it may cause perf regression for more people. I'd suggest revisiting your requirement and considering |
|
I'm not asking for revert here but to explore an option to disable this behavior, and hence the question about partitioning and bucketed scan. |
|
I'm fine to keep backward compatible with some "by-accident" features if the cost is small. Feel free to open a PR to bring back the old behavior if
|
|
@cloud-fan please help check #36733 |
What changes were proposed in this pull request?
For a bucketed table, when deciding output partitioning, if the output doesn't contain all bucket columns, the result is
UnknownPartitioning. But when generating rdd, current Spark usescreateBucketedReadRDDbecause it doesn't check if the output contains all bucket columns. So the rdd and its output partitioning are inconsistent.Why are the changes needed?
To fix a bug.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Modified existing tests.