Skip to content

Commit

Permalink
Add mesos role, auth and secret.
Browse files Browse the repository at this point in the history
  • Loading branch information
tnachen committed Jul 16, 2015
1 parent affbe32 commit f7fc2a9
Show file tree
Hide file tree
Showing 5 changed files with 278 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,17 +224,22 @@ private[spark] class CoarseMesosSchedulerBackend(
taskIdToSlaveId(taskId) = slaveId
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
val task = MesosTaskInfo.newBuilder()
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, taskResources) =
MesosSchedulerUtils.partitionResources(offer.getResourcesList, "cpus", cpusToUse)
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem", calculateTotalMemory(sc)))
.addAllResources(taskResources)
.addAllResources(
MesosSchedulerUtils.partitionResources(
remainingResources, "mem", MemoryUtils.calculateTotalMemory(sc))._2)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder())
}

// accept the offer and launch the task
Expand All @@ -255,7 +260,7 @@ private[spark] class CoarseMesosSchedulerBackend(
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
logInfo("Mesos task " + taskId + " is now " + state)
logInfo(s"Mesos task $taskId is now $state")
stateLock.synchronized {
if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
Expand All @@ -270,7 +275,7 @@ private[spark] class CoarseMesosSchedulerBackend(
if (TaskState.isFailed(TaskState.fromMesos(state))) {
failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
"is Spark installed on it?")
}
}
Expand All @@ -282,7 +287,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}

override def error(d: SchedulerDriver, message: String) {
logError("Mesos error: " + message)
logError(s"Mesos error: $message")
scheduler.error(message)
}

Expand Down Expand Up @@ -323,7 +328,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}

override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
logInfo("Mesos slave lost: " + slaveId.getValue)
logInfo(s"Mesos slave lost: ${slaveId.getValue}")
executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ private[spark] class MesosSchedulerBackend(
with MesosSchedulerUtils {

// Which slave IDs we have executors on
val slaveIdsWithExecutors = new HashSet[String]
val slaveIdsWithExecutors = new HashMap[String, MesosExecutorInfo]
val taskIdToSlaveId = new HashMap[Long, String]

// An ExecutorInfo for our tasks
Expand All @@ -71,7 +71,9 @@ private[spark] class MesosSchedulerBackend(
startScheduler(master, MesosSchedulerBackend.this, fwInfo)
}

def createExecutorInfo(execId: String): MesosExecutorInfo = {
def createExecutorInfo(
resources: JList[Resource],
execId: String): (MesosExecutorInfo, JList[Resource]) = {
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
Expand Down Expand Up @@ -115,32 +117,25 @@ private[spark] class MesosSchedulerBackend(
command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
}
val cpus = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(mesosExecutorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
.setValue(calculateTotalMemory(sc)).build())
.build()
val executorInfo = MesosExecutorInfo.newBuilder()
val builder = MesosExecutorInfo.newBuilder()
val (resourcesAfterCpu, usedCpuResources) =
MesosSchedulerUtils.partitionResources(resources, "cpus", scheduler.CPUS_PER_TASK)
builder.addAllResources(usedCpuResources)
val (resourcesAfterMem, usedMemResources) =
MesosSchedulerUtils.partitionResources(resourcesAfterCpu, "mem", MemoryUtils.calculateTotalMemory(sc))
builder.addAllResources(usedMemResources)
val executorInfo = builder
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(cpus)
.addResources(memory)
.build()

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
}

executorInfo.build()
(executorInfo, resourcesAfterMem)
}

/**
Expand Down Expand Up @@ -183,6 +178,19 @@ private[spark] class MesosSchedulerBackend(

override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}

def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
val builder = new StringBuilder
tasks.foreach {
case t =>
builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
.append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
.append("Task resources: ").append(t.getResourcesList).append("\n")
.append("Executor resources: ").append(t.getExecutor.getResourcesList)
.append("---------------------------------------------\n")
}
builder.toString()
}

/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
Expand Down Expand Up @@ -236,6 +244,10 @@ private[spark] class MesosSchedulerBackend(

val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
val slaveIdToResources = new HashMap[String, JList[Resource]]()
usableOffers.foreach { o =>
slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
}

val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]

Expand All @@ -245,13 +257,18 @@ private[spark] class MesosSchedulerBackend(
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
acceptedOffers
.foreach { offer =>
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
val (mesosTask, remainingResources) = createMesosTask(
taskDesc,
slaveIdToResources(slaveId),
slaveId)
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(mesosTask)
slaveIdToResources(slaveId) = remainingResources
}
}
}

Expand All @@ -264,6 +281,7 @@ private[spark] class MesosSchedulerBackend(
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}

Expand All @@ -277,21 +295,28 @@ private[spark] class MesosSchedulerBackend(
}

/** Turn a Spark TaskDescription into a Mesos task */
def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
def createMesosTask(
task: TaskDescription,
resources: JList[Resource],
slaveId: String): (MesosTaskInfo, JList[Resource]) = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
val cpuResource = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
.build()
MesosTaskInfo.newBuilder()
val (executorInfo, remainingResources) = if (slaveIdsWithExecutors.contains(slaveId)) {
(slaveIdsWithExecutors(slaveId), resources)
} else {
createExecutorInfo(resources, slaveId)
}
slaveIdsWithExecutors(slaveId) = executorInfo
val (finalResources, cpuResources) =
MesosSchedulerUtils.partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
val taskInfo = MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(createExecutorInfo(slaveId))
.setExecutor(executorInfo)
.setName(task.name)
.addResources(cpuResource)
.addAllResources(cpuResources)
.setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
.build()
(taskInfo, finalResources)
}

override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import java.util.{List => JList}
import java.util.concurrent.CountDownLatch

import scala.collection.JavaConversions._
import scala.util.control.NonFatal

import com.google.common.base.Splitter
import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos}
Expand All @@ -41,6 +40,36 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
// Driver for talking to Mesos
protected var mesosDriver: SchedulerDriver = null

def createSchedulerDriver(
scheduler: Scheduler,
sparkUser: String,
appName: String,
masterUrl: String,
conf: SparkConf): SchedulerDriver = {
val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName)
val credBuilder = Credential.newBuilder()
conf.getOption("spark.mesos.principal").foreach { principal =>
fwInfoBuilder.setPrincipal(principal)
credBuilder.setPrincipal(principal)
}
conf.getOption("spark.mesos.secret").foreach { secret =>
credBuilder.setSecret(ByteString.copyFromUtf8(secret))
}
if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) {
throw new SparkException(
"spark.mesos.principal must be configured when spark.mesos.secret is set")
}
conf.getOption("spark.mesos.role").foreach { role =>
fwInfoBuilder.setRole(role)
}
if (credBuilder.hasPrincipal) {
new MesosSchedulerDriver(
scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build())
} else {
new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl)
}
}

/**
* Starts the MesosSchedulerDriver with the provided information. This method returns
* only after the scheduler has registered with Mesos.
Expand Down Expand Up @@ -82,18 +111,59 @@ private[mesos] trait MesosSchedulerUtils extends Logging {
/**
* Signal that the scheduler has registered with Mesos.
*/
def getResource(res: List[Resource], name: String): Double = {
// A resource can have multiple values in the offer since it can either be from
// a specific role or wildcard.
res.filter(_.getName == name).map(_.getScalar.getValue).sum
}

protected def markRegistered(): Unit = {
registerLatch.countDown()
}

/**
* Get the amount of resources for the specified type from the resource list
* Partition the existing set of resources into two groups, those remaining to be
* scheduled and those requested to be used for a new task.
* @param resources The full list of available resources
* @param resourceName The name of the resource to take from the available resources
* @param count The amount of resources to take from the available resources
* @return The remaining resources list and the used resources list.
*/
protected def getResource(res: JList[Resource], name: String): Double = {
for (r <- res if r.getName == name) {
return r.getScalar.getValue
def partitionResources(
resources: List[Resource],
resourceName: String,
count: Double): (List[Resource], List[Resource]) = {
var remain = count
var requestedResources = new ArrayBuffer[Resource]
val remainingResources = resources.collect {
case r => {
if (remain > 0 &&
r.getType == Type.SCALAR &&
r.getScalar.getValue > 0.0 &&
r.getName == resourceName) {
val usage = Math.min(remain, r.getScalar.getValue)
requestedResources += Resource.newBuilder()
.setName(resourceName)
.setRole(r.getRole)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(usage).build())
.build()
remain -= usage
Resource.newBuilder()
.setName(resourceName)
.setRole(r.getRole)
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(r.getScalar.getValue - usage).build())
.build()
} else {
r
}
}
}
0.0

// Filter any resource that has depleted.
(remainingResources.filter(r => r.getType != Type.SCALAR || r.getScalar.getValue > 0.0).toList,
requestedResources.toList)
}

/** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */
Expand Down
Loading

0 comments on commit f7fc2a9

Please sign in to comment.