Skip to content

Commit

Permalink
Handle offers with multiple roles
Browse files Browse the repository at this point in the history
  • Loading branch information
tnachen committed Mar 17, 2015
1 parent 495b57c commit eba5669
Show file tree
Hide file tree
Showing 4 changed files with 248 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import java.util.{List => JList}
import java.util.Collections

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}

import org.apache.mesos.{Scheduler => MScheduler}
import org.apache.mesos._
Expand Down Expand Up @@ -209,6 +209,7 @@ private[spark] class CoarseMesosSchedulerBackend(
val filters = Filters.newBuilder().setRefuseSeconds(-1).build()

for (offer <- offers) {
logTrace("Offer received from Mesos, id: " + offer.getId + ", offer: " + offer)
val slaveId = offer.getSlaveId.toString
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
Expand All @@ -224,18 +225,26 @@ private[spark] class CoarseMesosSchedulerBackend(
taskIdToSlaveId(taskId) = slaveId
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
val task = MesosTaskInfo.newBuilder()

val (newResources, usedResources) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)

val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem",
MemoryUtils.calculateTotalMemory(sc)))
.build()
.addAllResources(usedResources)
.addAllResources(
partitionResources(newResources, "mem", MemoryUtils.calculateTotalMemory(sc))._2)

d.launchTasks(
Collections.singleton(offer.getId), Collections.singletonList(task), filters)
Collections.singleton(offer.getId),
Collections.singletonList(taskBuilder.build),
filters)

} else {
logTrace("Offer filtered: " + offer.getId)
// Filter it out
d.launchTasks(
Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters)
Expand All @@ -244,23 +253,6 @@ private[spark] class CoarseMesosSchedulerBackend(
}
}

/** Helper function to pull out a resource from a Mesos Resources protobuf */
private def getResource(res: JList[Resource], name: String): Double = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
0
}

/** Build a Mesos resource protobuf object */
private def createResource(resourceName: String, quantity: Double): Protos.Resource = {
Resource.newBuilder()
.setName(resourceName)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(quantity).build())
.build()
}

/** Check whether a Mesos task state represents a finished task */
private def isFinished(state: MesosTaskState) = {
state == MesosTaskState.TASK_FINISHED ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{ArrayList => JArrayList, List => JList}
import java.util.Collections
import java.util.{ArrayList => JArrayList, List => JList, Collections}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}
Expand Down Expand Up @@ -58,7 +57,7 @@ private[spark] class MesosSchedulerBackend(
var driver: SchedulerDriver = null

// Which slave IDs we have executors on
val slaveIdsWithExecutors = new HashSet[String]
val slaveIdsWithExecutors = new HashMap[String, MesosExecutorInfo]
val taskIdToSlaveId = new HashMap[Long, String]

// An ExecutorInfo for our tasks
Expand Down Expand Up @@ -93,7 +92,9 @@ private[spark] class MesosSchedulerBackend(
}
}

def createExecutorInfo(execId: String): MesosExecutorInfo = {
def createExecutorInfo(
resources: JList[Resource],
execId: String): (MesosExecutorInfo, JList[Resource]) = {
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
Expand Down Expand Up @@ -135,26 +136,26 @@ private[spark] class MesosSchedulerBackend(
command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
val cpus = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(scheduler.CPUS_PER_TASK).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
.setValue(MemoryUtils.calculateTotalMemory(sc)).build())
.build()
MesosExecutorInfo.newBuilder()

val builder = MesosExecutorInfo.newBuilder()

val (resourcesAfterCpu, usedCpuResources) =
partitionResources(resources, "cpus", scheduler.CPUS_PER_TASK)

builder.addAllResources(usedCpuResources)

val (resourcesAfterMem, usedMemResources) =
partitionResources(resourcesAfterCpu, "mem", MemoryUtils.calculateTotalMemory(sc))

builder.addAllResources(usedMemResources)

val executorInfo = builder
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(cpus)
.addResources(memory)
.build()

(executorInfo, resourcesAfterMem)
}

/**
Expand Down Expand Up @@ -208,6 +209,19 @@ private[spark] class MesosSchedulerBackend(

override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}

def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
val builder = new StringBuilder
tasks.foreach {
case t =>
builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
.append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
.append("Task resources: ").append(t.getResourcesList).append("\n")
.append("Executor resources: ").append(t.getExecutor.getResourcesList)
.append("---------------------------------------------\n")
}
builder.toString()
}

/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
Expand Down Expand Up @@ -245,6 +259,10 @@ private[spark] class MesosSchedulerBackend(

val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
val slaveIdToResources = new HashMap[String, JList[Resource]]()
usableOffers.foreach { o =>
slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
}

val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]

Expand All @@ -256,11 +274,15 @@ private[spark] class MesosSchedulerBackend(
.foreach { offer =>
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
val (mesosTask, remainingResources) = createMesosTask(
taskDesc,
slaveIdToResources(slaveId),
slaveId)
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
.add(mesosTask)
slaveIdToResources(slaveId) = remainingResources
}
}

Expand All @@ -273,6 +295,7 @@ private[spark] class MesosSchedulerBackend(
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}

Expand All @@ -287,30 +310,32 @@ private[spark] class MesosSchedulerBackend(
}
}

/** Helper function to pull out a resource from a Mesos Resources protobuf */
def getResource(res: JList[Resource], name: String): Double = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
}
0
}

/** Turn a Spark TaskDescription into a Mesos task */
def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
def createMesosTask(
task: TaskDescription,
resources: JList[Resource],
slaveId: String): (MesosTaskInfo, JList[Resource]) = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
val cpuResource = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
.build()
MesosTaskInfo.newBuilder()
val (executorInfo, remainingResources) = if (slaveIdsWithExecutors.contains(slaveId)) {
(slaveIdsWithExecutors(slaveId), resources)
} else {
createExecutorInfo(resources, slaveId)
}
slaveIdsWithExecutors(slaveId) = executorInfo

val (finalResources, cpuResources) =
partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)

val taskInfo = MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(createExecutorInfo(slaveId))
.setExecutor(executorInfo)
.setName(task.name)
.addResources(cpuResource)
.addAllResources(cpuResources)
.setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
.build()

(taskInfo, finalResources)
}

/** Check whether a Mesos task state represents a finished task */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,19 @@

package org.apache.spark.scheduler.cluster.mesos

import org.apache.mesos.Protos.{Credential, FrameworkInfo}
import java.util.List

import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer

import org.apache.mesos.Protos._
import org.apache.mesos.{Scheduler, MesosSchedulerDriver}
import org.apache.spark.SparkConf
import org.apache.spark.{Logging, SparkConf}
import org.apache.mesos.protobuf.ByteString
import org.apache.mesos.Protos.Value.Type


private[spark] trait MesosSchedulerUtils {
private[spark] trait MesosSchedulerUtils extends Logging {
def createSchedulerDriver(
scheduler: Scheduler,
sparkUser: String,
Expand Down Expand Up @@ -53,4 +60,57 @@ private[spark] trait MesosSchedulerUtils {
new MesosSchedulerDriver(scheduler, fwInfoBuilder.build, masterUrl)
}
}

// Helper function to pull out a resource from a Mesos Resources protobuf
def getResource(res: List[Resource], name: String): Double = {
var resource = 0.0
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
for (r <- res if r.getName == name) {
resource += r.getScalar.getValue

}
resource
}

/**
* Partition the existing resource list based on the resources requested and
* the remaining resources.
* @return The remaining resources list and the used resources list.
*/
def partitionResources(
resources: List[Resource],
resourceName: String,
count: Double): (List[Resource], List[Resource]) = {
var remain = count
var usedResources = new ArrayBuffer[Resource]
val newResources = resources.collect {
case r => {
if (remain > 0 &&
r.getType == Type.SCALAR &&
r.getScalar.getValue > 0.0 &&
r.getName == resourceName) {
val usage = Math.min(remain, r.getScalar.getValue)
usedResources += Resource.newBuilder()
.setName(resourceName)
.setRole(r.getRole)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(usage).build())
.build()
remain -= usage
Resource.newBuilder()
.setName(resourceName)
.setRole(r.getRole)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(r.getScalar.getValue - usage).build())
.build()
} else {
r
}
}
}

(newResources.filter(r => r.getType != Type.SCALAR || r.getScalar.getValue > 0.0).toList,
usedResources.toList)
}
}
Loading

0 comments on commit eba5669

Please sign in to comment.