Skip to content

Commit

Permalink
[SPARK-21456][MESOS] Make the driver failover_timeout configurable
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

Current behavior: in Mesos cluster mode, the driver failover_timeout is set to zero. If the driver temporarily loses connectivity with the Mesos master, the framework will be torn down and all executors killed.

Proposed change: make the failover_timeout configurable via a new option, spark.mesos.driver.failoverTimeout. The default value is still zero.

Note: with non-zero failover_timeout, an explicit teardown is needed in some cases. This is captured in https://issues.apache.org/jira/browse/SPARK-21458

## How was this patch tested?

Added a unit test to make sure the config option is set while creating the scheduler driver.

Ran an integration test with mesosphere/spark showing that with a non-zero failover_timeout the Spark job finishes after a driver is disconnected from the master.

Author: Susan X. Huynh <xhuynh@mesosphere.com>

Closes #18674 from susanxhuynh/sh-mesos-failover-timeout.
  • Loading branch information
susanxhuynh authored and Marcelo Vanzin committed Jul 19, 2017
1 parent c972918 commit c42ef95
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 2 deletions.
11 changes: 11 additions & 0 deletions docs/running-on-mesos.md
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,17 @@ See the [configuration page](configuration.html) for information on Spark config
Fetcher Cache</a>
</td>
</tr>
<tr>
<td><code>spark.mesos.driver.failoverTimeout</code></td>
<td><code>0.0</code></td>
<td>
The amount of time (in seconds) that the master will wait for the
driver to reconnect, after being temporarily disconnected, before
it tears down the driver framework by killing all its
executors. The default value is zero, meaning no timeout: if the
driver disconnects, the master immediately tears down the framework.
</td>
</tr>
</table>

# Troubleshooting and Debugging
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,16 @@ package object config {

private [spark] val DRIVER_LABELS =
ConfigBuilder("spark.mesos.driver.labels")
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value" +
.doc("Mesos labels to add to the driver. Labels are free-form key-value pairs. Key-value " +
"pairs should be separated by a colon, and commas used to list more than one." +
"Ex. key:value,key2:value2")
.stringConf
.createOptional

private [spark] val DRIVER_FAILOVER_TIMEOUT =
ConfigBuilder("spark.mesos.driver.failoverTimeout")
.doc("Amount of time in seconds that the master will wait to hear from the driver, " +
"during a temporary disconnection, before tearing down all the executors.")
.doubleConf
.createWithDefault(0.0)
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.mesos.Protos.{TaskInfo => MesosTaskInfo, _}
import org.apache.mesos.SchedulerDriver

import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskState}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config
import org.apache.spark.network.netty.SparkTransportConf
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
Expand Down Expand Up @@ -177,7 +178,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
sc.conf,
sc.conf.getOption("spark.mesos.driver.webui.url").orElse(sc.ui.map(_.webUrl)),
None,
None,
Some(sc.conf.get(DRIVER_FAILOVER_TIMEOUT)),
sc.conf.getOption("spark.mesos.driver.frameworkId")
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import org.scalatest.mock.MockitoSugar
import org.scalatest.BeforeAndAfter

import org.apache.spark.{LocalSparkContext, SecurityManager, SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.deploy.mesos.config._
import org.apache.spark.internal.config._
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
Expand Down Expand Up @@ -369,6 +370,41 @@ class MesosCoarseGrainedSchedulerBackendSuite extends SparkFunSuite
backend.start()
}

test("failover timeout is set in created scheduler driver") {
val failoverTimeoutIn = 3600.0
initializeSparkConf(Map(DRIVER_FAILOVER_TIMEOUT.key -> failoverTimeoutIn.toString))
sc = new SparkContext(sparkConf)

val taskScheduler = mock[TaskSchedulerImpl]
when(taskScheduler.sc).thenReturn(sc)

val driver = mock[SchedulerDriver]
when(driver.start()).thenReturn(Protos.Status.DRIVER_RUNNING)

val securityManager = mock[SecurityManager]

val backend = new MesosCoarseGrainedSchedulerBackend(
taskScheduler, sc, "master", securityManager) {
override protected def createSchedulerDriver(
masterUrl: String,
scheduler: Scheduler,
sparkUser: String,
appName: String,
conf: SparkConf,
webuiUrl: Option[String] = None,
checkpoint: Option[Boolean] = None,
failoverTimeout: Option[Double] = None,
frameworkId: Option[String] = None): SchedulerDriver = {
markRegistered()
assert(failoverTimeout.isDefined)
assert(failoverTimeout.get.equals(failoverTimeoutIn))
driver
}
}

backend.start()
}

test("honors unset spark.mesos.containerizer") {
setBackend(Map("spark.mesos.executor.docker.image" -> "test"))

Expand Down

0 comments on commit c42ef95

Please sign in to comment.