-
Notifications
You must be signed in to change notification settings - Fork 29.1k
[SPARK-17247][SQL]: when calcualting size of a relation from hdfs, th… #14817
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…e size calculation should be aborted once size is established to be > broadcast threshold.
|
@Parth-Brahmbhatt we are currently working Cost Based Optimization in Spark. An important input will be the actual size of the table. Having partial statistics (what you are suggestion) will not make this as good as it can be. The other issue is that the size of the table is currently used to determine if we are doing a hash join or a sort merge join. In order to do a hash join, the build side must be at least 3x smaller than the stream side and the build side should not occupy more memory per worker than the broadcast threshold. This will break that to. |
|
@hvanhovell The behavior in case fallbackToHdfs is not enabled ( and by default it is not enabled for performance reason) is to return the value specified via spark.sql.defaultSizeInBytes (default Long.MaxValue) which is also wrong and I am fine with returning that value instead of returning the partial value. This patch is trying to reduce the perf penalty that we have to pay when the fall back is enabled and in that case trying to get an accurate size is just too expansive when one of the 2 tables being joined is huge. I could also add a config to enable/disable this behavior but not even having the option just makes this fallback useless in most cases. |
| } | ||
| ) | ||
|
|
||
| private def getSize(f: Path): Long = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see what you're doing but this method no longer really does what it says. I think this is going to be error-prone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Renamed the method, added documentation and also returning the default bytes when size > broadcast threshold.
…nstead of returning any value > broadcast threshold.
|
@hvanhovell can you also point me at the design doc/discuss thread for CBO work? Thanks. |
|
Can one of the committers take a look at this PR? |
|
@Parth-Brahmbhatt here is the CBO ticket: https://issues.apache.org/jira/browse/SPARK-16026 Could you explain why this is so slow? Is this because of listing the files? Or because of the amount? We might be able to speed up the first one, see |
|
@hvanhovell its because of listing and gets worst as amount increases. |
|
@Parth-Brahmbhatt would the approach taken in |
|
@hvanhovell I will take a look at it and update this PR. |
|
@hvanhovell I looked at AlterTableRecoverPartitionsCommand and the parallelism in listing could help it will still cause huge perf penalty. We have tables with millions of partitions and we use s3 for storage where listing is more expansive. I think it is much better to just stop listing once we know the stat used only for join optimization won't meet the threshold and I don't see the downside compared to what we currently offer. |
|
Can someone please review this PR? Thanks. |
|
Request for review one more time. |
|
@Parth-Brahmbhatt I am very curious why you have millions of partitions. What is the use case? You will be in a world of hurt as soon as you do any listing. I am not going to merge this PR as-is, as it clashes with the work on CBO. I would support an analyze table command, that would allow for partial scanning, e.g. |
|
Can one of the admins verify this patch? |
|
@hvanhovell We have tables with 5-6 partition columns and data going back 4-5 years and given our data is stored in s3 the listing is paginated. If you want to wait till CBO work is done, that is fine We can resume reviewing after the CBO work is done. |
|
Thanks for reporting it! After CBO, the relation size is not only used for deciding whether a table can be broadcasted. Maybe we can close this PR now? |
What changes were proposed in this pull request?
when calcualting size of a relation from hdfs, the size calculation should be aborted once size is established to be > broadcast threshold. This will be really helpful when the config is enabled and one of the tables in query is partitioned and fairly large.
How was this patch tested?
Unit test modified.