Skip to content

Commit

Permalink
[SPARK-24794][CORE] Driver launched through rest should use all masters
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In standalone cluster mode, one could launch driver with supervise mode
enabled. StandaloneRestServer class uses the host and port of current
master as the spark.master property while launching the driver
(even if you are running in HA mode). This class also ignores the
spark.master property passed as part of the request.

Due to the above problem, if the Spark masters switch due to some reason
and your driver is killed unexpectedly and relaunched, it will try to
connect to the master which is in the driver command specified as
-Dspark.master. But this master will be in STANDBY mode and after trying
multiple times, the SparkContext will kill itself (even though secondary
master was alive and healthy).

This change picks the spark.master property from request and uses it to
launch the driver process. Due to this, the driver process has both
masters in -Dspark.master property. Even if the masters switch, SparkContext
can still connect to the ALIVE master and work correctly.

## How was this patch tested?
This patch was manually tested on a standalone cluster running 2.2.1. It was rebased on current master and all tests were executed. I have added a unit test for this change (but since I am new I hope I have covered all).

Closes #21816 from bsikander/rest_driver_fix.

Authored-by: Behroz Sikander <behroz.sikander@sap.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
  • Loading branch information
bsikander authored and srowen committed Oct 25, 2018
1 parent 65c653f commit 002f9c1
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,16 @@ private[rest] class StandaloneSubmitRequestServlet(
val driverExtraClassPath = sparkProperties.get("spark.driver.extraClassPath")
val driverExtraLibraryPath = sparkProperties.get("spark.driver.extraLibraryPath")
val superviseDriver = sparkProperties.get("spark.driver.supervise")
// The semantics of "spark.master" and the masterUrl are different. While the
// property "spark.master" could contain all registered masters, masterUrl
// contains only the active master. To make sure a Spark driver can recover
// in a multi-master setup, we use the "spark.master" property while submitting
// the driver.
val masters = sparkProperties.get("spark.master")
val (_, masterPort) = Utils.extractHostPortFromSparkUrl(masterUrl)
val masterRestPort = this.conf.getInt("spark.master.rest.port", 6066)
val updatedMasters = masters.map(
_.replace(s":$masterRestPort", s":$masterPort")).getOrElse(masterUrl)
val appArgs = request.appArgs
// Filter SPARK_LOCAL_(IP|HOSTNAME) environment variables from being set on the remote system.
val environmentVariables =
Expand All @@ -146,7 +156,7 @@ private[rest] class StandaloneSubmitRequestServlet(
// Construct driver description
val conf = new SparkConf(false)
.setAll(sparkProperties)
.set("spark.master", masterUrl)
.set("spark.master", updatedMasters)
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,26 @@ class StandaloneRestSubmitSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(submitResponse.success)
}

test("create submission with multiple masters") {
val submittedDriverId = "your-driver-id"
val submitMessage = "my driver is submitted"
val masterUrl = startDummyServer(submitId = submittedDriverId, submitMessage = submitMessage)
val conf = new SparkConf(loadDefaults = false)
val RANDOM_PORT = 9000
val allMasters = s"$masterUrl,${Utils.localHostName()}:$RANDOM_PORT"
conf.set("spark.master", allMasters)
conf.set("spark.app.name", "dreamer")
val appArgs = Array("one", "two", "six")
// main method calls this
val response = new RestSubmissionClientApp().run("app-resource", "main-class", appArgs, conf)
val submitResponse = getSubmitResponse(response)
assert(submitResponse.action === Utils.getFormattedClassName(submitResponse))
assert(submitResponse.serverSparkVersion === SPARK_VERSION)
assert(submitResponse.message === submitMessage)
assert(submitResponse.submissionId === submittedDriverId)
assert(submitResponse.success)
}

test("create submission from main method") {
val submittedDriverId = "your-driver-id"
val submitMessage = "my driver is submitted"
Expand Down

0 comments on commit 002f9c1

Please sign in to comment.