Skip to content

Conversation

@miaoever
Copy link

What changes were proposed in this pull request?

We found a race condition in our prod jobs when executor finished registration but when the it starts to heartbeat, the driver tells the executor is still unknown. We proposed to sync the executor registration status directly between the CoarseGrainedSchedulerBackend and the HeartbeatReciever to prevent the race condition.

Why are the changes needed?

It caused by the HeartbeatReciever is only hearing any registered executor from CoarseGrainedSchedulerBackend through the message bus, and by the nature of the asynchronous message delivering mechanism of message bus, there is no guarantee of how long it can take between message posting and message receiving. In our prod failure job case, the time is longer than 2min (the interval between executor finished registration and started the heartbeat), so it caused the inconsistent states of which executors were registered between HeartbeatReciever and CoarseGrainedSchedulerBackend

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit test and we've been running this fix in our production for a while and no failure from this race condition happened any more.

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Jul 17, 2024
@LuciferYang
Copy link
Contributor

Can you update the PR title? The current title looks more like a Jira ticket name. For a PR title, it's better to clearly reflect what this PR does.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Effectively, this change is converting HeartbeatReceiver from a spark listener to something that scheduler directly invokes, and ties it with CoarseGrainedSchedulerBackend

This is a nontrivial change, and I am not sure if it will cause side effects - not just within spark, but for external integrations as well.

+CC @JoshRosen for thoughts.

override def onExecutorAdded(executorAdded: SparkListenerExecutorAdded): Unit = {
addExecutor(executorAdded.executorId)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks inconsistent to have scheduler call into heartbeat receiver for execution addition and not removal.

Copy link
Contributor

@JoshRosen JoshRosen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From

sc.listenerBus.addToManagementQueue(this)
I see that the HeartbeatReceiver is registered on the management queue. Based on code search it looks like ExecutorAllocationManager should be the only other component sharing a queue with it (unless a user has managed to add their own listener to that queue, that is).

... there is no guarantee of how long it can take between message posting and message receiving. In our prod failure job case, the time is longer than 2min (the interval between executor finished registration and started the heartbeat) ....

In your real-world case, was your driver / SparkContext-hosting process experiencing long GC pauses or general unhealthiness due to some other reason? Or do you think the ExecutorAllocationManager was slow / blocking for some reason?

I'm curious whether the issue that you saw is perhaps a symptom of some other issue.


That being said, it looks like the HeartbeatReceiver only uses the event listener for the purposes of receiving executor addition / removal events, and the immediate processing of those is simply to enqueue a task on its own event loop (which itself is cheap).

Given this, I wouldn't be too concerned about performance problems from replacing the async. listener with a synchronous callback.

From what I can tell, SparkListenerExecutorAdded and SparkListenerExecutorRemoved are currently only published from two places:

  • CoarseGrainedSchedulerBackend
  • LocalSchedulerBackend

and I don't think that we support user-pluggable scheduler backends.

As an aside, I'm surprised that we perform heartbeats at all in local mode: this is something I recently stumbled across during some flaky unit test investigation, where I saw surprising heartbeat timeout issues in local-mode unit tests (under extremely high CI system load and GC issues.


So, given all of this, I think this change is directionally reasonable. But if we want to pursue this, I think that we should go all the way and have this not be a listener at all and use direct publication for both executor addition and removal.

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Dec 14, 2024
@github-actions github-actions bot closed this Dec 15, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants