Skip to content

Commit

Permalink
Fix standalone-cluster mode
Browse files Browse the repository at this point in the history
The problem was that spark properties are not propagated to the driver.
The solution is simple: pass the properties as part of the driver
description, such that the command that launches the driver
automatically sets the spark properties as its java system properties,
which will then be loaded by SparkConf.
  • Loading branch information
andrewor14 committed Jul 22, 2014
1 parent fd9da51 commit 855256e
Show file tree
Hide file tree
Showing 11 changed files with 18 additions and 17 deletions.
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/deploy/Client.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@

package org.apache.spark.deploy

import scala.collection.JavaConversions._
import scala.collection.mutable.Map
import scala.concurrent._

import akka.actor._
Expand Down Expand Up @@ -50,8 +48,8 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
// TODO: We could add an env variable here and intercept it in `sc.addJar` that would
// truncate filesystem paths similar to what YARN does. For now, we just require
// people call `addJar` assuming the jar is in the same directory.
val env = Map[String, String]()
System.getenv().foreach { case (k, v) => env(k) = v }
val env = sys.env
val props = conf.getAll.toMap

val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

Expand All @@ -68,7 +66,7 @@ private class ClientActor(driverArgs: ClientArguments, conf: SparkConf) extends
val javaOptionsConf = "spark.driver.extraJavaOptions"
val javaOpts = sys.props.get(javaOptionsConf)
val command = new Command(mainClass, Seq("{{WORKER_URL}}", driverArgs.mainClass) ++
driverArgs.driverOptions, env, classPathEntries, libraryPathEntries, javaOpts)
driverArgs.driverOptions, env, props, classPathEntries, libraryPathEntries, javaOpts)

val driverDescription = new DriverDescription(
driverArgs.jarUrl,
Expand Down
1 change: 1 addition & 0 deletions core/src/main/scala/org/apache/spark/deploy/Command.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ private[spark] case class Command(
mainClass: String,
arguments: Seq[String],
environment: Map[String, String],
sparkProps: Map[String, String],
classPathEntries: Seq[String],
libraryPathEntries: Seq[String],
extraJavaOptions: Option[String] = None) {
Expand Down
2 changes: 0 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ object SparkSubmit {
(clusterManager, deployMode) match {
case (MESOS, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Mesos clusters.")
case (STANDALONE, CLUSTER) =>
printErrorAndExit("Cluster deploy mode is currently not supported for Standalone clusters.")
case (_, CLUSTER) if args.isPython =>
printErrorAndExit("Cluster deploy mode is currently not supported for python applications.")
case (_, CLUSTER) if isShell(args.primaryResource) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ private[spark] object TestClient {
val (actorSystem, port) = AkkaUtils.createActorSystem("spark", Utils.localIpAddress, 0,
conf = conf, securityManager = new SecurityManager(conf))
val desc = new ApplicationDescription(
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Seq(),
Seq()), Some("dummy-spark-home"), "ignored")
"TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map(), Map(),
Seq(), Seq()), Some("dummy-spark-home"), "ignored")
val listener = new TestListener
val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
client.start()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ object CommandUtils extends Logging {

val permGenOpt = Seq("-XX:MaxPermSize=128m")

// Convert Spark properties to java system properties
val sparkOpts = command.sparkProps.map { case (k, v) => s"-D$k=$v" }

// Figure out our classpath with the external compute-classpath script
val ext = if (System.getProperty("os.name").startsWith("Windows")) ".cmd" else ".sh"
val classPath = Utils.executeAndGetOutput(
Expand All @@ -75,7 +78,7 @@ object CommandUtils extends Logging {
val userClassPath = command.classPathEntries ++ Seq(classPath)

Seq("-cp", userClassPath.filterNot(_.isEmpty).mkString(File.pathSeparator)) ++
permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
sparkOpts ++ permGenOpt ++ libraryOpts ++ extraOpts ++ workerLocalOpts ++ memoryOpts
}

/** Spawn a thread that will redirect a given stream to a file */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ private[spark] class DriverRunner(
driverDesc.command.mainClass,
driverDesc.command.arguments.map(substituteVariables),
driverDesc.command.environment,
driverDesc.command.sparkProps,
classPath,
driverDesc.command.libraryPathEntries,
driverDesc.command.extraJavaOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ private[spark] class ExecutorRunner(
appDesc.command.mainClass,
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
appDesc.command.environment,
appDesc.command.sparkProps,
appDesc.command.classPathEntries,
appDesc.command.libraryPathEntries,
appDesc.command.extraJavaOptions)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,8 @@ private[spark] class SparkDeploySchedulerBackend(
cp.split(java.io.File.pathSeparator)
}

val command = Command(
"org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs,
classPathEntries, libraryPathEntries, extraJavaOpts)
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, conf.getAll.toMap, classPathEntries, libraryPathEntries, extraJavaOpts)
val sparkHome = sc.getSparkHome()
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
sparkHome, sc.ui.appUIAddress, sc.eventLogger.map(_.logDir))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class JsonProtocolSuite extends FunSuite {
}

def createAppDesc(): ApplicationDescription = {
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Seq(), Seq())
val cmd = new Command("mainClass", List("arg1", "arg2"), Map(), Map(), Seq(), Seq())
new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
}

Expand All @@ -101,7 +101,7 @@ class JsonProtocolSuite extends FunSuite {

def createDriverCommand() = new Command(
"org.apache.spark.FakeClass", Seq("some arg --and-some options -g foo"),
Map(("K1", "V1"), ("K2", "V2")), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo")
Map(("K1", "V1"), ("K2", "V2")), Map(), Seq("cp1", "cp2"), Seq("lp1", "lp2"), Some("-Dfoo")
)

def createDriverDesc() = new DriverDescription("hdfs://some-dir/some.jar", 100, 3,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark.deploy.{Command, DriverDescription}

class DriverRunnerTest extends FunSuite {
private def createDriverRunner() = {
val command = new Command("mainClass", Seq(), Map(), Seq(), Seq())
val command = new Command("mainClass", Seq(), Map(), Map(), Seq(), Seq())
val driverDescription = new DriverDescription("jarUrl", 512, 1, true, command)
new DriverRunner("driverId", new File("workDir"), new File("sparkHome"), driverDescription,
null, "akka://1.2.3.4/worker/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ExecutorRunnerTest extends FunSuite {
def f(s:String) = new File(s)
val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(), Map(), Seq(), Seq()),
Command("foo", Seq(), Map(), Map(), Seq(), Seq()),
sparkHome, "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
Expand Down

0 comments on commit 855256e

Please sign in to comment.