Skip to content

Commit

Permalink
add an individual config for skewed partition threshold
Browse files Browse the repository at this point in the history
  • Loading branch information
cloud-fan committed Mar 23, 2020
1 parent f1cc867 commit 1976bf7
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 4 deletions.
9 changes: 8 additions & 1 deletion docs/sql-performance-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,14 @@ Data skew can severely downgrade the performance of join queries. This feature d
<td><code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code></td>
<td>10</td>
<td>
A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than <code>spark.sql.adaptive.advisoryPartitionSizeInBytes</code>.
A partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than <code>spark.sql.adaptive.skewedPartitionThresholdInBytes</code>.
</td>
</tr>
<tr>
<td><code>spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes</code></td>
<td>256MB</td>
<td>
A partition is considered as skewed if its size in bytes is larger than this threshold and also larger than <code>spark.sql.adaptive.skewJoin.skewedPartitionFactor</code> multiplying the median partition size.
</td>
</tr>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -490,12 +490,21 @@ object SQLConf {
buildConf("spark.sql.adaptive.skewJoin.skewedPartitionFactor")
.doc("A partition is considered as skewed if its size is larger than this factor " +
"multiplying the median partition size and also larger than " +
s"'${ADVISORY_PARTITION_SIZE_IN_BYTES.key}'")
s"'spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes'")
.version("3.0.0")
.intConf
.checkValue(_ > 0, "The skew factor must be positive.")
.createWithDefault(10)

val SKEW_JOIN_SKEWED_PARTITION_THRESHOLD =
buildConf("spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes")
.doc("A partition is considered as skewed if its size in bytes is larger than this " +
s"threshold and also larger than 'spark.sql.adaptive.skewJoin.skewedPartitionFactor' " +
"multiplying the median partition size.")
.version("3.0.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefaultString("256MB")

val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN =
buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] {
*/
private def isSkewed(size: Long, medianSize: Long): Boolean = {
size > medianSize * conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_FACTOR) &&
size > conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
size > conf.getConf(SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD)
}

private def medianSize(stats: MapOutputStatistics): Long = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,7 @@ class AdaptiveQueryExecSuite
withSQLConf(
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.SKEW_JOIN_SKEWED_PARTITION_THRESHOLD.key -> "2000",
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "2000") {
withTempView("skewData1", "skewData2") {
spark
Expand Down Expand Up @@ -781,4 +782,3 @@ class AdaptiveQueryExecSuite
}
}
}

0 comments on commit 1976bf7

Please sign in to comment.