New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-1946] Submit tasks after (configured ratio) executors have been registered #900
Conversation
Can one of the admins verify this patch? |
Jenkins, test this please. @li-zhihui could you create a JIRA for this and add it to the title of the PR (e.g. SPARK-XXX)? |
This should probably be unified with #634 |
This is very similar to SPARK-1453 and this may solve it generically instead of just yarn. |
This is also related to SPARK-1937 (#892) -- is fixing that sufficient or is it necessary to have this sleep as well? It seems like this sleep is only necessary if the sleep time is less than the expected increase in task completion time as a result of running non-locally. |
@kayousterhout @mridulm Looking briefly at pr #892 it seems that is handling locality when executors are added later and I assume some of the locality wait configs come into affect here, but it doesn't just wait for a certain number of executors to be there to start (or period of time), does it? Are the changes good enough where that waiting for a good percentage of the executors no longer matters? I seems like depending on your resource manager and how busy of a cluster you run on it could take a while (minutes) to get a large number of executors. I think this should just be a configuration and user code should not have to "sleep" or workaround this. |
This one slipped off my radar, my apologies. |
Hit submit by mistake, to continue ... |
Thanks @mridulm for the clarification. So it seems like this change would be useful and is similar to what we discussed in SPARK-1453. Are there any general concerns over adding configs to wait? If I'm seeing more people running into this and would like to get something implemented so each user doesn't have to put sleep in their code. |
@li-zhihui it looks like the JIRA you created (https://issues.apache.org/jira/browse/SPARK-1946) describes the issue fixed by #892 and described by a redundant issue (https://issues.apache.org/jira/browse/SPARK-1937). As @mridulm explained (thanks!!), the primary set of issues addressed by this pull request center around the fact that Spark-on-YARN has various performance problems when not enough executors have registered yet. Could you update SPARK-1946 accordingly? @tgravescs I'm a little nervous about adding more scheduler config options, because I think the average user would have a very difficult time figuring out that their performance problems could be fixed by tuning this particular set of options. The scheduler already has quite a few config options and I think we should be very cautious in adding more (cc @pwendell). On the other hand, as you pointed out, it seems like a user typically wants to wait for some number of executors to become available, and those semantics aren't available to the application -- so we're stuck with adding something to the scheduler code. Is it possible to do this only for the YARN scheduler / do you think it's necessary in standalone too? Doing it only for YARN (and naming the config variable accordingly) could help signal to a naive user when tuning this might help. From @mridulm's description, it sounds like many of the issues here are yarn-specific. |
Also, @li-zhihui, can you add a unit test for this? Second, it looks like this only works in standalone mode and not for YARN (since, as I understand the YARN code, YARN uses YarnClientSchedulerBackend and not SparkDeploySchedulerBackend)? Was that the intention? |
} | ||
while(!backend.isReady){ | ||
synchronized{ | ||
this.wait(100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason for not using wait/notify here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for programming simply. :)
If someone would like to implement more backend implementations or change backend.isReady somewhere, they needn't to call NOTIFY().
But, waiting 100 milliseconds maybe too long, is 10 milliseconds OK?
yarn uses CoarseGrainedSchedulerBackend for standalone mode and the YarnClientSchedulerBackend (which is based on CoarseGrainedSchedulerBackend) for client mode. I don't see this pr handling that since its not incrementing the totalExecutors. I haven't looked at it in detail yet but I would think you could do it in RegisterExecutor and that would handle both since SparkDeploySchedulerBackend is based on CoarseGrainedSchedulerBackend also. |
If people think its not useful for other deployments modes then I can look into making it YARN specific. We already have a small sleep to help with this in YarnClusterScheduler. |
@tgravescs @kayousterhout The PR only work in standalone mode now. But it provide a abstract method isReady() in SchedulerBackend.scala for all backend implementations. For yarn mode, it seems better to use registered number as threshold. |
@tgravescs @mridulm @kayousterhout I add a commit which submit stage after configured number executor are registered. I think it will work well in yarn mode. submit stage only after successfully registered executors arrived the number, default value 0spark.executor.minRegisteredNum = 20 |
Sorry to jump in late on this, but I think spark.executor.minRegisteredNum sounds like an executor property, when this is a property of the driver. |
Thanks @sryza How about spark.scheduler.minRegisteredExecutors? |
ready = true | ||
return true | ||
} | ||
return false |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no need use "return" in last statement i think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CrazyJvm. But if I remove "return", the method always return true. I don't know why. I use Scala 2.10.3.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
override def isReady(): Boolean = {
if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) {
ready = true
}
ready
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks @adrian-wang , but I think it's necessary to return true quickly, because ready is true most time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @CrazyJvm I made a mistake, the last "return" is not necessary.
@tgravescs @sryza I add a commit support the feature(with percentage style) for yarn mode, test successfully in yarn 2.2.0. About default value of minRegisteredRatio, Yarn mode is 0.9, Standalone mode is 0. |
@@ -77,6 +77,12 @@ private[spark] class YarnClientSchedulerBackend( | |||
|
|||
logDebug("ClientArguments called with: " + argsArrayBuf) | |||
val args = new ClientArguments(argsArrayBuf.toArray, conf) | |||
totalExecutors.set(args.numExecutors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note that yarn has 2 modes - yarn-client and yarn-cluster mode. This changes it for yarn-client mode, but not yarn-cluster mode. yarn-cluster mode currently just uses CoarseGrainedSchedulerBackend directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @tgravescs
@tgravescs I add a commit support yarn-cluster.
|
@tgravescs @kayousterhout |
@tgravescs @kayousterhout |
@tgravescs @kayousterhout How about move it (waitBackendReady) to postStartHook() ? |
val conf = scheduler.sc.conf | ||
private val timeout = AkkaUtils.askTimeout(conf) | ||
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) | ||
// Submit tasks only after (registered executors / total expected executors) | ||
// is equal to at least this value, that is double between 0 and 1. | ||
var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add the new configs to the user docs - see docs/configuration.md
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tgravescs Done.
postStartHook seems like a good place to put it. It will only be called once there and as you said Yarn cluster mode actually waits for the sparkcontext to be initialized before allocating executors. It looks like we aren't handling mesos. We should atleast file a jira for this. For the yarn side where you added the TODO's about the sleep. I think we can leave them here as there is another jira to remove them. |
Thanks @tgravescs |
<td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td> | ||
<td>30000</td> | ||
<td> | ||
Whatever (registered executors / total expected executors) is reached |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should clarify both of these a bit because its really you start when either one is hit so I think adding reference to maxRegisteredExecutorsWaitingTime from the description of minRegisteredExecutorsRatio would be good.
How about something like below? Note I'm not a doc writer so I'm fine with changing.
for spark.scheduler.minRegisteredExecutorsRatio:
The minimum ratio of registered executors (registered executors / total expected executors) to wait for before scheduling begins. Specified as a double between 0 and 1. Regardless of whether the minimum ratio of executors has been reached, the maximum amount of time it will wait before scheduling begins is controlled by config spark.scheduler.maxRegisteredExecutorsWaitingTime
.
Then for spark.scheduler.maxRegisteredExecutorsWaitingTime:
Maximum amount of time to wait for executors to register before scheduling begins (in milliseconds).
@tgravescs add a commit according to comments. |
Looks good. Thanks @li-zhihui |
…n registered Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality. A simple solution is sleeping few seconds in application, so that executors have enough time to register. The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered. \# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0 spark.scheduler.minRegisteredExecutorsRatio = 0.8 \# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000 spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000 Author: li-zhihui <zhihui.li@intel.com> Closes apache#900 from li-zhihui/master and squashes the following commits: b9f8326 [li-zhihui] Add logs & edit docs 1ac08b1 [li-zhihui] Add new configs to user docs 22ead12 [li-zhihui] Move waitBackendReady to postStartHook c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS 4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor 0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks 4261454 [li-zhihui] Add docs for new configs & code style ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime 6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha 812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode e7b6272 [li-zhihui] support yarn-cluster 37f7dc2 [li-zhihui] support yarn mode(percentage style) 3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered
…lone mode In SPARK-1946(PR #900), configuration <code>spark.scheduler.minRegisteredExecutorsRatio</code> was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set. Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(<code>--total-executor-cores</code>) as expected resources to judge whether SchedulerBackend is ready. Author: li-zhihui <zhihui.li@intel.com> Author: Li Zhihui <zhihui.li@intel.com> Closes #1525 from li-zhihui/fixre4s and squashes the following commits: e9a630b [Li Zhihui] Rename variable totalExecutors and clean codes abf4860 [Li Zhihui] Push down variable totalExpectedResources to children classes ca54bd9 [li-zhihui] Format log with String interpolation 88c7dc6 [li-zhihui] Few codes and docs refactor 41cf47e [li-zhihui] Fix race condition at SchedulerBackend.isReady in standalone mode (cherry picked from commit 28dbae8) Signed-off-by: Patrick Wendell <pwendell@gmail.com>
…lone mode In SPARK-1946(PR #900), configuration <code>spark.scheduler.minRegisteredExecutorsRatio</code> was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set. Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(<code>--total-executor-cores</code>) as expected resources to judge whether SchedulerBackend is ready. Author: li-zhihui <zhihui.li@intel.com> Author: Li Zhihui <zhihui.li@intel.com> Closes #1525 from li-zhihui/fixre4s and squashes the following commits: e9a630b [Li Zhihui] Rename variable totalExecutors and clean codes abf4860 [Li Zhihui] Push down variable totalExpectedResources to children classes ca54bd9 [li-zhihui] Format log with String interpolation 88c7dc6 [li-zhihui] Few codes and docs refactor 41cf47e [li-zhihui] Fix race condition at SchedulerBackend.isReady in standalone mode
…n registered Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality. A simple solution is sleeping few seconds in application, so that executors have enough time to register. The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered. \# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0 spark.scheduler.minRegisteredExecutorsRatio = 0.8 \# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000 spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000 Author: li-zhihui <zhihui.li@intel.com> Closes apache#900 from li-zhihui/master and squashes the following commits: b9f8326 [li-zhihui] Add logs & edit docs 1ac08b1 [li-zhihui] Add new configs to user docs 22ead12 [li-zhihui] Move waitBackendReady to postStartHook c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS 4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor 0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks 4261454 [li-zhihui] Add docs for new configs & code style ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime 6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha 812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode e7b6272 [li-zhihui] support yarn-cluster 37f7dc2 [li-zhihui] support yarn mode(percentage style) 3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered
…lone mode In SPARK-1946(PR apache#900), configuration <code>spark.scheduler.minRegisteredExecutorsRatio</code> was introduced. However, in standalone mode, there is a race condition where isReady() can return true because totalExpectedExecutors has not been correctly set. Because expected executors is uncertain in standalone mode, the PR try to use CPU cores(<code>--total-executor-cores</code>) as expected resources to judge whether SchedulerBackend is ready. Author: li-zhihui <zhihui.li@intel.com> Author: Li Zhihui <zhihui.li@intel.com> Closes apache#1525 from li-zhihui/fixre4s and squashes the following commits: e9a630b [Li Zhihui] Rename variable totalExecutors and clean codes abf4860 [Li Zhihui] Push down variable totalExpectedResources to children classes ca54bd9 [li-zhihui] Format log with String interpolation 88c7dc6 [li-zhihui] Few codes and docs refactor 41cf47e [li-zhihui] Fix race condition at SchedulerBackend.isReady in standalone mode
Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality.
A simple solution is sleeping few seconds in application, so that executors have enough time to register.
The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered.
Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0
spark.scheduler.minRegisteredExecutorsRatio = 0.8
Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000
spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000