New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-46021][CORE] Support cancel future jobs belonging to a job group #43926
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left an initial pass of review comments (will take a closer look tomorrow).
@@ -1347,7 +1347,7 @@ The following SQLSTATEs are collated from: | |||
|XX001 |XX |Internal Error |001 |data_corrupted |PostgreSQL |N |PostgreSQL Redshift | | |||
|XX002 |XX |Internal Error |002 |index_corrupted |PostgreSQL |N |PostgreSQL Redshift | | |||
|XXKD0 |XX |Internal Error |KD0 |Analysis - Bad plan |Databricks |N |Databricks | | |||
|XXKDA |XX |Internal Error |KAS |Scheduler (Aether Scheduler) |Databricks |N |Databricks | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given that this is adding an error class for an existing OSS Apache Spark error, we should probably use a non-Databricks error code here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 'Databricks' here means the 'origin' and 'used by', similar to errors that originates from other engines. Taken an example of
|XXKD0 |XX |Internal Error |KD0 |Analysis - Bad plan |Databricks |N |Databricks
, it's used in open source error class:
"PLAN_VALIDATION_FAILED_RULE_IN_BATCH" : {
"message" : [
"Rule <rule> in batch <batch> generated an invalid plan: <reason>"
],
"sqlState" : "XXKD0"
},
core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Outdated
Show resolved
Hide resolved
} | ||
|
||
def sparkJobCancelledAsPartOfJobGroupError(jobId: Int, jobGroupId: String): SparkException = { | ||
sparkJobCancelled(jobId, s"part of cancelled job group $jobGroupId", null) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like this is consistent with the message thrown in the other code path at
spark/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Lines 1207 to 1209 in 021a5e6
val jobIds = activeInGroup.map(_.jobId) | |
jobIds.foreach(handleJobCancellation(_, | |
Option("part of cancelled job group %s".format(groupId)))) |
which is important in case end-user code is matching on these exception messages (since we previously didn't provide a special exception or error class for distinguishing these errors) 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused. Shall the driver exit if throw a spark exception?
"message" : [ | ||
"Job <jobId> cancelled <reason>" | ||
], | ||
"sqlState" : "XXKDA" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How can a user hit this bug using SQL?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the engine with cancellation it's straightforward, that if user cancel the command, the error could be designed to pop to the top.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is my last concern. We need this error class as users can trigger it using RDD APIs. But RDD API is kind of internal now, and we don't expect users to call them directly.
To confirm, SQL users won't hit it, right? Image the user canceled a query, but the AQE loop is still running and submitted some more jobs, this failure will be swallowed and not propagated to users, right? also cc @liuzqt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory a SQL user might see this if they are submitting a query to the ThriftServer and then the query's underlying Spark job is canceled via the Web UI (unless the ThriftServer wraps the resulting query failure into some other exception or error code).
Regardless of the likelihood of this error occurring in SQL, I think it's still beneficial to migrate it to the error classes framework so that it is easier to match on cancellation exceptions without having to do string matching.
@@ -1728,6 +1728,16 @@ package object config { | |||
.checkValue(v => v > 0, "The max failures should be a positive value.") | |||
.createWithDefault(40) | |||
|
|||
private[spark] val CANCELLED_JOB_GROUP_SET_SIZE = | |||
ConfigBuilder("spark.scheduler.job.cancelledJobGroupSet.size") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe spark.scheduler.numJobGroupsToTrackForCancel
?
ConfigBuilder("spark.scheduler.job.cancelledJobGroupSet.size") | ||
.doc("The size of the set to store cancelled job groups, if the job group is cancelled " + | ||
"with cancelFutureJobs = true. If the size of the set exceeds this value, the oldest job " + | ||
"group will be removed from the set.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The maximum number of job groups to track for canceling job groups and their future jobs. If the maximum number is hit, the oldest job group will no longer be tracked and its future jobs may not be cancelled.
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Outdated
Show resolved
Hide resolved
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Outdated
Show resolved
Hide resolved
finalRDD: RDD[_], | ||
func: (TaskContext, Iterator[_]) => _, | ||
partitions: Array[Int], | ||
callSite: CallSite, | ||
listener: JobListener, | ||
artifacts: JobArtifactSet, | ||
properties: Properties): Unit = { | ||
// If this job belongs to a cancelled job group, skip running it | ||
val jobGroupIdOpt = Option(properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID)) | ||
if (jobGroupIdOpt.exists(cancelledJobGroups.contains(_))) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the schedule thread and event thread are asynchronous, it seems doesn't cancel the future job not always effective.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The handler
function are run in the synchronized block, so the "cancelJobGroup(cancelFutureJobs = true)
=> the current active jobs will be cancelled + this job group is added to the set" should be run atomically. And later the future job belonging to this job group will be cancelled on submission.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, I agree with @anchovYu's assessment:
- This
handleJobSubmitted
method and thehandleJobGroupCancelled
method both run in the single-threaded event thread. - Consider both potential interleavings:
- If
handleJobGroupCancelled
runs first then the group will be marked as cancelled andhandleJobSubmitted
will skip the job. - If
handleJobSubmitted
runs first thenhandleJobGroupCancelled
will see a running job and cancel it.
- If
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen Thank you for the description. I got it now.
thanks, merging to master! |
What changes were proposed in this pull request?
This PR supports a new API in SparkContext
cancelJobGroupAndFutureJobs(jobGroup)
. It not only cancels the active jobs, future submitted jobs that belongs to this job group will be cancelled and not run.Internally, it uses a limited-size (current size: 1000, controlled by config
CANCELLED_JOB_GROUP_SET_SIZE
) FIFO set to record all the job group cancelled with this new API.This PR also adds a new error class
SPARK_JOB_CANCELLED
without changing the error message at all, based on the assumption that it's a fundamental error that some downstream workload could rely on parsing the error message.Why are the changes needed?
Improvements.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
New tests.
Was this patch authored or co-authored using generative AI tooling?
No.