Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5095][MESOS] Support capping cores and launch mulitple executors in coar... #4027

Closed
wants to merge 4 commits into from

Conversation

tnachen
Copy link
Contributor

@tnachen tnachen commented Jan 13, 2015

...se grain mode.

@tnachen
Copy link
Contributor Author

tnachen commented Jan 13, 2015

@andrewor14

@SparkQA
Copy link

SparkQA commented Jan 13, 2015

Test build #25488 has finished for PR 4027 at commit c45ef3f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class QualifiedTableName(database: String, name: String)
    • case class CreateMetastoreDataSource(

@@ -63,20 +64,25 @@ private[spark] class CoarseMesosSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt

val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we reuse spark.executor.cores by making it from a YARN-only parameter to YARN&Mesos ?

@SparkQA
Copy link

SparkQA commented Jan 14, 2015

Test build #25532 has finished for PR 4027 at commit 88cfb83.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -226,6 +226,20 @@ See the [configuration page](configuration.html) for information on Spark config
The final total amount of memory allocated is the maximum value between executor memory plus memoryOverhead, and overhead fraction (1.07) plus the executor memory.
</td>
</tr>
<tr>
<td><code>spark.mesos.coarse.cpu.max</code></td>
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is called spark.mesos.coarse.cores.max in the code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Thanks

@tnachen tnachen force-pushed the coarse_multiple_executors branch 2 times, most recently from 373686b to 486d2f1 Compare January 15, 2015 00:23
@SparkQA
Copy link

SparkQA commented Jan 15, 2015

Test build #25580 has finished for PR 4027 at commit 486d2f1.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tnachen
Copy link
Contributor Author

tnachen commented Jan 15, 2015

Adding you folks for review: @dragos @deanw @huitseeker @skyluc

@dragos
Copy link
Contributor

dragos commented Jan 20, 2015

I'm not yet very familiar with the codebase, but from a Scala point of view this PR looks good.

@SparkQA
Copy link

SparkQA commented Feb 5, 2015

Test build #26843 has finished for PR 4027 at commit 996ae11.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@andrewor14
Copy link
Contributor

Hey @tnachen can you add [Mesos] to the title? Also did you mean "launch multiple executors per slave"?

@tnachen tnachen changed the title [SPARK-5095] Support capping cores and launch mulitple executors in coar... [SPARK-5095][MESOS] Support capping cores and launch mulitple executors in coar... Feb 19, 2015
@tnachen
Copy link
Contributor Author

tnachen commented Feb 19, 2015

Ok updated.

@@ -63,20 +63,25 @@ private[spark] class CoarseMesosSchedulerBackend(
// Maximum number of cores to acquire (TODO: we'll need more flexible controls here)
val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt

val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1)
val maxCpusPerExecutor = conf.getInt("spark.mesos.coarse.cores.max", Int.MaxValue)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the name of this value is very confusing. Say I'm a new user to Spark on Mesos and I see both spark.cores.max and spark.mesos.coarse.cores.max. Which one do I set, do I set both? Does one override the other?

I think a better name for this is probably spark.mesos.coarse.coresPerExecutor. It's a little more verbose, but there's no opportunity for ambiguity here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then I would also rename spark.mesos.coarse.executors.max to spark.mesos.coarse.executorsPerSlave

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(also can you call the variable maxCoresPerExecutor to be more consistent)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

by the way, I just remembered that we already have spark.executor.cores, which is currently only used in YARN but we can reuse that here as well (we'll need to update the documentation). In other words, spark.mesos.coarse.coresPerExecutor -> spark.executor.cores

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's quite hard to differentiate between spark.cores.max since spark.cores itself is already very vague, where it's a configuration to set the total number of cores a Spark app can schedule.
spark.mesos.coarse.cores.max is the Maximum cores a coarse grained Spark executor can take up to, and the scheduler will schedule any cores between 1 to spark.mesos.coarse.cores.max.

So calling it coresPerExecutor doesn't seem right as it's not a hard value that the scheduler tries to schedule.

How about spark.mesos.coarse.coresPerExecutor.max?

@andrewor14
Copy link
Contributor

Hi @tnachen thanks for working on this. This looks like a reasonably straightforward change. My inline comments have mostly to do with naming and docs. One high level question: does it make sense to set the max executors per slave but not the max cores per executor? For instance, if I allow 3 executors per slave but I don't control the number of cores each executor uses, then the first one will just gladly grab all the cores on that slave, right? Do we need to enforce some kind of constraint between these two configs?

@SleepyThread
Copy link
Contributor

@andrewor14 @pwendell @tnachen @dragos. In my company we have patched the Spark to specify minimum and maximum no of core per executor with only one executor running on each slave. I have a patch ready for it and waiting for my refactoring #8771 to be merge in master.

Benefit of this approach over above approach are,
Assuming,
core max required is 30 and memory per task is 10G. Mesos has 10 slave with 32 Cores and 64 GB memory.

  1. When there is offer to spark of 1 CPU and 50 GB memory on 5 different slave [ Assuming slave are running CPU heavy job at the moment]. Each of this offer will be accepted and hence there will be 5 executor running on 5 slaves with having 5 cores but 50 GB of memory from the cluster. When i specify the minimum no of executor on each slave. These offer will be reject.
  2. When there is offer to spark of 30 CPU and 50 GB then this offer will be accepted and whole of our spark job will be running on same slave. If this slave is lost, then whole spark job will be gone for a toss. If we specify max no of core per executor as 10. Then offers will be distributed in cluster and will not be running on single machine.

Max and min will give user to control to the user to cap the amount of resource used by the spark job but will give mesos elasticity to schedule jobs on various different machine. This spread the executor across the cluster and gives a distributed environment without being greedy.

Any thoughts?

@tnachen
Copy link
Contributor Author

tnachen commented Sep 18, 2015

The thought of this patch is give user the flexibility to provide a max and min, and also optionally allow multiple executors or not. So in your case you just need to specify max executors per slave as 1 and you should have the same behavior you mentioned

@SleepyThread
Copy link
Contributor

There is no minimum core per executor cap in this patch, only maximum cores. Also i think we will need to add more documentation around it to make these parameter more clear with examples [I can add them once pull request is accepted].

@tnachen
Copy link
Contributor Author

tnachen commented Sep 19, 2015

I see, I can see having a minimum CPU per executor useful, that's a easy change in this patch, other than that i think this patch can achieve what you want

@SleepyThread
Copy link
Contributor

Yes. Defaults in given patch will achieve the configurations.

@SparkQA
Copy link

SparkQA commented Oct 30, 2015

Test build #44709 has finished for PR 4027 at commit 3dc1383.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2015

Test build #46465 has finished for PR 4027 at commit 08723f8.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 21, 2015

Test build #46472 has finished for PR 4027 at commit 8a7a735.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tnachen
Copy link
Contributor Author

tnachen commented Nov 21, 2015

@andrewor14 I've updated the patch now. Originally you suggested me to look at deploy/master.scala to try to use the same configurations like spark.executor.cores. But in the end spark.executor.cores are referring to a set number of cores that will be used to launch per spark executor, but in this case we're trying to specify a maximum number of cores that can potentially launch your coarse grain executor/worker, and Mesos scheduler will launch an executors using between 1 to the max number of cores, and maximumly launch the "max executors per slave" amount per slave.

So I think having a spark.mesos.coarse.executor.cores.max or something similiar still makes sense. What do you think?

@dragos
Copy link
Contributor

dragos commented Nov 21, 2015

@tnachen I think this trade-off has been discussed in this comment and the following three. Since there are so many comments, here's a summary:

  • both standalone and Yarn are using a fixed number of executor cores, so it is more user-friendly to behave in the same way
  • the downside is that some CPUs wouldn't be utilized this way (example: 10 free cores, spark.executor.cores = 3, ==> 3 executors launched, 1 core not used)
  • spark.executor.cores is optional, so when not set we can still grab all cores. Would a max value make sense here?

I tend to agree with @pwendell and @andrewor14 but I don't want to push back if you guys discussed this previously and changed your minds (I just went through the whole thread again and I didn't find anything).

Still to do:

  • decide on spark.executor.cores or having a max value instead
  • one comment that wasn't addressed, related to config names.
  • I still need to try this on a real Mesos cluster, won't be able to do it before Monday.

@dragos
Copy link
Contributor

dragos commented Dec 11, 2015

@tnachen @andrewor14 what should be the way forward? I know about a more ambitious plan to rewrite the Mesos scheduler. Should we still push this in the interim, or just close and concentrate on the larger plan?

Since this is pretty close to merging, my preference would be to have this in now, and work on the scheduler rewrite later.

@andrewor14
Copy link
Contributor

@dragos Thanks for summarizing the current state of this patch. I would also like to get this merged soon since it's been lingering for a long time and people have been asking for this.

However, I still think the configs introduced in this patch expose too much to the average user. It's not straightforward even to me to think about how "max cores" and "max executors" interact. In the past we have introduced more complicated configs than necessary and now we're stuck with them due to backward compatibility. IMO it's always a good idea to first introduce the simplest set of properties possible and then augment that in the future if necessary (and 90% of the time it's sufficient).

In this particular case spark.executor.cores has well-understood semantics in other modes and has been in use for many versions now. It's not only the simplest but also the most consistent across different cluster managers in Spark.

@tnachen
Copy link
Contributor Author

tnachen commented Dec 12, 2015

Yes at this moment I'll be closing this PR and move forward with another proposal that is to use spark.executor.cores and have a new coarse grain scheduler.

On Dec 12, 2015, at 4:14 AM, andrewor14 notifications@github.com wrote:

I still think the configs introduced in this patch expose too much to the average user. It's not straightforward even to me to think about how "max cores" and "max executors" interact. In the past we have introduced more complicated configs than necessary and now we're stuck with them due to backward compatibility. IMO it's always a good idea to first introduce the simplest set of properties possible and then augment that in the future if necessary (and 90% of the time it's sufficient).

In this particular case spark.executor.cores has well-understood semantics in other modes and has been in use for many versions now. It's not only the simplest but also the most consistent across different cluster managers in Spark.


Reply to this email directly or view it on GitHub.

@tnachen tnachen closed this Dec 15, 2015
@rnowling
Copy link
Contributor

It's really unfortunate that this patch was closed without merging. I disagree with @andrewor14 and others that it exposes too much to the average user. It's much easier (for me) to think about the number of cores and amount of memory per NODE. I actually think the current approach used in spark (total number of cores) is more confusing because the behavior changes depending on the cluster node configuration. With settings for cores and memory per node, I can guarantee each executor gets enough memory regardless of the cluster configuration.

The current approach also has the problem mentioned by @maasg -- no control over how resources are distributed over nodes. For multi-user environment (where the resources may not be utilized only through a single system), the per node control makes it easier to prevent over-subscribing nodes.

(edited: grammar, clarification)

@maver1ck
Copy link
Contributor

I agree.
In YARN mode we have configuration per node

YARN: The --num-executors option to the Spark YARN client controls how many executors it will allocate on the cluster, while --executor-memory and --executor-cores control the resources per executor.

@andrewor14
Copy link
Contributor

It's much easier (for me) to think about the number of cores and amount of memory per NODE

You did not understand my suggestion. That's exactly what I'm suggesting: spark.executor.cores describes the number of cores each executor has.

@rnowling
Copy link
Contributor

@andrewor14 I wasn't clear about which of your posts I was referring to -- my apologies. The spark.executor.cores suggestion to set the number of cores per executor is very reasonable.

At one point, you also suggested that the framework should also execute as many executors as needed to use all or nearly all the cores on each node. I would prefer that this is overridable by specifying the maximum number of executors to use per node. This makes it easier to use Spark on a cluster shared by multiple users or applications.

Earlier, you had also suggested offering an option for the amount of memory per executor. Is that still valid in your proposal?

@andrewor14
Copy link
Contributor

Earlier, you had also suggested offering an option for the amount of memory per executor. Is that still valid in your proposal?

What do you mean? You can already do that through spark.executor.memory, even before this patch.

At one point, you also suggested that the framework should also execute as many executors as needed to use all or nearly all the cores on each node. I would prefer that this is overridable by specifying the maximum number of executors to use per node. This makes it easier to use Spark on a cluster shared by multiple users or applications.

I agree, though we should try to come up with a minimal set of configurations that conflict with each other least. I haven't decided exactly what those would look like but it could come in a later patch.

It's really unfortunate that this patch was closed without merging.

Actually it will be re-opened shortly, just with a slightly different approach. I believe @tnachen is currently on vacation but once he comes back we'll move forward again. :)

asfgit pushed a commit that referenced this pull request Feb 10, 2016
…rse grained mesos mode.

This is the next iteration of tnachen's previous PR: #4027

In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone.  This PR implements that resolution.

This PR implements two high-level features.  These two features are co-dependent, so they're implemented both here:
- Mesos support for spark.executor.cores
- Multiple executors per slave

We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR.

The contribution is my original work and I license the work to the project under the project's open source license.

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #10993 from mgummelt/executor_sizing.
mgummelt pushed a commit to mesosphere/spark that referenced this pull request Mar 7, 2017
…rse grained mesos mode.

This is the next iteration of tnachen's previous PR: apache#4027

In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone.  This PR implements that resolution.

This PR implements two high-level features.  These two features are co-dependent, so they're implemented both here:
- Mesos support for spark.executor.cores
- Multiple executors per slave

We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR.

The contribution is my original work and I license the work to the project under the project's open source license.

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes apache#10993 from mgummelt/executor_sizing.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet