Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-5095] [Mesos] Support launching multiple mesos executors in coarse grained mesos mode. #10993

Closed
wants to merge 1 commit into from

Conversation

mgummelt
Copy link
Contributor

This is the next iteration of @tnachen's previous PR: #4027

In that PR, we resolved with @andrewor14 and @pwendell to implement the Mesos scheduler's support of spark.executor.cores to be consistent with YARN and Standalone. This PR implements that resolution.

This PR implements two high-level features. These two features are co-dependent, so they're implemented both here:

  • Mesos support for spark.executor.cores
  • Multiple executors per slave

We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR.

The contribution is my original work and I license the work to the project under the project's open source license.

@@ -89,13 +82,11 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)

private val pendingRemovedSlaveIds = new HashSet[String]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAICT, this is never used, so I removed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

@SparkQA
Copy link

SparkQA commented Jan 30, 2016

Test build #50419 has finished for PR 10993 at commit f421133.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
// How many times tasks on each slave failed
val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]
private val slaves = new HashMap[String, Slave]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big issue, but I wonder if slaveInfo would be a better name. Also, a one-line comment to explain what the key is.

@andrewor14
Copy link
Contributor

Looks like this is failing real tests

@andrewor14
Copy link
Contributor

Also, have you had a chance to test this with cluster mode and/or with dynamic allocation?

@mgummelt
Copy link
Contributor Author

mgummelt commented Feb 1, 2016

I made an update that should fix the test.

I've tested in cluster mode, but not with dynamic allocation. Though I have added some unit tests that cover dynamic allocation. I'll see about setting up dynamic allocation.

@SparkQA
Copy link

SparkQA commented Feb 2, 2016

Test build #50506 has finished for PR 10993 at commit 318486e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dragos
Copy link
Contributor

dragos commented Feb 2, 2016

I didn't have time to look at this in detail, I'll do so this afternoon.

val id = offer.getId.getValue

if (tasks.contains(offer.getId)) { // accept
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is not your code, but it would be good to document this. Why do we filter out offers for 5 seconds on the offers we use?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this when I ran into it. The default is actually 5: https://github.com/apache/mesos/blob/master/include/mesos/mesos.proto#L1211

So I'll just remove it.

@dragos
Copy link
Contributor

dragos commented Feb 2, 2016

@mgummelt this looks really good! I have a few comments. I still have to run this PR with dynamic allocation and see it in action!

@SparkQA
Copy link

SparkQA commented Feb 3, 2016

Test build #50608 has finished for PR 10993 at commit 0a1181a.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Astralidea
Copy link

@mgummelt If I want to scheduler deploy with : Limiting a slave can only start one executor, how to do that? should I can have another config like spark.executor.maxperslave=1?
The scene is like is:
I read Kafka use spark-streaming & I need Receiver to deploy in each slave separately, not all of the them in one slave to occupied bandwidth..

* @param offers Mesos offers that match attribute constraints
* @return A map from OfferID to a list of Mesos tasks to launch on that offer
*/
private def getMesosTasks(offers: Buffer[Offer]): Map[OfferID, List[MesosTaskInfo]] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I find it non-productive to quibble over a name. That being said, this method doesn't just get tasks from somewhere. It produces them itself, based on a round-robin scheduling strategy over the given offers. I don't think get is the best verb to describe that action.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure. Changed to build

@mgummelt
Copy link
Contributor Author

mgummelt commented Feb 5, 2016

@andrewor14 Glad to be here! Flaky tests or no

I think all concerns have been addressed except for dynamic allocation testing, which seems to be broken entirely: SPARK-12583

@dragos Any other comments?

@SparkQA
Copy link

SparkQA commented Feb 5, 2016

Test build #50833 has finished for PR 10993 at commit 7e3f39d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dragos
Copy link
Contributor

dragos commented Feb 6, 2016

LGTM! Great work, @mgummelt!

val slaveId: String = status.getSlaveId.getValue
val taskId = status.getTaskId.getValue
val slaveId = status.getSlaveId.getValue
val slave = slaves(slaveId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be moved in the stateLock right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved, but I don't understand why the mesos methods are synchronized in the first place. They should only be called by a single thread (the driver thread).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ExecutionAllocationManager, used when dynamic allocation is enabled, runs on a different thread. Not sure if this particular method can run on different threads, but there's at least the issue of visibility.

@tnachen
Copy link
Contributor

tnachen commented Feb 8, 2016

Just one comment, overall LGTM

Support spark.executor.cores on Mesos.
@andrewor14
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50985 has finished for PR 10993 at commit ecad77a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 9, 2016

Test build #50984 has finished for PR 10993 at commit ecad77a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

// SlaveID -> Slave
// This map accumulates entries for the duration of the job. Slaves are never deleted, because
// we need to maintain e.g. failure state and connection state.
private val slaves = new HashMap[String, Slave]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

elsewhere in Spark we would call this class SlaveInfo instead of just Slave, so we don't confuse it with the Mesos Slave

@andrewor14
Copy link
Contributor

LGTM merging into master. @mgummelt feel free to address the remainder of the comments in a follow-up patch.

@asfgit asfgit closed this in 80cb963 Feb 10, 2016
@mgummelt
Copy link
Contributor Author

Thanks for merging. Will this go into 1.6.1, or not until 2.0?

@andrewor14
Copy link
Contributor

This is a big new feature. It will not go into a maintenance release (1.6.1).

@andrewor14
Copy link
Contributor

@mgummelt looks like this caused a flaky test:
https://spark-tests.appspot.com/tests/org.apache.spark.scheduler.cluster.mesos.CoarseMesosSchedulerBackendSuite/mesos%20kills%20an%20executor%20when%20told

Do you have the bandwidth to fix it quickly? If not I'll just revert this patch for now and we can resubmit it later.

@mgummelt
Copy link
Contributor Author

looking into it

@Astralidea
Copy link

@mgummelt Great Work! I think this feature will allow more people to use mesos.

@mgummelt
Copy link
Contributor Author

@andrewor14 I haven't found the problem, but here's a PR to remove the test in the interim #11164

It's a strange test to be flaky. It's very simple.

@mgummelt
Copy link
Contributor Author

Ah, I see the issue. There's a thread causing a race.

I won't be able to fix until tomorrow, though.

@mgummelt mgummelt deleted the executor_sizing branch February 12, 2016 19:31
asfgit pushed a commit that referenced this pull request Feb 12, 2016
andrewor14 This addressed your style comments from #10993

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #11187 from mgummelt/fix_mesos_style.
sttts pushed a commit to mesosphere/spark that referenced this pull request Mar 15, 2016
…rse grained mesos mode.

This is the next iteration of tnachen's previous PR: apache#4027

In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone.  This PR implements that resolution.

This PR implements two high-level features.  These two features are co-dependent, so they're implemented both here:
- Mesos support for spark.executor.cores
- Multiple executors per slave

We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR.

The contribution is my original work and I license the work to the project under the project's open source license.

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes apache#10993 from mgummelt/executor_sizing.
mgummelt pushed a commit to mesosphere/spark that referenced this pull request Aug 2, 2016
…rse grained mesos mode.

This is the next iteration of tnachen's previous PR: apache#4027

In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone.  This PR implements that resolution.

This PR implements two high-level features.  These two features are co-dependent, so they're implemented both here:
- Mesos support for spark.executor.cores
- Multiple executors per slave

We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR.

The contribution is my original work and I license the work to the project under the project's open source license.

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes apache#10993 from mgummelt/executor_sizing.
mgummelt pushed a commit to mesosphere/spark that referenced this pull request Mar 7, 2017
…rse grained mesos mode.

This is the next iteration of tnachen's previous PR: apache#4027

In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone.  This PR implements that resolution.

This PR implements two high-level features.  These two features are co-dependent, so they're implemented both here:
- Mesos support for spark.executor.cores
- Multiple executors per slave

We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR.

The contribution is my original work and I license the work to the project under the project's open source license.

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes apache#10993 from mgummelt/executor_sizing.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
7 participants