New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-41469][CORE] Avoid unnecessary task rerun on decommissioned executor lost if shuffle data migrated #39011
Conversation
cc @warrenzhu25 too |
core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
Outdated
Show resolved
Hide resolved
Mark as WIP first regarding the compilation error and missing ut. Any feedback is still welcome. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for working on this improvement.
It's really the change I want. Great work. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice fix @Ngone51 !
Had a couple of comments.
core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
Outdated
Show resolved
Hide resolved
The failed test seems to be flaky: - decommission workers ensure that shuffle output is regenerated even with shuffle service *** FAILED *** (18 seconds, 479 milliseconds)
[info] 5 did not equal 4 Expected 4 tasks but got List(0:0:0:0-SUCCESS, 0:0:0:0-SUCCESS, 0:0:1:0-SUCCESS, 0:0:0:1-SUCCESS, 1:0:0:0-SUCCESS) (DecommissionWorkerSuite.scala:191)
[info] org.scalatest.exceptions.TestFailedException:
[info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
[info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
[info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
[info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
[info] at org.apache.spark.deploy.DecommissionWorkerSuite.$anonfun$new$6(DecommissionWorkerSuite.scala:191)
.... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you take a look at the failure? It looks like relevant.
[error] Failed: Total 3462, Failed 1, Errors 0, Passed 3461, Ignored 9, Canceled 2
[error] Failed tests:
[error] org.apache.spark.deploy.DecommissionWorkerSuite
579cc22
to
984dea9
Compare
The failure is not reproducible every time. I suspect it is still a flaky test. I will keep an eye on it. |
Got it. If then, it's okay. Thank you for checking and confirming that. |
Let me take that back, this can actually be explained - and is an effect of this PR. Now, we also check for We should fix the test to account for the change in behavior - or relook at whether this is a legitimate case we are missing ? (dont think so, but want to be sure) |
@mridulm Thanks for the help. Let me take another look. (Sorry for the delay was unhealthy last week... |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me, thanks for fixing this @Ngone51 !
Merged to master. |
Thanks @mridulm @dongjoon-hyun |
…ecutor lost if shuffle data migrated This PR proposes to avoid rerunning the finished shuffle map task in `TaskSetManager.executorLost()` if the executor lost is caused by decommission and the shuffle data has been successfully migrated. To avoid unnecessary task recomputation. No. Added UT Closes apache#39011 from Ngone51/decom-executor-lost. Authored-by: Yi Wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…ecutor lost if shuffle data migrated ### What changes were proposed in this pull request? This PR proposes to avoid rerunning the finished shuffle map task in `TaskSetManager.executorLost()` if the executor lost is caused by decommission and the shuffle data has been successfully migrated. ### Why are the changes needed? To avoid unnecessary task recomputation. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added UT Closes apache#39011 from Ngone51/decom-executor-lost. Authored-by: Yi Wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…ecutor lost if shuffle data migrated This PR proposes to avoid rerunning the finished shuffle map task in `TaskSetManager.executorLost()` if the executor lost is caused by decommission and the shuffle data has been successfully migrated. To avoid unnecessary task recomputation. No. Added UT Closes apache#39011 from Ngone51/decom-executor-lost. Authored-by: Yi Wu <yi.wu@databricks.com> Change-Id: Ic6142fdb304ed67df019111210e728b7b73a917d Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> Reviewed-on: https://bigdataoss-internal-review.googlesource.com/c/third_party/apache/spark/+/54919 Reviewed-by: Animesh Nandanwar <animeshvn@google.com> Tested-by: Prow Service Account <425329972751-compute@developer.gserviceaccount.com> Reviewed-by: Wei Yan <weiyans@google.com>
val maybeShuffleMapOutputLoss = isShuffleMapTasks && | ||
(reason.isInstanceOf[ExecutorDecommission] || !env.blockManager.externalShuffleServiceEnabled) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Ngone51 @mridulm I have a question about the logic here:
Executor decommissioning does not necessarily imply worker decommissioning. If an external shuffle service is used and the executor is decommissioned without the worker also being decommissioned then the shuffle files will continue to be available at the original host.
Prior to this PR, I don't think this executorLost
method would not have scheduled re-runs in that case because the && !env.blockManager.externalShuffleServiceEnabled
condition would evaluate to false
when the ESS was used, causing us to skip all of the resubmission logic here.
With this PR, though, I think these changes might actually cause unnecessary task re-submission in that case because the (reason.isInstanceOf[ExecutorDecommission] || !env.blockManager.externalShuffleServiceEnabled)
condition would evaluate to true and locationOpt.exists(_.host != host)
would evaluate to false because the original outputs are still available because no migration is needed.
I think this could be addressed by checking whether ExecutorDecommission.workerHost
is defined, i.e. to do
val workerIsDecommissioned = reason match {
case e: ExecutorDecommission if e.workerHost.isDefined => true
case _ => false
}
val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
(workerIsDecommissioned || !env.blockManager.externalShuffleServiceEnabled)
That said, my argument above doesn't hold for Spark-on-Kubernetes because it never sets workerHost
:
Lines 336 to 341 in a2a5299
executorsPendingDecommission.get(id) match { | |
case Some(host) => | |
// We don't pass through the host because by convention the | |
// host is only populated if the entire host is going away | |
// and we don't know if that's the case or just one container. | |
removeExecutor(id, ExecutorDecommission(None)) |
Given this, I am wondering whether this PR's change might represent a regression when Dynamic Allocation is used alongside an external shuffle service in YARN.
WDYT? Am I interpreting the code correctly or have I overlooked something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay - this message got lost in my inbox.
You are correct @JoshRosen, we should indeed check for ExecutorDecommission.workerHost.isDefined
for standalone
+CC @Ngone51 in case I am missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
#(&£€&445₩)*khakqganHynosql sharsing
ShahrzadMahro
در تاریخ یکشنبه ۲۱ آوریل ۲۰۲۴، ۲۲:۳۶ Parth Shyara ***@***.***>
نوشت:
… ***@***.**** commented on this pull request.
------------------------------
In core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
<#39011 (comment)>:
> + val maybeShuffleMapOutputLoss = isShuffleMapTasks &&
+ (reason.isInstanceOf[ExecutorDecommission] || !env.blockManager.externalShuffleServiceEnabled)
@Ngone51 <https://github.com/Ngone51> @mridulm
<https://github.com/mridulm> Is the above issue being tracked elsewhere?
—
Reply to this email directly, view it on GitHub
<#39011 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/ANEZZRHCBDW5SHLP2PE25W3Y6QE45AVCNFSM6AAAAAASZOZVDSVHI2DSMVQWIX3LMV43YUDVNRWFEZLROVSXG5CSMV3GSZLXHMZDAMJTGUYDCMBYHA>
.
You are receiving this because you are subscribed to this thread.Message
ID: ***@***.***>
|
What changes were proposed in this pull request?
This PR proposes to avoid rerunning the finished shuffle map task in
TaskSetManager.executorLost()
if the executor lost is caused by decommission and the shuffle data has been successfully migrated.Why are the changes needed?
To avoid unnecessary task recomputation.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Added UT