From f7fc2a9cd5dc5ca0fe87e432d0d675b8c7c98003 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Mon, 23 Feb 2015 17:18:46 +0000 Subject: [PATCH 1/3] Add mesos role, auth and secret. --- .../mesos/CoarseMesosSchedulerBackend.scala | 21 ++-- .../cluster/mesos/MesosSchedulerBackend.scala | 95 +++++++++------ .../cluster/mesos/MesosSchedulerUtils.scala | 82 ++++++++++++- .../mesos/MesosSchedulerBackendSuite.scala | 113 +++++++++++++++++- docs/running-on-mesos.md | 22 ++++ 5 files changed, 278 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index cbade131494bc..4ad3cbdc02a9b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -224,17 +224,22 @@ private[spark] class CoarseMesosSchedulerBackend( taskIdToSlaveId(taskId) = slaveId slaveIdsWithExecutors += slaveId coresByTaskId(taskId) = cpusToUse - val task = MesosTaskInfo.newBuilder() + // Gather cpu resources from the available resources and use them in the task. + val (remainingResources, taskResources) = + MesosSchedulerUtils.partitionResources(offer.getResourcesList, "cpus", cpusToUse) + val taskBuilder = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) .setName("Task " + taskId) - .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", calculateTotalMemory(sc))) + .addAllResources(taskResources) + .addAllResources( + MesosSchedulerUtils.partitionResources( + remainingResources, "mem", MemoryUtils.calculateTotalMemory(sc))._2) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder) + .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder()) } // accept the offer and launch the task @@ -255,7 +260,7 @@ private[spark] class CoarseMesosSchedulerBackend( override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { val taskId = status.getTaskId.getValue.toInt val state = status.getState - logInfo("Mesos task " + taskId + " is now " + state) + logInfo(s"Mesos task $taskId is now $state") stateLock.synchronized { if (TaskState.isFinished(TaskState.fromMesos(state))) { val slaveId = taskIdToSlaveId(taskId) @@ -270,7 +275,7 @@ private[spark] class CoarseMesosSchedulerBackend( if (TaskState.isFailed(TaskState.fromMesos(state))) { failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { - logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + + logInfo(s"Blacklisting Mesos slave $slaveId due to too many failures; " + "is Spark installed on it?") } } @@ -282,7 +287,7 @@ private[spark] class CoarseMesosSchedulerBackend( } override def error(d: SchedulerDriver, message: String) { - logError("Mesos error: " + message) + logError(s"Mesos error: $message") scheduler.error(message) } @@ -323,7 +328,7 @@ private[spark] class CoarseMesosSchedulerBackend( } override def slaveLost(d: SchedulerDriver, slaveId: SlaveID): Unit = { - logInfo("Mesos slave lost: " + slaveId.getValue) + logInfo(s"Mesos slave lost: ${slaveId.getValue}") executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index d72e2af456e15..234fbb6145630 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -46,7 +46,7 @@ private[spark] class MesosSchedulerBackend( with MesosSchedulerUtils { // Which slave IDs we have executors on - val slaveIdsWithExecutors = new HashSet[String] + val slaveIdsWithExecutors = new HashMap[String, MesosExecutorInfo] val taskIdToSlaveId = new HashMap[Long, String] // An ExecutorInfo for our tasks @@ -71,7 +71,9 @@ private[spark] class MesosSchedulerBackend( startScheduler(master, MesosSchedulerBackend.this, fwInfo) } - def createExecutorInfo(execId: String): MesosExecutorInfo = { + def createExecutorInfo( + resources: JList[Resource], + execId: String): (MesosExecutorInfo, JList[Resource]) = { val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility .getOrElse { @@ -115,32 +117,25 @@ private[spark] class MesosSchedulerBackend( command.setValue(s"cd ${basename}*; $prefixEnv ./bin/spark-class $executorBackendName") command.addUris(CommandInfo.URI.newBuilder().setValue(uri.get)) } - val cpus = Resource.newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder() - .setValue(mesosExecutorCores).build()) - .build() - val memory = Resource.newBuilder() - .setName("mem") - .setType(Value.Type.SCALAR) - .setScalar( - Value.Scalar.newBuilder() - .setValue(calculateTotalMemory(sc)).build()) - .build() - val executorInfo = MesosExecutorInfo.newBuilder() + val builder = MesosExecutorInfo.newBuilder() + val (resourcesAfterCpu, usedCpuResources) = + MesosSchedulerUtils.partitionResources(resources, "cpus", scheduler.CPUS_PER_TASK) + builder.addAllResources(usedCpuResources) + val (resourcesAfterMem, usedMemResources) = + MesosSchedulerUtils.partitionResources(resourcesAfterCpu, "mem", MemoryUtils.calculateTotalMemory(sc)) + builder.addAllResources(usedMemResources) + val executorInfo = builder .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) - .addResources(cpus) - .addResources(memory) + .build() sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => MesosSchedulerBackendUtil .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder()) } - executorInfo.build() + (executorInfo, resourcesAfterMem) } /** @@ -183,6 +178,19 @@ private[spark] class MesosSchedulerBackend( override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = { + val builder = new StringBuilder + tasks.foreach { + case t => + builder.append("Task id: ").append(t.getTaskId.getValue).append("\n") + .append("Slave id: ").append(t.getSlaveId.getValue).append("\n") + .append("Task resources: ").append(t.getResourcesList).append("\n") + .append("Executor resources: ").append(t.getExecutor.getResourcesList) + .append("---------------------------------------------\n") + } + builder.toString() + } + /** * Method called by Mesos to offer resources on slaves. We respond by asking our active task sets * for tasks in order of priority. We fill each node with tasks in a round-robin manner so that @@ -236,6 +244,10 @@ private[spark] class MesosSchedulerBackend( val slaveIdToOffer = usableOffers.map(o => o.getSlaveId.getValue -> o).toMap val slaveIdToWorkerOffer = workerOffers.map(o => o.executorId -> o).toMap + val slaveIdToResources = new HashMap[String, JList[Resource]]() + usableOffers.foreach { o => + slaveIdToResources(o.getSlaveId.getValue) = o.getResourcesList + } val mesosTasks = new HashMap[String, JArrayList[MesosTaskInfo]] @@ -245,13 +257,18 @@ private[spark] class MesosSchedulerBackend( val acceptedOffers = scheduler.resourceOffers(workerOffers).filter(!_.isEmpty) acceptedOffers .foreach { offer => - offer.foreach { taskDesc => - val slaveId = taskDesc.executorId - slaveIdsWithExecutors += slaveId - slavesIdsOfAcceptedOffers += slaveId - taskIdToSlaveId(taskDesc.taskId) = slaveId - mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) - .add(createMesosTask(taskDesc, slaveId)) + offer.foreach { taskDesc => + val slaveId = taskDesc.executorId + slavesIdsOfAcceptedOffers += slaveId + taskIdToSlaveId(taskDesc.taskId) = slaveId + val (mesosTask, remainingResources) = createMesosTask( + taskDesc, + slaveIdToResources(slaveId), + slaveId) + mesosTasks.getOrElseUpdate(slaveId, new JArrayList[MesosTaskInfo]) + .add(mesosTask) + slaveIdToResources(slaveId) = remainingResources + } } } @@ -264,6 +281,7 @@ private[spark] class MesosSchedulerBackend( // TODO: Add support for log urls for Mesos new ExecutorInfo(o.host, o.cores, Map.empty))) ) + logTrace(s"Launching Mesos tasks on slave '$slaveId', tasks:\n${getTasksSummary(tasks)}") d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters) } @@ -277,21 +295,28 @@ private[spark] class MesosSchedulerBackend( } /** Turn a Spark TaskDescription into a Mesos task */ - def createMesosTask(task: TaskDescription, slaveId: String): MesosTaskInfo = { + def createMesosTask( + task: TaskDescription, + resources: JList[Resource], + slaveId: String): (MesosTaskInfo, JList[Resource]) = { val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() - val cpuResource = Resource.newBuilder() - .setName("cpus") - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build()) - .build() - MesosTaskInfo.newBuilder() + val (executorInfo, remainingResources) = if (slaveIdsWithExecutors.contains(slaveId)) { + (slaveIdsWithExecutors(slaveId), resources) + } else { + createExecutorInfo(resources, slaveId) + } + slaveIdsWithExecutors(slaveId) = executorInfo + val (finalResources, cpuResources) = + MesosSchedulerUtils.partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK) + val taskInfo = MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) - .setExecutor(createExecutorInfo(slaveId)) + .setExecutor(executorInfo) .setName(task.name) - .addResources(cpuResource) + .addAllResources(cpuResources) .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString) .build() + (taskInfo, finalResources) } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 925702e63afd3..678cce741106a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -21,7 +21,6 @@ import java.util.{List => JList} import java.util.concurrent.CountDownLatch import scala.collection.JavaConversions._ -import scala.util.control.NonFatal import com.google.common.base.Splitter import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos} @@ -41,6 +40,36 @@ private[mesos] trait MesosSchedulerUtils extends Logging { // Driver for talking to Mesos protected var mesosDriver: SchedulerDriver = null + def createSchedulerDriver( + scheduler: Scheduler, + sparkUser: String, + appName: String, + masterUrl: String, + conf: SparkConf): SchedulerDriver = { + val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName) + val credBuilder = Credential.newBuilder() + conf.getOption("spark.mesos.principal").foreach { principal => + fwInfoBuilder.setPrincipal(principal) + credBuilder.setPrincipal(principal) + } + conf.getOption("spark.mesos.secret").foreach { secret => + credBuilder.setSecret(ByteString.copyFromUtf8(secret)) + } + if (credBuilder.hasSecret && !fwInfoBuilder.hasPrincipal) { + throw new SparkException( + "spark.mesos.principal must be configured when spark.mesos.secret is set") + } + conf.getOption("spark.mesos.role").foreach { role => + fwInfoBuilder.setRole(role) + } + if (credBuilder.hasPrincipal) { + new MesosSchedulerDriver( + scheduler, fwInfoBuilder.build(), masterUrl, credBuilder.build()) + } else { + new MesosSchedulerDriver(scheduler, fwInfoBuilder.build(), masterUrl) + } + } + /** * Starts the MesosSchedulerDriver with the provided information. This method returns * only after the scheduler has registered with Mesos. @@ -82,18 +111,59 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Signal that the scheduler has registered with Mesos. */ + def getResource(res: List[Resource], name: String): Double = { + // A resource can have multiple values in the offer since it can either be from + // a specific role or wildcard. + res.filter(_.getName == name).map(_.getScalar.getValue).sum + } + protected def markRegistered(): Unit = { registerLatch.countDown() } /** - * Get the amount of resources for the specified type from the resource list + * Partition the existing set of resources into two groups, those remaining to be + * scheduled and those requested to be used for a new task. + * @param resources The full list of available resources + * @param resourceName The name of the resource to take from the available resources + * @param count The amount of resources to take from the available resources + * @return The remaining resources list and the used resources list. */ - protected def getResource(res: JList[Resource], name: String): Double = { - for (r <- res if r.getName == name) { - return r.getScalar.getValue + def partitionResources( + resources: List[Resource], + resourceName: String, + count: Double): (List[Resource], List[Resource]) = { + var remain = count + var requestedResources = new ArrayBuffer[Resource] + val remainingResources = resources.collect { + case r => { + if (remain > 0 && + r.getType == Type.SCALAR && + r.getScalar.getValue > 0.0 && + r.getName == resourceName) { + val usage = Math.min(remain, r.getScalar.getValue) + requestedResources += Resource.newBuilder() + .setName(resourceName) + .setRole(r.getRole) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(usage).build()) + .build() + remain -= usage + Resource.newBuilder() + .setName(resourceName) + .setRole(r.getRole) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(r.getScalar.getValue - usage).build()) + .build() + } else { + r + } + } } - 0.0 + + // Filter any resource that has depleted. + (remainingResources.filter(r => r.getType != Type.SCALAR || r.getScalar.getValue > 0.0).toList, + requestedResources.toList) } /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index d01837fe78957..8f84f5d7536f6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -21,6 +21,7 @@ import java.nio.ByteBuffer import java.util import java.util.Collections +import scala.collection.JavaConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -41,6 +42,14 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { test("check spark-class location correctly") { + def createResource(name: String, value: Double): Resource = { + Resource.newBuilder() + .setName(name) + .setScalar(Scalar.newBuilder().setValue(value).build()) + .setType(Value.Type.SCALAR) + .build() + } + val conf = new SparkConf conf.set("spark.mesos.executor.home" , "/mesos-home") @@ -60,16 +69,15 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master") + val resources = List(createResource("cpus", 4), createResource("mem", 1024)) // uri is null. - val executorInfo = mesosSchedulerBackend.createExecutorInfo("test-id") - assert(executorInfo.getCommand.getValue === - s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") + val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") + assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") // uri exists. conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") - val executorInfo1 = mesosSchedulerBackend.createExecutorInfo("test-id") - assert(executorInfo1.getCommand.getValue === - s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") + val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") + assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") } test("spark docker properties correctly populate the DockerInfo message") { @@ -214,4 +222,97 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi backend.resourceOffers(driver, mesosOffers2) verify(driver, times(1)).declineOffer(mesosOffers2.get(0).getId) } + + test("can handle multiple roles") { + val driver = mock[SchedulerDriver] + val taskScheduler = mock[TaskSchedulerImpl] + + val listenerBus = mock[LiveListenerBus] + listenerBus.post( + SparkListenerExecutorAdded(anyLong, "s1", new ExecutorInfo("host1", 2, Map.empty))) + + val sc = mock[SparkContext] + when(sc.executorMemory).thenReturn(100) + when(sc.getSparkHome()).thenReturn(Option("/path")) + when(sc.executorEnvs).thenReturn(new mutable.HashMap[String, String]) + when(sc.conf).thenReturn(new SparkConf) + when(sc.listenerBus).thenReturn(listenerBus) + + val id = 1 + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setRole("prod") + .setScalar(Scalar.newBuilder().setValue(500)) + builder.addResourcesBuilder() + .setName("cpus") + .setRole("prod") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(1)) + builder.addResourcesBuilder() + .setName("mem") + .setRole("dev") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(600)) + builder.addResourcesBuilder() + .setName("cpus") + .setRole("dev") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(2)) + val offer = builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")) + .setHostname(s"host${id.toString}").build() + + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(offer) + + val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") + + val expectedWorkerOffers = new ArrayBuffer[WorkerOffer](1) + expectedWorkerOffers.append(new WorkerOffer( + mesosOffers.get(0).getSlaveId.getValue, + mesosOffers.get(0).getHostname, + 2 // Deducting 1 for executor + )) + + val taskDesc = new TaskDescription(1L, 0, "s1", "n1", 0, ByteBuffer.wrap(new Array[Byte](0))) + when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) + when(taskScheduler.CPUS_PER_TASK).thenReturn(1) + + val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]]) + when( + driver.launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + ).thenReturn(Status.valueOf(1)) + + backend.resourceOffers(driver, mesosOffers) + + verify(driver, times(1)).launchTasks( + Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), + capture.capture(), + any(classOf[Filters]) + ) + + assert(capture.getValue.size() == 1) + val taskInfo = capture.getValue.iterator().next() + assert(taskInfo.getName.equals("n1")) + assert(taskInfo.getResourcesCount == 1) + val cpusDev = taskInfo.getResourcesList.get(0) + assert(cpusDev.getName.equals("cpus")) + assert(cpusDev.getScalar.getValue.equals(1.0)) + assert(cpusDev.getRole.equals("dev")) + val executorResources = taskInfo.getExecutor.getResourcesList + assert(executorResources.exists { r => + r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod") + }) + assert(executorResources.exists { r => + r.getName.equals("cpus") && r.getScalar.getValue.equals(1.0) && r.getRole.equals("prod") + }) + } } diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 1f915d8ea1d73..debdd2adf22d6 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -306,6 +306,28 @@ See the [configuration page](configuration.html) for information on Spark config the final overhead will be this value. + + spark.mesos.principal + Framework principal to authenticate to Mesos + + Set the principal with which Spark framework will use to authenticate with Mesos. + + + + spark.mesos.secret + Framework secret to authenticate to Mesos + + Set the secret with which Spark framework will use to authenticate with Mesos. + + + + spark.mesos.role + Role for the Spark framework + + Set the role of this Spark framework for Mesos. Roles are used in Mesos for reservations + and resource weight sharing. + + spark.mesos.constraints Attribute based constraints to be matched against when accepting resource offers. From 8f9488a4a4c76d60c5c2d7d4290bcdaa747b7981 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 28 Apr 2015 18:59:58 -0700 Subject: [PATCH 2/3] Fix rebase --- .../cluster/mesos/MesosClusterScheduler.scala | 20 ++++++++++------- .../cluster/mesos/MesosSchedulerBackend.scala | 12 +++++----- .../cluster/mesos/MesosSchedulerUtils.scala | 22 ++++++++++++++----- .../mesos/MesosSchedulerBackendSuite.scala | 6 +++-- 4 files changed, 39 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index d3a20f822176e..61cc6ffcfb889 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -295,20 +295,24 @@ private[spark] class MesosClusterScheduler( def start(): Unit = { // TODO: Implement leader election to make sure only one framework running in the cluster. val fwId = schedulerState.fetch[String]("frameworkId") - val builder = FrameworkInfo.newBuilder() - .setUser(Utils.getCurrentUserName()) - .setName(appName) - .setWebuiUrl(frameworkUrl) - .setCheckpoint(true) - .setFailoverTimeout(Integer.MAX_VALUE) // Setting to max so tasks keep running on crash fwId.foreach { id => - builder.setId(FrameworkID.newBuilder().setValue(id).build()) frameworkId = id } recoverState() metricsSystem.registerSource(new MesosClusterSchedulerSource(this)) metricsSystem.start() - startScheduler(master, MesosClusterScheduler.this, builder.build()) + val driver = createSchedulerDriver( + master, + MesosClusterScheduler.this, + Utils.getCurrentUserName(), + appName, + conf, + Some(frameworkUrl), + Some(true), + Some(Integer.MAX_VALUE), + fwId) + + startScheduler(driver) ready = true } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 234fbb6145630..fc2b0a384ed95 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -32,6 +32,7 @@ import org.apache.spark.scheduler._ import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.util.Utils + /** * A SchedulerBackend for running fine-grained tasks on Mesos. Each Spark task is mapped to a * separate Mesos task, allowing multiple applications to share cluster nodes both in space (tasks @@ -66,9 +67,10 @@ private[spark] class MesosSchedulerBackend( @volatile var appId: String = _ override def start() { - val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build() classLoader = Thread.currentThread.getContextClassLoader - startScheduler(master, MesosSchedulerBackend.this, fwInfo) + val driver = createSchedulerDriver( + master, MesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf) + startScheduler(driver) } def createExecutorInfo( @@ -119,10 +121,10 @@ private[spark] class MesosSchedulerBackend( } val builder = MesosExecutorInfo.newBuilder() val (resourcesAfterCpu, usedCpuResources) = - MesosSchedulerUtils.partitionResources(resources, "cpus", scheduler.CPUS_PER_TASK) + partitionResources(resources, "cpus", scheduler.CPUS_PER_TASK) builder.addAllResources(usedCpuResources) val (resourcesAfterMem, usedMemResources) = - MesosSchedulerUtils.partitionResources(resourcesAfterCpu, "mem", MemoryUtils.calculateTotalMemory(sc)) + partitionResources(resourcesAfterCpu, "mem", MemoryUtils.calculateTotalMemory(sc)) builder.addAllResources(usedMemResources) val executorInfo = builder .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) @@ -307,7 +309,7 @@ private[spark] class MesosSchedulerBackend( } slaveIdsWithExecutors(slaveId) = executorInfo val (finalResources, cpuResources) = - MesosSchedulerUtils.partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK) + partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK) val taskInfo = MesosTaskInfo.newBuilder() .setTaskId(taskId) .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 678cce741106a..eabcde07d0f6a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -40,14 +40,24 @@ private[mesos] trait MesosSchedulerUtils extends Logging { // Driver for talking to Mesos protected var mesosDriver: SchedulerDriver = null - def createSchedulerDriver( + protected def createSchedulerDriver( + masterUrl: String, scheduler: Scheduler, sparkUser: String, appName: String, - masterUrl: String, - conf: SparkConf): SchedulerDriver = { + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): MesosSchedulerDriver = { val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName) val credBuilder = Credential.newBuilder() + webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) } + checkpoint.foreach { checkpoint => fwInfoBuilder.setCheckpoint(checkpoint) } + failoverTimeout.foreach { timeout => fwInfoBuilder.setFailoverTimeout(timeout) } + frameworkId.foreach { id => + fwInfoBuilder.setId(FrameworkID.newBuilder().setValue(id).build()) + } conf.getOption("spark.mesos.principal").foreach { principal => fwInfoBuilder.setPrincipal(principal) credBuilder.setPrincipal(principal) @@ -77,7 +87,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * @param scheduler Scheduler object * @param fwInfo FrameworkInfo to pass to the Mesos master */ - def startScheduler(masterUrl: String, scheduler: Scheduler, fwInfo: FrameworkInfo): Unit = { + def startScheduler(newDriver: MesosSchedulerDriver): Unit = { synchronized { if (mesosDriver != null) { registerLatch.await() @@ -88,7 +98,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { setDaemon(true) override def run() { - mesosDriver = new MesosSchedulerDriver(scheduler, fwInfo, masterUrl) + mesosDriver = newDriver try { val ret = mesosDriver.run() logInfo("driver.run() returned with code " + ret) @@ -111,7 +121,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Signal that the scheduler has registered with Mesos. */ - def getResource(res: List[Resource], name: String): Double = { + protected def getResource(res: List[Resource], name: String): Double = { // A resource can have multiple values in the offer since it can either be from // a specific role or wildcard. res.filter(_.getName == name).map(_.getScalar.getValue).sum diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index 8f84f5d7536f6..edcf563af021f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -72,12 +72,14 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val resources = List(createResource("cpus", 4), createResource("mem", 1024)) // uri is null. val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") - assert(executorInfo.getCommand.getValue === s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") + assert(executorInfo.getCommand.getValue === + s" /mesos-home/bin/spark-class ${classOf[MesosExecutorBackend].getName}") // uri exists. conf.set("spark.executor.uri", "hdfs:///test-app-1.0.0.tgz") val (executorInfo1, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") - assert(executorInfo1.getCommand.getValue === s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") + assert(executorInfo1.getCommand.getValue === + s"cd test-app-1*; ./bin/spark-class ${classOf[MesosExecutorBackend].getName}") } test("spark docker properties correctly populate the DockerInfo message") { From 0f9f03e2ccd822aaa8939b8f7d5828e72ba88f11 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 7 Jul 2015 17:30:18 -0700 Subject: [PATCH 3/3] Fix review comments. --- .../mesos/CoarseMesosSchedulerBackend.scala | 28 +++---- .../cluster/mesos/MesosClusterScheduler.scala | 8 +- .../cluster/mesos/MesosSchedulerBackend.scala | 51 ++++++------ .../cluster/mesos/MesosSchedulerUtils.scala | 78 +++++++++++-------- .../CoarseMesosSchedulerBackendSuite.scala | 19 ++++- .../mesos/MesosSchedulerBackendSuite.scala | 21 ++--- 6 files changed, 113 insertions(+), 92 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 4ad3cbdc02a9b..b7fde0d9b3265 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -18,8 +18,8 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File -import java.util.{List => JList, Collections} import java.util.concurrent.locks.ReentrantLock +import java.util.{Collections, List => JList} import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet} @@ -27,12 +27,11 @@ import scala.collection.mutable.{HashMap, HashSet} import com.google.common.collect.HashBiMap import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.{Scheduler => MScheduler, _} -import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} -import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} import org.apache.spark.rpc.RpcAddress import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils +import org.apache.spark.{SparkContext, SparkEnv, SparkException, TaskState} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -69,7 +68,7 @@ private[spark] class CoarseMesosSchedulerBackend( /** * The total number of executors we aim to have. Undefined when not using dynamic allocation - * and before the ExecutorAllocatorManager calls [[doRequesTotalExecutors]]. + * and before the ExecutorAllocatorManager calls [[doRequestTotalExecutors]]. */ private var executorLimitOption: Option[Int] = None @@ -103,8 +102,9 @@ private[spark] class CoarseMesosSchedulerBackend( override def start() { super.start() - val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build() - startScheduler(master, CoarseMesosSchedulerBackend.this, fwInfo) + val driver = createSchedulerDriver( + master, CoarseMesosSchedulerBackend.this, sc.sparkUser, sc.appName, sc.conf) + startScheduler(driver) } def createCommand(offer: Offer, numCores: Int, taskId: Int): CommandInfo = { @@ -225,28 +225,28 @@ private[spark] class CoarseMesosSchedulerBackend( slaveIdsWithExecutors += slaveId coresByTaskId(taskId) = cpusToUse // Gather cpu resources from the available resources and use them in the task. - val (remainingResources, taskResources) = - MesosSchedulerUtils.partitionResources(offer.getResourcesList, "cpus", cpusToUse) + val (remainingResources, cpuResourcesToUse) = + partitionResources(offer.getResourcesList, "cpus", cpusToUse) + val (_, memResourcesToUse) = + partitionResources(remainingResources, "mem", calculateTotalMemory(sc)) val taskBuilder = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) .setName("Task " + taskId) - .addAllResources(taskResources) - .addAllResources( - MesosSchedulerUtils.partitionResources( - remainingResources, "mem", MemoryUtils.calculateTotalMemory(sc))._2) + .addAllResources(cpuResourcesToUse) + .addAllResources(memResourcesToUse) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => MesosSchedulerBackendUtil - .setupContainerBuilderDockerInfo(image, sc.conf, task.getContainerBuilder()) + .setupContainerBuilderDockerInfo(image, sc.conf, taskBuilder.getContainerBuilder()) } // accept the offer and launch the task logDebug(s"Accepting offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") d.launchTasks( Collections.singleton(offer.getId), - Collections.singleton(task.build()), filters) + Collections.singleton(taskBuilder.build()), filters) } else { // Decline the offer logDebug(s"Declining offer: $id with attributes: $offerAttributes mem: $mem cpu: $cpus") diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 61cc6ffcfb889..f078547e71352 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -453,12 +453,8 @@ private[spark] class MesosClusterScheduler( offer.cpu -= driverCpu offer.mem -= driverMem val taskId = TaskID.newBuilder().setValue(submission.submissionId).build() - val cpuResource = Resource.newBuilder() - .setName("cpus").setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(driverCpu)).build() - val memResource = Resource.newBuilder() - .setName("mem").setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(driverMem)).build() + val cpuResource = createResource("cpus", driverCpu) + val memResource = createResource("mem", driverMem) val commandInfo = buildDriverCommand(submission) val appName = submission.schedulerProperties("spark.app.name") val taskInfo = TaskInfo.newBuilder() diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index fc2b0a384ed95..a4e1950b9248f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -46,8 +46,8 @@ private[spark] class MesosSchedulerBackend( with MScheduler with MesosSchedulerUtils { - // Which slave IDs we have executors on - val slaveIdsWithExecutors = new HashMap[String, MesosExecutorInfo] + // Stores the slave ids that has launched a Mesos executor. + val slaveIdToExecutorInfo = new HashMap[String, MesosExecutorInfo] val taskIdToSlaveId = new HashMap[Long, String] // An ExecutorInfo for our tasks @@ -73,8 +73,14 @@ private[spark] class MesosSchedulerBackend( startScheduler(driver) } + /** + * Creates a MesosExecutorInfo that is used to launch a Mesos executor. + * @param availableResources Available resources that is offered by Mesos + * @param execId The executor id to assign to this new executor. + * @return A tuple of the new mesos executor info and the remaining available resources. + */ def createExecutorInfo( - resources: JList[Resource], + availableResources: JList[Resource], execId: String): (MesosExecutorInfo, JList[Resource]) = { val executorSparkHome = sc.conf.getOption("spark.mesos.executor.home") .orElse(sc.getSparkHome()) // Fall back to driver Spark home for backward compatibility @@ -121,23 +127,23 @@ private[spark] class MesosSchedulerBackend( } val builder = MesosExecutorInfo.newBuilder() val (resourcesAfterCpu, usedCpuResources) = - partitionResources(resources, "cpus", scheduler.CPUS_PER_TASK) - builder.addAllResources(usedCpuResources) + partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK) val (resourcesAfterMem, usedMemResources) = - partitionResources(resourcesAfterCpu, "mem", MemoryUtils.calculateTotalMemory(sc)) + partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc)) + + builder.addAllResources(usedCpuResources) builder.addAllResources(usedMemResources) val executorInfo = builder .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) .setCommand(command) .setData(ByteString.copyFrom(createExecArg())) - .build() sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => MesosSchedulerBackendUtil .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder()) } - (executorInfo, resourcesAfterMem) + (executorInfo.build(), resourcesAfterMem) } /** @@ -182,13 +188,12 @@ private[spark] class MesosSchedulerBackend( def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = { val builder = new StringBuilder - tasks.foreach { - case t => - builder.append("Task id: ").append(t.getTaskId.getValue).append("\n") - .append("Slave id: ").append(t.getSlaveId.getValue).append("\n") - .append("Task resources: ").append(t.getResourcesList).append("\n") - .append("Executor resources: ").append(t.getExecutor.getResourcesList) - .append("---------------------------------------------\n") + tasks.foreach { t => + builder.append("Task id: ").append(t.getTaskId.getValue).append("\n") + .append("Slave id: ").append(t.getSlaveId.getValue).append("\n") + .append("Task resources: ").append(t.getResourcesList).append("\n") + .append("Executor resources: ").append(t.getExecutor.getResourcesList) + .append("---------------------------------------------\n") } builder.toString() } @@ -217,7 +222,7 @@ private[spark] class MesosSchedulerBackend( val meetsRequirements = (meetsConstraints && meetsMemoryRequirements && meetsCPURequirements) || - (slaveIdsWithExecutors.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) + (slaveIdToExecutorInfo.contains(slaveId) && cpus >= scheduler.CPUS_PER_TASK) // add some debug messaging val debugstr = if (meetsRequirements) "Accepting" else "Declining" @@ -231,7 +236,7 @@ private[spark] class MesosSchedulerBackend( unUsableOffers.foreach(o => d.declineOffer(o.getId)) val workerOffers = usableOffers.map { o => - val cpus = if (slaveIdsWithExecutors.contains(o.getSlaveId.getValue)) { + val cpus = if (slaveIdToExecutorInfo.contains(o.getSlaveId.getValue)) { getResource(o.getResourcesList, "cpus").toInt } else { // If the Mesos executor has not been started on this slave yet, set aside a few @@ -272,7 +277,6 @@ private[spark] class MesosSchedulerBackend( slaveIdToResources(slaveId) = remainingResources } } - } // Reply to the offers val filters = Filters.newBuilder().setRefuseSeconds(1).build() // TODO: lower timeout? @@ -292,22 +296,21 @@ private[spark] class MesosSchedulerBackend( for (o <- usableOffers if !slavesIdsOfAcceptedOffers.contains(o.getSlaveId.getValue)) { d.declineOffer(o.getId) } - } } - /** Turn a Spark TaskDescription into a Mesos task */ + /** Turn a Spark TaskDescription into a Mesos task and also resources unused by the task */ def createMesosTask( task: TaskDescription, resources: JList[Resource], slaveId: String): (MesosTaskInfo, JList[Resource]) = { val taskId = TaskID.newBuilder().setValue(task.taskId.toString).build() - val (executorInfo, remainingResources) = if (slaveIdsWithExecutors.contains(slaveId)) { - (slaveIdsWithExecutors(slaveId), resources) + val (executorInfo, remainingResources) = if (slaveIdToExecutorInfo.contains(slaveId)) { + (slaveIdToExecutorInfo(slaveId), resources) } else { createExecutorInfo(resources, slaveId) } - slaveIdsWithExecutors(slaveId) = executorInfo + slaveIdToExecutorInfo(slaveId) = executorInfo val (finalResources, cpuResources) = partitionResources(remainingResources, "cpus", scheduler.CPUS_PER_TASK) val taskInfo = MesosTaskInfo.newBuilder() @@ -364,7 +367,7 @@ private[spark] class MesosSchedulerBackend( private def removeExecutor(slaveId: String, reason: String) = { synchronized { listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason)) - slaveIdsWithExecutors -= slaveId + slaveIdToExecutorInfo -= slaveId } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index eabcde07d0f6a..c04920e4f5873 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -21,14 +21,17 @@ import java.util.{List => JList} import java.util.concurrent.CountDownLatch import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer +import scala.util.control.NonFatal import com.google.common.base.Splitter import org.apache.mesos.{MesosSchedulerDriver, SchedulerDriver, Scheduler, Protos} import org.apache.mesos.Protos._ -import org.apache.mesos.protobuf.GeneratedMessage -import org.apache.spark.{Logging, SparkContext} +import org.apache.mesos.protobuf.{ByteString, GeneratedMessage} +import org.apache.spark.{SparkException, SparkConf, Logging, SparkContext} import org.apache.spark.util.Utils + /** * Shared trait for implementing a Mesos Scheduler. This holds common state and helper * methods and Mesos scheduler will use. @@ -40,6 +43,18 @@ private[mesos] trait MesosSchedulerUtils extends Logging { // Driver for talking to Mesos protected var mesosDriver: SchedulerDriver = null + /** + * Creates a new MesosSchedulerDriver that communicates to the Mesos master. + * @param masterUrl The url to connect to Mesos master + * @param scheduler the scheduler class to receive scheduler callbacks + * @param sparkUser User to impersonate with when running tasks + * @param appName The framework name to display on the Mesos UI + * @param conf Spark configuration + * @param webuiUrl The WebUI url to link from Mesos UI + * @param checkpoint Option to checkpoint tasks for failover + * @param failoverTimeout Duration Mesos master expect scheduler to reconnect on disconnect + * @param frameworkId The id of the new framework + */ protected def createSchedulerDriver( masterUrl: String, scheduler: Scheduler, @@ -49,7 +64,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { webuiUrl: Option[String] = None, checkpoint: Option[Boolean] = None, failoverTimeout: Option[Double] = None, - frameworkId: Option[String] = None): MesosSchedulerDriver = { + frameworkId: Option[String] = None): SchedulerDriver = { val fwInfoBuilder = FrameworkInfo.newBuilder().setUser(sparkUser).setName(appName) val credBuilder = Credential.newBuilder() webuiUrl.foreach { url => fwInfoBuilder.setWebuiUrl(url) } @@ -81,13 +96,11 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } /** - * Starts the MesosSchedulerDriver with the provided information. This method returns - * only after the scheduler has registered with Mesos. - * @param masterUrl Mesos master connection URL - * @param scheduler Scheduler object - * @param fwInfo FrameworkInfo to pass to the Mesos master + * Starts the MesosSchedulerDriver and stores the current running driver to this new instance. + * This driver is expected to not be running. + * This method returns only after the scheduler has registered with Mesos. */ - def startScheduler(newDriver: MesosSchedulerDriver): Unit = { + def startScheduler(newDriver: SchedulerDriver): Unit = { synchronized { if (mesosDriver != null) { registerLatch.await() @@ -102,7 +115,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { try { val ret = mesosDriver.run() logInfo("driver.run() returned with code " + ret) - if (ret.equals(Status.DRIVER_ABORTED)) { + if (ret != null && ret.equals(Status.DRIVER_ABORTED)) { System.exit(1) } } catch { @@ -121,7 +134,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { /** * Signal that the scheduler has registered with Mesos. */ - protected def getResource(res: List[Resource], name: String): Double = { + protected def getResource(res: JList[Resource], name: String): Double = { // A resource can have multiple values in the offer since it can either be from // a specific role or wildcard. res.filter(_.getName == name).map(_.getScalar.getValue).sum @@ -131,40 +144,41 @@ private[mesos] trait MesosSchedulerUtils extends Logging { registerLatch.countDown() } + def createResource(name: String, amount: Double, role: Option[String] = None): Resource = { + val builder = Resource.newBuilder() + .setName(name) + .setType(Value.Type.SCALAR) + .setScalar(Value.Scalar.newBuilder().setValue(amount).build()) + + role.foreach { r => builder.setRole(r) } + + builder.build() + } + /** * Partition the existing set of resources into two groups, those remaining to be * scheduled and those requested to be used for a new task. * @param resources The full list of available resources * @param resourceName The name of the resource to take from the available resources - * @param count The amount of resources to take from the available resources + * @param amountToUse The amount of resources to take from the available resources * @return The remaining resources list and the used resources list. */ def partitionResources( - resources: List[Resource], + resources: JList[Resource], resourceName: String, - count: Double): (List[Resource], List[Resource]) = { - var remain = count + amountToUse: Double): (List[Resource], List[Resource]) = { + var remain = amountToUse var requestedResources = new ArrayBuffer[Resource] - val remainingResources = resources.collect { + val remainingResources = resources.map { case r => { if (remain > 0 && - r.getType == Type.SCALAR && + r.getType == Value.Type.SCALAR && r.getScalar.getValue > 0.0 && r.getName == resourceName) { val usage = Math.min(remain, r.getScalar.getValue) - requestedResources += Resource.newBuilder() - .setName(resourceName) - .setRole(r.getRole) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(usage).build()) - .build() + requestedResources += createResource(resourceName, usage, Some(r.getRole)) remain -= usage - Resource.newBuilder() - .setName(resourceName) - .setRole(r.getRole) - .setType(Value.Type.SCALAR) - .setScalar(Value.Scalar.newBuilder().setValue(r.getScalar.getValue - usage).build()) - .build() + createResource(resourceName, r.getScalar.getValue - usage, Some(r.getRole)) } else { r } @@ -172,8 +186,10 @@ private[mesos] trait MesosSchedulerUtils extends Logging { } // Filter any resource that has depleted. - (remainingResources.filter(r => r.getType != Type.SCALAR || r.getScalar.getValue > 0.0).toList, - requestedResources.toList) + val filteredResources = + remainingResources.filter(r => r.getType != Value.Type.SCALAR || r.getScalar.getValue > 0.0) + + (filteredResources.toList, requestedResources.toList) } /** Helper method to get the key,value-set pair for a Mesos Attribute protobuf */ diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala index 3f1692917a357..4b504df7b8851 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -22,7 +22,7 @@ import java.util.Collections import org.apache.mesos.Protos.Value.Scalar import org.apache.mesos.Protos._ -import org.apache.mesos.SchedulerDriver +import org.apache.mesos.{Protos, Scheduler, SchedulerDriver} import org.mockito.Matchers._ import org.mockito.Mockito._ import org.mockito.Matchers @@ -60,7 +60,16 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite taskScheduler: TaskSchedulerImpl, driver: SchedulerDriver): CoarseMesosSchedulerBackend = { val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") { - mesosDriver = driver + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = driver markRegistered() } backend.start() @@ -80,6 +89,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite test("mesos supports killing and limiting executors") { val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) @@ -87,7 +97,7 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite sparkConf.set("spark.driver.port", "1234") val backend = createSchedulerBackend(taskScheduler, driver) - val minMem = backend.calculateTotalMemory(sc).toInt + val minMem = backend.calculateTotalMemory(sc) val minCpu = 4 val mesosOffers = new java.util.ArrayList[Offer] @@ -130,11 +140,12 @@ class CoarseMesosSchedulerBackendSuite extends SparkFunSuite test("mesos supports killing and relaunching tasks with executors") { val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) val taskScheduler = mock[TaskSchedulerImpl] when(taskScheduler.sc).thenReturn(sc) val backend = createSchedulerBackend(taskScheduler, driver) - val minMem = backend.calculateTotalMemory(sc).toInt + 1024 + val minMem = backend.calculateTotalMemory(sc) + 1024 val minCpu = 4 val mesosOffers = new java.util.ArrayList[Offer] diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index edcf563af021f..5ed30f64d705f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -42,14 +42,6 @@ import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, SparkFunSui class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext with MockitoSugar { test("check spark-class location correctly") { - def createResource(name: String, value: Double): Resource = { - Resource.newBuilder() - .setName(name) - .setScalar(Scalar.newBuilder().setValue(value).build()) - .setType(Value.Type.SCALAR) - .build() - } - val conf = new SparkConf conf.set("spark.mesos.executor.home" , "/mesos-home") @@ -69,7 +61,9 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master") - val resources = List(createResource("cpus", 4), createResource("mem", 1024)) + val resources = List( + mesosSchedulerBackend.createResource("cpus", 4), + mesosSchedulerBackend.createResource("mem", 1024)) // uri is null. val (executorInfo, _) = mesosSchedulerBackend.createExecutorInfo(resources, "test-id") assert(executorInfo.getCommand.getValue === @@ -103,7 +97,8 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") - val execInfo = backend.createExecutorInfo("mockExecutor") + val (execInfo, _) = backend.createExecutorInfo( + List(backend.createResource("cpus", 4)), "mockExecutor") assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) val portmaps = execInfo.getContainer.getDocker.getPortMappingsList assert(portmaps.get(0).getHostPort.equals(80)) @@ -204,7 +199,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi ) verify(driver, times(1)).declineOffer(mesosOffers.get(1).getId) verify(driver, times(1)).declineOffer(mesosOffers.get(2).getId) - assert(capture.getValue.size() == 1) + assert(capture.getValue.size() === 1) val taskInfo = capture.getValue.iterator().next() assert(taskInfo.getName.equals("n1")) val cpus = taskInfo.getResourcesList.get(0) @@ -301,10 +296,10 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi any(classOf[Filters]) ) - assert(capture.getValue.size() == 1) + assert(capture.getValue.size() === 1) val taskInfo = capture.getValue.iterator().next() assert(taskInfo.getName.equals("n1")) - assert(taskInfo.getResourcesCount == 1) + assert(taskInfo.getResourcesCount === 1) val cpusDev = taskInfo.getResourcesList.get(0) assert(cpusDev.getName.equals("cpus")) assert(cpusDev.getScalar.getValue.equals(1.0))