Skip to content

Commit

Permalink
Fix review comments.
Browse files Browse the repository at this point in the history
  • Loading branch information
tnachen committed Nov 21, 2015
1 parent 7d20148 commit 8a7a735
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 241 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@
package org.apache.spark.scheduler.cluster.mesos

import java.io.File
import java.util
import java.util.concurrent.locks.ReentrantLock
import java.util.{Collections, List => JList}

import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, HashSet}

import com.google.common.collect.HashBiMap
import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}

import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.RpcAddress
import org.apache.spark.scheduler.{SlaveLost, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
import org.apache.spark.scheduler.{ExecutorLossReason, TaskSchedulerImpl}
import org.apache.spark.util.Utils
import org.apache.spark.{SecurityManager, SparkContext, SparkEnv, SparkException, TaskState}


import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.{Scheduler => MScheduler, SchedulerDriver}

/**
* A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds
Expand Down Expand Up @@ -64,20 +65,31 @@ private[spark] class CoarseMesosSchedulerBackend(
// This is for cleaning up shuffle files reliably.
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)

val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1)
val maxCpusPerExecutor = conf.getInt("spark.mesos.coarse.cores.max", Int.MaxValue)
private val maxExecutorsPerSlave = conf.getInt("spark.mesos.coarse.executors.max", 1)

private val maxCpusPerExecutor =
conf.getOption("spark.mesos.coarse.executor.cores.max").map { m => m.toInt }

if (conf.getOption("spark.mesos.coarse.executors.max").isDefined && maxCpusPerExecutor.isEmpty) {
throw new IllegalArgumentException(
"Must configure spark.mesos.coarse.coresPerExecutor.max when " +
"spark.mesos.coarse.executors.max is set")
}

// Cores we have acquired with each Mesos task ID
val coresByTaskId = new HashMap[Int, Int]
val coresByTaskId = new HashMap[String, Int]
var totalCoresAcquired = 0

// Maping from slave Id to hostname
private val slaveIdToHost = new HashMap[String, String]

// Contains the list of slave ids that we have connect shuffle service to
private val existingSlaveShuffleConnections = new HashSet[String]

// Contains a mapping of slave ids to the number of executors launched.
val slaveIdsWithExecutors = new HashMap[String, Int]

val taskIdToSlaveId: HashBiMap[Int, String] = HashBiMap.create[Int, String]
val taskIdToSlaveId: HashMap[String, String] = new HashMap[String, String]
// How many times tasks on each slave failed
val failuresBySlaveId: HashMap[String, Int] = new HashMap[String, Int]

Expand All @@ -93,8 +105,6 @@ private[spark] class CoarseMesosSchedulerBackend(
*/
private[mesos] def executorLimit: Int = executorLimitOption.getOrElse(Int.MaxValue)

private val pendingRemovedSlaveIds = new HashSet[String]

// private lock object protecting mutable state above. Using the intrinsic lock
// may lead to deadlocks since the superclass might also try to lock
private val stateLock = new ReentrantLock
Expand Down Expand Up @@ -126,10 +136,10 @@ private[spark] class CoarseMesosSchedulerBackend(

@volatile var appId: String = _

def newMesosTaskId(): Int = {
def newMesosTaskId(slaveId: String): String = {
val id = nextMesosTaskId
nextMesosTaskId += 1
id
slaveId + "/" + id
}

override def start() {
Expand All @@ -144,7 +154,7 @@ private[spark] class CoarseMesosSchedulerBackend(
startScheduler(driver)
}

def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = {
def createCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = {
val executorSparkHome = conf.getOption("spark.mesos.executor.home")
.orElse(sc.getSparkHome())
.getOrElse {
Expand Down Expand Up @@ -188,20 +198,20 @@ private[spark] class CoarseMesosSchedulerBackend(
"%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend"
.format(prefixEnv, runScript) +
s" --driver-url $driverURL" +
s" --executor-id ${offer.getSlaveId.getValue}" +
s" --executor-id $taskId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.get.split('/').last.split('.').head
val executorId = sparkExecutorId(offer.getSlaveId.getValue, taskId.toString)

command.setValue(
s"cd $basename*; $prefixEnv " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend" +
s" --driver-url $driverURL" +
s" --executor-id $executorId" +
s" --executor-id $taskId" +
s" --hostname ${offer.getHostname}" +
s" --cores $numCores" +
s" --app-id $appId")
Expand Down Expand Up @@ -248,42 +258,47 @@ private[spark] class CoarseMesosSchedulerBackend(
* unless we've already launched more than we wanted to.
*/
override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) {
val memoryPerExecutor = calculateTotalMemory(sc)
stateLock.synchronized {
val filters = Filters.newBuilder().setRefuseSeconds(5).build()
for (offer <- offers.asScala) {
val mem = getResource(offer.getResourcesList, "mem")
val cpus = getResource(offer.getResourcesList, "cpus").toInt
var remainingMem = mem
var remainingCores = cpus
val tasks = new util.ArrayList[MesosTaskInfo]()
val offerAttributes = toAttributeMap(offer.getAttributesList)
val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes)
val slaveId = offer.getSlaveId.getValue
val totalMem = getResource(offer.getResourcesList, "mem")
val totalCpus = getResource(offer.getResourcesList, "cpus").toInt
val id = offer.getId.getValue
var executorCount = slaveIdsWithExecutors.getOrElse(slaveId, 0)
while (taskIdToSlaveId.size < executorLimit &&
totalCoresAcquired < maxCores &&
meetsConstraints &&
mem >= calculateTotalMemory(sc) &&
cpus >= 1 &&
remainingMem >= calculateTotalMemory(sc) &&
remainingCores >= 1 &&
failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES &&
executorCount < maxExecutorsPerSlave) {
// Launch an executor on the slave
val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired)
totalCoresAcquired += cpusToUse
totalCpus -= cpusToUse
totalMem -= memRequired
val taskId = newMesosTaskId()
val coresToUse =
math.min(maxCpusPerExecutor.getOrElse(Int.MaxValue),
math.min(remainingCores, maxCores - totalCoresAcquired))
totalCoresAcquired += coresToUse
remainingCores -= coresToUse
remainingMem -= memoryPerExecutor
val taskId = newMesosTaskId(slaveId)
taskIdToSlaveId(taskId) = slaveId
executorCount += 1
slaveIdsWithExecutors(slaveId) = executorCount
coresByTaskId(taskId) = cpusToUse
coresByTaskId(taskId) = coresToUse
// Gather cpu resources from the available resources and use them in the task.
val (remainingResources, cpuResourcesToUse) =
partitionResources(offer.getResourcesList, "cpus", cpusToUse)
partitionResources(offer.getResourcesList, "cpus", coresToUse)
val (_, memResourcesToUse) =
partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc))
val taskBuilder = MesosTaskInfo.newBuilder()
.setTaskId(TaskID.newBuilder().setValue(taskId.toString).build())
.setSlaveId(offer.getSlaveId)
.setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId))
.setCommand(createCommand(offer, coresToUse + extraCoresPerSlave, taskId))
.setName("Task " + taskId)
.addAllResources(cpuResourcesToUse.asJava)
.addAllResources(memResourcesToUse.asJava)
Expand All @@ -292,13 +307,14 @@ private[spark] class CoarseMesosSchedulerBackend(
MesosSchedulerBackendUtil
.setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder())
}
tasks.add(taskBuilder.build())
}

if (!tasks.isEmpty) {
// accept the offer and launch the task
logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus")
slaveIdToHost(offer.getSlaveId.getValue) = offer.getHostname
d.launchTasks(
Collections.singleton(offer.getId),
Collections.singleton(taskBuilder.build()), filters)
d.launchTasks(Collections.singleton(offer.getId()), tasks, filters)
} else {
// This offer does not meet constraints. We don't need to see it again.
// Decline the offer for a long period of time.
Expand All @@ -313,7 +329,7 @@ private[spark] class CoarseMesosSchedulerBackend(


override def statusUpdate(d: SchedulerDriver, status: TaskStatus) {
val taskId = status.getTaskId.getValue.toInt
val taskId = status.getTaskId.getValue
val state = status.getState
logInfo(s"Mesos task $taskId is now $state")
val slaveId: String = status.getSlaveId.getValue
Expand All @@ -324,7 +340,8 @@ private[spark] class CoarseMesosSchedulerBackend(
// this through Mesos, since the shuffle services are set up independently.
if (TaskState.fromMesos(state).equals(TaskState.RUNNING) &&
slaveIdToHost.contains(slaveId) &&
shuffleServiceEnabled) {
shuffleServiceEnabled &&
!existingSlaveShuffleConnections.contains(slaveId)) {
assume(mesosExternalShuffleClient.isDefined,
"External shuffle client was not instantiated even though shuffle service is enabled.")
// TODO: Remove this and allow the MesosExternalShuffleService to detect
Expand All @@ -335,12 +352,10 @@ private[spark] class CoarseMesosSchedulerBackend(
s"host $hostname, port $externalShufflePort for app ${conf.getAppId}")
mesosExternalShuffleClient.get
.registerDriverWithShuffleService(hostname, externalShufflePort)
}

if (TaskState.isFinished(TaskState.fromMesos(state))) {
existingSlaveShuffleConnections += slaveId
} else if (TaskState.isFinished(TaskState.fromMesos(state))) {
val slaveId = taskIdToSlaveId(taskId)
slaveIdsWithExecutors -= slaveId
taskIdToSlaveId.remove(taskId)

// Remove the cores we have remembered for this task, if it's in the hashmap
for (cores <- coresByTaskId.get(taskId)) {
totalCoresAcquired -= cores
Expand All @@ -354,7 +369,7 @@ private[spark] class CoarseMesosSchedulerBackend(
"is Spark installed on it?")
}
}
executorTerminated(d, slaveId, s"Executor finished with state $state")
executorTerminated(d, taskId, slaveId, s"Executor finished with state $state")
// In case we'd rejected everything before but have now lost a node
d.reviveOffers()
}
Expand All @@ -381,35 +396,39 @@ private[spark] class CoarseMesosSchedulerBackend(
* slave IDs that we might have asked to be killed. It also notifies the driver
* that an executor was removed.
*/
private def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String): Unit = {
private def executorTerminated(
d: SchedulerDriver,
executorId: String,
slaveId: String,
reason: String): Unit = {
stateLock.synchronized {
if (slaveIdsWithExecutors.contains(slaveId)) {
val slaveIdToTaskId = taskIdToSlaveId.inverse()
if (slaveIdToTaskId.containsKey(slaveId)) {
val taskId: Int = slaveIdToTaskId.get(slaveId)
taskIdToSlaveId.remove(taskId)
removeExecutor(sparkExecutorId(slaveId, taskId.toString), SlaveLost(reason))
if (slaveIdsWithExecutors.contains(slaveId) && taskIdToSlaveId.contains(executorId)) {
taskIdToSlaveId.remove(executorId)
removeExecutor(executorId, new ExecutorLossReason(reason))
val newCount = slaveIdsWithExecutors(slaveId) - 1
if (newCount == 0) {
slaveIdsWithExecutors.remove(slaveId)
} else {
slaveIdsWithExecutors(slaveId) = newCount
}
// TODO: This assumes one Spark executor per Mesos slave,
// which may no longer be true after SPARK-5095
pendingRemovedSlaveIds -= slaveId
slaveIdsWithExecutors -= slaveId
}
}
}

private def sparkExecutorId(slaveId: String, taskId: String): String = {
s"$slaveId/$taskId"
}

override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = {
logInfo(s"Mesos slave lost: ${slaveId.getValue}")
executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
// Terminate all executors in the slave
stateLock.synchronized {
val lostExecutors = taskIdToSlaveId.filter(_._2.equals(slaveId.getValue)).map(_._1)
lostExecutors.foreach { taskId =>
executorTerminated(d, taskId, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue)
}
}
}

override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int): Unit = {
logInfo("Executor lost: %s, marking slave %s as lost".format(e.getValue, s.getValue))
slaveLost(d, s)
logInfo("Executor lost: %s".format(e.getValue))
executorTerminated(d, e.getValue, s.getValue, "Mesos Executor lost: " + e.getValue)
}

override def applicationId(): String =
Expand All @@ -432,13 +451,9 @@ private[spark] class CoarseMesosSchedulerBackend(
return false
}

val slaveIdToTaskId = taskIdToSlaveId.inverse()
for (executorId <- executorIds) {
val slaveId = executorId.split("/")(0)
if (slaveIdToTaskId.containsKey(slaveId)) {
mesosDriver.killTask(
TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build())
pendingRemovedSlaveIds += slaveId
if (taskIdToSlaveId.contains(executorId)) {
mesosDriver.killTask(TaskID.newBuilder().setValue(executorId).build())
} else {
logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler")
}
Expand Down
Loading

0 comments on commit 8a7a735

Please sign in to comment.