Skip to content

Conversation

@jerryshao
Copy link
Contributor

This patch propose to lazy start the scheduler for dynamic allocation to avoid fast ramp down executor numbers is load is less.

This implementation will:

  1. immediately start the scheduler is numExecutorsTarget is 0, this is the expected behavior.
  2. if numExecutorsTarget is not zero, start the scheduler until the number is satisfied, if the load is less, this initial started executors will last for at least 60 seconds, user will have a window to submit a job, no need to revamp the executors.
  3. if numExecutorsTarget is not satisfied until the timeout, this means resource is not enough, the scheduler will start until this timeout, will not wait infinitely.

Please help to review, thanks a lot.

@SparkQA
Copy link

SparkQA commented May 27, 2015

Test build #33571 timed out for PR 6430 at commit 25af47b after a configured wait of 150m.

@jerryshao
Copy link
Contributor Author

Jenkins, retest this please.

@sryza
Copy link
Contributor

sryza commented May 28, 2015

If my understanding is correct, this affects the add behavior, making it so that targetNumExecutors can't increase until 60 seconds have passed or we've received all the initial requested executors.

Can we change the logic to be that we don't reduce targetNumExecutors below initialExecutors until either

  • a stage has been submitted
  • we've waited for spark.dynamicAllocation.executorIdleTimeout seconds

@jerryshao
Copy link
Contributor Author

Hey @sryza , so you mean we only consider the targetNumExecutors below than initialExecutors situation, if the load is heavy, immediately increase the targetNumExecutors, is that right ?

@sryza
Copy link
Contributor

sryza commented May 28, 2015

exactly

@jerryshao
Copy link
Contributor Author

OK, let me change the logic to handle this.

@SparkQA
Copy link

SparkQA commented May 28, 2015

Test build #33633 has finished for PR 6430 at commit 25af47b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 28, 2015

Test build #33650 has finished for PR 6430 at commit 84d069c.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented May 28, 2015

Test build #33652 has finished for PR 6430 at commit 5bb8cd8.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These compareAndSet can just be set I think. In fact a @volatile boolean is OK too. I don't know what's simpler.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to using a @volatile boolean

@srowen
Copy link
Member

srowen commented May 28, 2015

This looks like a pretty reasonable change, to attempt to defer evaluating a change in executors until something happens. I suppose this still means that something like spark-shell will never ramp down until you execute a job, but, maybe that's fine?

@jerryshao
Copy link
Contributor Author

spark-shell will also ramp down until the the first job is submitted or the initial executor is idle timeout, no need to wait until the first job is submitted, by default after 60 seconds numTargetExecutor can be re-calculated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rename this to inInitialExecutorsPeriod and add a comment explaining the criteria for being in this period?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would just call this @volatile private val initializing

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then could you add a comment to explain what this means, maybe something like:

// Whether we are still waiting for the initial set of executors to be allocated.
// While this is true, we will not cancel outstanding executor requests. This is
// set to false when:
//   (1) the initial executors have all registered,
//   (2) a stage is submitted, or
//   (3) an executor idle timeout has elapsed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @andrewor14 , do we need to consider this condition (1) the initial executors have all registered to change the initializing to false?

Previously in this patch we set initializing to false when:

  1. a stage is submitted.
  2. an executor is timeout due to idle (default is 60 secs)

Now we add another condition, so possibly we may have such scenario: when all the initial executors are registered, but no stage is submitted and executor is not idle timeout. So the executors can have a chance to ramp down at this time window, I think this will introduce additional requesting when stage is later submitted, is this a intended behavior?

@andrewor14
Copy link
Contributor

Hi all, I looked at the code and I think it works. However, I think there is a slightly simpler alternative.

// in updateAndSyncNumExecutorsTarget
val maxNeeded =
  if (initializing) {
    initialExecutors
  } else {
    maxNumExecutorsNeeded 
  }

Then the rest of the code in updateAndSyncNumExecutorsTarget can remain the same, so the developer doesn't have to reason about two different branches down there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style:

private def createSparkContext(
    minExecutors: Int = 1,
    maxExecutors: Int = 5,
    initialExecutors: Int = 1): SparkContext = {
  ...
}

@sryza
Copy link
Contributor

sryza commented May 28, 2015

@andrewor14 it's the other way around. We're ok with going above initialExecutors during the initialization period, but don't want to go below it.

@andrewor14
Copy link
Contributor

Hm, that seems to be so. I'm just wondering if there's a better way to simplify the cases so we don't have nested ifs. Previously it looks like this (pseudocode):

if (needed < target) {
  // cancel
} else if (need more executors) {
  // add
}

This is fairly easy to reason about. For the new initialization logic, it would be good to add it as a new separate case instead of nesting it in an existing case, something like:

if (initializing) {
  // resync with cluster manager
} else if (needed < target) {
  // cancel
} else if (need more executors) {
  // add
}

Then in onExecutorAdded, we will set initializing = false if executorIds.size >= initialExecutors.
This way the cancel case will remain the cancel case, and we'll never go into it during initialization.

@andrewor14
Copy link
Contributor

Another related problem with or without my suggested change: we actually call updateAndSync... quite often (every 100ms). While we're initializing we'll end up flooding the AM with requestTotalExecutors messages. Maybe this is not a huge problem but it's something to look out for.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, does it make sense to do this inside the case maxNeeded < numExecutorsTarget? This condition implies that we are targeting more executors that are needed, and so we should cancel outstanding requests. This is not really true during initialization.

I think a better indication that your application has initialized is if the number of executors is >= your initial executors. I discussed this in more detail on the main thread.

@sryza
Copy link
Contributor

sryza commented May 31, 2015

To avoid the problem @andrewor14 brought up, can we avoid calling requestTotalExecutors when numExecutorsTarget hasn't changed.?

@jerryshao
Copy link
Contributor Author

Thanks a lot @andrewor14 and @sryza for your comments, sorry for late response, I will try to understand your comments and change the code accordingly :).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sryza and @andrewor14 , do we need to avoid requesting executors also here and addExecutors when oldNumExecutorsTarget == numExecutorsTarget, not in initializing status?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was fixed in 51898b5

@jerryshao
Copy link
Contributor Author

Hi all, I just updated the code according to your comments, please help to review, thanks a lot. There's two opening questions I didn't address yet:

  1. Do we need to set the initializing flag to false when the initial executors have all registered. There's a concern as I mentioned before, there's a window for Yarn to ramp down the executors when all the initial executors are registered, but user job has not yet submitted and executor is not timeout, I'm not sure is this a intended behavior?
  2. Do we need to change the code to not send request when the old numExecutorsTarget is equal to new calculated one in all the situations (when initializing is finished)?

Please help to address the concern, thanks a lot and appreciate your time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little verbose, so change to logDebug.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, but this conflicts with a patch I just merged

@SparkQA
Copy link

SparkQA commented Jun 1, 2015

Test build #33878 has finished for PR 6430 at commit f03c250.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 1, 2015

Test build #33881 has finished for PR 6430 at commit 8211be0.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty long-winded. I think we can just do

if (initializing) {
} else if (maxNeeded < numExecutorsTarget) {
} ...

@andrewor14
Copy link
Contributor

@jerryshao Thanks for the update. The latest changes look fairly reasonable.

(1) I think you're correct. We don't actually need to wait for the initial set of executors to register to set initializing = false. Before the first stage is submitted, we can't actually ever ramp up our target. We can't cancel either, however, because all executors have already registered. In other words, setting initializing = false in this case does nothing.

(2) I was thinking we don't actually need to sync with the cluster manager to get our executors while we're initializing. We don't do that currently, and I don't see why we need to add that. This initializing flag is just a way for us to delay canceling. Then we don't need to worry about spamming the AM with sync messages while we initialize. @sryza thoughts?

@jerryshao
Copy link
Contributor Author

Hi @andrewor14 , I just updated the code according to your comments, please help to review, thanks a lot.

@SparkQA
Copy link

SparkQA commented Jun 4, 2015

Test build #34151 has finished for PR 6430 at commit 02cac8e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

lgtm. @jerryshao have you had a chance to test this on a real cluster?

@jerryshao
Copy link
Contributor Author

I just tried in my local pseudo-cluster for functionality, haven't tried in real cluster for performance, I will test it.

@andrewor14
Copy link
Contributor

@sryza I'm planning to pull this in. Any other comments?

@sryza
Copy link
Contributor

sryza commented Jun 5, 2015

This looks right to me. Merging.

@asfgit asfgit closed this in 3f80bc8 Jun 5, 2015
jeanlyn pushed a commit to jeanlyn/spark that referenced this pull request Jun 12, 2015
This patch propose to lazy start the scheduler for dynamic allocation to avoid fast ramp down executor numbers is load is less.

This implementation will:
1. immediately start the scheduler is `numExecutorsTarget` is 0, this is the expected behavior.
2. if `numExecutorsTarget` is not zero, start the scheduler until the number is satisfied, if the load is less, this initial started executors will last for at least 60 seconds, user will have a window to submit a job, no need to revamp the executors.
3. if `numExecutorsTarget` is not satisfied until the timeout, this means resource is not enough, the scheduler will start until this timeout, will not wait infinitely.

Please help to review, thanks a lot.

Author: jerryshao <saisai.shao@intel.com>

Closes apache#6430 from jerryshao/SPARK-7699 and squashes the following commits:

02cac8e [jerryshao] Address the comments
7242450 [jerryshao] Remove the useless import
ecc0b00 [jerryshao] Address the comments
6f75f00 [jerryshao] Style changes
8b8decc [jerryshao] change the test name
fb822ca [jerryshao] Change the solution according to comments
1cc74e5 [jerryshao] Lazy start the scheduler for dynamic allocation
nemccarthy pushed a commit to nemccarthy/spark that referenced this pull request Jun 19, 2015
This patch propose to lazy start the scheduler for dynamic allocation to avoid fast ramp down executor numbers is load is less.

This implementation will:
1. immediately start the scheduler is `numExecutorsTarget` is 0, this is the expected behavior.
2. if `numExecutorsTarget` is not zero, start the scheduler until the number is satisfied, if the load is less, this initial started executors will last for at least 60 seconds, user will have a window to submit a job, no need to revamp the executors.
3. if `numExecutorsTarget` is not satisfied until the timeout, this means resource is not enough, the scheduler will start until this timeout, will not wait infinitely.

Please help to review, thanks a lot.

Author: jerryshao <saisai.shao@intel.com>

Closes apache#6430 from jerryshao/SPARK-7699 and squashes the following commits:

02cac8e [jerryshao] Address the comments
7242450 [jerryshao] Remove the useless import
ecc0b00 [jerryshao] Address the comments
6f75f00 [jerryshao] Style changes
8b8decc [jerryshao] change the test name
fb822ca [jerryshao] Change the solution according to comments
1cc74e5 [jerryshao] Lazy start the scheduler for dynamic allocation
XuTingjun added a commit to XuTingjun/spark that referenced this pull request Aug 21, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants