-
Notifications
You must be signed in to change notification settings - Fork 28k
/
ConfigEntry.scala
274 lines (237 loc) · 8.89 KB
/
ConfigEntry.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal.config
// ====================================================================================
// The guideline for naming configurations
// ====================================================================================
/*
In general, the config name should be a noun that describes its basic purpose. It's
recommended to add prefix to the config name to make the scope clearer. For example,
`spark.scheduler.mode` clearly indicates that this config is for the scheduler.
A config name can have multiple prefixes that are structured, which is similar to a
qualified Java class name. Each prefix behaves like a namespace. We should only create
a namespace if it's meaningful and can be shared by multiple configs. For example,
`buffer.inMemoryThreshold` is preferred over `buffer.in.memory.threshold`.
The followings are best practices of naming configs for some common cases:
1. When adding configs for a big feature, it's better to create an umbrella config that
can turn the feature on/off, with a name like `featureName.enabled`. The other configs
of this feature should be put under the `featureName` namespace. For example:
- spark.sql.cbo.enabled
- spark.sql.cbo.starSchemaDetection
- spark.sql.cbo.joinReorder.enabled
- spark.sql.cbo.joinReorder.dp.threshold
2. When adding a boolean config, the name should be a verb that describes what
happens if this config is set to true, e.g. `spark.shuffle.sort.useRadixSort`.
3. When adding a config to specify a time duration, it's better to put the time unit
in the config name. For example, `featureName.timeoutMs`, which clearly indicates
that the time unit is millisecond. The config entry should be created with
`ConfigBuilder#timeConf`, to support time strings like `2 minutes`.
*/
/**
* An entry contains all meta information for a configuration.
*
* When applying variable substitution to config values, only references starting with "spark." are
* considered in the default namespace. For known Spark configuration keys (i.e. those created using
* `ConfigBuilder`), references will also consider the default value when it exists.
*
* Variable expansion is also applied to the default values of config entries that have a default
* value declared as a string.
*
* @param key the key for the configuration
* @param prependedKey the key for the configuration which will be prepended
* @param prependSeparator the separator which is used for prepending
* @param valueConverter how to convert a string to the value. It should throw an exception if the
* string does not have the required format.
* @param stringConverter how to convert a value to a string that the user can use it as a valid
* string value. It's usually `toString`. But sometimes, a custom converter
* is necessary. E.g., if T is List[String], `a, b, c` is better than
* `List(a, b, c)`.
* @param doc the documentation for the configuration
* @param isPublic if this configuration is public to the user. If it's `false`, this
* configuration is only used internally and we should not expose it to users.
* @tparam T the value type
*/
private[spark] abstract class ConfigEntry[T] (
val key: String,
val prependedKey: Option[String],
val prependSeparator: String,
val alternatives: List[String],
val valueConverter: String => T,
val stringConverter: T => String,
val doc: String,
val isPublic: Boolean) {
import ConfigEntry._
registerEntry(this)
def defaultValueString: String
protected def readString(reader: ConfigReader): Option[String] = {
val values = Seq(
prependedKey.flatMap(reader.get(_)),
alternatives.foldLeft(reader.get(key))((res, nextKey) => res.orElse(reader.get(nextKey)))
).flatten
if (values.nonEmpty) {
Some(values.mkString(prependSeparator))
} else {
None
}
}
def readFrom(reader: ConfigReader): T
def defaultValue: Option[T] = None
override def toString: String = {
s"ConfigEntry(key=$key, defaultValue=$defaultValueString, doc=$doc, public=$isPublic)"
}
}
private class ConfigEntryWithDefault[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultValue: T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic
) {
override def defaultValue: Option[T] = Some(_defaultValue)
override def defaultValueString: String = stringConverter(_defaultValue)
def readFrom(reader: ConfigReader): T = {
readString(reader).map(valueConverter).getOrElse(_defaultValue)
}
}
private class ConfigEntryWithDefaultFunction[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultFunction: () => T,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic
) {
override def defaultValue: Option[T] = Some(_defaultFunction())
override def defaultValueString: String = stringConverter(_defaultFunction())
def readFrom(reader: ConfigReader): T = {
readString(reader).map(valueConverter).getOrElse(_defaultFunction())
}
}
private class ConfigEntryWithDefaultString[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
_defaultValue: String,
valueConverter: String => T,
stringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry(
key,
prependedKey,
prependSeparator,
alternatives,
valueConverter,
stringConverter,
doc,
isPublic
) {
override def defaultValue: Option[T] = Some(valueConverter(_defaultValue))
override def defaultValueString: String = _defaultValue
def readFrom(reader: ConfigReader): T = {
val value = readString(reader).getOrElse(reader.substitute(_defaultValue))
valueConverter(value)
}
}
/**
* A config entry that does not have a default value.
*/
private[spark] class OptionalConfigEntry[T](
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
val rawValueConverter: String => T,
val rawStringConverter: T => String,
doc: String,
isPublic: Boolean)
extends ConfigEntry[Option[T]](
key,
prependedKey,
prependSeparator,
alternatives,
s => Some(rawValueConverter(s)),
v => v.map(rawStringConverter).orNull,
doc,
isPublic
) {
override def defaultValueString: String = ConfigEntry.UNDEFINED
override def readFrom(reader: ConfigReader): Option[T] = {
readString(reader).map(rawValueConverter)
}
}
/**
* A config entry whose default value is defined by another config entry.
*/
private[spark] class FallbackConfigEntry[T] (
key: String,
prependedKey: Option[String],
prependSeparator: String,
alternatives: List[String],
doc: String,
isPublic: Boolean,
val fallback: ConfigEntry[T])
extends ConfigEntry[T](
key,
prependedKey,
prependSeparator,
alternatives,
fallback.valueConverter,
fallback.stringConverter,
doc,
isPublic
) {
override def defaultValueString: String = s"<value of ${fallback.key}>"
override def readFrom(reader: ConfigReader): T = {
readString(reader).map(valueConverter).getOrElse(fallback.readFrom(reader))
}
}
private[spark] object ConfigEntry {
val UNDEFINED = "<undefined>"
private val knownConfigs = new java.util.concurrent.ConcurrentHashMap[String, ConfigEntry[_]]()
def registerEntry(entry: ConfigEntry[_]): Unit = {
val existing = knownConfigs.putIfAbsent(entry.key, entry)
require(existing == null, s"Config entry ${entry.key} already registered!")
}
def findEntry(key: String): ConfigEntry[_] = knownConfigs.get(key)
}