Skip to content

Commit

Permalink
Support multiple executors per node on Mesos.
Browse files Browse the repository at this point in the history
Support spark.executor.cores on Mesos.
  • Loading branch information
Michael Gummelt committed Feb 9, 2016
1 parent 33212cb commit ecad77a
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 275 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK

logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
s"${executorData.executorHost}.")

executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
Expand Down Expand Up @@ -309,7 +313,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}

// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
driverEndpoint = createDriverEndpointRef(properties)
}

protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}

protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ private[spark] class MesosSchedulerBackend(
val (resourcesAfterCpu, usedCpuResources) =
partitionResources(availableResources, "cpus", mesosExecutorCores)
val (resourcesAfterMem, usedMemResources) =
partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc))
partitionResources(resourcesAfterCpu.asJava, "mem", executorMemory(sc))

builder.addAllResources(usedCpuResources.asJava)
builder.addAllResources(usedMemResources.asJava)
Expand Down Expand Up @@ -250,7 +250,7 @@ private[spark] class MesosSchedulerBackend(
// check offers for
// 1. Memory requirements
// 2. CPU requirements - need at least 1 for executor, 1 for task
val meetsMemoryRequirements = mem >= calculateTotalMemory(sc)
val meetsMemoryRequirements = mem >= executorMemory(sc)
val meetsCPURequirements = cpus >= (mesosExecutorCores + scheduler.CPUS_PER_TASK)
val meetsRequirements =
(meetsMemoryRequirements && meetsCPURequirements) ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,15 +140,15 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
}
}

/**
* Signal that the scheduler has registered with Mesos.
*/
protected def getResource(res: JList[Resource], name: String): Double = {
def getResource(res: JList[Resource], name: String): Double = {
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum
}

/**
* Signal that the scheduler has registered with Mesos.
*/
protected def markRegistered(): Unit = {
registerLatch.countDown()
}
Expand Down Expand Up @@ -337,7 +337,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
* @return memory requirement as (0.1 * <memoryOverhead>) or MEMORY_OVERHEAD_MINIMUM
* (whichever is larger)
*/
def calculateTotalMemory(sc: SparkContext): Int = {
def executorMemory(sc: SparkContext): Int = {
sc.conf.getInt("spark.mesos.executor.memoryOverhead",
math.max(MEMORY_OVERHEAD_FRACTION * sc.executorMemory, MEMORY_OVERHEAD_MINIMUM).toInt) +
sc.executorMemory
Expand Down
Loading

0 comments on commit ecad77a

Please sign in to comment.