Skip to content

[SPARK-47247][SQL] Use smaller target size when coalescing partitions with exploding joins#45357

Closed
cloud-fan wants to merge 1 commit into
apache:masterfrom
cloud-fan:coalesce
Closed

[SPARK-47247][SQL] Use smaller target size when coalescing partitions with exploding joins#45357
cloud-fan wants to merge 1 commit into
apache:masterfrom
cloud-fan:coalesce

Conversation

@cloud-fan
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This PR changes the target partition size of AQE partition coalescing from spark.sql.adaptive.advisoryPartitionSizeInBytes (default 64MB) to spark.sql.adaptive.coalescePartitions.minPartitionSize (default 1MB) for non-equi joins, namely, broadcast nested loop join and cartesian product join, in order to minimize OOM risks as these join operators tend to be exploding joins and usually work better with smaller partitions compared to other operators.

Why are the changes needed?

reduce the OOM risk, as after data exploding, 64mb input can become super large

Does this PR introduce any user-facing change?

no

How was this patch tested?

new test

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions Bot added the SQL label Mar 1, 2024
@cloud-fan
Copy link
Copy Markdown
Contributor Author

cc @yaooqinn @ulysses-you

Copy link
Copy Markdown
Member

@wangyum wangyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not only join can make data explode. E.g.: ExpandExec. Can we achieve it by adjusting spark.sql.adaptive.advisoryPartitionSizeInBytes. E.g.:

  private[sql] def tunePostShuffleInputSize(plan: SparkPlan, conf: SQLConf): Long = {
    val postShuffleInputSize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
    var reduceRatio = 1
    plan.foreach {
      case ObjectHashAggregateExec(_, _, _, _, aggregateExpr, _, _, _, _) =>
        aggregateExpr.foreach(_.aggregateFunction match {
          case _: Percentile => reduceRatio += 1
          case _: ApproximatePercentile => reduceRatio += 1
          case _ =>
        })
      case _: BroadcastNestedLoopJoinExec => reduceRatio += 3
      case expand: ExpandExec => reduceRatio += expand.projections.size
      case SortMergeJoinExec(_, _, _, Some(condition), _, _, _)
        if containsInequalityPredicate(condition) => reduceRatio += 2
      case ShuffledHashJoinExec(_, _, _, _, Some(condition), _, _, _)
        if containsInequalityPredicate(condition) => reduceRatio += 2
      case ProjectExec(projectList, _) =>
        projectList.foreach {
          case _: GetJsonObject => reduceRatio += 1
          case Alias(_: GetJsonObject, _) => reduceRatio += 1
          case _ =>
        }

      case _ =>
    }
    postShuffleInputSize / reduceRatio
  }

@cloud-fan
Copy link
Copy Markdown
Contributor Author

fine-grained reduce ratio looks nice, but it needs a lot of tuning, and more or less based on experience. I don't think fixed reduce ratios work for all the cases and we probably need to add many configs. The approach in this PR is like a coarse-grained fallback: if there are exploding joins, do not coalesce (since the default is 1mb) but people can still tune it a bit.

ExpandExec should not have many projectLists and won't explode the data too much. But I don't know how people use it in real world use cases.

private def advisoryPartitionSize(shuffleStages: Seq[ShuffleStageInfo]): Long = {
private def advisoryPartitionSize(coalesceGroup: CoalesceGroup): Long = {
if (coalesceGroup.hasExplodingJoin) {
return conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a new config like explodingStageAdvisorySizeFactor to make the advisory size easy to change ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it harder to tune two configs (advisory size + advisory size factor for exploding) than just one advisory size for exploding? I didn't add a new config as I think COALESCE_PARTITIONS_MIN_PARTITION_SIZE has a reasonable default value to turn off coalesce for this case, and we do need to respect it when coalescing partitions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought if the query contain multi-stages and there are more than one stage contain exploding operators. People may want to change this config more fine-grained. That's why I tried to support plan fragment config before.

I'm fine to use the COALESCE_PARTITIONS_MIN_PARTITION_SIZE.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we probably need to add reduce ratio which needs more time to test and build it correctly: #45357 (review)

By having the reduce ratio, we can do coalescing differently for different coalesce groups. Maybe we can do it as a follow-up.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can respect spark.sql.adaptive.coalescePartitions.parallelismFirst here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallelismFirst affects the minNumPartitions, seems to be a different thing here.

@cloud-fan
Copy link
Copy Markdown
Contributor Author

thanks for the review, merging to master!

@cloud-fan cloud-fan closed this in e310e76 Mar 6, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants