From c858fa8e51691021018e628255725e7b930f7666 Mon Sep 17 00:00:00 2001 From: Marcelo Vanzin Date: Fri, 11 Dec 2015 09:39:29 -0800 Subject: [PATCH] Move code around, create object for core config keys. Hopefully makes approach clearer. --- .../scala/org/apache/spark/SparkConf.scala | 225 +---------------- .../org/apache/spark/config/ConfigEntry.scala | 239 ++++++++++++++++++ .../apache/spark/config/CoreConfigKeys.scala | 72 ++++++ .../org/apache/spark/SparkConfSuite.scala | 107 -------- .../spark/config/ConfigEntrySuite.scala | 134 ++++++++++ .../scala/org/apache/spark/sql/SQLConf.scala | 55 ++-- .../org/apache/spark/sql/SQLContext.scala | 13 +- .../apache/spark/sql/hive/HiveContext.scala | 4 +- .../yarn/AMDelegationTokenRenewer.scala | 1 + .../spark/deploy/yarn/ApplicationMaster.scala | 1 + .../org/apache/spark/deploy/yarn/Client.scala | 1 + .../spark/deploy/yarn/ClientArguments.scala | 1 + .../spark/deploy/yarn/ExecutorRunnable.scala | 1 + ...yPreferredContainerPlacementStrategy.scala | 3 +- .../spark/deploy/yarn/YarnConfigKeys.scala | 52 +--- .../deploy/yarn/YarnSparkHadoopUtil.scala | 9 +- 16 files changed, 501 insertions(+), 417 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/config/ConfigEntry.scala create mode 100644 core/src/main/scala/org/apache/spark/config/CoreConfigKeys.scala create mode 100644 core/src/test/scala/org/apache/spark/config/ConfigEntrySuite.scala diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index 171e34812eae2..74ee86de42381 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -24,7 +24,8 @@ import scala.collection.mutable.LinkedHashSet import org.apache.avro.{SchemaNormalization, Schema} -import org.apache.spark.network.util.{ByteUnit, JavaUtils} +import org.apache.spark.config.{ConfigEntry, OptionalConfigEntry} +import org.apache.spark.network.util.JavaUtils import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.Utils @@ -75,12 +76,12 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } - private[spark] def set[T](entry: ConfEntry[T], value: T): SparkConf = { + private[spark] def set[T](entry: ConfigEntry[T], value: T): SparkConf = { set(entry.key, entry.stringConverter(value)) this } - private[spark] def set[T](entry: OptionalConfEntry[T], value: T): SparkConf = { + private[spark] def set[T](entry: OptionalConfigEntry[T], value: T): SparkConf = { set(entry.key, entry._stringConverter(value)) this } @@ -159,14 +160,14 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { this } - private[spark] def setIfMissing[T](entry: ConfEntry[T], value: T): SparkConf = { + private[spark] def setIfMissing[T](entry: ConfigEntry[T], value: T): SparkConf = { if (settings.putIfAbsent(entry.key, entry.stringConverter(value)) == null) { logDeprecationWarning(entry.key) } this } - private[spark] def setIfMissing[T](entry: OptionalConfEntry[T], value: T): SparkConf = { + private[spark] def setIfMissing[T](entry: OptionalConfigEntry[T], value: T): SparkConf = { if (settings.putIfAbsent(entry.key, entry._stringConverter(value)) == null) { logDeprecationWarning(entry.key) } @@ -223,7 +224,7 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { getOption(key).getOrElse(defaultValue) } - private[spark] def get[T](entry: ConfEntry[T]): T = { + private[spark] def get[T](entry: ConfigEntry[T]): T = { entry.readFrom(this) } @@ -723,216 +724,4 @@ private[spark] object SparkConf extends Logging { version: String, translation: String => String = null) - /** - * An entry contains all meta information for a configuration. - * - * @param key the key for the configuration - * @param defaultValue the default value for the configuration - * @param valueConverter how to convert a string to the value. It should throw an exception if the - * string does not have the required format. - * @param stringConverter how to convert a value to a string that the user can use it as a valid - * string value. It's usually `toString`. But sometimes, a custom converter - * is necessary. E.g., if T is List[String], `a, b, c` is better than - * `List(a, b, c)`. - * @param doc the document for the configuration - * @param isPublic if this configuration is public to the user. If it's `false`, this - * configuration is only used internally and we should not expose it to the user. - * @tparam T the value type - */ - private[spark] class ConfEntry[T] ( - val key: String, - val defaultValue: Option[T], - val valueConverter: String => T, - val stringConverter: T => String, - val doc: String, - val isPublic: Boolean) { - - def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("") - - /** - * Returns a new ConfEntry that wraps the value in an Option, for config entries that do - * not require a value. - */ - def optional: OptionalConfEntry[T] = { - require(!defaultValue.isDefined, s"$this has a default value, cannot be optional.") - new OptionalConfEntry(key, valueConverter, stringConverter, doc, isPublic) - } - - def readFrom(conf: SparkConf): T = { - conf.getOption(key).map(valueConverter).orElse(defaultValue).getOrElse( - throw new NoSuchElementException(s"$key is not set.")) - } - - override def toString: String = { - s"ConfEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)" - } - } - - private[spark] class OptionalConfEntry[T]( - override val key: String, - val _valueConverter: String => T, - val _stringConverter: T => String, - override val doc: String, - override val isPublic: Boolean) - extends ConfEntry[Option[T]](key, None, s => Some(_valueConverter(s)), - null, doc, isPublic) { - - override def readFrom(conf: SparkConf): Option[T] = { - conf.getOption(key).map(_valueConverter) - } - - } - - private class FallbackConfEntry[T] ( - override val key: String, - override val doc: String, - override val isPublic: Boolean, - private val fallback: ConfEntry[T]) - extends ConfEntry[T](key, None, fallback.valueConverter, fallback.stringConverter, - doc, isPublic) { - - override def defaultValueString: String = { - defaultValue.map(stringConverter).getOrElse(s"") - } - - override def readFrom(conf: SparkConf): T = { - conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf)) - } - } - - def intConf( - key: String, - defaultValue: Option[Int] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[Int] = - new ConfEntry(key, defaultValue, { v => - try { - v.toInt - } catch { - case _: NumberFormatException => - throw new IllegalArgumentException(s"$key should be int, but was $v") - } - }, _.toString, doc, isPublic) - - def longConf( - key: String, - defaultValue: Option[Long] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[Long] = - new ConfEntry(key, defaultValue, { v => - try { - v.toLong - } catch { - case _: NumberFormatException => - throw new IllegalArgumentException(s"$key should be long, but was $v") - } - }, _.toString, doc, isPublic) - - def doubleConf( - key: String, - defaultValue: Option[Double] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[Double] = - new ConfEntry(key, defaultValue, { v => - try { - v.toDouble - } catch { - case _: NumberFormatException => - throw new IllegalArgumentException(s"$key should be double, but was $v") - } - }, _.toString, doc, isPublic) - - def booleanConf( - key: String, - defaultValue: Option[Boolean] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[Boolean] = - new ConfEntry(key, defaultValue, { v => - try { - v.toBoolean - } catch { - case _: IllegalArgumentException => - throw new IllegalArgumentException(s"$key should be boolean, but was $v") - } - }, _.toString, doc, isPublic) - - def stringConf( - key: String, - defaultValue: Option[String] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[String] = - new ConfEntry(key, defaultValue, v => v, v => v, doc, isPublic) - - def enumConf[T]( - key: String, - valueConverter: String => T, - validValues: Set[T], - defaultValue: Option[T] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[T] = - new ConfEntry(key, defaultValue, v => { - val _v = valueConverter(v) - if (!validValues.contains(_v)) { - throw new IllegalArgumentException( - s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v") - } - _v - }, _.toString, doc, isPublic) - - def seqConf[T]( - key: String, - valueConverter: String => T, - defaultValue: Option[Seq[T]] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[Seq[T]] = { - def stringToSeq(str: String): Seq[T] = { - str.split(",").map(_.trim()).filter(_.nonEmpty).map(valueConverter) - } - - new ConfEntry( - key, defaultValue, stringToSeq, _.mkString(","), doc, isPublic) - } - - def stringSeqConf( - key: String, - defaultValue: Option[Seq[String]] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[Seq[String]] = { - seqConf(key, s => s, defaultValue, doc, isPublic) - } - - def timeConf( - key: String, - unit: TimeUnit, - defaultValue: Option[String] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[Long] = { - def timeFromString(str: String): Long = JavaUtils.timeStringAs(str, unit) - def timeToString(v: Long): String = TimeUnit.MILLISECONDS.convert(v, unit) + "ms" - - new ConfEntry( - key, defaultValue.map(timeFromString), timeFromString, timeToString, doc, isPublic) - } - - def bytesConf( - key: String, - unit: ByteUnit, - defaultValue: Option[String] = None, - doc: String = "", - isPublic: Boolean = true): ConfEntry[Long] = { - def byteFromString(str: String): Long = JavaUtils.byteStringAs(str, unit) - def byteToString(v: Long): String = unit.convertTo(v, ByteUnit.BYTE) + "b" - - new ConfEntry( - key, defaultValue.map(byteFromString), byteFromString, byteToString, doc, isPublic) - } - - def fallbackConf[T]( - key: String, - fallback: ConfEntry[T], - doc: String = "", - isPublic: Boolean = true): ConfEntry[T] = { - new FallbackConfEntry(key, doc, isPublic, fallback) - } - } diff --git a/core/src/main/scala/org/apache/spark/config/ConfigEntry.scala b/core/src/main/scala/org/apache/spark/config/ConfigEntry.scala new file mode 100644 index 0000000000000..6c2704f451201 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/config/ConfigEntry.scala @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.config + +import java.util.concurrent.TimeUnit + +import org.apache.spark.SparkConf +import org.apache.spark.network.util.{ByteUnit, JavaUtils} + +/** + * An entry contains all meta information for a configuration. + * + * @param key the key for the configuration + * @param defaultValue the default value for the configuration + * @param valueConverter how to convert a string to the value. It should throw an exception if the + * string does not have the required format. + * @param stringConverter how to convert a value to a string that the user can use it as a valid + * string value. It's usually `toString`. But sometimes, a custom converter + * is necessary. E.g., if T is List[String], `a, b, c` is better than + * `List(a, b, c)`. + * @param doc the document for the configuration + * @param isPublic if this configuration is public to the user. If it's `false`, this + * configuration is only used internally and we should not expose it to the user. + * @tparam T the value type + */ +private[spark] class ConfigEntry[T] ( + val key: String, + val defaultValue: Option[T], + val valueConverter: String => T, + val stringConverter: T => String, + val doc: String, + val isPublic: Boolean) { + + def defaultValueString: String = defaultValue.map(stringConverter).getOrElse("") + + /** + * Returns a new ConfigEntry that wraps the value in an Option, for config entries that do + * not require a value. + */ + def optional: OptionalConfigEntry[T] = { + require(!defaultValue.isDefined, s"$this has a default value, cannot be optional.") + new OptionalConfigEntry(key, valueConverter, stringConverter, doc, isPublic) + } + + def readFrom(conf: SparkConf): T = { + conf.getOption(key).map(valueConverter).orElse(defaultValue).getOrElse( + throw new NoSuchElementException(s"$key is not set.")) + } + + override def toString: String = { + s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)" + } +} + +private[spark] class OptionalConfigEntry[T]( + override val key: String, + val _valueConverter: String => T, + val _stringConverter: T => String, + override val doc: String, + override val isPublic: Boolean) + extends ConfigEntry[Option[T]](key, None, s => Some(_valueConverter(s)), + null, doc, isPublic) { + + override def readFrom(conf: SparkConf): Option[T] = { + conf.getOption(key).map(_valueConverter) + } + +} + +private class FallbackConfigEntry[T] ( + override val key: String, + override val doc: String, + override val isPublic: Boolean, + private val fallback: ConfigEntry[T]) + extends ConfigEntry[T](key, None, fallback.valueConverter, fallback.stringConverter, + doc, isPublic) { + + override def defaultValueString: String = { + defaultValue.map(stringConverter).getOrElse(s"") + } + + override def readFrom(conf: SparkConf): T = { + conf.getOption(key).map(valueConverter).getOrElse(fallback.readFrom(conf)) + } +} + +private[spark] object ConfigEntry { + + def intConf( + key: String, + defaultValue: Option[Int] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[Int] = + new ConfigEntry(key, defaultValue, { v => + try { + v.toInt + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be int, but was $v") + } + }, _.toString, doc, isPublic) + + def longConf( + key: String, + defaultValue: Option[Long] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[Long] = + new ConfigEntry(key, defaultValue, { v => + try { + v.toLong + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be long, but was $v") + } + }, _.toString, doc, isPublic) + + def doubleConf( + key: String, + defaultValue: Option[Double] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[Double] = + new ConfigEntry(key, defaultValue, { v => + try { + v.toDouble + } catch { + case _: NumberFormatException => + throw new IllegalArgumentException(s"$key should be double, but was $v") + } + }, _.toString, doc, isPublic) + + def booleanConf( + key: String, + defaultValue: Option[Boolean] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[Boolean] = + new ConfigEntry(key, defaultValue, { v => + try { + v.toBoolean + } catch { + case _: IllegalArgumentException => + throw new IllegalArgumentException(s"$key should be boolean, but was $v") + } + }, _.toString, doc, isPublic) + + def stringConf( + key: String, + defaultValue: Option[String] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[String] = + new ConfigEntry(key, defaultValue, v => v, v => v, doc, isPublic) + + def enumConf[T]( + key: String, + valueConverter: String => T, + validValues: Set[T], + defaultValue: Option[T] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[T] = + new ConfigEntry(key, defaultValue, v => { + val _v = valueConverter(v) + if (!validValues.contains(_v)) { + throw new IllegalArgumentException( + s"The value of $key should be one of ${validValues.mkString(", ")}, but was $v") + } + _v + }, _.toString, doc, isPublic) + + def seqConf[T]( + key: String, + valueConverter: String => T, + defaultValue: Option[Seq[T]] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[Seq[T]] = { + def stringToSeq(str: String): Seq[T] = { + str.split(",").map(_.trim()).filter(_.nonEmpty).map(valueConverter) + } + + new ConfigEntry( + key, defaultValue, stringToSeq, _.mkString(","), doc, isPublic) + } + + def stringSeqConf( + key: String, + defaultValue: Option[Seq[String]] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[Seq[String]] = { + seqConf(key, s => s, defaultValue, doc, isPublic) + } + + def timeConf( + key: String, + unit: TimeUnit, + defaultValue: Option[String] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[Long] = { + def timeFromString(str: String): Long = JavaUtils.timeStringAs(str, unit) + def timeToString(v: Long): String = TimeUnit.MILLISECONDS.convert(v, unit) + "ms" + + new ConfigEntry( + key, defaultValue.map(timeFromString), timeFromString, timeToString, doc, isPublic) + } + + def bytesConf( + key: String, + unit: ByteUnit, + defaultValue: Option[String] = None, + doc: String = "", + isPublic: Boolean = true): ConfigEntry[Long] = { + def byteFromString(str: String): Long = JavaUtils.byteStringAs(str, unit) + def byteToString(v: Long): String = unit.convertTo(v, ByteUnit.BYTE) + "b" + + new ConfigEntry( + key, defaultValue.map(byteFromString), byteFromString, byteToString, doc, isPublic) + } + + def fallbackConf[T]( + key: String, + fallback: ConfigEntry[T], + doc: String = "", + isPublic: Boolean = true): ConfigEntry[T] = { + new FallbackConfigEntry(key, doc, isPublic, fallback) + } + +} diff --git a/core/src/main/scala/org/apache/spark/config/CoreConfigKeys.scala b/core/src/main/scala/org/apache/spark/config/CoreConfigKeys.scala new file mode 100644 index 0000000000000..8bf64f779c2ab --- /dev/null +++ b/core/src/main/scala/org/apache/spark/config/CoreConfigKeys.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.config + +import org.apache.spark.launcher.SparkLauncher + +private[spark] object CoreConfigKeys { + import ConfigEntry._ + + val DRIVER_CLASS_PATH = stringConf(SparkLauncher.DRIVER_EXTRA_CLASSPATH).optional + + val DRIVER_JAVA_OPTIONS = stringConf(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).optional + + val DRIVER_LIBRARY_PATH = stringConf(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).optional + + val DRIVER_USER_CLASS_PATH_FIRST = booleanConf("spark.driver.userClassPathFirst", + defaultValue = Some(false)) + + val EXECUTOR_CLASS_PATH = stringConf(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).optional + + val EXECUTOR_JAVA_OPTIONS = stringConf(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).optional + + val EXECUTOR_LIBRARY_PATH = stringConf(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).optional + + val EXECUTOR_USER_CLASS_PATH_FIRST = booleanConf("spark.executor.userClassPathFirst", + defaultValue = Some(false)) + + val IS_PYTHON_APP = booleanConf("spark.yarn.isPython", + defaultValue = Some(false), + isPublic = false) + + val CPUS_PER_TASK = intConf("spark.task.cpus", + defaultValue = Some(1)) + + val DYN_ALLOCATION_MIN_EXECUTORS = intConf("spark.dynamicAllocation.minExecutors", + defaultValue = Some(0)) + + val DYN_ALLOCATION_INITIAL_EXECUTORS = fallbackConf("spark.dynamicAllocation.initialExecutors", + fallback = DYN_ALLOCATION_MIN_EXECUTORS) + + val DYN_ALLOCATION_MAX_EXECUTORS = intConf("spark.dynamicAllocation.maxExecutors", + defaultValue = Some(Int.MaxValue)) + + val SHUFFLE_SERVICE_ENABLED = booleanConf("spark.shuffle.service.enabled", + defaultValue = Some(false)) + + val KEYTAB = stringConf("spark.yarn.keytab", + doc = "Location of user's keytab.") + .optional + + val PRINCIPAL = stringConf("spark.yarn.principal", + doc = "Name of the Kerberos principal.") + .optional + + val EXECUTOR_INSTANCES = intConf("spark.executor.instances").optional + +} diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index d83b390a14907..6756fb627ed79 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -267,113 +267,6 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst assert(RpcUtils.lookupRpcTimeout(conf).duration === (4 seconds)) } - test("conf entry: int") { - val conf = new SparkConf() - val iConf = SparkConf.intConf("spark.int", defaultValue = Some(1)) - assert(conf.get(iConf) === 1) - conf.set(iConf, 2) - assert(conf.get(iConf) === 2) - } - - test("conf entry: long") { - val conf = new SparkConf() - val lConf = SparkConf.longConf("spark.long") - conf.set(lConf, 1234L) - assert(conf.get(lConf) === 1234L) - } - - test("conf entry: double") { - val conf = new SparkConf() - val dConf = SparkConf.doubleConf("spark.double") - conf.set(dConf, 20.0) - assert(conf.get(dConf) === 20.0) - } - - test("conf entry: boolean") { - val conf = new SparkConf() - val bConf = SparkConf.booleanConf("spark.boolean", defaultValue = Some(false)) - assert(!conf.get(bConf)) - conf.set(bConf, true) - assert(conf.get(bConf)) - } - - test("conf entry: required") { - val conf = new SparkConf() - val requiredConf = SparkConf.longConf("spark.noDefault") - intercept[NoSuchElementException] { - conf.get(requiredConf) - } - } - - test("conf entry: optional") { - val conf = new SparkConf() - val optionalConf = SparkConf.intConf("spark.optional").optional - assert(conf.get(optionalConf) === None) - conf.set(optionalConf, 1) - assert(conf.get(optionalConf) === Some(1)) - } - - test("conf entry: fallback") { - val conf = new SparkConf() - val parentConf = SparkConf.intConf("spark.int", defaultValue = Some(1)) - val confWithFallback = SparkConf.fallbackConf("spark.fallback", parentConf) - assert(conf.get(confWithFallback) === 1) - conf.set(confWithFallback, 2) - assert(conf.get(parentConf) === 1) - assert(conf.get(confWithFallback) === 2) - } - - test("conf entry: time") { - val conf = new SparkConf() - val time = SparkConf.timeConf("spark.time", TimeUnit.SECONDS, defaultValue = Some("1h")) - assert(conf.get(time) === 3600L) - conf.set(time.key, "1m") - assert(conf.get(time) === 60L) - } - - test("conf entry: bytes") { - val conf = new SparkConf() - val bytes = SparkConf.bytesConf("spark.bytes", ByteUnit.KiB, defaultValue = Some("1m")) - assert(conf.get(bytes) === 1024L) - conf.set(bytes.key, "1k") - assert(conf.get(bytes) === 1L) - } - - test("conf entry: string seq") { - val conf = new SparkConf() - val seq = SparkConf.stringSeqConf("spark.seq") - conf.set(seq.key, "1,,2, 3 , , 4") - assert(conf.get(seq) === Seq("1", "2", "3", "4")) - conf.set(seq, Seq("1", "2")) - assert(conf.get(seq) === Seq("1", "2")) - } - - test("conf entry: enum") { - val conf = new SparkConf() - val enum = SparkConf.enumConf("spark.enum", v => v, Set("a", "b", "c"), - defaultValue = Some("a")) - assert(conf.get(enum) === "a") - - conf.set(enum, "b") - assert(conf.get(enum) === "b") - - conf.set(enum, "d") - val enumError = intercept[IllegalArgumentException] { - conf.get(enum) - } - assert(enumError.getMessage === s"The value of ${enum.key} should be one of a, b, c, but was d") - } - - test("conf entry: conversion error") { - val conf = new SparkConf() - val conversionTest = SparkConf.doubleConf("spark.conversionTest") - conf.set(conversionTest.key, "abc") - val conversionError = intercept[IllegalArgumentException] { - conf.get(conversionTest) - } - assert(conversionError.getMessage === s"${conversionTest.key} should be double, but was abc") - } - } class Class1 {} diff --git a/core/src/test/scala/org/apache/spark/config/ConfigEntrySuite.scala b/core/src/test/scala/org/apache/spark/config/ConfigEntrySuite.scala new file mode 100644 index 0000000000000..e1b2336d46960 --- /dev/null +++ b/core/src/test/scala/org/apache/spark/config/ConfigEntrySuite.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.config + +import java.util.concurrent.TimeUnit + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.network.util.ByteUnit + +class ConfigEntrySuite extends SparkFunSuite { + + test("conf entry: int") { + val conf = new SparkConf() + val iConf = ConfigEntry.intConf("spark.int", defaultValue = Some(1)) + assert(conf.get(iConf) === 1) + conf.set(iConf, 2) + assert(conf.get(iConf) === 2) + } + + test("conf entry: long") { + val conf = new SparkConf() + val lConf = ConfigEntry.longConf("spark.long") + conf.set(lConf, 1234L) + assert(conf.get(lConf) === 1234L) + } + + test("conf entry: double") { + val conf = new SparkConf() + val dConf = ConfigEntry.doubleConf("spark.double") + conf.set(dConf, 20.0) + assert(conf.get(dConf) === 20.0) + } + + test("conf entry: boolean") { + val conf = new SparkConf() + val bConf = ConfigEntry.booleanConf("spark.boolean", defaultValue = Some(false)) + assert(!conf.get(bConf)) + conf.set(bConf, true) + assert(conf.get(bConf)) + } + + test("conf entry: required") { + val conf = new SparkConf() + val requiredConf = ConfigEntry.longConf("spark.noDefault") + intercept[NoSuchElementException] { + conf.get(requiredConf) + } + } + + test("conf entry: optional") { + val conf = new SparkConf() + val optionalConf = ConfigEntry.intConf("spark.optional").optional + assert(conf.get(optionalConf) === None) + conf.set(optionalConf, 1) + assert(conf.get(optionalConf) === Some(1)) + } + + test("conf entry: fallback") { + val conf = new SparkConf() + val parentConf = ConfigEntry.intConf("spark.int", defaultValue = Some(1)) + val confWithFallback = ConfigEntry.fallbackConf("spark.fallback", parentConf) + assert(conf.get(confWithFallback) === 1) + conf.set(confWithFallback, 2) + assert(conf.get(parentConf) === 1) + assert(conf.get(confWithFallback) === 2) + } + + test("conf entry: time") { + val conf = new SparkConf() + val time = ConfigEntry.timeConf("spark.time", TimeUnit.SECONDS, defaultValue = Some("1h")) + assert(conf.get(time) === 3600L) + conf.set(time.key, "1m") + assert(conf.get(time) === 60L) + } + + test("conf entry: bytes") { + val conf = new SparkConf() + val bytes = ConfigEntry.bytesConf("spark.bytes", ByteUnit.KiB, defaultValue = Some("1m")) + assert(conf.get(bytes) === 1024L) + conf.set(bytes.key, "1k") + assert(conf.get(bytes) === 1L) + } + + test("conf entry: string seq") { + val conf = new SparkConf() + val seq = ConfigEntry.stringSeqConf("spark.seq") + conf.set(seq.key, "1,,2, 3 , , 4") + assert(conf.get(seq) === Seq("1", "2", "3", "4")) + conf.set(seq, Seq("1", "2")) + assert(conf.get(seq) === Seq("1", "2")) + } + + test("conf entry: enum") { + val conf = new SparkConf() + val enum = ConfigEntry.enumConf("spark.enum", v => v, Set("a", "b", "c"), + defaultValue = Some("a")) + assert(conf.get(enum) === "a") + + conf.set(enum, "b") + assert(conf.get(enum) === "b") + + conf.set(enum, "d") + val enumError = intercept[IllegalArgumentException] { + conf.get(enum) + } + assert(enumError.getMessage === s"The value of ${enum.key} should be one of a, b, c, but was d") + } + + test("conf entry: conversion error") { + val conf = new SparkConf() + val conversionTest = ConfigEntry.doubleConf("spark.conversionTest") + conf.set(conversionTest.key, "abc") + val conversionError = intercept[IllegalArgumentException] { + conf.get(conversionTest) + } + assert(conversionError.getMessage === s"${conversionTest.key} should be double, but was abc") + } + +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index a3f8aacb935a5..f3ac32919c40d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConverters._ import org.apache.parquet.hadoop.ParquetOutputCommitter import org.apache.spark.SparkConf -import org.apache.spark.SparkConf.ConfEntry +import org.apache.spark.config.ConfigEntry import org.apache.spark.sql.catalyst.CatalystConf //////////////////////////////////////////////////////////////////////////////////////////////////// @@ -36,11 +36,11 @@ import org.apache.spark.sql.catalyst.CatalystConf private[spark] object SQLConf { private val sqlConfEntries = java.util.Collections.synchronizedMap( - new java.util.HashMap[String, ConfEntry[_]]()) + new java.util.HashMap[String, ConfigEntry[_]]()) - private def register[T](entry: ConfEntry[T]): ConfEntry[T] = sqlConfEntries.synchronized { + private def register[T](entry: ConfigEntry[T]): ConfigEntry[T] = sqlConfEntries.synchronized { require(!sqlConfEntries.containsKey(entry.key), - s"Duplicate SQLConfEntry. ${entry.key} has been registered") + s"Duplicate SQLConfigEntry. ${entry.key} has been registered") sqlConfEntries.put(entry.key, entry) entry } @@ -49,29 +49,30 @@ private[spark] object SQLConf { key: String, defaultValue: Option[Int] = None, doc: String = "", - isPublic: Boolean = true): ConfEntry[Int] = - register(SparkConf.intConf(key, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) + isPublic: Boolean = true): ConfigEntry[Int] = + register(ConfigEntry.intConf(key, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) private[sql] def longConf( key: String, defaultValue: Option[Long] = None, doc: String = "", - isPublic: Boolean = true): ConfEntry[Long] = - register(SparkConf.longConf(key, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) + isPublic: Boolean = true): ConfigEntry[Long] = + register(ConfigEntry.longConf(key, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) private[sql] def doubleConf( key: String, defaultValue: Option[Double] = None, doc: String = "", - isPublic: Boolean = true): ConfEntry[Double] = - register(SparkConf.doubleConf(key, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) + isPublic: Boolean = true): ConfigEntry[Double] = + register(ConfigEntry.doubleConf(key, defaultValue = defaultValue, doc = doc, + isPublic = isPublic)) private[sql] def booleanConf( key: String, defaultValue: Option[Boolean] = None, doc: String = "", - isPublic: Boolean = true): ConfEntry[Boolean] = { - register(SparkConf.booleanConf(key, defaultValue = defaultValue, doc = doc, + isPublic: Boolean = true): ConfigEntry[Boolean] = { + register(ConfigEntry.booleanConf(key, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) } @@ -79,8 +80,9 @@ private[spark] object SQLConf { key: String, defaultValue: Option[String] = None, doc: String = "", - isPublic: Boolean = true): ConfEntry[String] = - register(SparkConf.stringConf(key, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) + isPublic: Boolean = true): ConfigEntry[String] = + register(ConfigEntry.stringConf(key, defaultValue = defaultValue, doc = doc, + isPublic = isPublic)) private[sql] def enumConf[T]( key: String, @@ -88,8 +90,8 @@ private[spark] object SQLConf { validValues: Set[T], defaultValue: Option[T] = None, doc: String = "", - isPublic: Boolean = true): ConfEntry[T] = { - register(SparkConf.enumConf(key, valueConverter, validValues, defaultValue = defaultValue, + isPublic: Boolean = true): ConfigEntry[T] = { + register(ConfigEntry.enumConf(key, valueConverter, validValues, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) } @@ -98,8 +100,8 @@ private[spark] object SQLConf { valueConverter: String => T, defaultValue: Option[Seq[T]] = None, doc: String = "", - isPublic: Boolean = true): ConfEntry[Seq[T]] = { - register(SparkConf.seqConf(key, valueConverter, defaultValue = defaultValue, doc = doc, + isPublic: Boolean = true): ConfigEntry[Seq[T]] = { + register(ConfigEntry.seqConf(key, valueConverter, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) } @@ -107,8 +109,8 @@ private[spark] object SQLConf { key: String, defaultValue: Option[Seq[String]] = None, doc: String = "", - isPublic: Boolean = true): ConfEntry[Seq[String]] = { - register(SparkConf.stringSeqConf(key, defaultValue = defaultValue, doc = doc, + isPublic: Boolean = true): ConfigEntry[Seq[String]] = { + register(ConfigEntry.stringSeqConf(key, defaultValue = defaultValue, doc = doc, isPublic = isPublic)) } @@ -408,7 +410,6 @@ private[spark] object SQLConf { * SQLConf is thread-safe (internally synchronized, so safe to be used in multiple threads). */ private[sql] class SQLConf extends Serializable with CatalystConf { - import SparkConf._ import SQLConf._ /** Only low degree of contention is expected for conf, thus NOT using ConcurrentHashMap. */ @@ -529,7 +530,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf { } /** Set the given Spark SQL configuration property. */ - def setConf[T](entry: ConfEntry[T], value: T): Unit = { + def setConf[T](entry: ConfigEntry[T], value: T): Unit = { require(entry != null, "entry cannot be null") require(value != null, s"value cannot be null for key: ${entry.key}") require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") @@ -548,19 +549,19 @@ private[sql] class SQLConf extends Serializable with CatalystConf { /** * Return the value of Spark SQL configuration property for the given key. If the key is not set - * yet, return `defaultValue`. This is useful when `defaultValue` in ConfEntry is not the + * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the * desired one. */ - def getConf[T](entry: ConfEntry[T], defaultValue: T): T = { + def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = { require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") Option(settings.get(entry.key)).map(entry.valueConverter).getOrElse(defaultValue) } /** * Return the value of Spark SQL configuration property for the given key. If the key is not set - * yet, return `defaultValue` in [[ConfEntry]]. + * yet, return `defaultValue` in [[ConfigEntry]]. */ - def getConf[T](entry: ConfEntry[T]): T = { + def getConf[T](entry: ConfigEntry[T]): T = { require(sqlConfEntries.get(entry.key) == entry, s"$entry is not registered") Option(settings.get(entry.key)).map(entry.valueConverter).orElse(entry.defaultValue). getOrElse(throw new NoSuchElementException(entry.key)) @@ -600,7 +601,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf { settings.remove(key) } - private[spark] def unsetConf(entry: ConfEntry[_]): Unit = { + private[spark] def unsetConf(entry: ConfigEntry[_]): Unit = { settings.remove(entry.key) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index d384c2c408181..62e73e847447a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -28,6 +28,7 @@ import scala.util.control.NonFatal import org.apache.spark.annotation.{DeveloperApi, Experimental} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} +import org.apache.spark.config.ConfigEntry import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd} import org.apache.spark.sql.catalyst.analysis._ @@ -72,8 +73,6 @@ class SQLContext private[sql]( self => - import SparkConf._ - def this(sparkContext: SparkContext) = { this(sparkContext, new CacheManager, SQLContext.createListenerAndUI(sparkContext), true) } @@ -129,7 +128,7 @@ class SQLContext private[sql]( def setConf(props: Properties): Unit = conf.setConf(props) /** Set the given Spark SQL configuration property. */ - private[sql] def setConf[T](entry: ConfEntry[T], value: T): Unit = conf.setConf(entry, value) + private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = conf.setConf(entry, value) /** * Set the given Spark SQL configuration property. @@ -149,16 +148,16 @@ class SQLContext private[sql]( /** * Return the value of Spark SQL configuration property for the given key. If the key is not set - * yet, return `defaultValue` in [[ConfEntry]]. + * yet, return `defaultValue` in [[ConfigEntry]]. */ - private[sql] def getConf[T](entry: ConfEntry[T]): T = conf.getConf(entry) + private[sql] def getConf[T](entry: ConfigEntry[T]): T = conf.getConf(entry) /** * Return the value of Spark SQL configuration property for the given key. If the key is not set - * yet, return `defaultValue`. This is useful when `defaultValue` in ConfEntry is not the + * yet, return `defaultValue`. This is useful when `defaultValue` in ConfigEntry is not the * desired one. */ - private[sql] def getConf[T](entry: ConfEntry[T], defaultValue: T): T = { + private[sql] def getConf[T](entry: ConfigEntry[T], defaultValue: T): T = { conf.getConf(entry, defaultValue) } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 6ed0fe0a64b86..1eb73885c82ac 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -37,7 +37,7 @@ import org.apache.hadoop.hive.ql.parse.VariableSubstitution import org.apache.hadoop.hive.serde2.io.{DateWritable, TimestampWritable} import org.apache.hadoop.util.VersionInfo -import org.apache.spark.SparkConf.ConfEntry +import org.apache.spark.config.ConfigEntry import org.apache.spark.api.java.JavaSparkContext import org.apache.spark.sql._ import org.apache.spark.sql.SQLConf._ @@ -447,7 +447,7 @@ class HiveContext private[hive]( hiveconf.set(key, value) } - override private[sql] def setConf[T](entry: ConfEntry[T], value: T): Unit = { + override private[sql] def setConf[T](entry: ConfigEntry[T], value: T): Unit = { setConf(entry.key, entry.stringConverter(value)) } diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala index cec14442172f6..dc40c44756200 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/AMDelegationTokenRenewer.scala @@ -27,6 +27,7 @@ import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.{Logging, SparkConf} +import org.apache.spark.config.CoreConfigKeys._ import org.apache.spark.util.ThreadUtils /* diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ca1fe70744277..f88eca4d1b234 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration import org.apache.spark.rpc._ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkEnv, SparkException, SparkUserAppException} +import org.apache.spark.config.CoreConfigKeys._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.history.HistoryServer import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend, YarnSchedulerBackend} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala index 58b210af8fe86..34d0f3ad85d4f 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala @@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException import org.apache.hadoop.yarn.util.Records import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext, SparkException} +import org.apache.spark.config.CoreConfigKeys._ import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle, YarnCommandBuilderUtils} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.util.Utils diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala index e0952902f9925..b8129e6e24c05 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ClientArguments.scala @@ -20,6 +20,7 @@ package org.apache.spark.deploy.yarn import scala.collection.mutable.ArrayBuffer import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.config.CoreConfigKeys._ import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._ import org.apache.spark.util.{IntParam, MemoryParam, Utils} diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index a1b0a10f3d41f..6f150457961c1 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.ipc.YarnRPC import org.apache.hadoop.yarn.util.{ConverterUtils, Records} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} +import org.apache.spark.config.CoreConfigKeys._ import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.network.util.JavaUtils import org.apache.spark.util.Utils diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala index c380a4dc0ccf9..3e57c93c36f8c 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/LocalityPreferredContainerPlacementStrategy.scala @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.util.RackResolver import org.apache.spark.SparkConf +import org.apache.spark.config.CoreConfigKeys private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) @@ -85,7 +86,7 @@ private[yarn] class LocalityPreferredContainerPlacementStrategy( val resource: Resource) { // Number of CPUs per task - private val CPUS_PER_TASK = sparkConf.get(YarnConfigKeys.CPUS_PER_TASK) + private val CPUS_PER_TASK = sparkConf.get(CoreConfigKeys.CPUS_PER_TASK) /** * Calculate each container's node locality and rack locality diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnConfigKeys.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnConfigKeys.scala index 24e6833430246..d29975fba938e 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnConfigKeys.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnConfigKeys.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.yarn import java.util.concurrent.TimeUnit -import org.apache.spark.SparkConf._ +import org.apache.spark.config.ConfigEntry._ import org.apache.spark.network.util.ByteUnit private[spark] object YarnConfigKeys { @@ -173,56 +173,6 @@ private[spark] object YarnConfigKeys { val CREDENTIAL_FILE_MAX_COUNT = intConf("spark.yarn.credentials.file.retention.count", defaultValue = Some(5)) - // TODO: options below should be in core, either because they're not YARN-specific, or because - // they're modified by code that resides in core. - - val DRIVER_CLASS_PATH = stringConf("spark.driver.extraClassPath").optional - - val DRIVER_JAVA_OPTIONS = stringConf("spark.driver.extraJavaOptions").optional - - val DRIVER_LIBRARY_PATH = stringConf("spark.driver.extraLibraryPath").optional - - val DRIVER_USER_CLASS_PATH_FIRST = booleanConf("spark.driver.userClassPathFirst", - defaultValue = Some(false)) - - val EXECUTOR_CLASS_PATH = stringConf("spark.executor.extraClassPath").optional - - val EXECUTOR_JAVA_OPTIONS = stringConf("spark.executor.extraJavaOptions").optional - - val EXECUTOR_LIBRARY_PATH = stringConf("spark.executor.extraLibraryPath").optional - - val EXECUTOR_USER_CLASS_PATH_FIRST = booleanConf("spark.executor.userClassPathFirst", - defaultValue = Some(false)) - - val IS_PYTHON_APP = booleanConf("spark.yarn.isPython", - defaultValue = Some(false), - isPublic = false) - - val CPUS_PER_TASK = intConf("spark.task.cpus", - defaultValue = Some(1)) - - val DYN_ALLOCATION_MIN_EXECUTORS = intConf("spark.dynamicAllocation.minExecutors", - defaultValue = Some(0)) - - val DYN_ALLOCATION_INITIAL_EXECUTORS = fallbackConf("spark.dynamicAllocation.initialExecutors", - fallback = DYN_ALLOCATION_MIN_EXECUTORS) - - val DYN_ALLOCATION_MAX_EXECUTORS = intConf("spark.dynamicAllocation.maxExecutors", - defaultValue = Some(Int.MaxValue)) - - val SHUFFLE_SERVICE_ENABLED = booleanConf("spark.shuffle.service.enabled", - defaultValue = Some(false)) - - val KEYTAB = stringConf("spark.yarn.keytab", - doc = "Location of user's keytab.") - .optional - - val PRINCIPAL = stringConf("spark.yarn.principal", - doc = "Name of the Kerberos principal.") - .optional - - val EXECUTOR_INSTANCES = intConf("spark.executor.instances").optional - /* Private configs. */ val CREDENTIALS_FILE_PATH = stringConf("spark.yarn.credentials.file", diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala index 0f83047785a16..49e0e17454804 100644 --- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala +++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala @@ -40,6 +40,7 @@ import org.apache.hadoop.yarn.api.ApplicationConstants.Environment import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, Priority} import org.apache.hadoop.yarn.util.ConverterUtils +import org.apache.spark.config.CoreConfigKeys import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.launcher.YarnCommandBuilderUtils import org.apache.spark.{SecurityManager, SparkConf, SparkException} @@ -395,9 +396,9 @@ object YarnSparkHadoopUtil { conf: SparkConf, numExecutors: Int = DEFAULT_NUMBER_EXECUTORS): Int = { if (Utils.isDynamicAllocationEnabled(conf)) { - val minNumExecutors = conf.get(YarnConfigKeys.DYN_ALLOCATION_MIN_EXECUTORS) - val initialNumExecutors = conf.get(YarnConfigKeys.DYN_ALLOCATION_INITIAL_EXECUTORS) - val maxNumExecutors = conf.get(YarnConfigKeys.DYN_ALLOCATION_MAX_EXECUTORS) + val minNumExecutors = conf.get(CoreConfigKeys.DYN_ALLOCATION_MIN_EXECUTORS) + val initialNumExecutors = conf.get(CoreConfigKeys.DYN_ALLOCATION_INITIAL_EXECUTORS) + val maxNumExecutors = conf.get(CoreConfigKeys.DYN_ALLOCATION_MAX_EXECUTORS) require(initialNumExecutors >= minNumExecutors && initialNumExecutors <= maxNumExecutors, s"initial executor number $initialNumExecutors must between min executor number" + s"$minNumExecutors and max executor number $maxNumExecutors") @@ -407,7 +408,7 @@ object YarnSparkHadoopUtil { val targetNumExecutors = sys.env.get("SPARK_EXECUTOR_INSTANCES").map(_.toInt).getOrElse(numExecutors) // System property can override environment variable. - conf.get(YarnConfigKeys.EXECUTOR_INSTANCES).getOrElse(targetNumExecutors) + conf.get(CoreConfigKeys.EXECUTOR_INSTANCES).getOrElse(targetNumExecutors) } } }