Skip to content

Commit

Permalink
[SPARK-5095][MESOS] Support launching multiple mesos executors in coa…
Browse files Browse the repository at this point in the history
…rse grained mesos mode.

This is the next iteration of tnachen's previous PR: #4027

In that PR, we resolved with andrewor14 and pwendell to implement the Mesos scheduler's support of `spark.executor.cores` to be consistent with YARN and Standalone.  This PR implements that resolution.

This PR implements two high-level features.  These two features are co-dependent, so they're implemented both here:
- Mesos support for spark.executor.cores
- Multiple executors per slave

We at Mesosphere have been working with Typesafe on a Spark/Mesos integration test suite: https://github.com/typesafehub/mesos-spark-integration-tests, which passes for this PR.

The contribution is my original work and I license the work to the project under the project's open source license.

Author: Michael Gummelt <mgummelt@mesosphere.io>

Closes #10993 from mgummelt/executor_sizing.
  • Loading branch information
Michael Gummelt authored and Andrew Or committed Feb 10, 2016
1 parent c0b71e0 commit 80cb963
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
Expand Down Expand Up @@ -309,7 +313,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
driverEndpoint = createDriverEndpointRef(properties)
}

protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}

protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private[spark] class MesosSchedulerBackend(
val (resourcesAfterCpu, usedCpuResources) =
partitionResources(availableResources, "cpus", mesosExecutorCores)
val (resourcesAfterMem, usedMemResources) =
partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc))

builder.addAllResources(usedCpuResources.asJava)
builder.addAllResources(usedMemResources.asJava)
Expand Down Expand Up @@ -250,7 +250,7 @@ private[spark] class MesosSchedulerBackend(
// check offers for
// 1. Memory requirements
// 2. CPU requirements - need at least 1 for executor, 1 for task
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsMemoryRequirements = mem >= executorMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
val meetsRequirements =
(meetsMemoryRequirements && meetsCPURequirements) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
}

/**
* Signal that the scheduler has registered with Mesos.
*/
protected def getResource(res: JList[Resource], name: String): Double = {
def getResource(res: JList[Resource], name: String): Double = {
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
}

/**
* Signal that the scheduler has registered with Mesos.
*/
protected def markRegistered(): Unit = {
registerLatch.countDown()
}
Expand Down Expand Up @@ -337,7 +337,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
* @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
* (whichever is larger)
*/
def calculateTotalMemory(sc: SparkContext): Int = {
def executorMemory(sc: SparkContext): Int = {
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
sc.executorMemory
Expand Down
Loading

0 comments on commit 80cb963

Please sign in to comment.