New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-30524] [SQL] Disable OptimizeSkewedJoin rule when introducing additional shuffle #27226
Conversation
@cloud-fan @hvanhovell @maryannxue Please help review if you have available time. Thanks for your help. |
Test build #116807 has finished for PR 27226 at commit
|
private def containShuffleQueryStage(plan : SparkPlan): (Boolean, ShuffleQueryStageExec) = | ||
plan match { | ||
case stage: ShuffleQueryStageExec => (true, stage) | ||
case sort: SortExec if (sort.child.isInstanceOf[ShuffleQueryStageExec]) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: case SortExec(_, _, s: ShuffleQueryStageExec, _)
private def reOptimizeChild( | ||
skewedReader: SkewedPartitionReaderExec, | ||
child: SparkPlan): SparkPlan = child match { | ||
case sort: SortExec if (sort.child.isInstanceOf[ShuffleQueryStageExec]) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
|Try to optimize skewed join. | ||
|Left side partition size: $leftSizeInfo | ||
|Right side partition size: $rightSizeInfo | ||
|Try to optimize skewed join. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the previous indentation seems corrected.
s1 @ SortExec(_, _, left: ShuffleQueryStageExec, _), | ||
s2 @ SortExec(_, _, right: ShuffleQueryStageExec, _)) | ||
if supportedJoinTypes.contains(joinType) => | ||
private def containShuffleQueryStage(plan : SparkPlan): (Boolean, ShuffleQueryStageExec) = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just return Option[ShuffleQueryStageExec]
? we can rename the method to getShuffleQueryStage
child: SparkPlan): SparkPlan = child match { | ||
case sort @ SortExec(_, _, s: ShuffleQueryStageExec, _) => | ||
sort.copy(child = skewedReader) | ||
case _ => child |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be: case _: ShuffleQueryStageExec => skewedReader
?
} | ||
} | ||
} | ||
logDebug(s"number of skewed partitions is ${skewedPartitions.size}") | ||
if (skewedPartitions.nonEmpty) { | ||
val visitedStages = HashSet.empty[Int] | ||
val optimizedSmj = smj.transformDown { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about transformUp
? Then we don't need the visitedStages
val optimizedSmj = smj.transformDown { | ||
case sort @ SortExec(_, _, shuffleStage: ShuffleQueryStageExec, _) => | ||
sort.copy(child = PartialShuffleReaderExec(shuffleStage, skewedPartitions.toSet)) | ||
case shuffleStage: ShuffleQueryStageExec if !visitedStages.contains(shuffleStage.id) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be safe, we should do case s: ShuffleQueryStageExec if s.id == left.id || s.id == right.id
@@ -189,6 +230,21 @@ case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { | |||
} | |||
} | |||
|
|||
def handleSkewJoin(plan: SparkPlan): SparkPlan = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not a long method, maybe just inline it in apply
?
@@ -579,6 +579,33 @@ class AdaptiveQueryExecSuite | |||
} | |||
} | |||
|
|||
test("SPARK-30524: AQE should disable OptimizeSkewedJoin rule" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: SPARK-30524: Do not optimize skew join if introduce additional shuffle
Test build #116810 has finished for PR 27226 at commit
|
Test build #116816 has finished for PR 27226 at commit
|
Test build #116819 has finished for PR 27226 at commit
|
Test build #116808 has finished for PR 27226 at commit
|
retest this please |
Test build #116835 has finished for PR 27226 at commit
|
thanks, merging to master! |
handleSkewJoin(plan) | ||
// When multi table join, there will be too many complex combination to consider. | ||
// Currently we only handle 2 table join like following two use cases. | ||
// SMJ SMJ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry that my previous comment was wrong. Once we have shuffle, there should always be a sort. So we don't need to match this.
@@ -95,8 +96,20 @@ case class SortMergeJoinExec( | |||
s"${getClass.getSimpleName} should not take $x as the JoinType") | |||
} | |||
|
|||
override def requiredChildDistribution: Seq[Distribution] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably make this a flag to indicate it's a partial SMJ. This whole matching is too tightly coupled with the skew join rule itself.
@JkSelf can you do a quick follow up for the comments above as well as this one: |
### What changes were proposed in this pull request? Resolve the remaining comments in [PR#27226](#27226). ### Why are the changes needed? Resolve the comments. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing unit tests. Closes #27253 from JkSelf/followup-skewjoinoptimization2. Authored-by: jiake <ke.a.jia@intel.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
OptimizeSkewedJoin
rule change theoutputPartitioning
after insertingPartialShuffleReaderExec
orSkewedPartitionReaderExec
. So it may need to introduce additional to ensure the right result. This PR disableOptimizeSkewedJoin
rule when introducing additional shuffle.Why are the changes needed?
bug fix
Does this PR introduce any user-facing change?
No
How was this patch tested?
Add new ut