-
Notifications
You must be signed in to change notification settings - Fork 28.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-28560][SQL][followup] change the local shuffle reader from leaf node to unary node #26250
[SPARK-28560][SQL][followup] change the local shuffle reader from leaf node to unary node #26250
Conversation
@cloud-fan @maryannxue Please help me review. Thanks. |
Test build #112645 has finished for PR 26250 at commit
|
The failed test may be not related. Please help to re-test. Thanks. |
retest it please |
@@ -129,7 +129,8 @@ abstract class QueryStageExec extends LeafExecNode { | |||
*/ | |||
case class ShuffleQueryStageExec( | |||
override val id: Int, | |||
override val plan: ShuffleExchangeExec) extends QueryStageExec { | |||
override val plan: ShuffleExchangeExec, | |||
var isLocalShuffle: Boolean = false) extends QueryStageExec { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's avoid using mutable states when not necessary.
@@ -70,7 +70,8 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { | |||
} | |||
// ShuffleExchanges introduced by repartition do not support changing the number of partitions. | |||
// We change the number of partitions in the stage only if all the ShuffleExchanges support it. | |||
if (!shuffleStages.forall(_.plan.canChangeNumPartitions)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can change the logic of collecting shuffle stages:
def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match {
case _: LocalShuffleReaderExec = Nil
case stage: ShuffleQueryStageExec => Seq(stage)
case ReusedQueryStageExec(_, stage: ShuffleQueryStageExec, _) => Seq(stage)
case _ => plan.children.flatMap(collectShuffleStages)
}
val shuffleStages = collectShuffleStages(plan)
We should also remove the changes to |
@cloud-fan @maryannxue update the coments. Please help me review. Thanks. |
Test build #112748 has finished for PR 26250 at commit
|
thanks, merging to master! |
What changes were proposed in this pull request?
Why are the changes needed?
When make the
LocalShuffleReaderExec
to leaf node, there exists a potential issue: the leaf node will hide the running query stage and make the unfinished query stage as finished query stage when creating its parent query stage.This PR make the leaf node to unary node.
Does this PR introduce any user-facing change?
No
How was this patch tested?
Existing tests