-
Notifications
You must be signed in to change notification settings - Fork 28k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-47253][CORE] Allow LiveEventBus to stop without the completely draining of event queue #45367
Conversation
…etely draining of event queue
@TakawaAkirayo Doesn't master have this issue? The pr should be submitted to the |
@LuciferYang The master branch have this issue too, this pr is already submitted in master branch. I add [3.5] in the title is because I though I must have a target version for PR. So what's the next action? please advise, thanks! |
@TakawaAkirayo But from the If you really want to backport this to branch-3.5 when it is merged, please change the issue type to BUG and provide some background about the bug. |
@LuciferYang I believe it's an improvement rather than a bug. If it's not back ported to version 3.5, will this change be included in the next release version? I'm okay for now with it because I can change the code in our internal spark version and use it. Once this change is available in the public version, I can switch to using the public Spark binary. Updated the version to 4.0.0 in the JIRA and start version to 4.0.0 in the config |
// in that case. | ||
if (Thread.currentThread() != dispatchThread) { | ||
if (waitForEventDispatchExit() && Thread.currentThread() != dispatchThread) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we remove waitForEventDispatchExit()
and inline conf.get(LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beliefer Thanks for the advise! I think use waitForEventDispatchExit is more readable here, it's explaining what it does. Using LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP, you have to jump to the place that define this conf to figure out the meaning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The name of LISTENER_BUS_EVENT_QUEUE_WAIT_FOR_EVENT_DISPATCH_EXIT_ON_STOP
have better readability.
It's not worth extract a method used only once too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@beliefer Sure, already commit, please have a check
Hi @LuciferYang @beliefer Just a follow up on this, what's the next action items? Any other reviewers need be involved? |
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
…until thread exit. 2. wait for a specified time
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/scheduler/AsyncEventQueue.scala
Outdated
Show resolved
Hide resolved
core/src/main/scala/org/apache/spark/internal/config/package.scala
Outdated
Show resolved
Hide resolved
…cala Co-authored-by: Mridul Muralidharan <1591700+mridulm@users.noreply.github.com>
….scala Co-authored-by: Mridul Muralidharan <1591700+mridulm@users.noreply.github.com>
…cala Co-authored-by: Mridul Muralidharan <1591700+mridulm@users.noreply.github.com>
…cala Co-authored-by: Mridul Muralidharan <1591700+mridulm@users.noreply.github.com>
|
||
// let the event drained now | ||
drainWait.acquire() | ||
assert(drained) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these two lines testing any behavior ?
It is a consequence of onJobEnd
concluding, right ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mridulm Yes, this is less relevant to the major change, it's just a check that the dispatch thread should unblocked. I removed those two lines now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me
The test failures are unrelated to this PR. |
I have updated the description, and merged to master. |
@mridulm @beliefer @LuciferYang Thanks for your review and guidance to improve the PR :-) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1, late LGTM. Thank you all.
What changes were proposed in this pull request?
Add config spark.scheduler.listenerbus.exitTimeout(default 0, wait until dispatch thread exist).
Before this PR: The event queue will wait for the event to drain completely on stop.
After this PR: Allow user to control this behavior(wait for completely drain or not) by spark config.
Why are the changes needed?
####Problem statement:
The SparkContext.stop() hung a long time on LiveEventBus.stop() when listeners slow
####User scenarios:
We have a centralized service with multiple instances to regularly execute user's scheduled tasks.
For each user task within one service instance, the process is as follows:
1.Create a Spark session directly within the service process with an account defined in the task.
2.Instantiate listeners by class names and register them with the SparkContext. The JARs containing the listener classes are uploaded to the service by the user.
3.Prepare resources.
4.Run user logic (Spark SQL).
5.Stop the Spark session by invoking SparkSession.stop().
In step 5, it will wait for the LiveEventBus to stop, which requires the remaining events to be completely drained by each listener.
Since the listener is implemented by users and we cannot prevent some heavy stuffs within the listener on each event, there are cases where a single heavy job has over 30,000 tasks,
and it could take 30 minutes for the listener to process all the remaining events, because within the listener, it requires a coarse-grained global lock and update the internal status to the remote database.
This kind of delay affects other user tasks in the queue. Therefore, from the server side perspective, we need the guarantee that the stop operation finishes quickly.
Does this PR introduce any user-facing change?
Add cofig spark.scheduler.listenerbus.exitTimeout.
Default is
0
, it will wait for the event to drain completely. If set to a non negative integer, the LivenEventBus will wait for atleast that duration (in ms) before it stops irrespective of whether the events are drained or not.How was this patch tested?
By UT and verified the feature in out production environment
Was this patch authored or co-authored using generative AI tooling?
No