-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28560][SQL][followup] support the build side to local shuffle reader as far as possible in BroadcastHashJoin #26289
Conversation
…as possible in BroadcastHashJoin
@cloud-fan Please help me review. Thanks. |
Test build #112816 has finished for PR 26289 at commit
|
The failed test may be not related. |
@@ -525,7 +525,7 @@ object AdaptiveSparkPlanExec { | |||
* Apply a list of physical operator rules on a [[SparkPlan]]. | |||
*/ | |||
def applyPhysicalRules(plan: SparkPlan, rules: Seq[Rule[SparkPlan]]): SparkPlan = { | |||
rules.foldLeft(plan) { case (sp, rule) => rule.apply(sp) } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the space should be there.
val shuffleStages = collectShuffleStages(plan) | ||
|
||
val optimizedPlan = if (shuffleStages.isEmpty || | ||
!shuffleStages.forall(_.plan.canChangeNumPartitions)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different from ReduceNumShufflePartitions
. ReduceNumShufflePartitions
needs to change all the shuffles together, so as long as there is a user-added shuffle, we need to skip it.
OptimizeLocalShuffleReader
can add local reader to any shuffle, so it's simple
private def canAddLocalReader(stage: QueryStage): Boolean = stage match {
case s: ShuffleQueryStage => s.plan.canChangeNumPartitions
case ReusedQueryStage(s: ShuffleQueryStage) => s.plan.canChangeNumPartitions
}
plan.transformUp {
case stage: QueryStageExec if canAddLocalReader(stage) =>
LocalShuffleReaderExec(stage)
}
Test build #112840 has finished for PR 26289 at commit
|
Test build #112841 has finished for PR 26289 at commit
|
How about we parameterize this rule into two versions, one for adding local shuffle reader on probe side which checks for extra exchanges and the other with no such checks at all? |
We always need the check. A query stage can be very complex, though it doesn't have exchanges in the middle. For example, we may have multiple BHJ, aggregate in one query stage, without shuffle between them. |
Why do we need the check on the build side? |
...core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
Show resolved
Hide resolved
@maryannxue after offline discussion with @cloud-fan , agree with the new optimization method in local reader rule: separate adding local reader in probe and build side and then we can ensure the local reader in build side will not be reverted. Already update the PR, please help review. Thanks. |
...core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
Outdated
Show resolved
Hide resolved
Test build #112891 has finished for PR 26289 at commit
|
retest this please |
...core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
Outdated
Show resolved
Hide resolved
...core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala
Outdated
Show resolved
Hide resolved
val optimizedPlan = plan.transformDown { | ||
case join: BroadcastHashJoinExec if canUseLocalShuffleReaderRight(join) => | ||
// Add local reader in probe side. | ||
val tmpOptimizedProbeSidePlan = plan.transformDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: withProbeSideLocalReader
* shuffle introduced. If introduced, we will revert all the local | ||
* reader in probe side. | ||
* Step2: Add the local reader in build side and will not check whether | ||
* additional shuffle introduced.Because the build side will not introduce |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: space after ... introduce.
if (numExchangeAfter > numExchangeBefore) { | ||
logDebug("OptimizeLocalShuffleReader rule is not applied due" + | ||
val numExchangeAfter = numExchanges(EnsureRequirements(conf).apply(tmpOptimizedProbeSidePlan)) | ||
val optimizedProbeSidePlan = if (numExchangeAfter > numExchangeBefore) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe simply "optimizedPlan"
// Add the local reader in build side and will not check whether | ||
// additional shuffle introduced. | ||
optimizedProbeSidePlan.transformDown { | ||
case join: BroadcastHashJoinExec if canUseLocalShuffleReaderBuildLeft(join) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case join: BroadcastHashJoinExec if join.buildSide == BuildLeft && isShuffle(join.left) =>
@@ -56,16 +76,25 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { | |||
case e: ShuffleExchangeExec => e | |||
}.length | |||
} | |||
|
|||
// Check whether additional shuffle introduced. If introduced, revert the local reader. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now this rule converts local shuffle reader for all BroadcastHashJoinExec and then reverts all local shuffle readers if any of local shuffle reader causes additional shuffle.
Can we just revert the local shuffle readers that cause additional shuffle and keep these not?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the best, but I don't know if there is an easy way to do it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can implement using revert all the local reader currently and re-optimize later when we find a better way.
// The child of remaining two BroadcastHashJoin is not ShuffleQueryStage. | ||
// So only two LocalShuffleReader. | ||
checkNumLocalShuffleReaders(adaptivePlan, 1) | ||
// *(7) BroadcastHashJoin [b#24], [a#33], Inner, BuildLeft |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit verbose to put the entire query plan here. How about we only put the sketch?
BroadcastHashJoin
+- BroadcastExchange
+- LocalShuffleReader*
+- ShuffleExchange
+- BroadcastHashJoin
+- BroadcastExchange
+- LocalShuffleReader*
+- ShuffleExchange
+- LocalShuffleReader*
+- ShuffleExchange
+- BroadcastHashJoin
+- LocalShuffleReader*
+- ShuffleExchange
+- BroadcastExchange
+-HashAggregate
+- CoalescedShuffleReader
+- ShuffleExchange
// The child of remaining two BroadcastHashJoin is not ShuffleQueryStage. | ||
// So only two LocalShuffleReader. | ||
checkNumLocalShuffleReaders(adaptivePlan, 1) | ||
// *(6) BroadcastHashJoin [cast(value#14 as int)], [a#220], Inner, BuildLeft |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
@cloud-fan @viirya update the comments. Please help review again. Thanks. |
// Add the local reader in build side and and do not need to check whether | ||
// additional shuffle introduced. | ||
optimizedPlan.transformDown { | ||
case join: BroadcastHashJoinExec if (join.buildSide == BuildLeft && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: we can unify the condition check some how
object BroadcastJoinWithShuffleLeft {
def unapply(plan: LogicalPlan): Option[(QueryStageExec, BuildSide)] = plan match {
case join: BroadcastHashJoinExec if ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) =>
Some((join.left.asInstanceOf[QueryStageExec], join.buildSide))
}
}
object BroadcastJoinWithShuffleRight ...
// add probe side local reader
case join @ BroadcastJoinWithShuffleLeft(shuffleStage, BuildRight) =>
val localReader = ...
join.copy(left = localReader)
...
// add build side local reader
case join @ BroadcastJoinWithShuffleLeft(shuffleStage, BuildLeft) =>
...
LGTM except one minor comment |
Test build #112895 has finished for PR 26289 at commit
|
Test build #112908 has finished for PR 26289 at commit
|
Test build #112993 has finished for PR 26289 at commit
|
Test build #112998 has finished for PR 26289 at commit
|
case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { | ||
|
||
def canUseLocalShuffleReaderLeft(join: BroadcastHashJoinExec): Boolean = { | ||
def canUseLocalShuffleReaderProbeLeft(join: BroadcastHashJoinExec): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove it now
join.buildSide == BuildRight && ShuffleQueryStageExec.isShuffleQueryStageExec(join.left) | ||
} | ||
|
||
def canUseLocalShuffleReaderRight(join: BroadcastHashJoinExec): Boolean = { | ||
def canUseLocalShuffleReaderProbeRight(join: BroadcastHashJoinExec): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
Test build #113002 has finished for PR 26289 at commit
|
I see Spark R failures in other PRs as well, and it's totally unrelated to this PR: checking CRAN incoming feasibility ...Error in .check_package_CRAN_incoming(pkgdir) thanks, merging to master! |
What changes were proposed in this pull request?
PR#25295 already implement the rule of converting the shuffle reader to local reader for the
BroadcastHashJoin
in probe side. This PR support converting the shuffle reader to local reader in build side.Why are the changes needed?
Improve performance
Does this PR introduce any user-facing change?
No
How was this patch tested?
existing unit tests