New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-20079][Core][yarn] Re registration of AM hangs spark cluster in yarn-client mode. #17480
Conversation
Test build #75391 has finished for PR 17480 at commit
|
Would you please help to elaborate the problem you met? That would be better to understand your scenario and fix. |
The ExecutorAllocationManager.reset method is called when re-registering AM, which sets the ExecutorAllocationManager.initializing field true. When this field is true, the Driver does not start a new executor from the AM request. The following two cases will cause the field to False
After the a stage was submitted, the AM was killed and restart ,the above two cases will not appear.
|
@witgo thanks for your explanation. But AFAIK if AM get restarted, it will honor initial executor number to launch executors, so after executors are launched, stage should be able to get executed. Is you initial executor number set to 0? |
@jerryshao Yes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The fix seems reasonable to me. But I'm also wondering is there any way to handle this in the scheduler side.
@@ -249,7 +249,9 @@ private[spark] class ExecutorAllocationManager( | |||
* yarn-client mode when AM re-registers after a failure. | |||
*/ | |||
def reset(): Unit = synchronized { | |||
initializing = true | |||
if (maxNumExecutorsNeeded() == 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please add some comments about the purpose of this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
Test build #75499 has started for PR 17480 at commit |
Test build #75508 has finished for PR 17480 at commit
|
initializing = true | ||
/** | ||
* When some tasks need to be scheduled, resetting the initializing field may cause | ||
* it to not be set to false in yarn. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it is not a yarn only issue, the description is not precise.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also can you elaborate more, I think this issue only exists in initial executor = 0 and stages are running scenario
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently this method will only be called in yarn-client mode when AM re-registers after a failure.
Also CC @tgravescs @vanzin to help to review, they may have more thoughts :). |
Test build #75601 has finished for PR 17480 at commit
|
* SPARK-20079: https://issues.apache.org/jira/browse/SPARK-20079 | ||
*/ | ||
if (maxNumExecutorsNeeded() == 0) { | ||
initializing = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kinda raises the question. Is it ever correct to set this to true
here?
This method is only called when the YARN client-mode AM is restarted, and at that point I'd expect initialization to have already happened (so I don't see a need to reset the field in any situation).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao Can you explain the following comments? I do not understand.
if (initializing) {
// Do not change our target while we are still initializing,
// Otherwise the first job may have to ramp up unnecessarily
0
} else if (maxNeeded < numExecutorsTarget) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the original design of dynamic executor allocation, this flag is set to "true" to avoid sudden executor number ramp up (because of first job submission) during initializing. You could check the comment.
And for the AM restart scenario, because all the executors will be re-spawned, this is similar to AM first start scenario (if the job is submitted during restart), so we set this flag to true.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry but that doesn't really explain much. Why is it bad to ramp up quickly? At which point are things not "initializing" anymore?
Isn't the AM restarting the definition of "I should ramp up quickly because I might be in the middle of a big job being run"?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@vanzin sorry I think I didn't explain well.
If this flag initializing
is set to false during initialization, updateAndSyncNumExecutorsTarget
will recalculate the required executor number and ramp down the executors if there's no job in the current time. And then if first job is submitted, it still requires to ramp up executors to meet the requirement.
For the AM restart scenario I think it is similar during initializing. One exception is the scenario mentioned here, for the case here should ramp up soon to meet the requirement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One downside could be:
During running tasks, when the total number of executors is the value of spark.dynamicAllocation.maxExecutors and the AM is failed. Then a new AM restarts. Because in ExecutorAllocationManager, the total number of executors does not changed, driver does not send RequestExecutors to AM to ask executors. Then the total number of executors is the value of spark.dynamicAllocation.initialExecutors . So the total number of executors in driver and AM is different.
Because when AM is restarted, it will change it's state to the initial state, whereas if ExecutorAllocationManager
's state is still the current state, then the state maintained in two sides will be out of sync, and required executor number calculated in AM side will be wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're just saying that when a new AM registers the driver needs to tell it how many executors it wants. So, basically, instead of the driver doing that, currently the driver just resets itself to the initial state, hurting any running jobs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when a new AM registers the driver needs to tell it how many executors it wants.
When a AM registers, it leverages configuration to decide the initial number of executors should be created, not driver who told him how many executors it wants. That's why in the driver side if we don't change the executor number to match the AM side, we will meet the problem as mentioned above (because driver hasn't yet told AM the executor number).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're explaining what the code does as a justification for why a hacky fix should be applied to this issue. I'm asking why the code needs to behave like that. If there's no actual need for the code to behave like that, it should be fixed.
Basically, imagine that at t1 the AM dies, and at t2 a new AM comes up and registers. What should happen from the driver's point of view? (Note, what should happen, not what the code does.)
In my view, the answer is "nothing". The driver knows what it needs, so the new AM should start as closely as possible to the state of the previous AM. Doing that might be hard (e.g. caching the complete list of known containers somewhere, probably the driver), so some things are sub-optimal (containers will be re-started). But as far as numbers go, the new AM should basically start up with the same number of containers the previous AM was managing (ignoring the time needed to start them up).
If the AM doesn't do that currently, then why is that? It asks the driver for state related to the previous AM already (see RetrieveLastAllocatedExecutorId
call). Why can't that call return more state needed for the new AM to sync up to what the driver needs?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I understood your thinking.
This actually comes from the definition of reset, should it be the initial state or the last state before failure. In our previous commit we chose the former to roll back to the initial state. But here you suggest the latter is better. I agree with you the latter looks more reasonable, also could address the problem here. Thanks for the clarification.
@witgo are you planning to update this PR to fix the behavior of The biggest problem I have with this patch is that reading the code does not give you any insight into why So in my view the right path here is to fix |
@vanzin |
I probably won't have time to look at a proper fix for this anytime soon, but I don't think your current patch is the right fix. |
OK, I will do the work at weekends. |
Test build #76076 has started for PR 17480 at commit |
@@ -249,7 +249,6 @@ private[spark] class ExecutorAllocationManager( | |||
* yarn-client mode when AM re-registers after a failure. | |||
*/ | |||
def reset(): Unit = synchronized { | |||
initializing = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jerryshao @vanzin
I think that deleting the initializing = true
is a good idea.
Test build #76089 has finished for PR 17480 at commit
|
When there is some need of task scheduling,
ExecutorAllocationManager
instances do not reset theinitializing
fieldHow was this patch tested?
Unit tests.