-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-44021][SQL] Add spark.sql.files.maxPartitionNum #41545
Conversation
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
Thanks @wangyum, this really helps for the query that scans the huge table. |
"and ORC.") | ||
.version("3.5.0") | ||
.intConf | ||
.checkValue(threshold => threshold > 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we change to check threshold > FILES_MIN_PARTITION_NUM
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the document of FILES_MIN_PARTITION_NUM
, it seems FILES_MIN_PARTITION_NUM
just suggests(not guaranteed) minimum number of split file partitions .
Maybe we can't rely on it.
"and ORC.") | ||
.version("3.5.0") | ||
.intConf | ||
.checkValue(threshold => threshold > 0, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the document of FILES_MIN_PARTITION_NUM
, it seems FILES_MIN_PARTITION_NUM
just suggests(not guaranteed) minimum number of split file partitions .
Maybe we can't rely on it.
cc @cloud-fan |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you, @wangyum . Could you add a description about why you couldn't increase spark.sql.files.maxPartitionBytes
instead initially? Maybe, is this PR aiming to selectively tune a large RDD only while keeping the existing behavior in the same ways in small RDDs?
Yes. This PR aiming to selectively tune a large RDD only while keeping the existing behavior in the same ways in small RDDs. Increase
|
Thanks. Please add that into the PR description to make it a commit log. |
maxSplitBytes: Long): Seq[FilePartition] = { | ||
val openCostBytes = sparkSession.sessionState.conf.filesOpenCostInBytes | ||
val maxPartitionNum = sparkSession.sessionState.conf.filesMaxPartitionNum | ||
val partitions = getFilePartitions(partitionedFiles, maxSplitBytes, openCostBytes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After this line, we ignore spark.sql.files.maxPartitionBytes
. Could you add a warning about this side-effect? This means each task will took a longer time and the job can become slower in some cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM from my side. Thank you for updating, @wangyum .
Merged to master. Thank you, @wangyum , @pan3793 , @beliefer , @LuciferYang ! |
### What changes were proposed in this pull request? This PR add a new SQL config: `spark.sql.files.maxPartitionNum`. User can set it to avoid generating too many partitions when reading file-based sources. Too many partitions will increase the various overheads of the driver and cause Shuffle service OOM. The following is the GC log of the Shuffle service: ``` 2023-06-08T01:41:01.871-0700: 7303.965: [Full GC (Allocation Failure) 2023-06-08T01:41:01.871-0700: 7303.965: [CMS: 4194304K->4194304K(4194304K), 7.4010107 secs]2023-06-08T01:41:09.272-0700: 7311.366: [Class Histogram (after full gc): num #instances #bytes class name ---------------------------------------------- 1: 7110660 2334927400 [C 2: 19465810 467514416 [I 3: 6754570 270182800 org.apache.spark.network.protocol.ChunkFetchRequest 4: 6661155 266446200 org.sparkproject.io.netty.channel.DefaultChannelPromise 5: 6639056 265562240 org.apache.spark.network.buffer.FileSegmentManagedBuffer 6: 6639055 265562200 org.apache.spark.network.protocol.RequestTraceInfo 7: 6663764 213240448 org.sparkproject.io.netty.util.Recycler$DefaultHandle 8: 6659382 213100224 org.sparkproject.io.netty.channel.AbstractChannelHandlerContext$WriteTask 9: 6659218 213094976 org.apache.spark.network.server.ChunkFetchRequestHandler$$Lambda$156/886274988 10: 6640444 212494208 java.io.File ... ``` ### Why are the changes needed? 1. The PR aims to selectively rescale large RDDs only, while keeping the existing behavior in the same way in small RDDs. So directly increasing `spark.sql.files.maxPartitionBytes` is not acceptable: 1. There are multiple data sources in one SQL, setting `spark.sql.files.maxPartitionBytes` will affect all data sources. 2. We don't know how much `spark.sql.files.maxPartitionBytes` should be set to, sometimes it may be very large(More than 20GiB). 2. To make it do not generate too many partitions if it is very large partitioned and bucketed table as it is not always use bucket scan since [SPARK-32859](https://issues.apache.org/jira/browse/SPARK-32859). Before SPARK-32859 | After SPARK-32859 --- | --- <img width="400" src="https://github.com/apache/spark/assets/5399861/5e14932b-aa3d-4b14-b80c-e3ff348958c4"> | <img width="400" src="https://github.com/apache/spark/assets/5399861/170311a0-c086-408a-9d95-17031e21e16a"> 3. Avoid generating too many partitions if these are lots of small files. ### Does this PR introduce _any_ user-facing change? No. Unless the user sets `spark.sql.files.maxPartitionNum`. ### How was this patch tested? Unit test and manual testing: Before this PR | After this PR and `set spark.sql.files.maxPartitionNum=20000` -- | -- <img width="400" src="https://github.com/apache/spark/assets/5399861/ffda1850-cd4a-4970-a4e5-e1e43177135a"> | <img width="330" src="https://github.com/apache/spark/assets/5399861/1df7cac7-fe82-4af3-b3ec-91aa23c79a8b"> Closes apache#41545 from wangyum/SPARK-44021. Authored-by: Yuming Wang <yumwang@ebay.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
Limiting the number of mappers can also reduce RPC calls to improve NameNode performance. Data size: 3.8 T, File number: 3992
|
Thank you for sharing that result additionally, @wangyum . |
What changes were proposed in this pull request?
This PR add a new SQL config:
spark.sql.files.maxPartitionNum
. User can set it to avoid generating too many partitions when reading file-based sources. Too many partitions will increase the various overheads of the driver and cause Shuffle service OOM.The following is the GC log of the Shuffle service:
Why are the changes needed?
The PR aims to selectively rescale large RDDs only, while keeping the existing behavior in the same way in small RDDs. So directly increasing
spark.sql.files.maxPartitionBytes
is not acceptable:spark.sql.files.maxPartitionBytes
will affect all data sources.spark.sql.files.maxPartitionBytes
should be set to, sometimes it may be very large(More than 20GiB).To make it do not generate too many partitions if it is very large partitioned and bucketed table as it is not always use bucket scan since SPARK-32859.
Avoid generating too many partitions if these are lots of small files.
Does this PR introduce any user-facing change?
No. Unless the user sets
spark.sql.files.maxPartitionNum
.How was this patch tested?
Unit test and manual testing:
set spark.sql.files.maxPartitionNum=20000