Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-46052][CORE] Remove function TaskScheduler.killAllTaskAttempts #43954

Closed
wants to merge 20 commits into from

Conversation

Ngone51
Copy link
Member

@Ngone51 Ngone51 commented Nov 22, 2023

What changes were proposed in this pull request?

This PR removes the interface TaskScheduler.killAllTaskAttempts and its implementations. And replace it with TaskScheduler.cancelTasks. This PR also removes "abort stage" from TaskScheduler.cancelTasks but move it to after the call of TaskScheduler.cancelTasks with a control flag spark.legacy.scheduler.stage.abortAfterCancelTasks (true by default to keep the same behaviour for now). Because "abort stage" is not necessary while canceling tasks, see the comment at #43954 (comment).

Besides, this PR fixes a bug which pontentially launching new tasks after killing all the tasks in the stage attempt. This PR fixes it by marking it as zombie (i.e., suspend()) after the killing.

Why are the changes needed?

Spark has two functions to kill all tasks in a Stage:

  • cancelTasks: Not only kill all the running tasks in all the stage attempts but also abort all the stage attempts
  • killAllTaskAttempts: Only kill all the running tasks in all the stage attemtps but won't abort the attempts.

However, there's no use case in Spark that a stage would launch new tasks after its all tasks get killed. So I think we can replace killAllTaskAttempts with cancelTasks directly.

Does this PR introduce any user-facing change?

No. TaskScheduler is internal.

How was this patch tested?

Pass existing tests.

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the CORE label Nov 22, 2023
val reason = s"Task $task from barrier stage $failedStage (${failedStage.name}) " +
"failed."
val job = jobIdToActiveJob.get(failedStage.firstJobId)
val shouldInterrupt = job.exists(j => shouldInterruptTaskThread(j))
taskScheduler.killAllTaskAttempts(stageId, shouldInterrupt, reason)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that there is a subtle difference here: killAllTaskAttempts only kills tasks, whereas cancelTasks also calls tsm.abort() on the stage attempts, which might enqueue a new taskSetFailed event for each task set and I think that could have unintended side effects. Can you double-check whether we think that change is okay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This's really a good point. taskSetFailed will abort the stage and in turn fails the whole job, which is not the intended behaviour here. The problem of killAllTaskAttempts is that it doesn't mark the TaskSetManager as zombie after killing all the tasks. So the TaskSetManager could still launch new tasks (by retry), which is not expected.

But I'm also thinking do we really want to abort the stages in cancelTasks? cancelTasks is currently called inside cancelRunningIndependentStages only. And cancelRunningIndependentStages is directly or indirectly called in 3 cases:

  • When a job successfully finished: in this case, we expect that all the stages in this job can release the computation resources (i.e., kill all the tasks via cleanupStateForJobAndIndependentStages) immediately. But I think we don't expect this "release" action would lead to the stage abortion and in turn fail the job in the end. It doesn't fail the already succeeded job today because the succeeded job has been clean up (no longer exists in the activeJobs list) when the taskSetFailed event comes.

  • When a job is requested to cancel: this case is essentially the same with the above case but only the job finishes in different states.

  • When a stage aborts: in this case, we expect all the active jobs which depends on this stage to be canceled. Thus, we need to call cancelRunningIndependentStages on each active job. And this would finally fallback to the first case as the active job will be cleaned up (via cleanupStateForJobAndIndependentStages) first before the taskSetFailed event comes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. The worst case is we manually trigger "abort stage" in the existing callers of cancelTasks to keep the old behavior.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated. I removed the "abort stage" inside cancelTasks but move it to after the callers of cancelTasks. I also added a new conf spark.scheduler.stage.abortStageAfterCancelTasks to control wether we should abort the stage after cancelTasks. By default, we don't abort.

@@ -54,7 +54,7 @@ private[spark] trait TaskScheduler {
// Submit a sequence of tasks to run.
def submitTasks(taskSet: TaskSet): Unit

// Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage.
// Kill all the tasks in all the stage attempts of the same stage Id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add comment about mark all the stage attempts as zombie.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered to metion zombie here but gave up finally as think that it's only an API and we don't we don't enforce the same functionaltiy for every implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good to me.

val tsm = taskScheduler.taskSetManagerForAttempt(0, 0).get
assert(2 === tsm.runningTasks)

taskScheduler.killAllTaskAttempts(0, false, "test")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we update the test case with taskScheduler.cancelTasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realize that we have a test "cancelTasks shall kill all the running tasks and fail the stage" right above. So I think keeping that test should be enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thank you.

@@ -296,18 +296,32 @@ private[spark] class TaskSchedulerImpl(
new TaskSetManager(this, taskSet, maxTaskFailures, healthTrackerOpt, clock)
}

// Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't
// be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up
// once all the tasks has been finished. The stage attempt could be aborted after the call of
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm trying to understand the rationale here. What's the key difference between marking as zombie and aborting the stage? Is it for the still-running tasks?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

def abort(message: String, exception: Option[Throwable] = None): Unit = sched.synchronized {
  sched.dagScheduler.taskSetFailed(taskSet, message, exception)
  isZombie = true
  maybeFinishTaskSet()
}

When there is a call to abort, the TSM must be marked as zombie. So the key difference should come from dagScheduler.taskSetFailed. dagScheduler.taskSetFailed essentially cleans up the data related to this stage and fail the jobs which depends on this stage.

There's no difference to TSM between zombie and abort. Tasks in TSM can still run until finishes (whether killed or succeeded).

@@ -1894,24 +1894,8 @@ private[spark] class DAGScheduler(
job.numFinished += 1
// If the whole job has finished, remove it
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why markStageAsFinished no need?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cancelRunningIndependentStages already does that, see:

taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job), reason)
markStageAsFinished(stage, Some(reason))

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it.

Copy link
Contributor

@mridulm mridulm Nov 29, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not the same @Ngone51 - markStageAsFinished is for successful stage completions (when no reason), while cancelRunningIndependentStages is aborting other stages for the job which now need to be killed due to the job successfully terminating.

One impact of this is in the errorMessage - it will be nonEmpty from cancelRunningIndependentStages and so trigger failure paths.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm Oh I see. Thanks for catching this.

backend.killTask(tid, execId, interruptThread, s"Stage cancelled: $reason")
}
}
tsm.suspend()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you expect that the tsm should be finished. But it may not necessarily happen.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I don't. Killing tasks may take some time so I don't expect an immediate tsm finishes. suspend() intends to call maybeFinishTaskSet() for safe in case the tsm can't finish normally in the end. Although, I don't see it's an issue in prod (tms should finishes normally when all tasks finish) but I did see it fails a test (cancelTasks shall kill all the running tasks) without maybeFinishTaskSet().

@@ -2603,4 +2603,13 @@ package object config {
.stringConf
.toSequence
.createWithDefault("org.apache.spark.sql.connect.client" :: Nil)

private[spark] val LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS =
ConfigBuilder("spark.scheduler.stage.legacyAbortStageAfterCancelTasks")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be spark.legacy....

// Suspends this TSM to avoid launching new tasks.
//
// Unlike `abort()`, this function intentionally to not notify DAGScheduler to avoid
// redundant operations. So the invocation to this function should assume DAGScheduler
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So how expensive is the redundant operation? We may choose to always do them if it's cheap, to simplify the code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not expensive. The operation ("abort stage") is a noop in the end as I mentioned at https://github.com/apache/spark/pull/43954/files#r1402869071. I want to remove "abort stage" because I think it's not a right behaviour. "abort stage" always means any active jobs that depends on it needs to fail. So it doesn't make sense to me, for example, when a result stage succeeds and the job succeeds, but in turn we needs to cancel straggle running tasks in that result stage and abort that stage. The "abort" here will try to fail the job (which already succeeds) but just doesn't happen today because DAGScheduler is thread-safe and the succeeded job have been removed from the acive job list.

If we want to be conservative, I'm fine to keep as it is.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need me a bit more time to go over this, in the meantime can you please check my other comments please ?
Changes (if any) due to that would impact how we analyze the rest of the pr.

Thanks for working on this @Ngone51 !

@@ -2860,6 +2844,11 @@ private[spark] class DAGScheduler(
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job), reason)
if (sc.getConf.get(LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull this out as a field

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, but let's wait for @mridulm to do a final sign off.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will need to look into it in more detail, but I am not sure this change is without side effects.

I am dumping my thoughts here, though it is spread out in the code.
Considering existing state:

  • killAllTaskAttempts will simply kill any running attempts and then we are done - with all additional suffix work done by the caller.
  • cancelTasks was using killAllTaskAttempts, and then additionally doing a tsm.abort.

The new behavior is:

  • killAllTaskAttempts is now cancelTasks
  • cancelTasks becomes earlier killAllTaskAttempts + a tsm.suspend (which does isZombie = true + tsm.maybeFinishTaskSet)

This has two impacts:

  • Additional call to suspend for existing killAllTaskAttempts
  • Lack of TaskSetFailed for existing cancelTasks (which could be impacting use from abortStage, job cancellation, etc)

Note - I would love to reduce the api surface, and this might still be exactly the same as existing behavior - but it has resulted in a bit of nontrivial change already and I would like us to make sure this is safe and equivalent.

+CC @JoshRosen in case I am missing something.

@@ -2603,4 +2603,13 @@ package object config {
.stringConf
.toSequence
.createWithDefault("org.apache.spark.sql.connect.client" :: Nil)

private[spark] val LEGACY_ABORT_STAGE_AFTER_CANCEL_TASKS =
ConfigBuilder("spark.legacy.scheduler.stage.abortAfterCancelTasks")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Namespace it below scheduler ? Something like spark.scheduler.stage.legacyAbortAfterCancelTasks ...
Also, mark it as internal ?

Btw, we should flip this switch on and off in the relevant tests to check if the behavior is preserved.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, we should flip this switch on and off in the relevant tests to check if the behavior is preserved.

It is a bit annoying to flip this conf switch on and off only for the relevant unit tests in DAGSchedulerSuite since it uses a global SparkContext. One way to do this is to might intrduce a new entire suite called DAGSchedulerWithAbortStageDisabledSuite but not sure it's worth to do that. Or we could extract the relevant unit tests into a separate suite and then flip the conf with two on- and off- suites. a bit complicated though.

"from TaskScheduler.cancelTasks()")
.version("4.0.0")
.booleanConf
.createWithDefault(false)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to default to true and switch to false in a later ver ?

@Ngone51
Copy link
Member Author

Ngone51 commented Dec 4, 2023

@mridulm Thanks for the detailed comment.

Additional call to suspend for existing killAllTaskAttempts

Note that we always call markStageAsFinished after the call to killAllTaskAttempts. So I think we don't expect to launch new tasks for such stage. And suspend, especially "mark as zombie", excatly avoids that. I think this's actually a bug that we need to fix.

Lack of TaskSetFailed for existing cancelTasks (which could be impacting use from abortStage, job cancellation, etc)

TaskSetFailed leads to abortStage. And I think the essential (main) effect of abortStage is to fail the active jobs which depend on the stage. And I have the analysis here to explain why I think abortStage is a noop at the end during the call of cancelTasks. Could you take a look there?

@mridulm
Copy link
Contributor

mridulm commented Dec 6, 2023

Thanks for the details @Ngone51, sorry for the delay in going over this - your explaination makes sense to me.
Can you update the PR description about the fact that we are also fixing a bug through the change with suspend ?

Did another pass, and dont have additional notes beyond what I have already left some comments - thanks for working on this !

@Ngone51
Copy link
Member Author

Ngone51 commented Dec 6, 2023

Thanks @mridulm . Will address your comments.

@Ngone51
Copy link
Member Author

Ngone51 commented Dec 7, 2023

Btw, we should flip this switch on and off in the relevant tests to check if the behavior is preserved.

@mridulm I tried to enable spark.scheduler.stage.legacyAbortAfterCancelTasks in DAGSchedulerSuite. It does appear to have several test fauilures 😢 . I'm working on investigating the failures right now.

@Ngone51
Copy link
Member Author

Ngone51 commented Dec 8, 2023

To fix the tests, I have to move (fe70ba9) the "abort stage" call back into cancelTasks() with the control flag rather than call "abort stage" after cancelTasks(). After the move, the timing of "abort stage" call strictly follows the original behavior.

The probelm of calling "abort stage" after cancelTasks() is, e.g., test DAGSchedulerSuite has its own implementation of TaskScheduler which overrides the cancelTasks() with no "abort stage". But our change has extracted "abort stage" out side of cancelTasks(). Thus, the intention to not call "abort stage" in cancelTask() is broken and so test fails.

@Ngone51
Copy link
Member Author

Ngone51 commented Dec 12, 2023

Damn! Barrier stage seems to be a special case. It called killAllTaskAttempts() to kill all the other tasks when there was a task failure but didn't abort the stage as it would have a retry later. In this PR, we replace killAllTaskAttempts() with cancelTasks() and enables stage abortion by default within cancelTasks(). This leads to the barrier stage failure instead of retry. It would only work for all the cases if we're confident enough to remove stage abortion from cancelTasks() thoroughly without any control flag.

@mridulm
Copy link
Contributor

mridulm commented Dec 13, 2023

Ah, interesting - I had not looked at barrier stage in as much detail; my initial observation was it worked fine, but you are right - this does break the assumption.

@@ -54,7 +54,7 @@ private[spark] trait TaskScheduler {
// Submit a sequence of tasks to run.
def submitTasks(taskSet: TaskSet): Unit

// Kill all the tasks in a stage and fail the stage and all the jobs that depend on the stage.
// Kill all the tasks in all the stage attempts of the same stage Id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at the comment, shall we keep killAllTaskAttempts instead of cancelTasks, as the naming of killAllTaskAttempts fits the comment better?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, thanks!

// Kill all the tasks in all the stage attempts of the same stage Id. Note stage attempts won't
// be aborted but will be marked as zombie. The stage attempt will be finished and cleaned up
// once all the tasks has been finished. The stage attempt could be aborted after the call of
// `cancelTasks` if required.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// `cancelTasks` if required.
// `killAllTaskAttempts ` if required.

// Unlike `abort()`, this function intentionally to not notify DAGScheduler to avoid
// redundant operations. So the invocation to this function should assume DAGScheduler
// already knows about this TSM failure. For example, this function can be called from
// `TaskScheduler.cancelTasks` by DAGScheduler.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

// Throw UnsupportedOperationException if the backend doesn't support kill tasks.
def cancelTasks(stageId: Int, interruptThread: Boolean, reason: String): Unit
def killAllTaskAttempts(stageId: Int, interruptThread: Boolean, reason: String): Unit
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: shall we put this method after def killTaskAttempt? The same for the testing TaskScheduler implementations to reduce the code diff.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated, thanks!

@Ngone51
Copy link
Member Author

Ngone51 commented Jan 12, 2024

@mridulm @cloud-fan Could you help merge the PR if you don't have other comments? Thanks!

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 96f34bb Jan 12, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
5 participants