New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-32019][SQL] Add spark.sql.files.minPartitionNum config #28853
Conversation
Test build #124188 has finished for PR 28853 at commit
|
Test build #124191 has finished for PR 28853 at commit
|
Test build #124195 has finished for PR 28853 at commit
|
retest this please |
Test build #124208 has finished for PR 28853 at commit
|
Test build #124211 has finished for PR 28853 at commit
|
@maropu @cloud-fan thanks for review again. |
@@ -528,6 +528,18 @@ class FileSourceStrategySuite extends QueryTest with SharedSparkSession with Pre | |||
} | |||
} | |||
|
|||
test("Add spark.sql.files.minPartitionNum config") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add SPARK-32019:
prefix into this test case name?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since it's not a bug. So it's not need ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to add it since it's a dedicated test case for this JIRA ticket.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add it.
...core/src/test/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategySuite.scala
Show resolved
Hide resolved
Test build #124241 has finished for PR 28853 at commit
|
Seems the error is not related. |
@@ -1176,6 +1176,15 @@ object SQLConf { | |||
.longConf | |||
.createWithDefault(4 * 1024 * 1024) | |||
|
|||
val FILES_MIN_PARTITION_NUM = buildConf("spark.sql.files.minPartitionNum") | |||
.doc("The suggested (not guaranteed) minimum number of file split partitions. If not set, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
file split
-> split file
?
@@ -1176,6 +1176,15 @@ object SQLConf { | |||
.longConf | |||
.createWithDefault(4 * 1024 * 1024) | |||
|
|||
val FILES_MIN_PARTITION_NUM = buildConf("spark.sql.files.minPartitionNum") | |||
.doc("The suggested (not guaranteed) minimum number of file split partitions. If not set, " + | |||
"the default value is the default parallelism of the Spark cluster. This configuration is " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default parallelism of the Spark cluster
-> spark.default.parallelism
?
Test build #124245 has finished for PR 28853 at commit
|
retest this please |
"file3" -> 1 | ||
)) | ||
assert(table.rdd.partitions.length == 3) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add more tests to make sure that this is really for min partition number?
e.g. we can create more partitions than the min number, as we make each partition 128mb.
Test build #124279 has finished for PR 28853 at commit
|
val partitions = (1 to 100).map(i => s"file$i" -> 128*1024*1024) | ||
val table = createTable(files = partitions) | ||
// partition is limit by filesMaxPartitionBytes(128MB) | ||
assert(table.rdd.partitions.length == 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100 * 128 / 16 > 128
, so max partition size is 128, partition number == file number.
Test build #124269 has finished for PR 28853 at commit
|
Test build #124287 has finished for PR 28853 at commit
|
withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "32") { | ||
val partitions = (1 to 800).map(i => s"file$i" -> 4*1024*1024) | ||
val table = createTable(files = partitions) | ||
assert(table.rdd.partitions.length == 50) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
800 * (4 + 4) / 32 > 128
, and use 128 as maxSplitSize, 800 * (4 + 4) / 128 = 50
Test build #124290 has finished for PR 28853 at commit
|
@@ -1176,6 +1176,15 @@ object SQLConf { | |||
.longConf | |||
.createWithDefault(4 * 1024 * 1024) | |||
|
|||
val FILES_MIN_PARTITION_NUM = buildConf("spark.sql.files.minPartitionNum") | |||
.doc("The suggested (not guaranteed) minimum number of splitting file partitions. " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
splitting? split?
} | ||
|
||
withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "16") { | ||
val partitions = (1 to 100).map(i => s"file$i" -> 128*1024*1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 128*1024*1024
-> 128 * 1024 * 1024
withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "16") { | ||
val partitions = (1 to 100).map(i => s"file$i" -> 128*1024*1024) | ||
val table = createTable(files = partitions) | ||
// partition is limit by filesMaxPartitionBytes(128MB) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: limit -> limited
} | ||
|
||
withSQLConf(SQLConf.FILES_MIN_PARTITION_NUM.key -> "32") { | ||
val partitions = (1 to 800).map(i => s"file$i" -> 4*1024*1024) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Test build #124311 has finished for PR 28853 at commit
|
retest this please |
Test build #124317 has finished for PR 28853 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, LGTM. Thank you all. Like spark.sql.adaptive.coalescePartitions.minPartitionNum
, it seems to have a reasonable use case. Merged to master for Apache Spark 3.1.
@dongjoon-hyun thanks for merging. Thanks @maropu @cloud-fan ! |
### What changes were proposed in this pull request? The UT for SPARK-32019 (#28853) tries to write about 16GB of data do the disk. We must change the value of `spark.sql.files.maxPartitionBytes` to a smaller value do check the correct behavior with less data. By default it is `128MB`. The other parameters in this UT are also changed to smaller values to keep the behavior the same. ### Why are the changes needed? The runtime of this one UT can be over 7 minutes on Jenkins. After the change it is few seconds. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing UT Closes #29842 from tanelk/SPARK-32970. Authored-by: tanel.kiis@gmail.com <tanel.kiis@gmail.com> Signed-off-by: HyukjinKwon <gurwls223@apache.org>
What changes were proposed in this pull request?
Add a new config
spark.sql.files.minPartitionNum
to control file split partition in local session.Why are the changes needed?
Aims to control file split partitions in session level.
More details see discuss in PR-28778.
Does this PR introduce any user-facing change?
Yes, new config.
How was this patch tested?
Add UT.