From 71a3e597df653c93ebc31d731d61f6e21d932cd9 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Fri, 5 Dec 2014 05:55:42 +0000 Subject: [PATCH 01/15] Launch External Shuffle Service with mesos --- .../StandaloneWorkerShuffleService.scala | 1 - .../CoarseGrainedExecutorBackend.scala | 20 +++-- .../CoarseGrainedMesosExecutorBackend.scala | 73 +++++++++++++++++++ .../mesos/CoarseMesosSchedulerBackend.scala | 8 +- 4 files changed, 91 insertions(+), 11 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index b9798963bab0a..d61019ca816fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -31,7 +31,6 @@ import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler * * Optionally requires SASL authentication in order to read. See [[SecurityManager]]. */ -private[worker] class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: SecurityManager) extends Logging { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 9a4adfbbb3d71..94d0f71b0a6e4 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -33,7 +33,7 @@ import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils} -private[spark] class CoarseGrainedExecutorBackend( +protected[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, @@ -99,15 +99,15 @@ private[spark] class CoarseGrainedExecutorBackend( } } -private[spark] object CoarseGrainedExecutorBackend extends Logging { - - private def run( +trait CoarseGrainedExecutorBackendRunner extends Logging { + def run[T <: CoarseGrainedExecutorBackend]( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, - workerUrl: Option[String]) { + workerUrl: Option[String], + klass: Class[T]) { SignalLogger.register(log) @@ -148,6 +148,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { env.actorSystem.awaitTermination() } } +} + +protected[spark] object CoarseGrainedExecutorBackend + extends CoarseGrainedExecutorBackendRunner { def main(args: Array[String]) { args.length match { @@ -161,9 +165,11 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging { // NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode) // and CoarseMesosSchedulerBackend (for mesos mode). case 5 => - run(args(0), args(1), args(2), args(3).toInt, args(4), None) + run(args(0), args(1), args(2), args(3).toInt, args(4), None, + classOf[CoarseGrainedExecutorBackend]) case x if x > 5 => - run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5))) + run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)), + classOf[CoarseGrainedExecutorBackend]) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala new file mode 100644 index 0000000000000..d52e5c3850e19 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.executor.{CoarseGrainedExecutorBackendRunner, CoarseGrainedExecutorBackend} +import org.apache.spark.SecurityManager +import org.apache.spark.SparkConf +import org.apache.spark.deploy.worker.StandaloneWorkerShuffleService +import akka.actor.ActorSystem + +private[spark] class CoarseGrainedMesosExecutorBackend( + driverUrl: String, + executorId: String, + hostPort: String, + cores: Int, + sparkProperties: Seq[(String, String)], + actorSystem: ActorSystem) + extends CoarseGrainedExecutorBackend(driverUrl, executorId, hostPort, + cores, sparkProperties, actorSystem) { + + lazy val shuffleService: StandaloneWorkerShuffleService = { + val executorConf = new SparkConf + new StandaloneWorkerShuffleService(executorConf, new SecurityManager(executorConf)) + } + + override def preStart() { + shuffleService.startIfEnabled() + super.preStart() + } + + override def postStop() { + shuffleService.stop() + super.postStop() + } +} + +private[spark] object CoarseGrainedMesosExecutorBackend + extends CoarseGrainedExecutorBackendRunner { + + def main(args: Array[String]) { + args.length match { + case x if x < 5 => + System.err.println( + // Worker url is used in spark standalone mode to enforce fate-sharing with worker + "Usage: CoarseGrainedMesosExecutorBackend " + + " [] ") + System.exit(1) + + // NB: These arguments are provided by CoarseMesosSchedulerBackend (for mesos mode). + case 5 => + run(args(0), args(1), args(2), args(3).toInt, args(4), None, + classOf[CoarseGrainedMesosExecutorBackend]) + case x if x > 5 => + run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)), + classOf[CoarseGrainedMesosExecutorBackend]) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 5289661eb896b..24c57da7359c9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -153,7 +153,8 @@ private[spark] class CoarseMesosSchedulerBackend( if (uri == null) { val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath command.setValue( - "%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format( + "%s \"%s\" org.apache.spark.executor.CoarseGrainedMesosExecutorBackend " + + "%s %s %s %d %s".format( prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId)) } else { @@ -162,8 +163,9 @@ private[spark] class CoarseMesosSchedulerBackend( val basename = uri.split('/').last.split('.').head command.setValue( ("cd %s*; %s " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s") - .format(basename, prefixEnv, driverUrl, offer.getSlaveId.getValue, + "./bin/spark-class org.apache.spark.executor.CoarseGrainedMesosExecutorBackend " + + "%s %s %s %d %s").format( + basename, prefixEnv, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId)) command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) } From e90dd4a5211c23f74fa2d62a40de020c0cf4891c Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Sat, 13 Dec 2014 12:17:47 -0800 Subject: [PATCH 02/15] Support total and kill executors in coarse grained mesos mode. --- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../mesos/CoarseMesosSchedulerBackend.scala | 61 ++++++++-- .../CoarseMesosSchedulerBackendSuite.scala | 113 ++++++++++++++++++ 3 files changed, 168 insertions(+), 10 deletions(-) create mode 100644 core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ff5d796ee2766..0215d3f73cc64 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1002,7 +1002,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { - assert(master.contains("yarn") || dynamicAllocationTesting, + assert(master.contains("mesos") || assert(master.contains("yarn") || dynamicAllocationTesting, "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => @@ -1020,7 +1020,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { - assert(master.contains("yarn") || dynamicAllocationTesting, + assert(master.contains("mesos") || assert(master.contains("yarn") || dynamicAllocationTesting, "Killing executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 24c57da7359c9..8c9c1e1108de9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -32,6 +32,7 @@ import org.apache.spark.{Logging, SparkContext, SparkEnv, SparkException} import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils +import com.google.common.collect.{BiMap, HashBiMap} /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -69,14 +70,18 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveIdsWithExecutors = new HashSet[String] - val taskIdToSlaveId = new HashMap[Int, String] + val taskIdToSlaveId = HashBiMap.create[Int, String]() + val failuresBySlaveId = new HashMap[String, Int] // How many times tasks on each slave failed + val pendingRemovedSlaveIds = new HashSet[String] val extraCoresPerSlave = conf.getInt("spark.mesos.extra.cores", 0) var nextMesosTaskId = 0 + var executorLimit: Option[Int] = None + @volatile var appId: String = _ def newMesosTaskId(): Int = { @@ -153,8 +158,8 @@ private[spark] class CoarseMesosSchedulerBackend( if (uri == null) { val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath command.setValue( - "%s \"%s\" org.apache.spark.executor.CoarseGrainedMesosExecutorBackend " + - "%s %s %s %d %s".format( + ("%s \"%s\" org.apache.spark.executor.CoarseGrainedMesosExecutorBackend " + + "%s %s %s %d %s").format( prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId)) } else { @@ -204,10 +209,11 @@ private[spark] class CoarseMesosSchedulerBackend( val filters = Filters.newBuilder().setRefuseSeconds(-1).build() for (offer <- offers) { - val slaveId = offer.getSlaveId.toString + val slaveId = offer.getSlaveId.getValue val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt - if (totalCoresAcquired < maxCores && + if (taskIdToSlaveId.size < executorLimit.getOrElse(Int.MaxValue) && + totalCoresAcquired < maxCores && mem >= MemoryUtils.calculateTotalMemory(sc) && cpus >= 1 && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && @@ -231,9 +237,7 @@ private[spark] class CoarseMesosSchedulerBackend( d.launchTasks( Collections.singleton(offer.getId), Collections.singletonList(task), filters) } else { - // Filter it out - d.launchTasks( - Collections.singleton(offer.getId), Collections.emptyList[MesosTaskInfo](), filters) + d.declineOffer(offer.getId) } } } @@ -273,6 +277,7 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveId = taskIdToSlaveId(taskId) slaveIdsWithExecutors -= slaveId taskIdToSlaveId -= taskId + pendingRemovedSlaveIds -= slaveId // Remove the cores we have remembered for this task, if it's in the hashmap for (cores <- coresByTaskId.get(taskId)) { totalCoresAcquired -= cores @@ -311,6 +316,11 @@ private[spark] class CoarseMesosSchedulerBackend( if (slaveIdsWithExecutors.contains(slaveId.getValue)) { // Note that the slave ID corresponds to the executor ID on that slave slaveIdsWithExecutors -= slaveId.getValue + val slaveIdToTaskId: BiMap[String, Int] = taskIdToSlaveId.inverse() + if (slaveIdToTaskId.contains(slaveId.getValue)) { + taskIdToSlaveId.remove(slaveIdToTaskId.get(slaveId.getValue)) + } + pendingRemovedSlaveIds -= slaveId.getValue removeExecutor(slaveId.getValue, "Mesos slave lost") } } @@ -327,4 +337,39 @@ private[spark] class CoarseMesosSchedulerBackend( super.applicationId } + override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { + // We don't truly know if we can fulfill the full amount of executors + // since at coarse grain it depends on the amount of slaves available. + logInfo("Capping the total amount of executors to " + requestedTotal) + executorLimit = Option(requestedTotal); + true + } + + override def doKillExecutors(executorIds: Seq[String]): Boolean = { + if (driver == null) { + logWarning("Asked to kill executors before the executor was started.") + return false + } + + val slaveIdToTaskId = taskIdToSlaveId.inverse() + for (executorId <- executorIds) { + if (slaveIdToTaskId.contains(executorId)) { + driver.killTask( + TaskID.newBuilder().setValue(slaveIdToTaskId.get(executorId).toString).build) + pendingRemovedSlaveIds += executorId + } else { + logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler") + } + } + + assert(pendingRemovedSlaveIds.size <= taskIdToSlaveId.size) + + // We cannot simply decrement from the existing executor limit as we may not able to + // launch as much executors as the limit. But we assume if we are notified to kill + // executors, that means the scheduler wants to set the limit that is less than + // the amount of the executors that has been launched. Therefore, we take the existing + // amount of executors launched and deduct the executors killed as the new limit. + executorLimit = Option(taskIdToSlaveId.size - pendingRemovedSlaveIds.size) + true + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala new file mode 100644 index 0000000000000..945e561098a59 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.mesos + +import org.scalatest.FunSuite +import org.apache.spark.{SparkEnv, SparkConf, SparkContext, LocalSparkContext} +import org.scalatest.mock.EasyMockSugar +import org.apache.spark.scheduler.cluster.mesos.{MemoryUtils, CoarseMesosSchedulerBackend} +import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.Value.Scalar +import org.easymock.EasyMock +import org.apache.mesos.SchedulerDriver +import org.apache.spark.scheduler.TaskSchedulerImpl +import scala.collection.mutable +import akka.actor.ActorSystem +import java.util.Collections + +class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { + def createOffer(id: Int, mem: Int, cpu: Int) = { + val builder = Offer.newBuilder() + builder.addResourcesBuilder() + .setName("mem") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(mem)) + builder.addResourcesBuilder() + .setName("cpus") + .setType(Value.Type.SCALAR) + .setScalar(Scalar.newBuilder().setValue(cpu)) + builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() + } + + test("mesos supports killing and limiting executors") { + val driver = EasyMock.createMock(classOf[SchedulerDriver]) + val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + + val se = EasyMock.createMock(classOf[SparkEnv]) + val actorSystem = EasyMock.createMock(classOf[ActorSystem]) + val sparkConf = new SparkConf + EasyMock.expect(se.actorSystem).andReturn(actorSystem) + EasyMock.replay(se) + val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() + EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() + EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() + EasyMock.expect(sc.conf).andReturn(sparkConf).anyTimes() + EasyMock.expect(sc.env).andReturn(se) + EasyMock.replay(sc) + + EasyMock.expect(taskScheduler.sc).andReturn(sc) + EasyMock.replay(taskScheduler) + + sparkConf.set("spark.driver.host", "driverHost") + sparkConf.set("spark.driver.port", "1234") + + val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + val minCpu = 4 + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(createOffer(1, minMem, minCpu)) + + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)), + EasyMock.anyObject(), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)).once + + EasyMock.expect( + driver.killTask(TaskID.newBuilder().setValue("0").build()) + ).andReturn(Status.valueOf(1)) + + EasyMock.replay(driver) + + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") + backend.driver = driver + backend.resourceOffers(driver, mesosOffers) + + assert(backend.doKillExecutors(Seq("s1"))) + EasyMock.verify(driver) + assert(backend.executorLimit.get.equals(0)) + + val mesosOffers2 = new java.util.ArrayList[Offer] + mesosOffers2.add(createOffer(2, minMem, minCpu)) + backend.resourceOffers(driver, mesosOffers) + // Verify we didn't launch any new executor + assert(backend.slaveIdsWithExecutors.size.equals(1)) + assert(backend.pendingRemovedSlaveIds.size.equals(1)) + + backend.doRequestTotalExecutors(2) + backend.resourceOffers(driver, mesosOffers) + assert(backend.slaveIdsWithExecutors.size.equals(2)) + backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build()) + assert(backend.slaveIdsWithExecutors.size.equals(1)) + assert(backend.pendingRemovedSlaveIds.size.equals(0)) + } +} From daeee340840596e3fe599433e7fcf191597ce9a8 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 16 Dec 2014 17:36:56 -0800 Subject: [PATCH 03/15] Propogate the shuffle service setting. --- .../deploy/worker/StandaloneWorkerShuffleService.scala | 1 + .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 6 ++++++ 2 files changed, 7 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index d61019ca816fa..69c64811a0d5b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -54,6 +54,7 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl") server = transportContext.createServer(port) } + logInfo(s"Skip launching shuffle service as it's not enabled.") } def stop() { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 8c9c1e1108de9..49b04afec0d18 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -140,6 +140,12 @@ private[spark] class CoarseMesosSchedulerBackend( .setValue(extraJavaOpts) .build()) + environment.addVariables( + Environment.Variable.newBuilder() + .setName("spark.shuffle.service.enabled") + .setValue(conf.getBoolean("spark.shuffle.service.enabled", false).toString) + .build()) + sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() .setName(key) From dbbedc921b60278c7d2a55650db3f7ec1bc31244 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Fri, 19 Dec 2014 17:27:42 -0800 Subject: [PATCH 04/15] Implement a new executor for coarse grained mesos mode. --- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../CoarseGrainedExecutorBackend.scala | 20 +- .../CoarseGrainedMesosExecutorBackend.scala | 200 ++++++++++++++---- .../mesos/CoarseMesosSchedulerBackend.scala | 170 ++++++++++----- .../CoarseMesosSchedulerBackendSuite.scala | 6 +- 5 files changed, 286 insertions(+), 114 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 0215d3f73cc64..2afb1c96c6f8f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1002,7 +1002,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { - assert(master.contains("mesos") || assert(master.contains("yarn") || dynamicAllocationTesting, + assert(master.contains("mesos") || master.contains("yarn") || dynamicAllocationTesting, "Requesting executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => @@ -1020,7 +1020,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { - assert(master.contains("mesos") || assert(master.contains("yarn") || dynamicAllocationTesting, + assert(master.contains("mesos") || master.contains("yarn") || dynamicAllocationTesting, "Killing executors is currently only supported in YARN mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 94d0f71b0a6e4..9a4adfbbb3d71 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -33,7 +33,7 @@ import org.apache.spark.scheduler.TaskDescription import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages._ import org.apache.spark.util.{ActorLogReceive, AkkaUtils, SignalLogger, Utils} -protected[spark] class CoarseGrainedExecutorBackend( +private[spark] class CoarseGrainedExecutorBackend( driverUrl: String, executorId: String, hostPort: String, @@ -99,15 +99,15 @@ protected[spark] class CoarseGrainedExecutorBackend( } } -trait CoarseGrainedExecutorBackendRunner extends Logging { - def run[T <: CoarseGrainedExecutorBackend]( +private[spark] object CoarseGrainedExecutorBackend extends Logging { + + private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, - workerUrl: Option[String], - klass: Class[T]) { + workerUrl: Option[String]) { SignalLogger.register(log) @@ -148,10 +148,6 @@ trait CoarseGrainedExecutorBackendRunner extends Logging { env.actorSystem.awaitTermination() } } -} - -protected[spark] object CoarseGrainedExecutorBackend - extends CoarseGrainedExecutorBackendRunner { def main(args: Array[String]) { args.length match { @@ -165,11 +161,9 @@ protected[spark] object CoarseGrainedExecutorBackend // NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode) // and CoarseMesosSchedulerBackend (for mesos mode). case 5 => - run(args(0), args(1), args(2), args(3).toInt, args(4), None, - classOf[CoarseGrainedExecutorBackend]) + run(args(0), args(1), args(2), args(3).toInt, args(4), None) case x if x > 5 => - run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)), - classOf[CoarseGrainedExecutorBackend]) + run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5))) } } } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala index d52e5c3850e19..d98f5fb16f474 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala @@ -17,57 +17,173 @@ package org.apache.spark.executor -import org.apache.spark.executor.{CoarseGrainedExecutorBackendRunner, CoarseGrainedExecutorBackend} -import org.apache.spark.SecurityManager -import org.apache.spark.SparkConf +import org.apache.spark.{SparkConf, Logging, SecurityManager} +import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver, MesosNativeLibrary} +import org.apache.spark.util.{Utils, SignalLogger} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.mesos.Protos._ import org.apache.spark.deploy.worker.StandaloneWorkerShuffleService -import akka.actor.ActorSystem - -private[spark] class CoarseGrainedMesosExecutorBackend( - driverUrl: String, - executorId: String, - hostPort: String, - cores: Int, - sparkProperties: Seq[(String, String)], - actorSystem: ActorSystem) - extends CoarseGrainedExecutorBackend(driverUrl, executorId, hostPort, - cores, sparkProperties, actorSystem) { - - lazy val shuffleService: StandaloneWorkerShuffleService = { - val executorConf = new SparkConf - new StandaloneWorkerShuffleService(executorConf, new SecurityManager(executorConf)) +import scala.collection.JavaConversions._ +import scala.io.Source +import java.io.PrintWriter + +/** + * The Coarse grained Mesos executor backend is responsible for launching the shuffle service + * and the CoarseGrainedExecutorBackend actor. + * This is assuming the scheduler detected that the shuffle service is enabled and launches + * this class instead of CoarseGrainedExecutorBackend directly. + */ +private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) + extends MesosExecutor + with Logging { + + private var shuffleService: StandaloneWorkerShuffleService = null + private var driver: ExecutorDriver = null + private var executorProc: Process = null + private var taskId: TaskID = null + + override def registered( + driver: ExecutorDriver, + executorInfo: ExecutorInfo, + frameworkInfo: FrameworkInfo, + slaveInfo: SlaveInfo) { + + this.driver = driver + logInfo("Coarse Grain Mesos Executor '" + executorInfo.getExecutorId.getValue + + "' is registered.") + + if (shuffleService == null) { + sparkConf.set("spark.shuffle.service.enabled", "true") + shuffleService = new StandaloneWorkerShuffleService(sparkConf, new SecurityManager(sparkConf)) + shuffleService.startIfEnabled() + } } - override def preStart() { - shuffleService.startIfEnabled() - super.preStart() + override def launchTask(d: ExecutorDriver, taskInfo: TaskInfo) { + if (executorProc != null) { + logError("Received LaunchTask while executor is already running") + val status = TaskStatus.newBuilder() + .setState(TaskState.TASK_FAILED) + .setMessage("Received LaunchTask while executor is already running") + .build() + d.sendStatusUpdate(status) + return + } + + // We are launching the CoarseGrainedExecutorBackend via subprocess + // because the backend is designed to run in its own process. + // Since it's a shared class we are preserving the existing behavior + // and launching it as a subprocess here. + val command = Utils.deserialize[String](taskInfo.getData().toByteArray) + val pb = new ProcessBuilder(command) + + val currentEnvVars = pb.environment() + for (variable <- taskInfo.getExecutor.getCommand.getEnvironment.getVariablesList()) { + currentEnvVars.put(variable.getName, variable.getValue) + } + + executorProc = pb.start() + + new Thread("stderr reader for task " + taskInfo.getTaskId.getValue) { + override def run() { + for (line <- Source.fromInputStream(executorProc.getErrorStream).getLines) { + System.err.println(line) + } + } + }.start() + + new Thread("stdout reader for task " + taskInfo.getTaskId.getValue) { + override def run() { + for (line <- Source.fromInputStream(executorProc.getInputStream).getLines) { + System.out.println(line) + } + } + }.start() + + new Thread("process waiter for mesos executor for task " + taskInfo.getTaskId.getValue) { + override def run() { + executorProc.waitFor() + val (state, msg) = if (executorProc.exitValue() == 0) { + (TaskState.TASK_FINISHED, "") + } else { + (TaskState.TASK_FAILED, "Exited with status: " + executorProc.exitValue().toString) + } + + // We leave the shuffle service running after the task. + cleanup(state, msg) + } + }.start() + + taskId = taskInfo.getTaskId } - override def postStop() { - shuffleService.stop() - super.postStop() + override def error(d: ExecutorDriver, message: String) { + logError("Error from Mesos: " + message) } -} -private[spark] object CoarseGrainedMesosExecutorBackend - extends CoarseGrainedExecutorBackendRunner { + override def killTask(d: ExecutorDriver, t: TaskID) { + if (executorProc == null) { + logError("Received killtask when no process is initialized") + return + } + + // We only destroy the coarse grained executor but leave the shuffle + // service running for other tasks that might be reusing this executor. + // This is no-op if the process already finished. + executorProc.destroy() + cleanup(TaskState.TASK_KILLED) + } + + def cleanup(state: TaskState, msg: String = ""): Unit = synchronized { + if (driver == null) { + logError("Cleaning up process but driver is not initialized") + return + } + + if (executorProc == null) { + logDebug("Process is not started or already cleaned up") + return + } + + assert(taskId != null) + + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setState(state) + .setMessage(msg) + .setTaskId(taskId) + .build) + + executorProc = null + taskId = null + } + + override def reregistered(d: ExecutorDriver, p2: SlaveInfo) {} + + override def disconnected(d: ExecutorDriver) {} + + override def frameworkMessage(d: ExecutorDriver, data: Array[Byte]) {} + + override def shutdown(d: ExecutorDriver) { + if (executorProc != null) { + killTask(d, taskId) + } + + if (shuffleService != null) { + shuffleService.stop() + shuffleService = null + } + } +} +private[spark] object CoarseGrainedMesosExecutorBackend extends Logging { def main(args: Array[String]) { - args.length match { - case x if x < 5 => - System.err.println( - // Worker url is used in spark standalone mode to enforce fate-sharing with worker - "Usage: CoarseGrainedMesosExecutorBackend " + - " [] ") - System.exit(1) - - // NB: These arguments are provided by CoarseMesosSchedulerBackend (for mesos mode). - case 5 => - run(args(0), args(1), args(2), args(3).toInt, args(4), None, - classOf[CoarseGrainedMesosExecutorBackend]) - case x if x > 5 => - run(args(0), args(1), args(2), args(3).toInt, args(4), Some(args(5)), - classOf[CoarseGrainedMesosExecutorBackend]) + SignalLogger.register(log) + SparkHadoopUtil.get.runAsSparkUser { () => + MesosNativeLibrary.load() + val sparkConf = new SparkConf() + // Create a new Executor and start it running + val runner = new CoarseGrainedMesosExecutorBackend(sparkConf) + new MesosExecutorDriver(runner).run() } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 49b04afec0d18..53c1b473e4e00 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -33,6 +33,7 @@ import org.apache.spark.scheduler.TaskSchedulerImpl import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import com.google.common.collect.{BiMap, HashBiMap} +import org.apache.mesos.protobuf.ByteString /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -52,7 +53,7 @@ private[spark] class CoarseMesosSchedulerBackend( with MScheduler with Logging { - val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures + val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures // Lock used to wait for scheduler to be registered var isRegistered = false @@ -62,13 +63,13 @@ private[spark] class CoarseMesosSchedulerBackend( var driver: SchedulerDriver = null // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] var totalCoresAcquired = 0 - val slaveIdsWithExecutors = new HashSet[String] + val slaveIdsWithTasks = new HashSet[String] val taskIdToSlaveId = HashBiMap.create[Int, String]() @@ -96,6 +97,7 @@ private[spark] class CoarseMesosSchedulerBackend( synchronized { new Thread("CoarseMesosSchedulerBackend driver") { setDaemon(true) + override def run() { val scheduler = CoarseMesosSchedulerBackend.this val fwInfo = FrameworkInfo.newBuilder().setUser(sc.sparkUser).setName(sc.appName).build() @@ -114,12 +116,27 @@ private[spark] class CoarseMesosSchedulerBackend( } } - def createCommand(offer: Offer, numCores: Int): CommandInfo = { - val executorSparkHome = conf.getOption("spark.mesos.executor.home") - .orElse(sc.getSparkHome()) - .getOrElse { - throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") - } + // Set the environment variable through a command prefix + // to append to the existing value of the variable + lazy val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p => + Utils.libraryPathEnvPrefix(Seq(p)) + }.getOrElse("") + + lazy val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( + SparkEnv.driverActorSystemName, + conf.get("spark.driver.host"), + conf.get("spark.driver.port"), + CoarseGrainedSchedulerBackend.ACTOR_NAME) + + lazy val executorUri = conf.get("spark.executor.uri", null) + + lazy val executorSparkHome = conf.getOption("spark.mesos.executor.home") + .orElse(sc.getSparkHome()) + .getOrElse { + throw new SparkException("Executor Spark home `spark.mesos.executor.home` is not set!") + } + + def createBaseCommand(offer: Offer): CommandInfo.Builder = { val environment = Environment.newBuilder() val extraClassPath = conf.getOption("spark.executor.extraClassPath") extraClassPath.foreach { cp => @@ -128,24 +145,12 @@ private[spark] class CoarseMesosSchedulerBackend( } val extraJavaOpts = conf.get("spark.executor.extraJavaOptions", "") - // Set the environment variable through a command prefix - // to append to the existing value of the variable - val prefixEnv = conf.getOption("spark.executor.extraLibraryPath").map { p => - Utils.libraryPathEnvPrefix(Seq(p)) - }.getOrElse("") - environment.addVariables( Environment.Variable.newBuilder() .setName("SPARK_EXECUTOR_OPTS") .setValue(extraJavaOpts) .build()) - environment.addVariables( - Environment.Variable.newBuilder() - .setName("spark.shuffle.service.enabled") - .setValue(conf.getBoolean("spark.shuffle.service.enabled", false).toString) - .build()) - sc.executorEnvs.foreach { case (key, value) => environment.addVariables(Environment.Variable.newBuilder() .setName(key) @@ -154,32 +159,56 @@ private[spark] class CoarseMesosSchedulerBackend( } val command = CommandInfo.newBuilder() .setEnvironment(environment) - val driverUrl = "akka.tcp://%s@%s:%s/user/%s".format( - SparkEnv.driverActorSystemName, - conf.get("spark.driver.host"), - conf.get("spark.driver.port"), - CoarseGrainedSchedulerBackend.ACTOR_NAME) - - val uri = conf.get("spark.executor.uri", null) - if (uri == null) { + + if (executorUri != null) { + command.addUris(CommandInfo.URI.newBuilder().setValue(executorUri)) + } + + command + } + + def createTaskCommandString(offer: Offer, numCores: Int) = { + if (executorUri == null) { val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath - command.setValue( - ("%s \"%s\" org.apache.spark.executor.CoarseGrainedMesosExecutorBackend " + + ("%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend " + "%s %s %s %d %s").format( prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue, - offer.getHostname, numCores, appId)) + offer.getHostname, numCores, appId) + } else { + // Grab everything to the first '.'. We'll use that and '*' to + // glob the directory "correctly". + val basename = executorUri.split('/').last.split('.').head + ("cd %s*; %s " + + "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend " + + "%s %s %s %d %s").format( + basename, prefixEnv, driverUrl, offer.getSlaveId.getValue, + offer.getHostname, numCores, appId) + } + } + + def createTaskCommand(offer: Offer, numCores: Int): CommandInfo = { + val command = createBaseCommand(offer) + command.setValue(createTaskCommandString(offer, numCores)).build + } + + def createExecutorCommand(offer: Offer): CommandInfo = { + val command = createBaseCommand(offer) + + if (executorUri == null) { + val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath + command.setValue( + ("%s \"%s\" org.apache.spark.executor.CoarseGrainedMesosExecutorBackend").format( + prefixEnv, runScript)) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". - val basename = uri.split('/').last.split('.').head + val basename = executorUri.split('/').last.split('.').head command.setValue( ("cd %s*; %s " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedMesosExecutorBackend " + - "%s %s %s %d %s").format( - basename, prefixEnv, driverUrl, offer.getSlaveId.getValue, - offer.getHostname, numCores, appId)) - command.addUris(CommandInfo.URI.newBuilder().setValue(uri)) + "./bin/spark-class org.apache.spark.executor.CoarseGrainedMesosExecutorBackend ") + .format(basename, prefixEnv)) } + command.build() } @@ -206,6 +235,25 @@ private[spark] class CoarseMesosSchedulerBackend( override def reregistered(d: SchedulerDriver, masterInfo: MasterInfo) {} + lazy val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false) + + lazy val shuffleServiceMem = if (shuffleServiceEnabled) { + conf.getInt("spark.mesos.shuffle.service.mem", 1024) + } else { + 0 + } + + lazy val shuffleServiceCpu = if (shuffleServiceEnabled) { + conf.getInt("spark.mesos.shuffle.service.cpu", 1) + } else { + 0 + } + + def minMemRequired = MemoryUtils.calculateTotalMemory(sc) + shuffleServiceMem.toDouble + + + def minCpuRequired = 1 + shuffleServiceCpu + /** * Method called by Mesos to offer resources on slaves. We respond by launching an executor, * unless we've already launched more than we wanted to. @@ -219,29 +267,43 @@ private[spark] class CoarseMesosSchedulerBackend( val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt if (taskIdToSlaveId.size < executorLimit.getOrElse(Int.MaxValue) && - totalCoresAcquired < maxCores && - mem >= MemoryUtils.calculateTotalMemory(sc) && - cpus >= 1 && - failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithExecutors.contains(slaveId)) { + totalCoresAcquired < maxCores && + mem >= minMemRequired && + cpus >= minCpuRequired && + failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && + !slaveIdsWithTasks.contains(slaveId)) { // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() taskIdToSlaveId(taskId) = slaveId - slaveIdsWithExecutors += slaveId + slaveIdsWithTasks += slaveId coresByTaskId(taskId) = cpusToUse - val task = MesosTaskInfo.newBuilder() + val builder = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) - .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave)) .setName("Task " + taskId) - .addResources(createResource("cpus", cpusToUse)) - .addResources(createResource("mem", + + if (!shuffleServiceEnabled) { + builder.setCommand(createTaskCommand(offer, cpusToUse + extraCoresPerSlave)) + .addResources(createResource("cpus", cpusToUse)) + .addResources(createResource("mem", MemoryUtils.calculateTotalMemory(sc))) - .build() + } else { + builder.setExecutor( + ExecutorInfo.newBuilder() + .addResources(createResource("cpus", shuffleServiceCpu)) + .addResources(createResource("mem", shuffleServiceMem)) + .setCommand(createExecutorCommand(offer)).build() + ) + + builder.setData( + ByteString.copyFrom( + Utils.serialize(createTaskCommandString(offer, cpusToUse - shuffleServiceCpu)))) + } + d.launchTasks( - Collections.singleton(offer.getId), Collections.singletonList(task), filters) + Collections.singleton(offer.getId), Collections.singletonList(builder.build()), filters) } else { d.declineOffer(offer.getId) } @@ -281,7 +343,7 @@ private[spark] class CoarseMesosSchedulerBackend( synchronized { if (isFinished(state)) { val slaveId = taskIdToSlaveId(taskId) - slaveIdsWithExecutors -= slaveId + slaveIdsWithTasks -= slaveId taskIdToSlaveId -= taskId pendingRemovedSlaveIds -= slaveId // Remove the cores we have remembered for this task, if it's in the hashmap @@ -294,7 +356,7 @@ private[spark] class CoarseMesosSchedulerBackend( failuresBySlaveId(slaveId) = failuresBySlaveId.getOrElse(slaveId, 0) + 1 if (failuresBySlaveId(slaveId) >= MAX_SLAVE_FAILURES) { logInfo("Blacklisting Mesos slave " + slaveId + " due to too many failures; " + - "is Spark installed on it?") + "is Spark installed on it?") } } driver.reviveOffers() // In case we'd rejected everything before but have now lost a node @@ -319,9 +381,9 @@ private[spark] class CoarseMesosSchedulerBackend( override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { logInfo("Mesos slave lost: " + slaveId.getValue) synchronized { - if (slaveIdsWithExecutors.contains(slaveId.getValue)) { + if (slaveIdsWithTasks.contains(slaveId.getValue)) { // Note that the slave ID corresponds to the executor ID on that slave - slaveIdsWithExecutors -= slaveId.getValue + slaveIdsWithTasks -= slaveId.getValue val slaveIdToTaskId: BiMap[String, Int] = taskIdToSlaveId.inverse() if (slaveIdToTaskId.contains(slaveId.getValue)) { taskIdToSlaveId.remove(slaveIdToTaskId.get(slaveId.getValue)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala index 945e561098a59..796cc84f4c0a0 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -100,14 +100,14 @@ class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext w mesosOffers2.add(createOffer(2, minMem, minCpu)) backend.resourceOffers(driver, mesosOffers) // Verify we didn't launch any new executor - assert(backend.slaveIdsWithExecutors.size.equals(1)) + assert(backend.slaveIdsWithTasks.size.equals(1)) assert(backend.pendingRemovedSlaveIds.size.equals(1)) backend.doRequestTotalExecutors(2) backend.resourceOffers(driver, mesosOffers) - assert(backend.slaveIdsWithExecutors.size.equals(2)) + assert(backend.slaveIdsWithTasks.size.equals(2)) backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build()) - assert(backend.slaveIdsWithExecutors.size.equals(1)) + assert(backend.slaveIdsWithTasks.size.equals(1)) assert(backend.pendingRemovedSlaveIds.size.equals(0)) } } From 07ef8a085bb4000ee0c962c3d2a03a3d1abc8188 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 23 Dec 2014 21:28:09 -0800 Subject: [PATCH 05/15] Launch executor with shell and add traces --- .../StandaloneWorkerShuffleService.scala | 3 ++- .../CoarseGrainedMesosExecutorBackend.scala | 18 +++++++++++++++--- .../mesos/CoarseMesosSchedulerBackend.scala | 16 +++++++++++++--- 3 files changed, 30 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index 69c64811a0d5b..c167b012746f2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -53,8 +53,9 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu require(server == null, "Shuffle server already started") logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl") server = transportContext.createServer(port) + } else { + logInfo(s"Skip launching shuffle service as it's not enabled.") } - logInfo(s"Skip launching shuffle service as it's not enabled.") } def stop() { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala index d98f5fb16f474..ad06239894211 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala @@ -25,7 +25,7 @@ import org.apache.mesos.Protos._ import org.apache.spark.deploy.worker.StandaloneWorkerShuffleService import scala.collection.JavaConversions._ import scala.io.Source -import java.io.PrintWriter +import java.io.{File, PrintWriter} /** * The Coarse grained Mesos executor backend is responsible for launching the shuffle service @@ -63,6 +63,8 @@ private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) if (executorProc != null) { logError("Received LaunchTask while executor is already running") val status = TaskStatus.newBuilder() + .setTaskId(taskInfo.getTaskId) + .setSlaveId(taskInfo.getSlaveId) .setState(TaskState.TASK_FAILED) .setMessage("Received LaunchTask while executor is already running") .build() @@ -75,7 +77,10 @@ private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) // Since it's a shared class we are preserving the existing behavior // and launching it as a subprocess here. val command = Utils.deserialize[String](taskInfo.getData().toByteArray) - val pb = new ProcessBuilder(command) + + // Mesos only work on linux platforms, as mesos command executor also + // executes with /bin/sh -c, we assume this will also work under mesos execution. + val pb = new ProcessBuilder(List("/bin/sh", "-c", command)) val currentEnvVars = pb.environment() for (variable <- taskInfo.getExecutor.getCommand.getEnvironment.getVariablesList()) { @@ -122,11 +127,18 @@ private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) } override def killTask(d: ExecutorDriver, t: TaskID) { - if (executorProc == null) { + if (taskId == null) { logError("Received killtask when no process is initialized") return } + if (!taskId.getValue.equals(t.getValue)) { + logError("Asked to kill task '" + t.getValue + "' but executor is running task '" + + taskId.getValue + "'") + return + } + + assert(executorProc != null) // We only destroy the coarse grained executor but leave the shuffle // service running for other tasks that might be reusing this executor. // This is no-op if the process already finished. diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 53c1b473e4e00..52d96b3c22f5f 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -263,9 +263,12 @@ private[spark] class CoarseMesosSchedulerBackend( val filters = Filters.newBuilder().setRefuseSeconds(-1).build() for (offer <- offers) { + val slaveId = offer.getSlaveId.getValue val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt + logTrace("Received offer id: " + offer.getId.getValue + ", cpu: " + cpus.toString + + ", mem: " + mem.toString) if (taskIdToSlaveId.size < executorLimit.getOrElse(Int.MaxValue) && totalCoresAcquired < maxCores && mem >= minMemRequired && @@ -290,18 +293,25 @@ private[spark] class CoarseMesosSchedulerBackend( .addResources(createResource("mem", MemoryUtils.calculateTotalMemory(sc))) } else { + val taskCpus = cpusToUse - shuffleServiceCpu + val taskMem = mem - shuffleServiceMem builder.setExecutor( ExecutorInfo.newBuilder() + .setExecutorId(ExecutorID.newBuilder().setValue(slaveId).build) .addResources(createResource("cpus", shuffleServiceCpu)) .addResources(createResource("mem", shuffleServiceMem)) - .setCommand(createExecutorCommand(offer)).build() - ) + .setCommand(createExecutorCommand(offer)).build()) + .addResources(createResource("cpus", taskCpus)) + .addResources(createResource("mem", taskMem)) builder.setData( ByteString.copyFrom( - Utils.serialize(createTaskCommandString(offer, cpusToUse - shuffleServiceCpu)))) + Utils.serialize(createTaskCommandString(offer, taskCpus)))) } + logTrace("Launching task with offer id: " + offer.getId.getValue + + ", task: " + builder.build()) + d.launchTasks( Collections.singleton(offer.getId), Collections.singletonList(builder.build()), filters) } else { From 74b23577b31fa80e6fd067b0ddfc8dc6cebda807 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Tue, 30 Dec 2014 00:12:42 -0800 Subject: [PATCH 06/15] Fix destroying executor. --- .../CoarseGrainedMesosExecutorBackend.scala | 29 +++++--- .../mesos/CoarseMesosSchedulerBackend.scala | 73 +++++++++++-------- 2 files changed, 62 insertions(+), 40 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala index ad06239894211..18a62af39aefa 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala @@ -41,16 +41,16 @@ private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) private var driver: ExecutorDriver = null private var executorProc: Process = null private var taskId: TaskID = null + @volatile var killed = false override def registered( driver: ExecutorDriver, executorInfo: ExecutorInfo, frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo) { - this.driver = driver logInfo("Coarse Grain Mesos Executor '" + executorInfo.getExecutorId.getValue + - "' is registered.") + "' is registered.") if (shuffleService == null) { sparkConf.set("spark.shuffle.service.enabled", "true") @@ -72,15 +72,20 @@ private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) return } + killed = false + + logInfo("Launching task id: " + taskInfo.getTaskId.getValue) + // We are launching the CoarseGrainedExecutorBackend via subprocess // because the backend is designed to run in its own process. // Since it's a shared class we are preserving the existing behavior // and launching it as a subprocess here. - val command = Utils.deserialize[String](taskInfo.getData().toByteArray) + val command = "exec " + Utils.deserialize[String](taskInfo.getData().toByteArray) + + logInfo("Running command: " + command) - // Mesos only work on linux platforms, as mesos command executor also - // executes with /bin/sh -c, we assume this will also work under mesos execution. - val pb = new ProcessBuilder(List("/bin/sh", "-c", command)) + // Mesos only work on linux platforms, so we assume bash is available is Mesos is used. + val pb = new ProcessBuilder("/bin/bash", "-c", command) val currentEnvVars = pb.environment() for (variable <- taskInfo.getExecutor.getCommand.getEnvironment.getVariablesList()) { @@ -105,15 +110,21 @@ private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) } }.start() + driver.sendStatusUpdate(TaskStatus.newBuilder() + .setState(TaskState.TASK_RUNNING) + .setTaskId(taskInfo.getTaskId) + .build) + new Thread("process waiter for mesos executor for task " + taskInfo.getTaskId.getValue) { override def run() { executorProc.waitFor() - val (state, msg) = if (executorProc.exitValue() == 0) { + val (state, msg) = if (killed) { + (TaskState.TASK_KILLED, "") + } else if (executorProc.exitValue() == 0) { (TaskState.TASK_FINISHED, "") } else { (TaskState.TASK_FAILED, "Exited with status: " + executorProc.exitValue().toString) } - // We leave the shuffle service running after the task. cleanup(state, msg) } @@ -139,11 +150,11 @@ private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) } assert(executorProc != null) + killed = true // We only destroy the coarse grained executor but leave the shuffle // service running for other tasks that might be reusing this executor. // This is no-op if the process already finished. executorProc.destroy() - cleanup(TaskState.TASK_KILLED) } def cleanup(state: TaskState, msg: String = ""): Unit = synchronized { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 52d96b3c22f5f..af0721696869e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -34,6 +34,7 @@ import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend import org.apache.spark.util.Utils import com.google.common.collect.{BiMap, HashBiMap} import org.apache.mesos.protobuf.ByteString +import java.util.concurrent.locks.ReentrantLock /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds @@ -83,6 +84,11 @@ private[spark] class CoarseMesosSchedulerBackend( var executorLimit: Option[Int] = None + // Introducing a new lock for protecting above shared state. We avoid locking on the class object + // as it can result in deadlock when synchronized method in this class calls a parent's method + // that is also trying to lock on the class object. + var stateLock = new ReentrantLock() + @volatile var appId: String = _ def newMesosTaskId(): Int = { @@ -167,23 +173,26 @@ private[spark] class CoarseMesosSchedulerBackend( command } - def createTaskCommandString(offer: Offer, numCores: Int) = { - if (executorUri == null) { + def createTaskCommandString( + offer: Offer, + numCores: Int, + executorUri: String = executorUri, + sparkHome: String = null): String = { + val sparkClassCommand = if (executorUri == null && sparkHome == null) { + "./bin/spark-class " + } else if (executorUri == null) { val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath - ("%s \"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend " + - "%s %s %s %d %s").format( - prefixEnv, runScript, driverUrl, offer.getSlaveId.getValue, - offer.getHostname, numCores, appId) + "%s \"%s\" ".format(prefixEnv, runScript) } else { // Grab everything to the first '.'. We'll use that and '*' to // glob the directory "correctly". val basename = executorUri.split('/').last.split('.').head - ("cd %s*; %s " + - "./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend " + - "%s %s %s %d %s").format( - basename, prefixEnv, driverUrl, offer.getSlaveId.getValue, - offer.getHostname, numCores, appId) + "cd %s*; %s ./bin/spark-class ".format(basename, prefixEnv) } + sparkClassCommand + + "org.apache.spark.executor.CoarseGrainedExecutorBackend " + + "%s %s %s %d %s".format( + driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId) } def createTaskCommand(offer: Offer, numCores: Int): CommandInfo = { @@ -259,11 +268,10 @@ private[spark] class CoarseMesosSchedulerBackend( * unless we've already launched more than we wanted to. */ override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { - synchronized { + stateLock.synchronized { val filters = Filters.newBuilder().setRefuseSeconds(-1).build() for (offer <- offers) { - val slaveId = offer.getSlaveId.getValue val mem = getResource(offer.getResourcesList, "mem") val cpus = getResource(offer.getResourcesList, "cpus").toInt @@ -306,7 +314,7 @@ private[spark] class CoarseMesosSchedulerBackend( builder.setData( ByteString.copyFrom( - Utils.serialize(createTaskCommandString(offer, taskCpus)))) + Utils.serialize(createTaskCommandString(offer, taskCpus, null, null)))) } logTrace("Launching task with offer id: " + offer.getId.getValue + @@ -350,12 +358,9 @@ private[spark] class CoarseMesosSchedulerBackend( val taskId = status.getTaskId.getValue.toInt val state = status.getState logInfo("Mesos task " + taskId + " is now " + state) - synchronized { + stateLock.synchronized { if (isFinished(state)) { val slaveId = taskIdToSlaveId(taskId) - slaveIdsWithTasks -= slaveId - taskIdToSlaveId -= taskId - pendingRemovedSlaveIds -= slaveId // Remove the cores we have remembered for this task, if it's in the hashmap for (cores <- coresByTaskId.get(taskId)) { totalCoresAcquired -= cores @@ -369,6 +374,8 @@ private[spark] class CoarseMesosSchedulerBackend( "is Spark installed on it?") } } + + executorTerminated(d, status.getSlaveId, "Executor finished with state: " + state) driver.reviveOffers() // In case we'd rejected everything before but have now lost a node } } @@ -388,19 +395,23 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} + def executorTerminated(d: SchedulerDriver, slaveId: SlaveID, reason: String) { + if (slaveIdsWithTasks.contains(slaveId.getValue)) { + // Note that the slave ID corresponds to the executor ID on that slave + slaveIdsWithTasks -= slaveId.getValue + val slaveIdToTaskId: BiMap[String, Int] = taskIdToSlaveId.inverse() + if (slaveIdToTaskId.contains(slaveId.getValue)) { + taskIdToSlaveId.remove(slaveIdToTaskId.get(slaveId.getValue)) + } + pendingRemovedSlaveIds -= slaveId.getValue + removeExecutor(slaveId.getValue, reason) + } + } + override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { logInfo("Mesos slave lost: " + slaveId.getValue) - synchronized { - if (slaveIdsWithTasks.contains(slaveId.getValue)) { - // Note that the slave ID corresponds to the executor ID on that slave - slaveIdsWithTasks -= slaveId.getValue - val slaveIdToTaskId: BiMap[String, Int] = taskIdToSlaveId.inverse() - if (slaveIdToTaskId.contains(slaveId.getValue)) { - taskIdToSlaveId.remove(slaveIdToTaskId.get(slaveId.getValue)) - } - pendingRemovedSlaveIds -= slaveId.getValue - removeExecutor(slaveId.getValue, "Mesos slave lost") - } + stateLock.synchronized { + executorTerminated(d, slaveId, "Mesos slave lost: " + slaveId.getValue) } } @@ -415,7 +426,7 @@ private[spark] class CoarseMesosSchedulerBackend( super.applicationId } - override def doRequestTotalExecutors(requestedTotal: Int): Boolean = { + override def doRequestTotalExecutors(requestedTotal: Int): Boolean = stateLock.synchronized { // We don't truly know if we can fulfill the full amount of executors // since at coarse grain it depends on the amount of slaves available. logInfo("Capping the total amount of executors to " + requestedTotal) @@ -423,7 +434,7 @@ private[spark] class CoarseMesosSchedulerBackend( true } - override def doKillExecutors(executorIds: Seq[String]): Boolean = { + override def doKillExecutors(executorIds: Seq[String]): Boolean = stateLock.synchronized { if (driver == null) { logWarning("Asked to kill executors before the executor was started.") return false From a85d73dfb0252b86055e9a90069a47a2116e0d17 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Wed, 31 Dec 2014 12:10:19 -0800 Subject: [PATCH 07/15] Only process status update if task is still tracked. --- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index af0721696869e..dd4c5cdf2b9b3 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -357,8 +357,13 @@ private[spark] class CoarseMesosSchedulerBackend( override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { val taskId = status.getTaskId.getValue.toInt val state = status.getState - logInfo("Mesos task " + taskId + " is now " + state) stateLock.synchronized { + if(!taskIdToSlaveId.containsKey(taskId)) { + return + } + + logInfo("Mesos task " + taskId + " is now " + state) + if (isFinished(state)) { val slaveId = taskIdToSlaveId(taskId) // Remove the cores we have remembered for this task, if it's in the hashmap From 3cab809ff2bb670a35034773d15759e40f27f5ca Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Wed, 31 Dec 2014 15:08:16 -0800 Subject: [PATCH 08/15] Generate new spark executor id on each launch and fix tests --- .../mesos/CoarseMesosSchedulerBackend.scala | 54 +++++++++++-------- .../CoarseMesosSchedulerBackendSuite.scala | 27 ++++++++-- 2 files changed, 54 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index dd4c5cdf2b9b3..81a497d7ddfc5 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -173,9 +173,12 @@ private[spark] class CoarseMesosSchedulerBackend( command } + def sparkExecutorId(slaveId: String, taskId: String) = "%s/%s".format(slaveId, taskId) + def createTaskCommandString( offer: Offer, numCores: Int, + taskId: String, executorUri: String = executorUri, sparkHome: String = null): String = { val sparkClassCommand = if (executorUri == null && sparkHome == null) { @@ -192,12 +195,13 @@ private[spark] class CoarseMesosSchedulerBackend( sparkClassCommand + "org.apache.spark.executor.CoarseGrainedExecutorBackend " + "%s %s %s %d %s".format( - driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId) + driverUrl, sparkExecutorId(offer.getSlaveId.getValue, taskId), + offer.getHostname, numCores, appId) } - def createTaskCommand(offer: Offer, numCores: Int): CommandInfo = { + def createTaskCommand(offer: Offer, numCores: Int, taskId: String): CommandInfo = { val command = createBaseCommand(offer) - command.setValue(createTaskCommandString(offer, numCores)).build + command.setValue(createTaskCommandString(offer, numCores, taskId)).build } def createExecutorCommand(offer: Offer): CommandInfo = { @@ -290,13 +294,15 @@ private[spark] class CoarseMesosSchedulerBackend( taskIdToSlaveId(taskId) = slaveId slaveIdsWithTasks += slaveId coresByTaskId(taskId) = cpusToUse + val taskIdString: String = taskId.toString val builder = MesosTaskInfo.newBuilder() - .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) + .setTaskId(TaskID.newBuilder().setValue(taskIdString).build()) .setSlaveId(offer.getSlaveId) .setName("Task " + taskId) if (!shuffleServiceEnabled) { - builder.setCommand(createTaskCommand(offer, cpusToUse + extraCoresPerSlave)) + builder + .setCommand(createTaskCommand(offer, cpusToUse + extraCoresPerSlave, taskIdString)) .addResources(createResource("cpus", cpusToUse)) .addResources(createResource("mem", MemoryUtils.calculateTotalMemory(sc))) @@ -314,7 +320,8 @@ private[spark] class CoarseMesosSchedulerBackend( builder.setData( ByteString.copyFrom( - Utils.serialize(createTaskCommandString(offer, taskCpus, null, null)))) + Utils.serialize( + createTaskCommandString(offer, taskCpus, taskIdString, null, null)))) } logTrace("Launching task with offer id: " + offer.getId.getValue + @@ -380,7 +387,7 @@ private[spark] class CoarseMesosSchedulerBackend( } } - executorTerminated(d, status.getSlaveId, "Executor finished with state: " + state) + executorTerminated(d, status.getSlaveId.getValue, "Executor finished with state: " + state) driver.reviveOffers() // In case we'd rejected everything before but have now lost a node } } @@ -400,24 +407,24 @@ private[spark] class CoarseMesosSchedulerBackend( override def frameworkMessage(d: SchedulerDriver, e: ExecutorID, s: SlaveID, b: Array[Byte]) {} - def executorTerminated(d: SchedulerDriver, slaveId: SlaveID, reason: String) { - if (slaveIdsWithTasks.contains(slaveId.getValue)) { - // Note that the slave ID corresponds to the executor ID on that slave - slaveIdsWithTasks -= slaveId.getValue - val slaveIdToTaskId: BiMap[String, Int] = taskIdToSlaveId.inverse() - if (slaveIdToTaskId.contains(slaveId.getValue)) { - taskIdToSlaveId.remove(slaveIdToTaskId.get(slaveId.getValue)) + def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String) { + stateLock.synchronized { + if (slaveIdsWithTasks.contains(slaveId)) { + slaveIdsWithTasks -= slaveId + val slaveIdToTaskId: BiMap[String, Int] = taskIdToSlaveId.inverse() + if (slaveIdToTaskId.contains(slaveId)) { + val taskId: Int = slaveIdToTaskId.get(slaveId) + taskIdToSlaveId.remove(taskId) + removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason) + } + pendingRemovedSlaveIds -= slaveId } - pendingRemovedSlaveIds -= slaveId.getValue - removeExecutor(slaveId.getValue, reason) } } override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { logInfo("Mesos slave lost: " + slaveId.getValue) - stateLock.synchronized { - executorTerminated(d, slaveId, "Mesos slave lost: " + slaveId.getValue) - } + executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) } override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { @@ -435,7 +442,7 @@ private[spark] class CoarseMesosSchedulerBackend( // We don't truly know if we can fulfill the full amount of executors // since at coarse grain it depends on the amount of slaves available. logInfo("Capping the total amount of executors to " + requestedTotal) - executorLimit = Option(requestedTotal); + executorLimit = Option(requestedTotal) true } @@ -447,10 +454,11 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveIdToTaskId = taskIdToSlaveId.inverse() for (executorId <- executorIds) { - if (slaveIdToTaskId.contains(executorId)) { + val slaveId = executorId.split("/")(0) + if (slaveIdToTaskId.contains(slaveId)) { driver.killTask( - TaskID.newBuilder().setValue(slaveIdToTaskId.get(executorId).toString).build) - pendingRemovedSlaveIds += executorId + TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build) + pendingRemovedSlaveIds += slaveId } else { logWarning("Unable to find executor Id '" + executorId + "' in Mesos scheduler") } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala index 796cc84f4c0a0..527242f528f56 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -86,28 +86,47 @@ class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext w driver.killTask(TaskID.newBuilder().setValue("0").build()) ).andReturn(Status.valueOf(1)) + EasyMock.expect( + driver.declineOffer(OfferID.newBuilder().setValue("o2").build()) + ).andReturn(Status.valueOf(1)) + EasyMock.replay(driver) val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") backend.driver = driver backend.resourceOffers(driver, mesosOffers) - assert(backend.doKillExecutors(Seq("s1"))) - EasyMock.verify(driver) + assert(backend.doKillExecutors(Seq("s1/0"))) assert(backend.executorLimit.get.equals(0)) val mesosOffers2 = new java.util.ArrayList[Offer] mesosOffers2.add(createOffer(2, minMem, minCpu)) - backend.resourceOffers(driver, mesosOffers) + backend.resourceOffers(driver, mesosOffers2) // Verify we didn't launch any new executor assert(backend.slaveIdsWithTasks.size.equals(1)) assert(backend.pendingRemovedSlaveIds.size.equals(1)) + EasyMock.verify(driver) + + EasyMock.reset(driver) + + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(mesosOffers2.get(0).getId)), + EasyMock.anyObject(), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)).once + + EasyMock.replay(driver) + backend.doRequestTotalExecutors(2) - backend.resourceOffers(driver, mesosOffers) + backend.resourceOffers(driver, mesosOffers2) assert(backend.slaveIdsWithTasks.size.equals(2)) backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build()) assert(backend.slaveIdsWithTasks.size.equals(1)) assert(backend.pendingRemovedSlaveIds.size.equals(0)) + + EasyMock.verify(driver) } } From 61d8873cc6c3941ab215d2ac98d8c99373f0e0c1 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Fri, 2 Jan 2015 15:36:12 -0800 Subject: [PATCH 09/15] Address review feedback --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- .../spark/executor/CoarseGrainedMesosExecutorBackend.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 2afb1c96c6f8f..4e7f8920997d2 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1003,7 +1003,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { assert(master.contains("mesos") || master.contains("yarn") || dynamicAllocationTesting, - "Requesting executors is currently only supported in YARN mode") + "Requesting executors is currently only supported in YARN or Mesos mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) @@ -1021,7 +1021,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli @DeveloperApi override def killExecutors(executorIds: Seq[String]): Boolean = { assert(master.contains("mesos") || master.contains("yarn") || dynamicAllocationTesting, - "Killing executors is currently only supported in YARN mode") + "Killing executors is currently only supported in YARN or Mesos mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.killExecutors(executorIds) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala index 18a62af39aefa..2eebe48cc3454 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedMesosExecutorBackend.scala @@ -49,7 +49,7 @@ private[spark] class CoarseGrainedMesosExecutorBackend(val sparkConf: SparkConf) frameworkInfo: FrameworkInfo, slaveInfo: SlaveInfo) { this.driver = driver - logInfo("Coarse Grain Mesos Executor '" + executorInfo.getExecutorId.getValue + + logInfo("Coarse Grained Mesos Executor '" + executorInfo.getExecutorId.getValue + "' is registered.") if (shuffleService == null) { From 4ca04ab4a7909f52ee681a53618748f1f53fa35d Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Mon, 5 Jan 2015 18:00:56 -0800 Subject: [PATCH 10/15] Add documentation for new config added --- docs/running-on-mesos.md | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 78358499fd01f..e171f240318b9 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -226,6 +226,20 @@ See the [configuration page](configuration.html) for information on Spark config The final total amount of memory allocated is the maximum value between executor memory plus memoryOverhead, and overhead fraction (1.07) plus the executor memory. + + spark.mesos.shuffle.service.cpu + 1 + + The amount of CPU that the Mesos coarse grained scheduler will request for launching the shuffle service. The shuffle service is launched on each slave. + + + + spark.mesos.shuffle.service.mem + 384 + + The amount of memory that Mesos coarse grained scheduler will request for launching the shuffle service. The shuffle service is launched on each slave. + + # Troubleshooting and Debugging From cb8ae343f9a80980897115afdfca3e6aef3bbff8 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Mon, 5 Jan 2015 18:07:06 -0800 Subject: [PATCH 11/15] Fix documentation --- docs/running-on-mesos.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index e171f240318b9..8d9bf26ec5ff0 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -231,13 +231,15 @@ See the [configuration page](configuration.html) for information on Spark config 1 The amount of CPU that the Mesos coarse grained scheduler will request for launching the shuffle service. The shuffle service is launched on each slave. + This will only apply if shuffle service is enabled and is running under coarse grained mode. spark.mesos.shuffle.service.mem - 384 + 1024 The amount of memory that Mesos coarse grained scheduler will request for launching the shuffle service. The shuffle service is launched on each slave. + This will only apply if shuffle service is enabled and is running under coarse grained mode. From 0adc4007c568624242916197a0baa3c4c4721f2b Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Wed, 7 Jan 2015 23:13:27 -0800 Subject: [PATCH 12/15] Fix requiring executor resources when executor is still running --- .../mesos/CoarseMesosSchedulerBackend.scala | 47 ++++++++++++++----- .../CoarseMesosSchedulerBackendSuite.scala | 7 +-- 2 files changed, 38 insertions(+), 16 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 81a497d7ddfc5..8bfc1f24f93c1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -36,6 +36,10 @@ import com.google.common.collect.{BiMap, HashBiMap} import org.apache.mesos.protobuf.ByteString import java.util.concurrent.locks.ReentrantLock +case class SlaveStatus(var executorRunning: Boolean, var taskRunning: Boolean) { + def notRunning = !taskRunning && !executorRunning +} + /** * A SchedulerBackend that runs tasks on Mesos, but uses "coarse-grained" tasks, where it holds * onto each Mesos node for the duration of the Spark job instead of relinquishing cores whenever @@ -70,7 +74,7 @@ private[spark] class CoarseMesosSchedulerBackend( val coresByTaskId = new HashMap[Int, Int] var totalCoresAcquired = 0 - val slaveIdsWithTasks = new HashSet[String] + val slaveStatuses = new HashMap[String, SlaveStatus] val taskIdToSlaveId = HashBiMap.create[Int, String]() @@ -262,11 +266,6 @@ private[spark] class CoarseMesosSchedulerBackend( 0 } - def minMemRequired = MemoryUtils.calculateTotalMemory(sc) + shuffleServiceMem.toDouble - - - def minCpuRequired = 1 + shuffleServiceCpu - /** * Method called by Mesos to offer resources on slaves. We respond by launching an executor, * unless we've already launched more than we wanted to. @@ -281,18 +280,29 @@ private[spark] class CoarseMesosSchedulerBackend( val cpus = getResource(offer.getResourcesList, "cpus").toInt logTrace("Received offer id: " + offer.getId.getValue + ", cpu: " + cpus.toString + ", mem: " + mem.toString) + + var minCpuRequired = 1 + var minMemRequired = MemoryUtils.calculateTotalMemory(sc) + + slaveStatuses.get(slaveId).map { s => + if (s.executorRunning) { + minCpuRequired += shuffleServiceCpu + minMemRequired += shuffleServiceMem + } + } + if (taskIdToSlaveId.size < executorLimit.getOrElse(Int.MaxValue) && totalCoresAcquired < maxCores && - mem >= minMemRequired && - cpus >= minCpuRequired && failuresBySlaveId.getOrElse(slaveId, 0) < MAX_SLAVE_FAILURES && - !slaveIdsWithTasks.contains(slaveId)) { + (!slaveStatuses.contains(slaveId) || !slaveStatuses(slaveId).taskRunning) && + mem >= minMemRequired && + cpus >= minCpuRequired) { + // Launch an executor on the slave val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() taskIdToSlaveId(taskId) = slaveId - slaveIdsWithTasks += slaveId coresByTaskId(taskId) = cpusToUse val taskIdString: String = taskId.toString val builder = MesosTaskInfo.newBuilder() @@ -301,12 +311,14 @@ private[spark] class CoarseMesosSchedulerBackend( .setName("Task " + taskId) if (!shuffleServiceEnabled) { + slaveStatuses(slaveId) = SlaveStatus(false, true) builder .setCommand(createTaskCommand(offer, cpusToUse + extraCoresPerSlave, taskIdString)) .addResources(createResource("cpus", cpusToUse)) .addResources(createResource("mem", MemoryUtils.calculateTotalMemory(sc))) } else { + slaveStatuses(slaveId) = SlaveStatus(true, true) val taskCpus = cpusToUse - shuffleServiceCpu val taskMem = mem - shuffleServiceMem builder.setExecutor( @@ -327,6 +339,10 @@ private[spark] class CoarseMesosSchedulerBackend( logTrace("Launching task with offer id: " + offer.getId.getValue + ", task: " + builder.build()) + val status = slaveStatuses(slaveId) + status.taskRunning = true + status.executorRunning = true + d.launchTasks( Collections.singleton(offer.getId), Collections.singletonList(builder.build()), filters) } else { @@ -365,7 +381,7 @@ private[spark] class CoarseMesosSchedulerBackend( val taskId = status.getTaskId.getValue.toInt val state = status.getState stateLock.synchronized { - if(!taskIdToSlaveId.containsKey(taskId)) { + if (!taskIdToSlaveId.containsKey(taskId)) { return } @@ -409,8 +425,7 @@ private[spark] class CoarseMesosSchedulerBackend( def executorTerminated(d: SchedulerDriver, slaveId: String, reason: String) { stateLock.synchronized { - if (slaveIdsWithTasks.contains(slaveId)) { - slaveIdsWithTasks -= slaveId + if (slaveStatuses.contains(slaveId)) { val slaveIdToTaskId: BiMap[String, Int] = taskIdToSlaveId.inverse() if (slaveIdToTaskId.contains(slaveId)) { val taskId: Int = slaveIdToTaskId.get(slaveId) @@ -418,6 +433,11 @@ private[spark] class CoarseMesosSchedulerBackend( removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason) } pendingRemovedSlaveIds -= slaveId + val status = slaveStatuses(slaveId) + status.taskRunning = false + if (status.notRunning) { + slaveStatuses -= slaveId + } } } } @@ -425,6 +445,7 @@ private[spark] class CoarseMesosSchedulerBackend( override def slaveLost(d: SchedulerDriver, slaveId: SlaveID) { logInfo("Mesos slave lost: " + slaveId.getValue) executorTerminated(d, slaveId.getValue, "Mesos slave lost: " + slaveId.getValue) + slaveStatuses -= slaveId.getValue } override def executorLost(d: SchedulerDriver, e: ExecutorID, s: SlaveID, status: Int) { diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala index 527242f528f56..3bec6baa14bfb 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -103,7 +103,8 @@ class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext w mesosOffers2.add(createOffer(2, minMem, minCpu)) backend.resourceOffers(driver, mesosOffers2) // Verify we didn't launch any new executor - assert(backend.slaveIdsWithTasks.size.equals(1)) + assert(backend.slaveStatuses.size.equals(1)) + assert(backend.slaveStatuses.values.iterator.next().taskRunning.equals(true)) assert(backend.pendingRemovedSlaveIds.size.equals(1)) EasyMock.verify(driver) @@ -122,9 +123,9 @@ class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext w backend.doRequestTotalExecutors(2) backend.resourceOffers(driver, mesosOffers2) - assert(backend.slaveIdsWithTasks.size.equals(2)) + assert(backend.slaveStatuses.size.equals(2)) backend.slaveLost(driver, SlaveID.newBuilder().setValue("s1").build()) - assert(backend.slaveIdsWithTasks.size.equals(1)) + assert(backend.slaveStatuses.size.equals(1)) assert(backend.pendingRemovedSlaveIds.size.equals(0)) EasyMock.verify(driver) From a276a43d0683adc776f8c390949e3903b546856e Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Thu, 8 Jan 2015 00:19:57 -0800 Subject: [PATCH 13/15] Add relaunch task test --- .../mesos/CoarseMesosSchedulerBackend.scala | 12 ++- .../CoarseMesosSchedulerBackendSuite.scala | 87 +++++++++++++++++-- 2 files changed, 87 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index 8bfc1f24f93c1..f8391a957b230 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -37,7 +37,7 @@ import org.apache.mesos.protobuf.ByteString import java.util.concurrent.locks.ReentrantLock case class SlaveStatus(var executorRunning: Boolean, var taskRunning: Boolean) { - def notRunning = !taskRunning && !executorRunning + def finished = !taskRunning && !executorRunning } /** @@ -284,11 +284,9 @@ private[spark] class CoarseMesosSchedulerBackend( var minCpuRequired = 1 var minMemRequired = MemoryUtils.calculateTotalMemory(sc) - slaveStatuses.get(slaveId).map { s => - if (s.executorRunning) { - minCpuRequired += shuffleServiceCpu - minMemRequired += shuffleServiceMem - } + if (!slaveStatuses.contains(slaveId) || !slaveStatuses(slaveId).executorRunning) { + minCpuRequired += shuffleServiceCpu + minMemRequired += shuffleServiceMem } if (taskIdToSlaveId.size < executorLimit.getOrElse(Int.MaxValue) && @@ -435,7 +433,7 @@ private[spark] class CoarseMesosSchedulerBackend( pendingRemovedSlaveIds -= slaveId val status = slaveStatuses(slaveId) status.taskRunning = false - if (status.notRunning) { + if (status.finished) { slaveStatuses -= slaveId } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala index 3bec6baa14bfb..96787b771eda6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/CoarseMesosSchedulerBackendSuite.scala @@ -31,7 +31,7 @@ import akka.actor.ActorSystem import java.util.Collections class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with EasyMockSugar { - def createOffer(id: Int, mem: Int, cpu: Int) = { + def createOffer(offerId: String, slaveId: String, mem: Int, cpu: Int) = { val builder = Offer.newBuilder() builder.addResourcesBuilder() .setName("mem") @@ -41,8 +41,8 @@ class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext w .setName("cpus") .setType(Value.Type.SCALAR) .setScalar(Scalar.newBuilder().setValue(cpu)) - builder.setId(OfferID.newBuilder().setValue(s"o${id.toString}").build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) - .setSlaveId(SlaveID.newBuilder().setValue(s"s${id.toString}")).setHostname(s"host${id.toString}").build() + builder.setId(OfferID.newBuilder().setValue(offerId).build()).setFrameworkId(FrameworkID.newBuilder().setValue("f1")) + .setSlaveId(SlaveID.newBuilder().setValue(slaveId)).setHostname(s"host${slaveId}").build() } test("mesos supports killing and limiting executors") { @@ -72,7 +72,7 @@ class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext w val minCpu = 4 val mesosOffers = new java.util.ArrayList[Offer] - mesosOffers.add(createOffer(1, minMem, minCpu)) + mesosOffers.add(createOffer("o1", "s1", minMem, minCpu)) EasyMock.expect( driver.launchTasks( @@ -100,7 +100,7 @@ class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext w assert(backend.executorLimit.get.equals(0)) val mesosOffers2 = new java.util.ArrayList[Offer] - mesosOffers2.add(createOffer(2, minMem, minCpu)) + mesosOffers2.add(createOffer("o2", "s2", minMem, minCpu)) backend.resourceOffers(driver, mesosOffers2) // Verify we didn't launch any new executor assert(backend.slaveStatuses.size.equals(1)) @@ -130,4 +130,81 @@ class CoarseMesosSchedulerBackendSuite extends FunSuite with LocalSparkContext w EasyMock.verify(driver) } + + test("mesos supports killing and relaunching tasks with executors") { + val driver = EasyMock.createMock(classOf[SchedulerDriver]) + val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl]) + + val se = EasyMock.createMock(classOf[SparkEnv]) + val actorSystem = EasyMock.createMock(classOf[ActorSystem]) + val sparkConf = new SparkConf + EasyMock.expect(se.actorSystem).andReturn(actorSystem) + EasyMock.replay(se) + val sc = EasyMock.createMock(classOf[SparkContext]) + EasyMock.expect(sc.executorMemory).andReturn(100).anyTimes() + EasyMock.expect(sc.getSparkHome()).andReturn(Option("/path")).anyTimes() + EasyMock.expect(sc.executorEnvs).andReturn(new mutable.HashMap).anyTimes() + EasyMock.expect(sc.conf).andReturn(sparkConf).anyTimes() + EasyMock.expect(sc.env).andReturn(se) + EasyMock.replay(sc) + + EasyMock.expect(taskScheduler.sc).andReturn(sc) + EasyMock.replay(taskScheduler) + + // Enable shuffle service so it will require extra resources + sparkConf.set("spark.shuffle.service.enabled", "true") + sparkConf.set("spark.driver.host", "driverHost") + sparkConf.set("spark.driver.port", "1234") + + val minMem = MemoryUtils.calculateTotalMemory(sc).toInt + 1024 + val minCpu = 4 + + val mesosOffers = new java.util.ArrayList[Offer] + mesosOffers.add(createOffer("o1", "s1", minMem, minCpu)) + + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(mesosOffers.get(0).getId)), + EasyMock.anyObject(), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)).once + + val offer2 = createOffer("o2", "s1", minMem, 1); + + EasyMock.expect( + driver.launchTasks( + EasyMock.eq(Collections.singleton(offer2.getId)), + EasyMock.anyObject(), + EasyMock.anyObject(classOf[Filters]) + ) + ).andReturn(Status.valueOf(1)).once + + EasyMock.expect(driver.reviveOffers()).andReturn(Status.valueOf(1)).once + + EasyMock.replay(driver) + + val backend = new CoarseMesosSchedulerBackend(taskScheduler, sc, "master") + backend.driver = driver + backend.resourceOffers(driver, mesosOffers) + + // Simulate task killed, but executor is still running + val status = TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue("0").build()) + .setSlaveId(SlaveID.newBuilder().setValue("s1").build()) + .setState(TaskState.TASK_KILLED) + .build + + backend.statusUpdate(driver, status) + assert(backend.slaveStatuses("s1").taskRunning.equals(false)) + assert(backend.slaveStatuses("s1").executorRunning.equals(true)) + + mesosOffers.clear() + mesosOffers.add(offer2) + backend.resourceOffers(driver, mesosOffers) + assert(backend.slaveStatuses("s1").taskRunning.equals(true)) + assert(backend.slaveStatuses("s1").executorRunning.equals(true)) + + EasyMock.verify(driver) + } } From 770042adcb4e362c122d898321aa6adac96376bd Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Thu, 8 Jan 2015 10:20:39 -0800 Subject: [PATCH 14/15] Only deduct shuffle resources when executor is launched. --- .../cluster/mesos/CoarseMesosSchedulerBackend.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index f8391a957b230..ff12505a16ce4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -283,10 +283,12 @@ private[spark] class CoarseMesosSchedulerBackend( var minCpuRequired = 1 var minMemRequired = MemoryUtils.calculateTotalMemory(sc) + var shuffleResourcesUsed = false if (!slaveStatuses.contains(slaveId) || !slaveStatuses(slaveId).executorRunning) { minCpuRequired += shuffleServiceCpu minMemRequired += shuffleServiceMem + shuffleResourcesUsed = true } if (taskIdToSlaveId.size < executorLimit.getOrElse(Int.MaxValue) && @@ -317,8 +319,14 @@ private[spark] class CoarseMesosSchedulerBackend( MemoryUtils.calculateTotalMemory(sc))) } else { slaveStatuses(slaveId) = SlaveStatus(true, true) - val taskCpus = cpusToUse - shuffleServiceCpu - val taskMem = mem - shuffleServiceMem + + // Deduct the executor resources that the remains is for the task. + val (taskCpus, taskMem) = if (shuffleResourcesUsed) { + (cpusToUse - shuffleServiceCpu, mem - shuffleServiceMem) + } else { + (cpusToUse, mem) + } + builder.setExecutor( ExecutorInfo.newBuilder() .setExecutorId(ExecutorID.newBuilder().setValue(slaveId).build) From 99415c3bc9973f2f80faaf7f5742b3bc860bc900 Mon Sep 17 00:00:00 2001 From: Timothy Chen Date: Thu, 8 Jan 2015 10:44:18 -0800 Subject: [PATCH 15/15] Address review feedback --- core/src/main/scala/org/apache/spark/SparkContext.scala | 2 +- .../spark/deploy/worker/StandaloneWorkerShuffleService.scala | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 4e7f8920997d2..a65fe58a275ff 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1003,7 +1003,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli @DeveloperApi override def requestExecutors(numAdditionalExecutors: Int): Boolean = { assert(master.contains("mesos") || master.contains("yarn") || dynamicAllocationTesting, - "Requesting executors is currently only supported in YARN or Mesos mode") + "Requesting executors is currently only supported in YARN or Mesos coarse-grained mode") schedulerBackend match { case b: CoarseGrainedSchedulerBackend => b.requestExecutors(numAdditionalExecutors) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala index c167b012746f2..d61019ca816fa 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/StandaloneWorkerShuffleService.scala @@ -53,8 +53,6 @@ class StandaloneWorkerShuffleService(sparkConf: SparkConf, securityManager: Secu require(server == null, "Shuffle server already started") logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl") server = transportContext.createServer(port) - } else { - logInfo(s"Skip launching shuffle service as it's not enabled.") } }