Skip to content

Commit

Permalink
[SPARK-32855][SQL] Improve the cost model in pruningHasBenefit for fi…
Browse files Browse the repository at this point in the history
…ltering side can not build broadcast by join type

### What changes were proposed in this pull request?

This pr improve the cost model in `pruningHasBenefit` for filtering side can not build broadcast by join type:
1. The filtering side must be small enough to build broadcast by size.
2. The estimated size of the pruning side must be big enough: `estimatePruningSideSize * spark.sql.optimizer.dynamicPartitionPruning.pruningSideExtraFilterRatio > overhead`.

### Why are the changes needed?

Improve query performance for these cases.

This a real case from cluster. Left join and left size very small and right side can build DPP:
![image](https://user-images.githubusercontent.com/5399861/92882197-445a2a00-f442-11ea-955d-16a7724e535b.png)

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Unit test.

Closes #29726 from wangyum/SPARK-32855.

Authored-by: Yuming Wang <yumwang@ebay.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
  • Loading branch information
wangyum authored and cloud-fan committed Mar 26, 2021
1 parent 820b465 commit aaa0d2a
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val DYNAMIC_PARTITON_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO =
buildConf("spark.sql.optimizer.dynamicPartitionPruning.pruningSideExtraFilterRatio")
.internal()
.doc("When filtering side doesn't support broadcast by join type, and doing DPP means " +
"running an extra query that may have significant overhead. This config will be used " +
"as the extra filter ratio for computing the data size of the pruning side after DPP, " +
"in order to evaluate if it is worth adding an extra subquery as the pruning filter.")
.version("3.2.0")
.doubleConf
.createWithDefault(0.04)

val COMPRESS_CACHED = buildConf("spark.sql.inMemoryColumnarStorage.compressed")
.doc("When set to true Spark SQL will automatically select a compression codec for each " +
"column based on statistics of the data.")
Expand Down Expand Up @@ -3249,6 +3260,9 @@ class SQLConf extends Serializable with Logging {
def dynamicPartitionPruningReuseBroadcastOnly: Boolean =
getConf(DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY)

def dynamicPartitionPruningPruningSideExtraFilterRatio: Double =
getConf(DYNAMIC_PARTITON_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO)

def stateStoreProviderClass: String = getConf(STATE_STORE_PROVIDER_CLASS)

def isStateSchemaCheckEnabled: Boolean = getConf(STATE_SCHEMA_CHECK_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.dynamicpruning

import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.optimizer.JoinSelectionHelper
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical._
Expand Down Expand Up @@ -45,7 +46,7 @@ import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRela
* subquery query twice, we keep the duplicated subquery
* (3) otherwise, we drop the subquery.
*/
object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper with JoinSelectionHelper {

/**
* Search the partitioned table scan for a given partition column in a logical plan
Expand Down Expand Up @@ -84,10 +85,12 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
filteringKey: Expression,
filteringPlan: LogicalPlan,
joinKeys: Seq[Expression],
partScan: LogicalRelation): LogicalPlan = {
partScan: LogicalRelation,
canBuildBroadcast: Boolean): LogicalPlan = {
val reuseEnabled = conf.exchangeReuseEnabled
val index = joinKeys.indexOf(filteringKey)
lazy val hasBenefit = pruningHasBenefit(pruningKey, partScan, filteringKey, filteringPlan)
lazy val hasBenefit =
pruningHasBenefit(pruningKey, partScan, filteringKey, filteringPlan, canBuildBroadcast)
if (reuseEnabled || hasBenefit) {
// insert a DynamicPruning wrapper to identify the subquery during query planning
Filter(
Expand Down Expand Up @@ -115,7 +118,8 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
partExpr: Expression,
partPlan: LogicalPlan,
otherExpr: Expression,
otherPlan: LogicalPlan): Boolean = {
otherPlan: LogicalPlan,
canBuildBroadcast: Boolean): Boolean = {

// get the distinct counts of an attribute for a given table
def distinctCounts(attr: Attribute, plan: LogicalPlan): Option[BigInt] = {
Expand Down Expand Up @@ -146,10 +150,18 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
case _ => fallbackRatio
}

val estimatePruningSideSize = filterRatio * partPlan.stats.sizeInBytes.toFloat
// the pruning overhead is the total size in bytes of all scan relations
val overhead = otherPlan.collectLeaves().map(_.stats.sizeInBytes).sum.toFloat

filterRatio * partPlan.stats.sizeInBytes.toFloat > overhead.toFloat
if (canBuildBroadcast) {
estimatePruningSideSize > overhead
} else {
// We can't reuse the broadcast because the join type doesn't support broadcast,
// and doing DPP means running an extra query that may have significant overhead.
// We need to make sure the pruning side is very big so that DPP is still worthy.
canBroadcastBySize(otherPlan, conf) &&
estimatePruningSideSize * conf.dynamicPartitionPruningPruningSideExtraFilterRatio > overhead
}
}

/**
Expand Down Expand Up @@ -235,12 +247,14 @@ object PartitionPruning extends Rule[LogicalPlan] with PredicateHelper {
var partScan = getPartitionTableScan(l, left)
if (partScan.isDefined && canPruneLeft(joinType) &&
hasPartitionPruningFilter(right)) {
newLeft = insertPredicate(l, newLeft, r, right, rightKeys, partScan.get)
newLeft = insertPredicate(l, newLeft, r, right, rightKeys, partScan.get,
canBuildBroadcastRight(joinType))
} else {
partScan = getPartitionTableScan(r, right)
if (partScan.isDefined && canPruneRight(joinType) &&
hasPartitionPruningFilter(left) ) {
newRight = insertPredicate(r, newRight, l, left, leftKeys, partScan.get)
newRight = insertPredicate(r, newRight, l, left, leftKeys, partScan.get,
canBuildBroadcastLeft(joinType))
}
}
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ abstract class DynamicPartitionPruningSuiteBase
test("DPP triggers only for certain types of query",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false") {
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITON_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") {
Given("dynamic partition pruning disabled")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "false") {
val df = sql(
Expand Down Expand Up @@ -1429,6 +1430,39 @@ abstract class DynamicPartitionPruningSuiteBase
)
}
}

test("SPARK-32855: Filtering side can not broadcast by join type",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",
SQLConf.DYNAMIC_PARTITON_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") {

val sqlStr =
"""
|SELECT s.store_id,f. product_id FROM dim_store s
|LEFT JOIN fact_sk f
|ON f.store_id = s.store_id WHERE s.country = 'NL'
""".stripMargin

// DPP will only apply if disable reuseBroadcastOnly
Seq(true, false).foreach { reuseBroadcastOnly =>
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> s"$reuseBroadcastOnly") {
val df = sql(sqlStr)
checkPartitionPruningPredicate(df, !reuseBroadcastOnly, false)
}
}

// DPP will only apply if left side can broadcast by size
Seq(1L, 100000L).foreach { threshold =>
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> s"$threshold") {
val df = sql(sqlStr)
checkPartitionPruningPredicate(df, threshold > 10L, false)
}
}
}
}
}

class DynamicPartitionPruningSuiteAEOff extends DynamicPartitionPruningSuiteBase
Expand Down

0 comments on commit aaa0d2a

Please sign in to comment.