Skip to content

Commit

Permalink
Move code around, create object for core config keys.
Browse files Browse the repository at this point in the history
Hopefully makes approach clearer.
  • Loading branch information
Marcelo Vanzin committed Dec 11, 2015
1 parent 93736c2 commit c858fa8
Show file tree
Hide file tree
Showing 16 changed files with 501 additions and 417 deletions.
225 changes: 7 additions & 218 deletions core/src/main/scala/org/apache/spark/SparkConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ import scala.collection.mutable.LinkedHashSet

import org.apache.avro.{SchemaNormalization, Schema}

import org.apache.spark.network.util.{ByteUnit, JavaUtils}
import org.apache.spark.config.{ConfigEntry, OptionalConfigEntry}
import org.apache.spark.network.util.JavaUtils
import org.apache.spark.serializer.KryoSerializer
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -75,12 +76,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}

private[spark] def set[T](entry: ConfEntry[T], value: T): SparkConf = {
private[spark] def set[T](entry: ConfigEntry[T], value: T): SparkConf = {
set(entry.key, entry.stringConverter(value))
this
}

private[spark] def set[T](entry: OptionalConfEntry[T], value: T): SparkConf = {
private[spark] def set[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
set(entry.key, entry._stringConverter(value))
this
}
Expand Down Expand Up @@ -159,14 +160,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
this
}

private[spark] def setIfMissing[T](entry: ConfEntry[T], value: T): SparkConf = {
private[spark] def setIfMissing[T](entry: ConfigEntry[T], value: T): SparkConf = {
if (settings.putIfAbsent(entry.key, entry.stringConverter(value)) == null) {
logDeprecationWarning(entry.key)
}
this
}

private[spark] def setIfMissing[T](entry: OptionalConfEntry[T], value: T): SparkConf = {
private[spark] def setIfMissing[T](entry: OptionalConfigEntry[T], value: T): SparkConf = {
if (settings.putIfAbsent(entry.key, entry._stringConverter(value)) == null) {
logDeprecationWarning(entry.key)
}
Expand Down Expand Up @@ -223,7 +224,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
getOption(key).getOrElse(defaultValue)
}

private[spark] def get[T](entry: ConfEntry[T]): T = {
private[spark] def get[T](entry: ConfigEntry[T]): T = {
entry.readFrom(this)
}

Expand Down Expand Up @@ -723,216 +724,4 @@ private[spark] object SparkConf extends Logging {
version: String,
translation: String => String = null)

/**
* An entry contains all meta information for a configuration.
*
* @param key the key for the configuration
* @param defaultValue the default value for the configuration
* @param valueConverter how to convert a string to the value. It should throw an exception if the
* string does not have the required format.
* @param stringConverter how to convert a value to a string that the user can use it as a valid
* string value. It's usually `toString`. But sometimes, a custom converter
* is necessary. E.g., if T is List[String], `a, b, c` is better than
* `List(a, b, c)`.
* @param doc the document for the configuration
* @param isPublic if this configuration is public to the user. If it's `false`, this
* configuration is only used internally and we should not expose it to the user.
* @tparam T the value type
*/
private[spark] class ConfEntry[T] (
val key: String,
val defaultValue: Option[T],
val valueConverter: String => T,
val stringConverter: T => String,
val doc: String,
val isPublic: Boolean) {

def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("<undefined>")

/**
* Returns a new ConfEntry that wraps the value in an Option, for config entries that do
* not require a value.
*/
def optional: OptionalConfEntry[T] = {
require(!defaultValue.isDefined, s"$this has a default value, cannot be optional.")
new OptionalConfEntry(key, valueConverter, stringConverter, doc, isPublic)
}

def readFrom(conf: SparkConf): T = {
conf.getOption(key).map(valueConverter).orElse(defaultValue).getOrElse(
throw new NoSuchElementException(s"$key is not set."))
}

override def toString: String = {
s"ConfEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
}
}

private[spark] class OptionalConfEntry[T](
override val key: String,
val _valueConverter: String => T,
val _stringConverter: T => String,
override val doc: String,
override val isPublic: Boolean)
extends ConfEntry[Option[T]](key, None, s => Some(_valueConverter(s)),
null, doc, isPublic) {

override def readFrom(conf: SparkConf): Option[T] = {
conf.getOption(key).map(_valueConverter)
}

}

private class FallbackConfEntry[T] (
override val key: String,
override val doc: String,
override val isPublic: Boolean,
private val fallback: ConfEntry[T])
extends ConfEntry[T](key, None, fallback.valueConverter, fallback.stringConverter,
doc, isPublic) {

override def defaultValueString: String = {
defaultValue.map(stringConverter).getOrElse(s"<value of ${fallback.key}>")
}

override def readFrom(conf: SparkConf): T = {
conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf))
}
}

def intConf(
key: String,
defaultValue: Option[Int] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[Int] =
new ConfEntry(key, defaultValue, { v =>
try {
v.toInt
} catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"$key should be int, but was $v")
}
}, _.toString, doc, isPublic)

def longConf(
key: String,
defaultValue: Option[Long] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[Long] =
new ConfEntry(key, defaultValue, { v =>
try {
v.toLong
} catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"$key should be long, but was $v")
}
}, _.toString, doc, isPublic)

def doubleConf(
key: String,
defaultValue: Option[Double] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[Double] =
new ConfEntry(key, defaultValue, { v =>
try {
v.toDouble
} catch {
case _: NumberFormatException =>
throw new IllegalArgumentException(s"$key should be double, but was $v")
}
}, _.toString, doc, isPublic)

def booleanConf(
key: String,
defaultValue: Option[Boolean] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[Boolean] =
new ConfEntry(key, defaultValue, { v =>
try {
v.toBoolean
} catch {
case _: IllegalArgumentException =>
throw new IllegalArgumentException(s"$key should be boolean, but was $v")
}
}, _.toString, doc, isPublic)

def stringConf(
key: String,
defaultValue: Option[String] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[String] =
new ConfEntry(key, defaultValue, v => v, v => v, doc, isPublic)

def enumConf[T](
key: String,
valueConverter: String => T,
validValues: Set[T],
defaultValue: Option[T] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[T] =
new ConfEntry(key, defaultValue, v => {
val _v = valueConverter(v)
if (!validValues.contains(_v)) {
throw new IllegalArgumentException(
s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v")
}
_v
}, _.toString, doc, isPublic)

def seqConf[T](
key: String,
valueConverter: String => T,
defaultValue: Option[Seq[T]] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[Seq[T]] = {
def stringToSeq(str: String): Seq[T] = {
str.split(",").map(_.trim()).filter(_.nonEmpty).map(valueConverter)
}

new ConfEntry(
key, defaultValue, stringToSeq, _.mkString(","), doc, isPublic)
}

def stringSeqConf(
key: String,
defaultValue: Option[Seq[String]] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[Seq[String]] = {
seqConf(key, s => s, defaultValue, doc, isPublic)
}

def timeConf(
key: String,
unit: TimeUnit,
defaultValue: Option[String] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[Long] = {
def timeFromString(str: String): Long = JavaUtils.timeStringAs(str, unit)
def timeToString(v: Long): String = TimeUnit.MILLISECONDS.convert(v, unit) + "ms"

new ConfEntry(
key, defaultValue.map(timeFromString), timeFromString, timeToString, doc, isPublic)
}

def bytesConf(
key: String,
unit: ByteUnit,
defaultValue: Option[String] = None,
doc: String = "",
isPublic: Boolean = true): ConfEntry[Long] = {
def byteFromString(str: String): Long = JavaUtils.byteStringAs(str, unit)
def byteToString(v: Long): String = unit.convertTo(v, ByteUnit.BYTE) + "b"

new ConfEntry(
key, defaultValue.map(byteFromString), byteFromString, byteToString, doc, isPublic)
}

def fallbackConf[T](
key: String,
fallback: ConfEntry[T],
doc: String = "",
isPublic: Boolean = true): ConfEntry[T] = {
new FallbackConfEntry(key, doc, isPublic, fallback)
}

}
Loading

0 comments on commit c858fa8

Please sign in to comment.