New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-22160][SQL] Make sample points per partition (in range partitioner) configurable and bump the default value up to 100 #19387
Conversation
…e shuffle exchange (cherry picked from commit 8e51ae5) Signed-off-by: Reynold Xin <rxin@databricks.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm except for a minor comment
@@ -108,9 +108,17 @@ class HashPartitioner(partitions: Int) extends Partitioner { | |||
class RangePartitioner[K : Ordering : ClassTag, V]( | |||
partitions: Int, | |||
rdd: RDD[_ <: Product2[K, V]], | |||
private var ascending: Boolean = true) | |||
private var ascending: Boolean = true, | |||
val samplePointsPerPartitionHint: Int = 20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
> 0
precondition check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
wait, I need to review the test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some rough calculation suggest the average value for the chisq test statistics may be of the order are ~ 50 and ~ 1200 in the two test cases respectively. So the threshold (100 and 1000) may make the test flaky
|
||
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) { | ||
// The default chi-sq value should be low | ||
assert(computeChiSquareTest() < 100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test may be flaky. It depends on the ratio of n/RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION
. What is the default value of RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100 - which is pretty high
the actual value computed on my laptop is around 10, so 1000 is already three orders of magnitude larger
withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") { | ||
// If we only sample one point, the range boundaries will be pretty bad and the | ||
// chi-sq value would be very high. | ||
assert(computeChiSquareTest() > 1000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test may be flaky as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the value i got from my laptop was 1800
I put up a comment saying this test result should be deterministic, since the sampling uses a fixed seed based on partition id. |
LGTM |
// 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160. | ||
// This is added to make sure from a bytecode point of view, there is still a 3-arg ctor. | ||
def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = { | ||
this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default value is 100 now in SQLConf, shall we also use 100 here as default value for samplePointsPerPartitionHint
to be consistent?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That one has been there for much longer so I'd rather change the SQL default first and see what happens.
Test build #82296 has finished for PR 19387 at commit
|
Test build #82295 has finished for PR 19387 at commit
|
Test build #82302 has finished for PR 19387 at commit
|
Merging in master. |
…oner) configurable and bump the default value up to 100 Spark's RangePartitioner hard codes the number of sampling points per partition to be 20. This is sometimes too low. This ticket makes it configurable, via spark.sql.execution.rangeExchange.sampleSizePerPartition, and raises the default in Spark SQL to be 100. Added a pretty sophisticated test based on chi square test ... Author: Reynold Xin <rxin@databricks.com> Closes apache#19387 from rxin/SPARK-22160. This commit contains the following squashed commits: 938326b NETFLIX-BUILD: Fixup backport of SPARK-22160.
What changes were proposed in this pull request?
Spark's RangePartitioner hard codes the number of sampling points per partition to be 20. This is sometimes too low. This ticket makes it configurable, via spark.sql.execution.rangeExchange.sampleSizePerPartition, and raises the default in Spark SQL to be 100.
How was this patch tested?
Added a pretty sophisticated test based on chi square test ...