Skip to content

Commit

Permalink
Abstract usages of converting spark opts to java opts
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewor14 committed Jul 25, 2014
1 parent 79f63a3 commit b890949
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 14 deletions.
23 changes: 22 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ import scala.collection.mutable.HashMap
*/
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {

import SparkConf._

/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)

Expand Down Expand Up @@ -198,7 +200,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
*
* E.g. spark.akka.option.x.y.x = "value"
*/
getAll.filter {case (k, v) => k.startsWith("akka.")}
getAll.filter { case (k, _) => isAkkaConf(k) }

/** Does the configuration contain a given parameter? */
def contains(key: String): Boolean = settings.contains(key)
Expand Down Expand Up @@ -292,3 +294,22 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
settings.toArray.sorted.map{case (k, v) => k + "=" + v}.mkString("\n")
}
}

private[spark] object SparkConf {
/**
* Return whether the given config is an akka config (e.g. akka.actor.provider).
* Note that this does not include spark-specific akka configs (e.g. spark.akka.timeout).
*/
def isAkkaConf(name: String): Boolean = name.startsWith("akka.")

/**
* Return whether the given config should be passed to an executor on start-up.
*
* When connecting to the scheduler, the executor backend needs certain akka and authentication
* settings to connect to the scheduler, while the rest of the spark configs can be inherited
* from the driver later.
*/
def isExecutorStartupConf(name: String): Boolean = {
isAkkaConf(name) || name.startsWith("spark.akka") || name.startsWith("spark.auth")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.scheduler.cluster

import org.apache.spark.{Logging, SparkContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import org.apache.spark.deploy.{ApplicationDescription, Command}
import org.apache.spark.deploy.client.{AppClient, AppClientListener}
import org.apache.spark.scheduler.{ExecutorExited, ExecutorLossReason, SlaveLost, TaskSchedulerImpl}
Expand Down Expand Up @@ -55,9 +55,7 @@ private[spark] class SparkDeploySchedulerBackend(
}

// Start executors with a few necessary configs for registering with the scheduler
val sparkJavaOpts = Utils.sparkJavaOpts(conf, (key: String) =>
key.startsWith("spark.akka") || key.startsWith("spark.auth")
)
val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
args, sc.executorEnvs, classPathEntries, libraryPathEntries, javaOpts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ import org.apache.hadoop.yarn.api.protocolrecords._
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.Records

import org.apache.spark.{SparkException, Logging, SparkConf, SparkContext}
import org.apache.spark.util.Utils

/**
* The entry point (starting in Client#main() and Client#run()) for launching Spark on YARN. The
Expand Down Expand Up @@ -383,9 +385,7 @@ trait ClientBase extends Logging {
// Forward the Spark configuration to the application master / executors.
// TODO: it might be nicer to pass these as an internal environment variable rather than
// as Java options, due to complications with string parsing of nested quotes.
for ((k, v) <- sparkConf.getAll) {
javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\""
}
javaOpts ++= Utils.sparkJavaOpts(sparkConf)

if (args.amClass == classOf[ApplicationMaster].getName) {
sparkConf.getOption("spark.driver.extraJavaOptions")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.util.Utils

trait ExecutorRunnableUtil extends Logging {

Expand Down Expand Up @@ -66,12 +67,7 @@ trait ExecutorRunnableUtil extends Logging {
// registers with the Scheduler and transfers the spark configs. Since the Executor backend
// uses Akka to connect to the scheduler, the akka settings are needed as well as the
// authentication settings.
sparkConf.getAll.
filter { case (k, v) => k.startsWith("spark.auth") || k.startsWith("spark.akka") }.
foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }

sparkConf.getAkkaConf.
foreach { case (k, v) => javaOpts += "-D" + k + "=" + "\\\"" + v + "\\\"" }
javaOpts ++= Utils.sparkJavaOpts(sparkConf, SparkConf.isExecutorStartupConf)

// Commenting it out for now - so that people can refer to the properties if required. Remove
// it once cpuset version is pushed out.
Expand Down

0 comments on commit b890949

Please sign in to comment.