From 2e5664c63416fed1a9954fd2ed6c71773eed34ed Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Thu, 26 May 2016 13:18:00 +0530 Subject: [PATCH 1/2] [SPARK-15555] [Mesos] Driver with --supervise option cannot be killed in Mesos mode --- .../spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 73bd4c58e16fc..98496775931b4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -616,7 +616,6 @@ private[spark] class MesosClusterScheduler( */ private def shouldRelaunch(state: MesosTaskState): Boolean = { state == MesosTaskState.TASK_FAILED || - state == MesosTaskState.TASK_KILLED || state == MesosTaskState.TASK_LOST } From a6473e39080ae649d1ec01d4f3bef5785c113e15 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Thu, 9 Jun 2016 16:40:29 +0530 Subject: [PATCH 2/2] Added test for the change --- .../mesos/MesosClusterSchedulerSuite.scala | 57 ++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala index a32423dc4fdeb..db60bd6a9055b 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterSchedulerSuite.scala @@ -21,7 +21,7 @@ import java.util.{Collection, Collections, Date} import scala.collection.JavaConverters._ -import org.apache.mesos.Protos._ +import org.apache.mesos.Protos.{TaskState => MesosTaskState, _} import org.apache.mesos.Protos.Value.{Scalar, Type} import org.apache.mesos.SchedulerDriver import org.mockito.{ArgumentCaptor, Matchers} @@ -172,4 +172,59 @@ class MesosClusterSchedulerSuite extends SparkFunSuite with LocalSparkContext wi assert(escape(s"onlywrap${char}this") === wrapped(s"onlywrap${char}this")) }) } + + test("can kill supervised drivers") { + val driver = mock[SchedulerDriver] + val conf = new SparkConf() + conf.setMaster("mesos://localhost:5050") + conf.setAppName("spark mesos") + scheduler = new MesosClusterScheduler( + new BlackHoleMesosClusterPersistenceEngineFactory, conf) { + override def start(): Unit = { + ready = true + mesosDriver = driver + } + } + scheduler.start() + + val response = scheduler.submitDriver( + new MesosDriverDescription("d1", "jar", 100, 1, true, command, + Map(("spark.mesos.executor.home", "test"), ("spark.app.name", "test")), "s1", new Date())) + assert(response.success) + val slaveId = SlaveID.newBuilder().setValue("s1").build() + val offer = Offer.newBuilder() + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1).build()).setName("cpus").setType(Type.SCALAR)) + .addResources( + Resource.newBuilder().setRole("*") + .setScalar(Scalar.newBuilder().setValue(1000).build()) + .setName("mem") + .setType(Type.SCALAR)) + .setId(OfferID.newBuilder().setValue("o1").build()) + .setFrameworkId(FrameworkID.newBuilder().setValue("f1").build()) + .setSlaveId(slaveId) + .setHostname("host1") + .build() + // Offer the resource to launch the submitted driver + scheduler.resourceOffers(driver, Collections.singletonList(offer)) + var state = scheduler.getSchedulerState() + assert(state.launchedDrivers.size == 1) + // Issue the request to kill the launched driver + val killResponse = scheduler.killDriver(response.submissionId) + assert(killResponse.success) + + val taskStatus = TaskStatus.newBuilder() + .setTaskId(TaskID.newBuilder().setValue(response.submissionId).build()) + .setSlaveId(slaveId) + .setState(MesosTaskState.TASK_KILLED) + .build() + // Update the status of the killed task + scheduler.statusUpdate(driver, taskStatus) + // Driver should be moved to finishedDrivers for kill + state = scheduler.getSchedulerState() + assert(state.pendingRetryDrivers.isEmpty) + assert(state.launchedDrivers.isEmpty) + assert(state.finishedDrivers.size == 1) + } }