Skip to content

Commit

Permalink
[SPARK-6284] [MESOS] Add mesos role, principal and secret
Browse files Browse the repository at this point in the history
Mesos supports framework authentication and role to be set per framework, which the role is used to identify the framework's role which impacts the sharing weight of resource allocation and optional authentication information to allow the framework to be connected to the master.

Author: Timothy Chen <tnachen@gmail.com>

Closes #4960 from tnachen/mesos_fw_auth and squashes the following commits:

0f9f03e [Timothy Chen] Fix review comments.
8f9488a [Timothy Chen] Fix rebase
f7fc2a9 [Timothy Chen] Add mesos role, auth and secret.
  • Loading branch information
tnachen authored and Andrew Or committed Jul 17, 2015
1 parent 49351c7 commit d86bbb4
Show file tree
Hide file tree
Showing 7 changed files with 358 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,20 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util.{List => JList, Collections}
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, List => JList}

import scala.collection.JavaConversions._
import scala.collection.mutable.{HashMap, HashSet}

import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, _}
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.TaskSchedulerImpl
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.util.Utils
import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -69,7 +68,7 @@ private[spark] class CoarseMesosSchedulerBackend(

/**
* The total number of executors we aim to have. Undefined when not using dynamic allocation
* and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]].
* and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]].
*/
private var executorLimitOption: Option[Int] = None

Expand Down Expand Up @@ -103,8 +102,9 @@ private[spark] class CoarseMesosSchedulerBackend(

override def start() {
super.start()
val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo)
val driver = createSchedulerDriver(
master, CoarseMesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf)
startScheduler(driver)
}

def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
Expand Down Expand Up @@ -224,24 +224,29 @@ private[spark] class CoarseMesosSchedulerBackend(
taskIdToSlaveId(taskId) = slaveId
slaveIdsWithExecutors += slaveId
coresByTaskId(taskId) = cpusToUse
val task = MesosTaskInfo.newBuilder()
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addResources(createResource("cpus", cpusToUse))
.addResources(createResource("mem", calculateTotalMemory(sc)))
.addAllResources(cpuResourcesToUse)
.addAllResources(memResourcesToUse)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder)
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
}

// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(task.build()), filters)
Collections.singleton(taskBuilder.build()), filters)
} else {
// Decline the offer
logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
Expand All @@ -255,7 +260,7 @@ private[spark] class CoarseMesosSchedulerBackend(
override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
val state = status.getState
logInfo("Mesos task " + taskId + " is now " + state)
logInfo(s"Mesos task $taskId is now $state")
stateLock.synchronized {
if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
Expand All @@ -270,7 +275,7 @@ private[spark] class CoarseMesosSchedulerBackend(
if (TaskState.isFailed(TaskState.fromMesos(state))) {
failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1
if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) {
logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " +
logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " +
"is Spark installed on it?")
}
}
Expand All @@ -282,7 +287,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}

override def error(d: SchedulerDriver, message: String) {
logError("Mesos error: " + message)
logError(s"Mesos error: $message")
scheduler.error(message)
}

Expand Down Expand Up @@ -323,7 +328,7 @@ private[spark] class CoarseMesosSchedulerBackend(
}

override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
logInfo("Mesos slave lost: " + slaveId.getValue)
logInfo(s"Mesos slave lost: ${slaveId.getValue}")
executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,20 +295,24 @@ private[spark] class MesosClusterScheduler(
def start(): Unit = {
// TODO: Implement leader election to make sure only one framework running in the cluster.
val fwId = schedulerState.fetch[String]("frameworkId")
val builder = FrameworkInfo.newBuilder()
.setUser(Utils.getCurrentUserName())
.setName(appName)
.setWebuiUrl(frameworkUrl)
.setCheckpoint(true)
.setFailoverTimeout(Integer.MAX_VALUE) // Setting to max so tasks keep running on crash
fwId.foreach { id =>
builder.setId(FrameworkID.newBuilder().setValue(id).build())
frameworkId = id
}
recoverState()
metricsSystem.registerSource(new MesosClusterSchedulerSource(this))
metricsSystem.start()
startScheduler(master, MesosClusterScheduler.this, builder.build())
val driver = createSchedulerDriver(
master,
MesosClusterScheduler.this,
Utils.getCurrentUserName(),
appName,
conf,
Some(frameworkUrl),
Some(true),
Some(Integer.MAX_VALUE),
fwId)

startScheduler(driver)
ready = true
}

Expand Down Expand Up @@ -449,12 +453,8 @@ private[spark] class MesosClusterScheduler(
offer.cpu -= driverCpu
offer.mem -= driverMem
val taskId = TaskID.newBuilder().setValue(submission.submissionId).build()
val cpuResource = Resource.newBuilder()
.setName("cpus").setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build()
val memResource = Resource.newBuilder()
.setName("mem").setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build()
val cpuResource = createResource("cpus", driverCpu)
val memResource = createResource("mem", driverMem)
val commandInfo = buildDriverCommand(submission)
val appName = submission.schedulerProperties("spark.app.name")
val taskInfo = TaskInfo.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.util.Utils


/**
* A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a
* separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks
Expand All @@ -45,8 +46,8 @@ private[spark] class MesosSchedulerBackend(
with MScheduler
with MesosSchedulerUtils {

// Which slave IDs we have executors on
val slaveIdsWithExecutors = new HashSet[String]
// Stores the slave ids that has launched a Mesos executor.
val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo]
val taskIdToSlaveId = new HashMap[Long, String]

// An ExecutorInfo for our tasks
Expand All @@ -66,12 +67,21 @@ private[spark] class MesosSchedulerBackend(
@volatile var appId: String = _

override def start() {
val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build()
classLoader = Thread.currentThread.getContextClassLoader
startScheduler(master, MesosSchedulerBackend.this, fwInfo)
val driver = createSchedulerDriver(
master, MesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf)
startScheduler(driver)
}

def createExecutorInfo(execId: String): MesosExecutorInfo = {
/**
* Creates a MesosExecutorInfo that is used to launch a Mesos executor.
* @param availableResources Available resources that is offered by Mesos
* @param execId The executor id to assign to this new executor.
* @return A tuple of the new mesos executor info and the remaining available resources.
*/
def createExecutorInfo(
availableResources: JList[Resource],
execId: String): (MesosExecutorInfo, JList[Resource]) = {
val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility
.getOrElse {
Expand Down Expand Up @@ -115,32 +125,25 @@ private[spark] class MesosSchedulerBackend(
command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName")
command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get))
}
val cpus = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder()
.setValue(mesosExecutorCores).build())
.build()
val memory = Resource.newBuilder()
.setName("mem")
.setType(Value.Type.SCALAR)
.setScalar(
Value.Scalar.newBuilder()
.setValue(calculateTotalMemory(sc)).build())
.build()
val executorInfo = MesosExecutorInfo.newBuilder()
val builder = MesosExecutorInfo.newBuilder()
val (resourcesAfterCpu, usedCpuResources) =
partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK)
val (resourcesAfterMem, usedMemResources) =
partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc))

builder.addAllResources(usedCpuResources)
builder.addAllResources(usedMemResources)
val executorInfo = builder
.setExecutorId(ExecutorID.newBuilder().setValue(execId).build())
.setCommand(command)
.setData(ByteString.copyFrom(createExecArg()))
.addResources(cpus)
.addResources(memory)

sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image =>
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder())
}

executorInfo.build()
(executorInfo.build(), resourcesAfterMem)
}

/**
Expand Down Expand Up @@ -183,6 +186,18 @@ private[spark] class MesosSchedulerBackend(

override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {}

private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = {
val builder = new StringBuilder
tasks.foreach { t =>
builder.append("Task id: ").append(t.getTaskId.getValue).append("\n")
.append("Slave id: ").append(t.getSlaveId.getValue).append("\n")
.append("Task resources: ").append(t.getResourcesList).append("\n")
.append("Executor resources: ").append(t.getExecutor.getResourcesList)
.append("---------------------------------------------\n")
}
builder.toString()
}

/**
* Method called by Mesos to offer resources on slaves. We respond by asking our active task sets
* for tasks in order of priority. We fill each node with tasks in a round-robin manner so that
Expand All @@ -207,7 +222,7 @@ private[spark] class MesosSchedulerBackend(

val meetsRequirements =
(meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) ||
(slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)
(slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK)

// add some debug messaging
val debugstr = if (meetsRequirements) "Accepting" else "Declining"
Expand All @@ -221,7 +236,7 @@ private[spark] class MesosSchedulerBackend(
unUsableOffers.foreach(o => d.declineOffer(o.getId))

val workerOffers = usableOffers.map { o =>
val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) {
val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) {
getResource(o.getResourcesList, "cpus").toInt
} else {
// If the Mesos executor has not been started on this slave yet, set aside a few
Expand All @@ -236,6 +251,10 @@ private[spark] class MesosSchedulerBackend(

val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap
val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap
val slaveIdToResources = new HashMap[String, JList[Resource]]()
usableOffers.foreach { o =>
slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList
}

val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]]

Expand All @@ -245,15 +264,19 @@ private[spark] class MesosSchedulerBackend(
val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty)
acceptedOffers
.foreach { offer =>
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slaveIdsWithExecutors += slaveId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(createMesosTask(taskDesc, slaveId))
offer.foreach { taskDesc =>
val slaveId = taskDesc.executorId
slavesIdsOfAcceptedOffers += slaveId
taskIdToSlaveId(taskDesc.taskId) = slaveId
val (mesosTask, remainingResources) = createMesosTask(
taskDesc,
slaveIdToResources(slaveId),
slaveId)
mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo])
.add(mesosTask)
slaveIdToResources(slaveId) = remainingResources
}
}
}

// Reply to the offers
val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout?
Expand All @@ -264,6 +287,7 @@ private[spark] class MesosSchedulerBackend(
// TODO: Add support for log urls for Mesos
new ExecutorInfo(o.host, o.cores, Map.empty)))
)
logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}")
d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
}

Expand All @@ -272,26 +296,32 @@ private[spark] class MesosSchedulerBackend(
for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) {
d.declineOffer(o.getId)
}

}
}

/** Turn a Spark TaskDescription into a Mesos task */
def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = {
/** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */
def createMesosTask(
task: TaskDescription,
resources: JList[Resource],
slaveId: String): (MesosTaskInfo, JList[Resource]) = {
val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build()
val cpuResource = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
.build()
MesosTaskInfo.newBuilder()
val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) {
(slaveIdToExecutorInfo(slaveId), resources)
} else {
createExecutorInfo(resources, slaveId)
}
slaveIdToExecutorInfo(slaveId) = executorInfo
val (finalResources, cpuResources) =
partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK)
val taskInfo = MesosTaskInfo.newBuilder()
.setTaskId(taskId)
.setSlaveId(SlaveID.newBuilder().setValue(slaveId).build())
.setExecutor(createExecutorInfo(slaveId))
.setExecutor(executorInfo)
.setName(task.name)
.addResources(cpuResource)
.addAllResources(cpuResources)
.setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString)
.build()
(taskInfo, finalResources)
}

override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
Expand Down Expand Up @@ -337,7 +367,7 @@ private[spark] class MesosSchedulerBackend(
private def removeExecutor(slaveId: String, reason: String) = {
synchronized {
listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
slaveIdsWithExecutors -= slaveId
slaveIdToExecutorInfo -= slaveId
}
}

Expand Down
Loading

0 comments on commit d86bbb4

Please sign in to comment.