From c42ef953343073a50ef04c5ce848b574ff7f2238 Mon Sep 17 00:00:00 2001 From: "Susan X. Huynh" Date: Wed, 19 Jul 2017 15:11:06 -0700 Subject: [PATCH] [SPARK-21456][MESOS] Make the driver failover_timeout configurable ## What changes were proposed in this pull request? Current behavior: in Mesos cluster mode, the driver failover_timeout is set to zero. If the driver temporarily loses connectivity with the Mesos master, the framework will be torn down and all executors killed. Proposed change: make the failover_timeout configurable via a new option, spark.mesos.driver.failoverTimeout. The default value is still zero. Note: with non-zero failover_timeout, an explicit teardown is needed in some cases. This is captured in https://issues.apache.org/jira/browse/SPARK-21458 ## How was this patch tested? Added a unit test to make sure the config option is set while creating the scheduler driver. Ran an integration test with mesosphere/spark showing that with a non-zero failover_timeout the Spark job finishes after a driver is disconnected from the master. Author: Susan X. Huynh Closes #18674 from susanxhuynh/sh-mesos-failover-timeout. --- docs/running-on-mesos.md | 11 ++++++ .../apache/spark/deploy/mesos/config.scala | 9 ++++- .../MesosCoarseGrainedSchedulerBackend.scala | 3 +- ...osCoarseGrainedSchedulerBackendSuite.scala | 36 +++++++++++++++++++ 4 files changed, 57 insertions(+), 2 deletions(-) diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 7401b63e022c1..cf257c06c9516 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -545,6 +545,17 @@ See the [configuration page](configuration.html) for information on Spark config Fetcher Cache + + spark.mesos.driver.failoverTimeout + 0.0 + + The amount of time (in seconds) that the master will wait for the + driver to reconnect, after being temporarily disconnected, before + it tears down the driver framework by killing all its + executors. The default value is zero, meaning no timeout: if the + driver disconnects, the master immediately tears down the framework. + + # Troubleshooting and Debugging diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala index 56d697f359614..6c8619e3c3c13 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/deploy/mesos/config.scala @@ -58,9 +58,16 @@ package object config { private [spark] val DRIVER_LABELS = ConfigBuilder("spark.mesos.driver.labels") - .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" + + .doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " + "pairs should be separated by a colon, and commas used to list more than one." + "Ex. key:value,key2:value2") .stringConf .createOptional + + private [spark] val DRIVER_FAILOVER_TIMEOUT = + ConfigBuilder("spark.mesos.driver.failoverTimeout") + .doc("Amount of time in seconds that the master will wait to hear from the driver, " + + "during a temporary disconnection, before tearing down all the executors.") + .doubleConf + .createWithDefault(0.0) } diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala index 7dd42c41aa7c2..6e7f41dad34ba 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala @@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _} import org.apache.mesos.SchedulerDriver import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient @@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend( sc.conf, sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)), None, - None, + Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)), sc.conf.getOption("spark.mesos.driver.frameworkId") ) diff --git a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala index 7cca5fedb31eb..d9ff4a403ea36 100644 --- a/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala +++ b/resource-managers/mesos/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackendSuite.scala @@ -33,6 +33,7 @@ import org.scalatest.mock.MockitoSugar import org.scalatest.BeforeAndAfter import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.deploy.mesos.config._ import org.apache.spark.internal.config._ import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef} @@ -369,6 +370,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite backend.start() } + test("failover timeout is set in created scheduler driver") { + val failoverTimeoutIn = 3600.0 + initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString)) + sc = new SparkContext(sparkConf) + + val taskScheduler = mock[TaskSchedulerImpl] + when(taskScheduler.sc).thenReturn(sc) + + val driver = mock[SchedulerDriver] + when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING) + + val securityManager = mock[SecurityManager] + + val backend = new MesosCoarseGrainedSchedulerBackend( + taskScheduler, sc, "master", securityManager) { + override protected def createSchedulerDriver( + masterUrl: String, + scheduler: Scheduler, + sparkUser: String, + appName: String, + conf: SparkConf, + webuiUrl: Option[String] = None, + checkpoint: Option[Boolean] = None, + failoverTimeout: Option[Double] = None, + frameworkId: Option[String] = None): SchedulerDriver = { + markRegistered() + assert(failoverTimeout.isDefined) + assert(failoverTimeout.get.equals(failoverTimeoutIn)) + driver + } + } + + backend.start() + } + test("honors unset spark.mesos.containerizer") { setBackend(Map("spark.mesos.executor.docker.image" -> "test"))