From eba5669a3101f1f0a6cd2a9a68a49fdcbc374085 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 17 Mar 2015 01:52:51 -0700 Subject: [PATCH] Handle offers with multiple roles --- .../mesos/CoarseMesosSchedulerBackend.scala | 40 +++---- .../cluster/mesos/MesosSchedulerBackend.scala | 103 ++++++++++------- .../cluster/mesos/MesosSchedulerUtils.scala | 66 ++++++++++- .../mesos/MesosSchedulerBackendSuite.scala | 107 +++++++++++++++++- 4 files changed, 248 insertions(+), 68 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 1f1db42443ab3..3fabf4a3606ff 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -22,7 +22,7 @@ import java.util.{List => JList} import java.util.Collections import scala.collection.JavaConversions._ -import scala.collection.mutable.{HashMap, HashSet} +import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import org.apache.mesos.{Scheduler => MScheduler} import org.apache.mesos._ @@ -209,6 +209,7 @@ private[spark] class CoarseMesosSchedulerBackend( val filters = Filters.newBuilder().setRefuseSeconds(-1).build() for (offer <- offers) { + logTrace("Offer received from Mesos, id: " + offer.getId + ", offer: " + offer) val slaveId = offer.getSlaveId.toString val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt @@ -224,18 +225,26 @@ private[spark] class CoarseMesosSchedulerBackend( taskIdToSlaveId(taskId) = slaveId slaveIdsWithExecutors += slaveId coresByTaskId(taskId) = cpusToUse - val task = MesosTaskInfo.newBuilder() + + val (newResources, usedResources) = + partitionResources(offer.getResourcesList, "cpus", cpusToUse) + + val taskBuilder = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) .setName("Task " + taskId) - .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", - MemoryUtils.calculateTotalMemory(sc))) - .build() + .addAllResources(usedResources) + .addAllResources( + partitionResources(newResources, "mem", MemoryUtils.calculateTotalMemory(sc))._2) + d.launchTasks( - Collections.singleton(offer.getId), Collections.singletonList(task), filters) + Collections.singleton(offer.getId), + Collections.singletonList(taskBuilder.build), + filters) + } else { + logTrace("Offer filtered: " + offer.getId) // Filter it out d.launchTasks( Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters) @@ -244,23 +253,6 @@ private[spark] class CoarseMesosSchedulerBackend( } } - /** Helper function to pull out a resource from a Mesos Resources protobuf */ - private def getResource(res: JList[Resource], name: String): Double = { - for (r <- res if r.getName == name) { - return r.getScalar.getValue - } - 0 - } - - /** Build a Mesos resource protobuf object */ - private def createResource(resourceName: String, quantity: Double): Protos.Resource = { - Resource.newBuilder() - .setName(resourceName) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(quantity).build()) - .build() - } - /** Check whether a Mesos task state represents a finished task */ private def isFinished(state: MesosTaskState) = { state == MesosTaskState.TASK_FINISHED || diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 2b9f2b938730f..24c011dea61fb 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -18,8 +18,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{ArrayList => JArrayList, List => JList} -import java.util.Collections +import java.util.{ArrayList => JArrayList, List => JList, Collections} import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} @@ -58,7 +57,7 @@ private[spark] class MesosSchedulerBackend( var driver: SchedulerDriver = null // Which slave IDs we have executors on - val slaveIdsWithExecutors = new HashSet[String] + val slaveIdsWithExecutors = new HashMap[String, MesosExecutorInfo] val taskIdToSlaveId = new HashMap[Long, String] // An ExecutorInfo for our tasks @@ -93,7 +92,9 @@ private[spark] class MesosSchedulerBackend( } } - def createExecutorInfo(execId: String): MesosExecutorInfo = { + def createExecutorInfo( + resources: JList[Resource], + execId: String): (MesosExecutorInfo, JList[Resource]) = { val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility .getOrElse { @@ -135,26 +136,26 @@ private[spark] class MesosSchedulerBackend( command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName") command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } - val cpus = Resource.newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder() - .setValue(scheduler.CPUS_PER_TASK).build()) - .build() - val memory = Resource.newBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar( - Value.Scalar.newBuilder() - .setValue(MemoryUtils.calculateTotalMemory(sc)).build()) - .build() - MesosExecutorInfo.newBuilder() + + val builder = MesosExecutorInfo.newBuilder() + + val (resourcesAfterCpu, usedCpuResources) = + partitionResources(resources, "cpus", scheduler.CPUS_PER_TASK) + + builder.addAllResources(usedCpuResources) + + val (resourcesAfterMem, usedMemResources) = + partitionResources(resourcesAfterCpu, "mem", MemoryUtils.calculateTotalMemory(sc)) + + builder.addAllResources(usedMemResources) + + val executorInfo = builder .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) - .addResources(cpus) - .addResources(memory) .build() + + (executorInfo, resourcesAfterMem) } /** @@ -208,6 +209,19 @@ private[spark] class MesosSchedulerBackend( override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = { + val builder = new StringBuilder + tasks.foreach { + case t => + builder.append("Task id: ").append(t.getTaskId.getValue).append("\n") + .append("Slave id: ").append(t.getSlaveId.getValue).append("\n") + .append("Task resources: ").append(t.getResourcesList).append("\n") + .append("Executor resources: ").append(t.getExecutor.getResourcesList) + .append("---------------------------------------------\n") + } + builder.toString() + } + /** * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that @@ -245,6 +259,10 @@ private[spark] class MesosSchedulerBackend( val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap + val slaveIdToResources = new HashMap[String, JList[Resource]]() + usableOffers.foreach { o => + slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList + } val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] @@ -256,11 +274,15 @@ private[spark] class MesosSchedulerBackend( .foreach { offer => offer.foreach { taskDesc => val slaveId = taskDesc.executorId - slaveIdsWithExecutors += slaveId slavesIdsOfAcceptedOffers += slaveId taskIdToSlaveId(taskDesc.taskId) = slaveId + val (mesosTask, remainingResources) = createMesosTask( + taskDesc, + slaveIdToResources(slaveId), + slaveId) mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) - .add(createMesosTask(taskDesc, slaveId)) + .add(mesosTask) + slaveIdToResources(slaveId) = remainingResources } } @@ -273,6 +295,7 @@ private[spark] class MesosSchedulerBackend( // TODO: Add support for log urls for Mesos new ExecutorInfo(o.host, o.cores, Map.empty))) ) + logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}") d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } @@ -287,30 +310,32 @@ private[spark] class MesosSchedulerBackend( } } - /** Helper function to pull out a resource from a Mesos Resources protobuf */ - def getResource(res: JList[Resource], name: String): Double = { - for (r <- res if r.getName == name) { - return r.getScalar.getValue - } - 0 - } - /** Turn a Spark TaskDescription into a Mesos task */ - def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { + def createMesosTask( + task: TaskDescription, + resources: JList[Resource], + slaveId: String): (MesosTaskInfo, JList[Resource]) = { val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() - val cpuResource = Resource.newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build()) - .build() - MesosTaskInfo.newBuilder() + val (executorInfo, remainingResources) = if (slaveIdsWithExecutors.contains(slaveId)) { + (slaveIdsWithExecutors(slaveId), resources) + } else { + createExecutorInfo(resources, slaveId) + } + slaveIdsWithExecutors(slaveId) = executorInfo + + val (finalResources, cpuResources) = + partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK) + + val taskInfo = MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) - .setExecutor(createExecutorInfo(slaveId)) + .setExecutor(executorInfo) .setName(task.name) - .addResources(cpuResource) + .addAllResources(cpuResources) .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString) .build() + + (taskInfo, finalResources) } /** Check whether a Mesos task state represents a finished task */ diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index f76e5e4940304..ee1099990ec3e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -17,12 +17,19 @@ package org.apache.spark.scheduler.cluster.mesos -import org.apache.mesos.Protos.{Credential, FrameworkInfo} +import java.util.List + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.mesos.Protos._ import org.apache.mesos.{Scheduler, MesosSchedulerDriver} -import org.apache.spark.SparkConf +import org.apache.spark.{Logging, SparkConf} import org.apache.mesos.protobuf.ByteString +import org.apache.mesos.Protos.Value.Type + -private[spark] trait MesosSchedulerUtils { +private[spark] trait MesosSchedulerUtils extends Logging { def createSchedulerDriver( scheduler: Scheduler, sparkUser: String, @@ -53,4 +60,57 @@ private[spark] trait MesosSchedulerUtils { new MesosSchedulerDriver(scheduler, fwInfoBuilder.build, masterUrl) } } + + // Helper function to pull out a resource from a Mesos Resources protobuf + def getResource(res: List[Resource], name: String): Double = { + var resource = 0.0 + // A resource can have multiple values in the offer since it can either be from + // a specific role or wildcard. + for (r <- res if r.getName == name) { + resource += r.getScalar.getValue + + } + resource + } + + /** + * Partition the existing resource list based on the resources requested and + * the remaining resources. + * @return The remaining resources list and the used resources list. + */ + def partitionResources( + resources: List[Resource], + resourceName: String, + count: Double): (List[Resource], List[Resource]) = { + var remain = count + var usedResources = new ArrayBuffer[Resource] + val newResources = resources.collect { + case r => { + if (remain > 0 && + r.getType == Type.SCALAR && + r.getScalar.getValue > 0.0 && + r.getName == resourceName) { + val usage = Math.min(remain, r.getScalar.getValue) + usedResources += Resource.newBuilder() + .setName(resourceName) + .setRole(r.getRole) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(usage).build()) + .build() + remain -= usage + Resource.newBuilder() + .setName(resourceName) + .setRole(r.getRole) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(r.getScalar.getValue - usage).build()) + .build() + } else { + r + } + } + } + + (newResources.filter(r => r.getType != Type.SCALAR || r.getScalar.getValue > 0.0).toList, + usedResources.toList) + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala index afbaa9ade811f..3f61914bbed4e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import java.util import java.util.Collections +import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -43,6 +44,14 @@ import org.apache.spark.scheduler.cluster.mesos.{MesosSchedulerBackend, MemoryUt class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with MockitoSugar { test("check spark-class location correctly") { + def createResource(name: String, value: Double): Resource = { + Resource.newBuilder() + .setName(name) + .setScalar(Scalar.newBuilder().setValue(value).build()) + .setType(Value.Type.SCALAR) + .build() + } + val conf = new SparkConf conf.set("spark.mesos.executor.home" , "/mesos-home") @@ -62,13 +71,14 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val resources = List(createResource("cpus", 4), createResource("mem", 1024)) // uri is null. - val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id") + val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") // uri exists. conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") - val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id") + val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") } @@ -166,4 +176,97 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Mo backend.resourceOffers(driver, mesosOffers2) verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId) } + + test("can handle multiple roles") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.conf).thenReturn(new SparkConf) + when(sc.listenerBus).thenReturn(listenerBus) + + val id = 1 + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setRole("prod") + .setScalar(Scalar.newBuilder().setValue(500)) + builder.addResourcesBuilder() + .setName("cpus") + .setRole("prod") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(1)) + builder.addResourcesBuilder() + .setName("mem") + .setRole("dev") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(600)) + builder.addResourcesBuilder() + .setName("cpus") + .setRole("dev") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(2)) + val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) + .setHostname(s"host${id.toString}").build() + + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(offer) + + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) + expectedWorkerOffers.append(new WorkerOffer( + mesosOffers.get(0).getSlaveId.getValue, + mesosOffers.get(0).getHostname, + 2 // Deducting 1 for executor + )) + + val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) + when(taskScheduler.CPUS_PER_TASK).thenReturn(1) + + val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]]) + when( + driver.launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + ).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + + assert(capture.getValue.size() == 1) + val taskInfo = capture.getValue.iterator().next() + assert(taskInfo.getName.equals("n1")) + assert(taskInfo.getResourcesCount == 1) + val cpusDev = taskInfo.getResourcesList.get(0) + assert(cpusDev.getName.equals("cpus")) + assert(cpusDev.getScalar.getValue.equals(1.0)) + assert(cpusDev.getRole.equals("dev")) + val executorResources = taskInfo.getExecutor.getResourcesList + assert(executorResources.exists { r => + r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod") + }) + assert(executorResources.exists { r => + r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod") + }) + } }