Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-22160][SQL] Make sample points per partition (in range partitioner) configurable and bump the default value up to 100 #19387

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
15 changes: 13 additions & 2 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,21 @@ class HashPartitioner(partitions: Int) extends Partitioner {
class RangePartitioner[K : Ordering : ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
private var ascending: Boolean = true)
private var ascending: Boolean = true,
val samplePointsPerPartitionHint: Int = 20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

> 0 precondition check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

extends Partitioner {

// A constructor declared in order to maintain backward compatibility for Java, when we add the
// 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160.
// This is added to make sure from a bytecode point of view, there is still a 3-arg ctor.
def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = {
this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20)
Copy link
Member

@viirya viirya Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default value is 100 now in SQLConf, shall we also use 100 here as default value for samplePointsPerPartitionHint to be consistent?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That one has been there for much longer so I'd rather change the SQL default first and see what happens.

}

// We allow partitions = 0, which happens when sorting an empty RDD under the default settings.
require(partitions >= 0, s"Number of partitions cannot be negative but found $partitions.")
require(samplePointsPerPartitionHint > 0,
s"Sample points per partition must be greater than 0 but found $samplePointsPerPartitionHint")

private var ordering = implicitly[Ordering[K]]

Expand All @@ -122,7 +132,8 @@ class RangePartitioner[K : Ordering : ClassTag, V](
Array.empty
} else {
// This is the sample size we need to have roughly balanced output partitions, capped at 1M.
val sampleSize = math.min(20.0 * partitions, 1e6)
// Cast to double to avoid overflowing ints or longs
val sampleSize = math.min(samplePointsPerPartitionHint.toDouble * partitions, 1e6)
// Assume the input partitions are roughly balanced and over-sample a little bit.
val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt
val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,14 @@ object SQLConf {
.booleanConf
.createWithDefault(false)

val RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION =
buildConf("spark.sql.execution.rangeExchange.sampleSizePerPartition")
.internal()
.doc("Number of points to sample per partition in order to determine the range boundaries" +
" for range partitioning, typically used in global sorting (without limit).")
.intConf
.createWithDefault(100)

val ARROW_EXECUTION_ENABLE =
buildConf("spark.sql.execution.arrow.enable")
.internal()
Expand Down Expand Up @@ -1199,6 +1207,8 @@ class SQLConf extends Serializable with Logging {

def supportQuotedRegexColumnName: Boolean = getConf(SUPPORT_QUOTED_REGEX_COLUMN_NAME)

def rangeExchangeSampleSizePerPartition: Int = getConf(RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION)

def arrowEnable: Boolean = getConf(ARROW_EXECUTION_ENABLE)

def arrowMaxRecordsPerBatch: Int = getConf(ARROW_EXECUTION_MAX_RECORDS_PER_BATCH)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
import org.apache.spark.sql.catalyst.plans.physical._
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.metric.SQLMetrics
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.util.MutablePair

/**
Expand Down Expand Up @@ -218,7 +219,11 @@ object ShuffleExchangeExec {
iter.map(row => mutablePair.update(row.copy(), null))
}
implicit val ordering = new LazilyGeneratedOrdering(sortingExpressions, outputAttributes)
new RangePartitioner(numPartitions, rddForSampling, ascending = true)
new RangePartitioner(
numPartitions,
rddForSampling,
ascending = true,
samplePointsPerPartitionHint = SQLConf.get.rangeExchangeSampleSizePerPartition)
case SinglePartition =>
new Partitioner {
override def numPartitions: Int = 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql

import org.apache.commons.math3.stat.inference.ChiSquareTest

import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext


class ConfigBehaviorSuite extends QueryTest with SharedSQLContext {

import testImplicits._

test("SPARK-22160 spark.sql.execution.rangeExchange.sampleSizePerPartition") {
// In this test, we run a sort and compute the histogram for partition size post shuffle.
// With a high sample count, the partition size should be more evenly distributed, and has a
// low chi-sq test value.
// Also the whole code path for range partitioning as implemented should be deterministic
// (it uses the partition id as the seed), so this test shouldn't be flaky.

val numPartitions = 4

def computeChiSquareTest(): Double = {
val n = 10000
// Trigger a sort
val data = spark.range(0, n, 1, 1).sort('id)
.selectExpr("SPARK_PARTITION_ID() pid", "id").as[(Int, Long)].collect()

// Compute histogram for the number of records per partition post sort
val dist = data.groupBy(_._1).map(_._2.length.toLong).toArray
assert(dist.length == 4)

new ChiSquareTest().chiSquare(
Array.fill(numPartitions) { n.toDouble / numPartitions },
dist)
}

withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> numPartitions.toString) {
// The default chi-sq value should be low
assert(computeChiSquareTest() < 100)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test may be flaky. It depends on the ratio of n/RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION. What is the default value of RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100 - which is pretty high

the actual value computed on my laptop is around 10, so 1000 is already three orders of magnitude larger


withSQLConf(SQLConf.RANGE_EXCHANGE_SAMPLE_SIZE_PER_PARTITION.key -> "1") {
// If we only sample one point, the range boundaries will be pretty bad and the
// chi-sq value would be very high.
assert(computeChiSquareTest() > 1000)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test may be flaky as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the value i got from my laptop was 1800

}
}
}

}