Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-21656][CORE] spark dynamic allocation should not idle timeout executors when tasks still to run #18874

Closed
wants to merge 2 commits into from

Conversation

yoonlee95
Copy link

@yoonlee95 yoonlee95 commented Aug 7, 2017

What changes were proposed in this pull request?

Right now spark lets go of executors when they are idle for the 60s (or configurable time). I have seen spark let them go when they are idle but they were really needed. I have seen this issue when the scheduler was waiting to get node locality but that takes longer than the default idle timeout. In these jobs the number of executors goes down really small (less than 10) but there are still like 80,000 tasks to run.
We should consider not allowing executors to idle timeout if they are still needed according to the number of tasks to be run.

How was this patch tested?

Tested by manually adding executors to executorsIdsToBeRemoved list and seeing if those executors were removed when there are a lot of tasks and a high numExecutorsTarget value.

Code used

In ExecutorAllocationManager.start()

    start_time = clock.getTimeMillis()

In ExecutorAllocationManager.schedule()

    val executorIdsToBeRemoved = ArrayBuffer[String]()
    if ( now > start_time + 1000 * 60 * 2) {
      logInfo("--- REMOVING 1/2 of the EXECUTORS ---")
      start_time +=  1000 * 60 * 100
      var counter = 0
      for (x <- executorIds) {
        counter += 1
        if (counter == 2) {
          counter = 0
          executorIdsToBeRemoved += x
        }
      }
    }

@tgravescs
Copy link
Contributor

ok to test

@SparkQA
Copy link

SparkQA commented Aug 7, 2017

Test build #80360 has finished for PR 18874 at commit 3c2cf90.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tgravescs
Copy link
Contributor

@yoonlee95 please update with unit tests

@srowen
Copy link
Member

srowen commented Aug 9, 2017

Doesn't this make the 'target' effectively the minimum?
As I say on the JIRA I still do not see a behavior that needs fixing here.

@tgravescs
Copy link
Contributor

the bug is that with idle timeout the number of executors can go to a very small number, even zero and we never look back to make sure that doesn't happen.

@srowen
Copy link
Member

srowen commented Aug 9, 2017

That is correct behavior, as defined by the idle timeout and the min number of executors, which are already configured. I do not understand why going to the small number that the config explicitly allows can be viewed as a problem. We are in circles. I would object to this unless you can give a concrete example of a behavior that you can't achieve with existing configs.

@tgravescs
Copy link
Contributor

is going to 0 executors and allowing a deadlock a bug?

@srowen
Copy link
Member

srowen commented Aug 9, 2017

Going to 0 executors is not a bug, if you set the min to 0. A deadlock is a bug. But, nothing in the JIRA or here suggests there's a deadlock -- what do you mean?

@tgravescs
Copy link
Contributor

There is nothing in the code stopping your from you idle timeouting all of your executors.. thus executors are 0 and you deadlock. 0 executors = deadlock = definite bug. We definitely want to fix that. User should not have to set a minimum number to stop a deadlock.

The jira did not explicitly state you can go to 0, it states the general problem. The going to 0 is just a very bad case of the general problem.

I believe a small number of executors is a bug as well. There are multiple reasons:

  1. Things can happen in the system that are not expected that can cause delays. Spark should be resilient to these. If the driver is GC'ing, you have network delays, etc we could idle timeout executors even though there are tasks to run on them its just the scheduler hasn't had time to start those tasks. these just slow down the users job, the user does not want this.

  2. Internal Spark components have opposing requirements. The scheduler has a requirement to try to get locality, the dynamic allocation doesn't know about this and it giving away executors it hurting the scheduler from doing what it was designed to do.
    Ideally we have enough executors to run all the tasks on. If dynamic allocation allows those to idle timeout the scheduler can not make proper decisions. In the end this hurts users by affects the job. A user should not have to mess with the configs to keep this basic behavior.

So I guess our disagreement is in what the definition of the idle timeout is. If the system were in perfect sync and the scheduler could tell the dynamic allocation that it will never need that executor for that stage then let it timeout. We don't have that case so the dynamic allocation manager is doing something that doesn't jive with the logic in the scheduler. It specifically has logic to try to get good locality. We should not be giving away the executors while its trying to do that logic. From a user point of view this is very bad behavior and can cause the stage to take much longer. The user only wants things to idle timeout when they could never be used. This is when the # of tasks is < the number of executors * cores per executor.

There are multiple ways we could fix this but the present is the most straight forward. In the very least we need to make sure it doesn't go to zero and at some point look to see if we should reacquire executors and try to let scheduler do its job again. ie after some point tasks that got locality will likely have finished and scheduler falls back more. But again that is just more complex logic.

I understand how you are saying since they are idle its best to give them back, but that directly affects the scheduler and as I've seen in production causes bad things to happen.

perhaps we need others to chime in. @vanzin @squito thoughts?

@srowen
Copy link
Member

srowen commented Aug 9, 2017

Why is 0 executors a 'deadlock'? if there is no work to do, 0 executors is fine. If there is work to do, of course, at least 1 executor should not time out. Is that what you're claiming happens? that would be a bug, but that's not what this addresses though.

I don't understand the claim that a small number of executors is somehow wrong. 0 is a fine number to have if you have no work.

What do delays have to do with timing out idle executors? they're idle because no work is sent, not because they can't be reached.

Yes, you may have a situation where the scheduler prefers to wait to schedule on executor A, even though B has slots (or no tasks at all) because of locality. That is by design. You are claiming this is an error because it will need it 'later' or something -- how would you even know that?

If B times out because nobody sends it work, that is fine. You're saying let B wait a little longer: fine, increase the idle timeout. Or you're saying, wait for A for a shorter time: fine, decrease the locality wait.

Or: if B is getting no work, how can it matter whether it's removed?

But this change just ignores the minimum, and I dont see how it's related to any of these ideas.

@tgravescs
Copy link
Contributor

I'm saying you have a stage running that has > 0 tasks to run. If dynamic allocation has already got all the executors it originally thought it needed and they all idle timeout then you have 0 executors, then you can't run the tasks anywhere. Deadlock. We don't try to reacquire executors until the next stage. This patch does address that bug by not allowing you to go to 0 executors. it keeps enough executors to run all the tasks in parallel.

the executors idle timeout is based on a time. If there are other delays in the system that cause the scheduler to not schedule tasks fast enough the executors will idle timeout. This could be delays in network, could be because driver was Gcing, etc. if the scheduler doesn't put a task on that executor fast enough we can idle timeout it and never get a replacement back.

The problem with locality is that you don't know unless you give the scheduler time to do its logic. it starts by scheduling things node local and eventually falls back to rack and then any. If you don't allow the executors to stay long enough for it to fall back then it can't schedule them there and by the nature things will go slower because it can't run the tasks in parallel.

NO this is not a problem the user should be solving by increasing the idle timeout. You want things to timeout fairly quickly when there are no tasks that could be run on that. Changing the timeout will affect that badly. and NO I don't necessarily want to decrease the locality wait as it could again affect job performance. This again is going to be very job dependent as well as what executors I get in this particular instance of the application. A user should not have to work around this issue by changing the configurations. The fact is we give back executors and never get more when the spark job could be using them.

It matters if its removed because it would be used shortly and we never get another executor back and thus I can never run all my tasks in parallel.

This does not ignore the minimum, the minimum is used when there aren't enough tasks to actually use all the executors.

@srowen
Copy link
Member

srowen commented Aug 9, 2017

How do you reach 0 executors when there is still a task to schedule? That if anything is the bug, but it isn't what's contemplated here, so, confused.

I disagree, the rest of your scenarios are examples of when increasing the idle timeout is appropriate. But those also don't relate to 0 executors?

@tgravescs
Copy link
Contributor

I suggest you go understand the code.

I've already explained this multiple times. You get 0 executors by there being delays when an executors doesn't have a task scheduled. say you have a stage with 10 tasks. say 1 executors can run 1 task, it finishes, the driver Gc's for 60 seconds before it can put another task on it the dynamic allocation manager idle times out that executor. We never ask for more executors. This patch does address 0 executors, but again that is more the edge case, the real problem is it goes down a few and never gets any back. A job that should take 10's of minutes and take hours because of this.

increasing the idle timeout is just a work around for the problem its not a solution. As I've said multiple times increasing the idle timeout has other consequences and a user should not have to increase the idle timeout just to get there job to run in a reasonable time. They should increase or decrease it to optimize things between stages or jobs. the definitely of dynamic allocation is to automatically get executors when they are needed. We are not doing this! they may not be needed at this moment, but if we would either keep them or reacquire them they would be used and are needed.

@jerryshao
Copy link
Contributor

I think the current fix is a feasible and simple solution for the scenarios mentioned above. As far as I understand from the comments above, ideally this problem should not be happened, but in a real cluster especially driver is stuck by GC or others, this will be happened easily and delay the task scheduling, to the extreme going into deadlock situation.

My only concern is that will this fix bring in some edge cases due to the complexity of this part (though I cannot imagine it currently). Since now idle executors will stay in the executorsPendingToRemove queue indefinitely.

@srowen
Copy link
Member

srowen commented Aug 10, 2017

@tgravescs that's actually progress. You're no longer saying that the goal is to keep a few executors around just in case (https://issues.apache.org/jira/browse/SPARK-21656) or that the problem is waiting on locality (https://issues.apache.org/jira/browse/SPARK-21656?focusedCommentId=16117159&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16117159) .

I believe then you're saying the problem is what I asked about in #18874 (comment) : there should be no way to go to 0 executors when there is any work to do. The scheduler should never make that decision, even if the min. I agree.

You're actually saying something stronger: the number of executors should not go below target, not minimum. If so can we update the description to state it that way? (and if so, is checking vs min redundant?)

But what about just fixing the 0 executor case as that is the scenario where no progress can be made?

This change is a heuristic with side effects, as noted just above. This means you don't remove legitimately idle executors that the scheduler won't use. It harms the common case, though probably marginally. It's behavior change.

It helps a case only where the driver is stuck for periods longer than an executor idle timeout. I think you have bigger problems if this is the case, right? If you have 60s GC pauses, you need to tune GC (or, idle timeout), but it's fair to have to tune something if you don't like the slowdown from executors having to be reallocated.

Is this a good tradeoff? I don't think so, but don't feel extreme about it.
Is it important to address 0 executors, more narrowly? yes.

@tgravescs
Copy link
Contributor

I've updated the description in https://issues.apache.org/jira/browse/SPARK-21656 to join all my comments here together, hopefully that clarifies it.

@tgravescs
Copy link
Contributor

To answer a few of your last questions.
It doesn't hurt the common case, the common case is all your executors have tasks on them as long as there are tasks to run. Normally scheduler can fill up the executors. It will use more resources if the scheduler takes time to put tasks on them, but that versus the time wasted in jobs that don't have enough executors to run on is hard to quantify because its going to be so application dependent. yes it is a behavior change but a behavior change that is fixing an issue.

I would much rather see us doing as much as possible to make things work and be as fast as possible for the user. This is another reason I don't think a user should have to change configs for this.

Like I've mentioned before, the other approach would be to let them idle timeout and then go back later to get more and see if they can be used. This again is a trade off. The only other real way to fix this is for us to flip this and actually have the scheduler tell us exactly which nodes it wants when it wants them and we go get them. The problem still is with yarn we aren't guaranteed the exact node. That is also a much bigger architectural change though.

for GC yes your job might have other issues, other things like node slowdown or network slowness have nothing to do with your job. Again Spark should be resilient to any weird errors and do its best to make things run well.

@tgravescs
Copy link
Contributor

tgravescs commented Aug 10, 2017

Also note that I would like to investigating making the locality logic in the scheduler better as I don't think it should take 60+ seconds for it to fall back to use a node for rack local. Ideally we also have the scheduler telling the allocator it needs a node. I just see that as a bigger task. If there are real objections to this we can fix the basic going to 0 and wait on that, but I don't' think that is the right approach.

@vanzin
Copy link
Contributor

vanzin commented Aug 10, 2017

I think the fix makes sense; the part that is not clear is why this is happening, since the default locality timeout is 3s and the default executor idle timeout is 60s, so they really shouldn't affect each other in the normal case. (So basically Tom's last comment.)

@tgravescs
Copy link
Contributor

so I think the issue with the locality is that it resets the time (3s wait) whenever it schedules any task at the particular locality level (in this case node local) on any node. So it can take a lot longer then 3 seconds for it to fall back to rack local for any specific task. So if no tasks are node local on this node it can wait a long time to fall back.
I think its more ideal we look at it on a per task basis. I don't see a reason to have a task wait 60 seconds+ skipping over rack local nodes. Locality doesn't matter that much for the majority of applications and you are just wasting time starting.
I still need to look more at the scheduler logic to confirm and stuff, but either way I think this change is good to have. I'm going to be filing a separate jira for that shortly

@squito
Copy link
Contributor

squito commented Aug 10, 2017

This change makes sense to me.

Tom's last comment about resetting that timeout every time one task is scheduled I think explains how you get in this situation and why you don't actually want to change those confs. A few months back I was looking at that and chatted with Kay about why that was the case -- I forget the details off the top of my head, but at least remember there was some reason why it wasn't trivial to change. So I think this is the right immediate fix.

@srowen
Copy link
Member

srowen commented Aug 10, 2017

Seems not-unreasonable to me given the current problem statement. It does solve the possible problem about 0 executors, and then some.

The possible impact to a normal app is like: run a bunch of short-lived stages (think iterative ML). Target executor count stays high. But the tasks schedule on just a subset of executors because they finish quickly and the rest wait for the data-local slot and finish on those executors too. In this scenario, the extra executors can't be released, though will always be idle, because they have to be there to keep up the target count. Right now, they'd be released. This scenario is not unrealistic in my experience, but it's the only problem scenario I can think of.

(Am I right that the check vs minimum executor count here is now redundant? the target can't go under the minimum count, and executors can't go under the target count now on removal.)

I guess I'm still sort of unclear how in the stuck-driver scenario that onExecutorBusy isn't firing to mark executors as not-idle, but, the idle-timeout schedule() loops is still running fine. But it's imaginable. Yes this change fixes that scenario, and sounds like it has been observed, though may be chalked up to dire driver states that are going to fall over anyhow. It does sound logical to not let the idle-timeout loop take the executor count below target, even though I assumed it was because of the scenario above, maybe?

Those are the things we're weighing, and there's clear support for maybe inconveniencing the first scenario to both help the second and fix the 0-executor risk, so I have no issue with that.

@tgravescs
Copy link
Contributor

The minimum count is still needed, its needed between stages when the number of tasks goes below the minimum count. Its either going to keep minimum number of executors or enough executors to run all tasks. so if the # of tasks is say 2 and your minimum number of executors is 10, its going to always keep 10 executors. To me the minimum is exactly for keep things between stages and jobs.

@SparkQA
Copy link

SparkQA commented Aug 14, 2017

Test build #80645 has finished for PR 18874 at commit bcfa1f0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito
Copy link
Contributor

squito commented Aug 15, 2017

@srowen you have a good point about a case that becomes worse after this change. Still I think this change is better on balance.

btw, there are more even more odd cases with dynamic allocation right now -- the one that I've seen most often is if you have a run really short tasks, but all in sequence, you probably won't release any executors. Say you first run some really large job with 10k tasks, and so you request a bunch of executors. After that, you only ever run 100 tasks at a time, so you could release a bunch of resources. But in 60 seconds, its enough time for a bunch of short stages to execute. Each stage chooses a random set of executors to run on. So no executor is ever idle for 60 seconds.

Perhaps we could also fix that in some larger change which more tightly integrated dynamic allocation into the scheduler that Tom was alluding to.

@tgravescs
Copy link
Contributor

+1

asfgit pushed a commit that referenced this pull request Aug 16, 2017
…executors when tasks still to run

## What changes were proposed in this pull request?

Right now spark lets go of executors when they are idle for the 60s (or configurable time). I have seen spark let them go when they are idle but they were really needed. I have seen this issue when the scheduler was waiting to get node locality but that takes longer than the default idle timeout. In these jobs the number of executors goes down really small (less than 10) but there are still like 80,000 tasks to run.
We should consider not allowing executors to idle timeout if they are still needed according to the number of tasks to be run.

## How was this patch tested?

Tested by manually adding executors to `executorsIdsToBeRemoved` list and seeing if those executors were removed when there are a lot of tasks and a high `numExecutorsTarget` value.

Code used

In  `ExecutorAllocationManager.start()`

```
    start_time = clock.getTimeMillis()
```

In `ExecutorAllocationManager.schedule()`
```
    val executorIdsToBeRemoved = ArrayBuffer[String]()
    if ( now > start_time + 1000 * 60 * 2) {
      logInfo("--- REMOVING 1/2 of the EXECUTORS ---")
      start_time +=  1000 * 60 * 100
      var counter = 0
      for (x <- executorIds) {
        counter += 1
        if (counter == 2) {
          counter = 0
          executorIdsToBeRemoved += x
        }
      }
    }

Author: John Lee <jlee2@yahoo-inc.com>

Closes #18874 from yoonlee95/SPARK-21656.

(cherry picked from commit adf005d)
Signed-off-by: Tom Graves <tgraves@yahoo-inc.com>
@asfgit asfgit closed this in adf005d Aug 16, 2017
MatthewRBruce pushed a commit to Shopify/spark that referenced this pull request Jul 31, 2018
…executors when tasks still to run

## What changes were proposed in this pull request?

Right now spark lets go of executors when they are idle for the 60s (or configurable time). I have seen spark let them go when they are idle but they were really needed. I have seen this issue when the scheduler was waiting to get node locality but that takes longer than the default idle timeout. In these jobs the number of executors goes down really small (less than 10) but there are still like 80,000 tasks to run.
We should consider not allowing executors to idle timeout if they are still needed according to the number of tasks to be run.

## How was this patch tested?

Tested by manually adding executors to `executorsIdsToBeRemoved` list and seeing if those executors were removed when there are a lot of tasks and a high `numExecutorsTarget` value.

Code used

In  `ExecutorAllocationManager.start()`

```
    start_time = clock.getTimeMillis()
```

In `ExecutorAllocationManager.schedule()`
```
    val executorIdsToBeRemoved = ArrayBuffer[String]()
    if ( now > start_time + 1000 * 60 * 2) {
      logInfo("--- REMOVING 1/2 of the EXECUTORS ---")
      start_time +=  1000 * 60 * 100
      var counter = 0
      for (x <- executorIds) {
        counter += 1
        if (counter == 2) {
          counter = 0
          executorIdsToBeRemoved += x
        }
      }
    }

Author: John Lee <jlee2@yahoo-inc.com>

Closes apache#18874 from yoonlee95/SPARK-21656.

(cherry picked from commit adf005d)
Signed-off-by: Tom Graves <tgraves@yahoo-inc.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants