-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-40211][CORE][SQL] Allow customize initial partitions number in take() behavior #37661
Conversation
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
if (results.size == 0) { | ||
numPartsToTry = partsScanned * 4L | ||
if (results.isEmpty) { | ||
numPartsToTry = partsScanned * scaleUpFactor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this is fixing a pre-existing bug where the RDD_LIMIT_SCALE_UP_FACTOR
wasn't being applied to the AsyncRDDActions version of take()
. Nice!
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM pending tests.
It looks like GitHub Actions might not be properly enabled for your fork. Can you follow the instructions linked from https://spark.apache.org/contributing.html to enable actions in your fork, then push an empty commit to re-trigger tests?
Go to “Actions” tab on your forked repository and enable “Build and test” and “Report test results” workflows
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a minor comment, looks good to me.
Nice change @liuzqt !
@@ -84,18 +87,18 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends Serializable with Loggi | |||
} else { | |||
// The number of partitions to try in this iteration. It is ok for this number to be | |||
// greater than totalParts because we actually cap it at totalParts in runJob. | |||
var numPartsToTry = 1L | |||
var numPartsToTry = Math.max(self.conf.get(RDD_LIMIT_INITIAL_NUM_PARTITIONS), 1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enforce it in Config itself and always use self.conf.get(RDD_LIMIT_INITIAL_NUM_PARTITIONS)
?
For RDD_LIMIT_INITIAL_NUM_PARTITIONS:
...
.intConf
.checkValue(_ > 0, "")
...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice idea, modified accordingly
LGTM. I'm going to merge this to master (Spark 3.4.0). Thanks! |
…4.9f463a9 ### What changes were proposed in this pull request? This PR aims to upgrade mvn-scalafmt from 1.0.4 to 1.1.1640084764.9f463a9 PS: > 1.Following #37489 > 2.Fix https://issues.apache.org/jira/browse/SPARK-40221 ### Why are the changes needed? The last upgrade occurred 2 year ago. <img width="810" alt="image" src="https://user-images.githubusercontent.com/15246973/184291798-b1960f0a-f5ab-4037-835a-c788fb9447fc.png"> maven repo version: https://mvnrepository.com/artifact/org.antipathy/mvn-scalafmt The new version specifically update the following issues: > 1.SimonJPegg/mvn_scalafmt@6b9e0a4 > <img width="566" alt="image" src="https://user-images.githubusercontent.com/15246973/184291619-f77c3df3-0044-4f66-bc24-c60b8148e44b.png"> > 2.mvn_scalafmt_2.11(scala 2.11) be deprecated: SimonJPegg/mvn_scalafmt@9f3d109 > <img width="473" alt="image" src="https://user-images.githubusercontent.com/15246973/184292603-74f70241-05e7-4c4e-9ef7-60041c6d9e76.png"> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA & Manual test > 1.mvn -Pscala-2.12 scalafmt:format -Dscalafmt.skip=false > 2.mvn -Pscala-2.13 scalafmt:format -Dscalafmt.skip=false > 3.Download #37661 branch SPARK-40211), and execute ./dev/scalafmt, result: <img width="555" alt="image" src="https://user-images.githubusercontent.com/15246973/187426893-13001dc4-155a-4305-9324-437387d437e4.png"> Closes #37727 from panbingkun/upgrade_scalafmt. Authored-by: panbingkun <pbk1982@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
… take() behavior [SPARK-40211](https://issues.apache.org/jira/browse/SPARK-40211) add a `initialNumPartitions` config parameter to allow customizing initial partitions to try in `take()` Currently, the initial partitions to try to hardcode to `1`, which might cause unnecessary overhead. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in take behavior. We could also set it to higher-than-1-but-still-small values (like, say, 10) to achieve a middle-ground trade-off. NO Unit test Closes apache#37661 from liuzqt/SPARK-40211. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
… take() behavior [SPARK-40211](https://issues.apache.org/jira/browse/SPARK-40211) add a `initialNumPartitions` config parameter to allow customizing initial partitions to try in `take()` Currently, the initial partitions to try to hardcode to `1`, which might cause unnecessary overhead. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in take behavior. We could also set it to higher-than-1-but-still-small values (like, say, 10) to achieve a middle-ground trade-off. NO Unit test Closes apache#37661 from liuzqt/SPARK-40211. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
… take() behavior [SPARK-40211](https://issues.apache.org/jira/browse/SPARK-40211) add a `initialNumPartitions` config parameter to allow customizing initial partitions to try in `take()` Currently, the initial partitions to try to hardcode to `1`, which might cause unnecessary overhead. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in take behavior. We could also set it to higher-than-1-but-still-small values (like, say, 10) to achieve a middle-ground trade-off. NO Unit test Closes apache#37661 from liuzqt/SPARK-40211. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
… take() behavior [SPARK-40211](https://issues.apache.org/jira/browse/SPARK-40211) add a `initialNumPartitions` config parameter to allow customizing initial partitions to try in `take()` Currently, the initial partitions to try to hardcode to `1`, which might cause unnecessary overhead. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in take behavior. We could also set it to higher-than-1-but-still-small values (like, say, 10) to achieve a middle-ground trade-off. NO Unit test Closes apache#37661 from liuzqt/SPARK-40211. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
… take() behavior [SPARK-40211](https://issues.apache.org/jira/browse/SPARK-40211) add a `initialNumPartitions` config parameter to allow customizing initial partitions to try in `take()` Currently, the initial partitions to try to hardcode to `1`, which might cause unnecessary overhead. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in take behavior. We could also set it to higher-than-1-but-still-small values (like, say, 10) to achieve a middle-ground trade-off. NO Unit test Closes apache#37661 from liuzqt/SPARK-40211. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
… take() behavior [SPARK-40211](https://issues.apache.org/jira/browse/SPARK-40211) add a `initialNumPartitions` config parameter to allow customizing initial partitions to try in `take()` Currently, the initial partitions to try to hardcode to `1`, which might cause unnecessary overhead. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in take behavior. We could also set it to higher-than-1-but-still-small values (like, say, 10) to achieve a middle-ground trade-off. NO Unit test Closes apache#37661 from liuzqt/SPARK-40211. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
… take() behavior [SPARK-40211](https://issues.apache.org/jira/browse/SPARK-40211) add a `initialNumPartitions` config parameter to allow customizing initial partitions to try in `take()` Currently, the initial partitions to try to hardcode to `1`, which might cause unnecessary overhead. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in take behavior. We could also set it to higher-than-1-but-still-small values (like, say, 10) to achieve a middle-ground trade-off. NO Unit test Closes apache#37661 from liuzqt/SPARK-40211. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
… take() behavior [SPARK-40211](https://issues.apache.org/jira/browse/SPARK-40211) add a `initialNumPartitions` config parameter to allow customizing initial partitions to try in `take()` Currently, the initial partitions to try to hardcode to `1`, which might cause unnecessary overhead. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in take behavior. We could also set it to higher-than-1-but-still-small values (like, say, 10) to achieve a middle-ground trade-off. NO Unit test Closes apache#37661 from liuzqt/SPARK-40211. Authored-by: Ziqi Liu <ziqi.liu@databricks.com> Signed-off-by: Josh Rosen <joshrosen@databricks.com>
What changes were proposed in this pull request?
SPARK-40211 add a
initialNumPartitions
config parameter to allow customizing initial partitions to try intake()
Why are the changes needed?
Currently, the initial partitions to try to hardcode to
1
, which might cause unnecessary overhead. By setting this new configuration to a high value we could effectively mitigate the “run multiple jobs” overhead in take behavior. We could also set it to higher-than-1-but-still-small values (like, say, 10) to achieve a middle-ground trade-off.Does this PR introduce any user-facing change?
NO
How was this patch tested?
Unit test