Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
val minNumPartitionsByGroup = if (coalesceGroups.length == 1) {
Seq(math.max(minNumPartitions, 1))
} else {
val sizes =
coalesceGroups.map(_.flatMap(_.shuffleStage.mapStats.map(_.bytesByPartitionId.sum)).sum)
val sizes = coalesceGroups.map(
_.shuffleStages.flatMap(_.shuffleStage.mapStats.map(_.bytesByPartitionId.sum)).sum)
val totalSize = sizes.sum
sizes.map { size =>
val num = if (totalSize > 0) {
Expand All @@ -90,8 +90,8 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe

val specsMap = mutable.HashMap.empty[Int, Seq[ShufflePartitionSpec]]
// Coalesce partitions for each coalesce group independently.
coalesceGroups.zip(minNumPartitionsByGroup).foreach { case (shuffleStages, minNumPartitions) =>
val advisoryTargetSize = advisoryPartitionSize(shuffleStages)
coalesceGroups.zip(minNumPartitionsByGroup).foreach { case (coalesceGroup, minNumPartitions) =>
val advisoryTargetSize = advisoryPartitionSize(coalesceGroup)
val minPartitionSize = if (Utils.isTesting) {
// In the tests, we usually set the target size to a very small value that is even smaller
// than the default value of the min partition size. Here we also adjust the min partition
Expand All @@ -103,14 +103,14 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
}

val newPartitionSpecs = ShufflePartitionsUtil.coalescePartitions(
shuffleStages.map(_.shuffleStage.mapStats),
shuffleStages.map(_.partitionSpecs),
coalesceGroup.shuffleStages.map(_.shuffleStage.mapStats),
coalesceGroup.shuffleStages.map(_.partitionSpecs),
advisoryTargetSize = advisoryTargetSize,
minNumPartitions = minNumPartitions,
minPartitionSize = minPartitionSize)

if (newPartitionSpecs.nonEmpty) {
shuffleStages.zip(newPartitionSpecs).map { case (stageInfo, partSpecs) =>
coalesceGroup.shuffleStages.zip(newPartitionSpecs).map { case (stageInfo, partSpecs) =>
specsMap.put(stageInfo.shuffleStage.id, partSpecs)
}
}
Expand All @@ -126,9 +126,12 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
// data sources may request a particular advisory partition size for the final write stage
// if it happens, the advisory partition size will be set in ShuffleQueryStageExec
// only one shuffle stage is expected in such cases
private def advisoryPartitionSize(shuffleStages: Seq[ShuffleStageInfo]): Long = {
private def advisoryPartitionSize(coalesceGroup: CoalesceGroup): Long = {
if (coalesceGroup.hasExplodingJoin) {
return conf.getConf(SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add a new config like explodingStageAdvisorySizeFactor to make the advisory size easy to change ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it harder to tune two configs (advisory size + advisory size factor for exploding) than just one advisory size for exploding? I didn't add a new config as I think COALESCE_PARTITIONS_MIN_PARTITION_SIZE has a reasonable default value to turn off coalesce for this case, and we do need to respect it when coalescing partitions.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought if the query contain multi-stages and there are more than one stage contain exploding operators. People may want to change this config more fine-grained. That's why I tried to support plan fragment config before.

I'm fine to use the COALESCE_PARTITIONS_MIN_PARTITION_SIZE.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we probably need to add reduce ratio which needs more time to test and build it correctly: #45357 (review)

By having the reduce ratio, we can do coalescing differently for different coalesce groups. Maybe we can do it as a follow-up.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can respect spark.sql.adaptive.coalescePartitions.parallelismFirst here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parallelismFirst affects the minNumPartitions, seems to be a different thing here.

}
val defaultAdvisorySize = conf.getConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES)
shuffleStages match {
coalesceGroup.shuffleStages match {
case Seq(stage) =>
stage.shuffleStage.advisoryPartitionSize.getOrElse(defaultAdvisorySize)
case _ =>
Expand All @@ -143,18 +146,18 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
* 1) all leaf nodes of this child are exchange stages; and
* 2) all these shuffle stages support coalescing.
*/
private def collectCoalesceGroups(plan: SparkPlan): Seq[Seq[ShuffleStageInfo]] = plan match {
private def collectCoalesceGroups(
plan: SparkPlan,
hasExplodingJoin: Boolean = false): Seq[CoalesceGroup] = plan match {
case r @ AQEShuffleReadExec(q: ShuffleQueryStageExec, _) if isSupported(q.shuffle) =>
Seq(collectShuffleStageInfos(r))
case unary: UnaryExecNode => collectCoalesceGroups(unary.child)
case union: UnionExec => union.children.flatMap(collectCoalesceGroups)
case join: CartesianProductExec => join.children.flatMap(collectCoalesceGroups)
// Note that, `BroadcastQueryStageExec` is a valid case:
// If a join has been optimized from shuffled join to broadcast join, then the one side is
// `BroadcastQueryStageExec` and other side is `ShuffleQueryStageExec`. It can coalesce the
// shuffle side as we do not expect broadcast exchange has same partition number.
case join: BroadcastHashJoinExec => join.children.flatMap(collectCoalesceGroups)
case join: BroadcastNestedLoopJoinExec => join.children.flatMap(collectCoalesceGroups)
Seq(CoalesceGroup(collectShuffleStageInfos(r), hasExplodingJoin))
case unary: UnaryExecNode => collectCoalesceGroups(unary.child, hasExplodingJoin)
// If a plan node does not need compatible data partitioning for its children, then each of its
// child can be an individual coalesce group and Spark will apply shuffle partitions coalescing
// for them independently,
case p if !childrenNeedCompatiblePartitioning(p) =>
val hasExplodingJoinSoFar = hasExplodingJoin || isExplodingJoin(p)
p.children.flatMap(collectCoalesceGroups(_, hasExplodingJoinSoFar))
// If not all leaf nodes are exchange query stages, it's not safe to reduce the number of
// shuffle partitions, because we may break the assumption that all children of a spark plan
// have same number of output partitions.
Expand All @@ -163,13 +166,30 @@ case class CoalesceShufflePartitions(session: SparkSession) extends AQEShuffleRe
// ShuffleExchanges introduced by repartition do not support partition number change.
// We change the number of partitions only if all the ShuffleExchanges support it.
if (shuffleStages.forall(s => isSupported(s.shuffleStage.shuffle))) {
Seq(shuffleStages)
// The recursion stops here, we need to call `p.exists(isExplodingJoin)` and find out if
// there is any exploding join in this sub-plan-tree.
Seq(CoalesceGroup(shuffleStages, hasExplodingJoin || p.exists(isExplodingJoin)))
} else {
Seq.empty
}
case _ => Seq.empty
}

private def childrenNeedCompatiblePartitioning(p: SparkPlan): Boolean = p match {
// TODO: match more plan nodes here.
case _: UnionExec => false
case _: CartesianProductExec => false
case _: BroadcastHashJoinExec => false
case _: BroadcastNestedLoopJoinExec => false
case _ => true
}

private def isExplodingJoin(p: SparkPlan): Boolean = p match {
case _: BroadcastNestedLoopJoinExec => true
case _: CartesianProductExec => true
case _ => false
}

private def collectShuffleStageInfos(plan: SparkPlan): Seq[ShuffleStageInfo] = plan match {
case ShuffleStageInfo(stage, specs) => Seq(new ShuffleStageInfo(stage, specs))
case _ => plan.children.flatMap(collectShuffleStageInfos)
Expand Down Expand Up @@ -202,3 +222,7 @@ private object ShuffleStageInfo {
case _ => None
}
}

private case class CoalesceGroup(
shuffleStages: Seq[ShuffleStageInfo],
hasExplodingJoin: Boolean)
Original file line number Diff line number Diff line change
Expand Up @@ -2861,6 +2861,26 @@ class AdaptiveQueryExecSuite
val unionDF = aggDf1.union(aggDf2)
checkAnswer(unionDF.select("id").distinct(), Seq(Row(null)))
}

test("SPARK-47247: coalesce differently for BNLJ") {
Seq(true, false).foreach { expectCoalesce =>
val minPartitionSize = if (expectCoalesce) "64MB" else "1B"
withSQLConf(
SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "64MB",
SQLConf.COALESCE_PARTITIONS_MIN_PARTITION_SIZE.key -> minPartitionSize) {
val (_, adaptivePlan) = runAdaptiveAndVerifyResult(
"SELECT /*+ broadcast(testData2) */ * " +
"FROM (SELECT value v, max(key) k from testData group by value) " +
"JOIN testData2 ON k + a > 0")
val bnlj = findTopLevelBroadcastNestedLoopJoin(adaptivePlan)
assert(bnlj.size == 1)
val coalescedReads = collect(adaptivePlan) {
case read: AQEShuffleReadExec if read.isCoalescedRead => read
}
assert(coalescedReads.nonEmpty == expectCoalesce)
}
}
}
}

/**
Expand Down