Skip to content

Commit

Permalink
All Spark processes should support spark-defaults.conf, config file
Browse files Browse the repository at this point in the history
  • Loading branch information
witgo committed Oct 15, 2014
1 parent 7b4f39f commit c45d20c
Show file tree
Hide file tree
Showing 8 changed files with 133 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

package org.apache.spark.deploy

import java.io.{File, FileInputStream, IOException}
import java.util.Properties
import java.util.jar.JarFile

import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkException
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -63,9 +60,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
val defaultProperties = new HashMap[String, String]()
if (verbose) SparkSubmit.printStream.println(s"Using properties file: $propertiesFile")
Option(propertiesFile).foreach { filename =>
val file = new File(filename)
SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) =>
if (k.startsWith("spark")) {
Utils.getPropertiesFromFile(filename).foreach { case (k, v) =>
if (k.startsWith("spark.")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
} else {
Expand All @@ -90,19 +86,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
*/
private def mergeSparkProperties(): Unit = {
// Use common defaults file, if not specified by user
if (propertiesFile == null) {
val sep = File.separator
val sparkHomeConfig = env.get("SPARK_HOME").map(sparkHome => s"${sparkHome}${sep}conf")
val confDir = env.get("SPARK_CONF_DIR").orElse(sparkHomeConfig)

confDir.foreach { sparkConfDir =>
val defaultPath = s"${sparkConfDir}${sep}spark-defaults.conf"
val file = new File(defaultPath)
if (file.exists()) {
propertiesFile = file.getAbsolutePath
}
}
}
propertiesFile = Option(propertiesFile).getOrElse(Utils.getDefaultPropertiesFile)

val properties = HashMap[String, String]()
properties.putAll(defaultSparkProperties)
Expand Down Expand Up @@ -397,23 +381,3 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
SparkSubmit.exitFn()
}
}

object SparkSubmitArguments {
/** Load properties present in the given file. */
def getPropertiesFromFile(file: File): Seq[(String, String)] = {
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")
val inputStream = new FileInputStream(file)
try {
val properties = new Properties()
properties.load(inputStream)
properties.stringPropertyNames().toSeq.map(k => (k, properties(k).trim))
} catch {
case e: IOException =>
val message = s"Failed when loading Spark properties file $file"
throw new SparkException(message, e)
} finally {
inputStream.close()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private[spark] object SparkSubmitDriverBootstrapper {
assume(bootstrapDriver != null, "SPARK_SUBMIT_BOOTSTRAP_DRIVER must be set")

// Parse the properties file for the equivalent spark.driver.* configs
val properties = SparkSubmitArguments.getPropertiesFromFile(new File(propertiesFile)).toMap
val properties = Utils.getPropertiesFromFile(propertiesFile)
val confDriverMemory = properties.get("spark.driver.memory")
val confLibraryPath = properties.get("spark.driver.extraLibraryPath")
val confClasspath = properties.get("spark.driver.extraClassPath")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.spark.deploy.history

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

/**
* Command-line parser for the master.
*/
private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]) {
private var logDir: String = null
private var propertiesFile: String = null

parse(args.toList)

Expand All @@ -32,22 +34,34 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
case ("--dir" | "-d") :: value :: tail =>
logDir = value
conf.set("spark.history.fs.logDirectory", value)
System.setProperty("spark.history.fs.logDirectory", value)
parse(tail)

case ("--help" | "-h") :: tail =>
printUsageAndExit(0)

case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case Nil =>

case _ =>
printUsageAndExit(1)
}
}

// This mutates the SparkConf, so all accesses to it must be made after this line
Utils.loadDefaultSparkProperties(conf, propertiesFile)

private def printUsageAndExit(exitCode: Int) {
System.err.println(
"""
|Usage: HistoryServer
|Usage: HistoryServer [options]
|
|Options:
| --properties-file FILE Path to a custom Spark properties file.
| Default is conf/spark-defaults.conf.
|
|Configuration options can be set by setting the corresponding JVM system property.
|History Server options are always available; additional options depend on the provider.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
var host = Utils.localHostName()
var port = 7077
var webUiPort = 8080
var propertiesFile: String = null

// Check for settings in environment variables
if (System.getenv("SPARK_MASTER_HOST") != null) {
Expand All @@ -38,12 +39,16 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_MASTER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_MASTER_WEBUI_PORT").toInt
}

parse(args.toList)

// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

if (conf.contains("spark.master.ui.port")) {
webUiPort = conf.get("spark.master.ui.port").toInt
}

parse(args.toList)

def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Utils.checkHost(value, "ip no longer supported, please use hostname " + value)
Expand All @@ -63,7 +68,11 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)

case ("--help" | "-h") :: tail =>
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case ("--help") :: tail =>
printUsageAndExit(0)

case Nil => {}
Expand All @@ -83,7 +92,9 @@ private[spark] class MasterArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip HOST Hostname to listen on (deprecated, please use --host or -h) \n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: 7077)\n" +
" --webui-port PORT Port for web UI (default: 8080)")
" --webui-port PORT Port for web UI (default: 8080)\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
System.exit(exitCode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
var memory = inferDefaultMemory()
var masters: Array[String] = null
var workDir: String = null
var propertiesFile: String = null

// Check for settings in environment variables
if (System.getenv("SPARK_WORKER_PORT") != null) {
Expand All @@ -47,16 +48,19 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
if (System.getenv("SPARK_WORKER_WEBUI_PORT") != null) {
webUiPort = System.getenv("SPARK_WORKER_WEBUI_PORT").toInt
}
if (conf.contains("spark.worker.ui.port")) {
webUiPort = conf.get("spark.worker.ui.port").toInt
}
if (System.getenv("SPARK_WORKER_DIR") != null) {
workDir = System.getenv("SPARK_WORKER_DIR")
}

parse(args.toList)

checkWorkerMemory()
// This mutates the SparkConf, so all accesses to it must be made after this line
propertiesFile = Utils.loadDefaultSparkProperties(conf, propertiesFile)

if (conf.contains("spark.worker.ui.port")) {
webUiPort = conf.get("spark.worker.ui.port").toInt
}

def parse(args: List[String]): Unit = args match {
case ("--ip" | "-i") :: value :: tail =>
Expand Down Expand Up @@ -89,7 +93,11 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
webUiPort = value
parse(tail)

case ("--help" | "-h") :: tail =>
case ("--properties-file") :: value :: tail =>
propertiesFile = value
parse(tail)

case ("--help") :: tail =>
printUsageAndExit(0)

case value :: tail =>
Expand Down Expand Up @@ -124,7 +132,9 @@ private[spark] class WorkerArguments(args: Array[String], conf: SparkConf) {
" -i HOST, --ip IP Hostname to listen on (deprecated, please use --host or -h)\n" +
" -h HOST, --host HOST Hostname to listen on\n" +
" -p PORT, --port PORT Port to listen on (default: random)\n" +
" --webui-port PORT Port for web UI (default: 8081)")
" --webui-port PORT Port for web UI (default: 8081)\n" +
" --properties-file FILE Path to a custom Spark properties file.\n" +
" Default is conf/spark-defaults.conf.")
System.exit(exitCode)
}

Expand Down
58 changes: 58 additions & 0 deletions core/src/main/scala/org/apache/spark/util/Utils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1410,6 +1410,64 @@ private[spark] object Utils extends Logging {
}
}

/**
* Load default Spark properties from the given file. If no file is provided,
* use the common defaults file. This mutates state in the given SparkConf and
* in this JVM's system properties if the config specified in the file is not
* already set. Return the path of the properties file used.
*/
def loadDefaultSparkProperties(conf: SparkConf, filePath: String = null): String = {
val path = Option(filePath).getOrElse(getDefaultPropertiesFile)
Option(path).foreach { confFile =>
getPropertiesFromFile(confFile).filter { case (k, v) =>
k.startsWith("spark.")
}.foreach { case (k, v) =>
conf.setIfMissing(k, v)
sys.props.getOrElseUpdate(k, v)
}
}
path
}

/** Load properties present in the given file. */
def getPropertiesFromFile(filename: String): Map[String, String] = {
val file = new File(filename)
require(file.exists(), s"Properties file $file does not exist")
require(file.isFile(), s"Properties file $file is not a normal file")

val inReader = new InputStreamReader(new FileInputStream(file), "UTF-8")
try {
val properties = new Properties()
properties.load(inReader)
properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap
} catch {
case e: IOException =>
throw new SparkException(s"Failed when loading Spark properties from $filename", e)
} finally {
inReader.close()
}
}

/** Return the path of the default Spark properties file. */
def getDefaultPropertiesFile(): String = {
val s = File.separator
def getAbsolutePath(filePath: String): String = {
Option(filePath)
.map(t => new File(t))
.filter(_.isFile)
.map(_.getAbsolutePath).orNull
}

val configFile = sys.env.get("SPARK_CONF_DIR")
.map(t => s"$t${s}spark-defaults.conf")
.map(getAbsolutePath).orNull

Option(configFile).getOrElse(sys.env.get("SPARK_HOME")
.map(t => s"${t}${s}conf${s}spark-defaults.conf")
.map(getAbsolutePath)
.orNull)
}

/** Return a nice string representation of the exception, including the stack trace. */
def exceptionString(e: Exception): String = {
if (e == null) "" else exceptionString(getFormattedClassName(e), e.getMessage, e.getStackTrace)
Expand Down
19 changes: 19 additions & 0 deletions core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import com.google.common.base.Charsets
import com.google.common.io.Files
import org.scalatest.FunSuite

import org.apache.spark.SparkConf

class UtilsSuite extends FunSuite {

test("bytesToString") {
Expand Down Expand Up @@ -332,4 +334,21 @@ class UtilsSuite extends FunSuite {
assert(!tempFile2.exists())
}

test("loading properties from file") {
val outFile = File.createTempFile("test-load-spark-properties", "test")
try {
System.setProperty("spark.test.fileNameLoadB", "2")
Files.write("spark.test.fileNameLoadA true\n" +
"spark.test.fileNameLoadB 1\n", outFile, Charsets.UTF_8)
val properties = Utils.getPropertiesFromFile(outFile.getAbsolutePath)
properties
.filter { case (k, v) => k.startsWith("spark.")}
.foreach { case (k, v) => sys.props.getOrElseUpdate(k, v)}
val sparkConf = new SparkConf
assert(sparkConf.getBoolean("spark.test.fileNameLoadA", false) === true)
assert(sparkConf.getInt("spark.test.fileNameLoadB", 1) === 2)
} finally {
outFile.delete()
}
}
}
7 changes: 7 additions & 0 deletions docs/monitoring.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,13 @@ follows:
one implementation, provided by Spark, which looks for application logs stored in the
file system.</td>
</tr>
<tr>
<td>spark.history.fs.logDirectory</td>
<td>(none)</td>
<td>
Directory that contains application event logs to be loaded by the history server
</td>
</tr>
<tr>
<td>spark.history.fs.updateInterval</td>
<td>10</td>
Expand Down

0 comments on commit c45d20c

Please sign in to comment.