/
package.scala
310 lines (248 loc) · 12.4 KB
/
package.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.internal
import java.util.concurrent.TimeUnit
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.util.Utils
package object config {
private[spark] val DRIVER_CLASS_PATH =
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_CLASSPATH).stringConf.createOptional
private[spark] val DRIVER_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val DRIVER_LIBRARY_PATH =
ConfigBuilder(SparkLauncher.DRIVER_EXTRA_LIBRARY_PATH).stringConf.createOptional
private[spark] val DRIVER_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.driver.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val DRIVER_MEMORY = ConfigBuilder("spark.driver.memory")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
private[spark] val EXECUTOR_CLASS_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_CLASSPATH).stringConf.createOptional
private[spark] val EXECUTOR_JAVA_OPTIONS =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_JAVA_OPTIONS).stringConf.createOptional
private[spark] val EXECUTOR_LIBRARY_PATH =
ConfigBuilder(SparkLauncher.EXECUTOR_EXTRA_LIBRARY_PATH).stringConf.createOptional
private[spark] val EXECUTOR_USER_CLASS_PATH_FIRST =
ConfigBuilder("spark.executor.userClassPathFirst").booleanConf.createWithDefault(false)
private[spark] val EXECUTOR_MEMORY = ConfigBuilder("spark.executor.memory")
.bytesConf(ByteUnit.MiB)
.createWithDefaultString("1g")
private[spark] val IS_PYTHON_APP = ConfigBuilder("spark.yarn.isPython").internal()
.booleanConf.createWithDefault(false)
private[spark] val CPUS_PER_TASK = ConfigBuilder("spark.task.cpus").intConf.createWithDefault(1)
private[spark] val DYN_ALLOCATION_MIN_EXECUTORS =
ConfigBuilder("spark.dynamicAllocation.minExecutors").intConf.createWithDefault(0)
private[spark] val DYN_ALLOCATION_INITIAL_EXECUTORS =
ConfigBuilder("spark.dynamicAllocation.initialExecutors")
.fallbackConf(DYN_ALLOCATION_MIN_EXECUTORS)
private[spark] val DYN_ALLOCATION_MAX_EXECUTORS =
ConfigBuilder("spark.dynamicAllocation.maxExecutors").intConf.createWithDefault(Int.MaxValue)
private[spark] val SHUFFLE_SERVICE_ENABLED =
ConfigBuilder("spark.shuffle.service.enabled").booleanConf.createWithDefault(false)
private[spark] val KEYTAB = ConfigBuilder("spark.yarn.keytab")
.doc("Location of user's keytab.")
.stringConf.createOptional
private[spark] val PRINCIPAL = ConfigBuilder("spark.yarn.principal")
.doc("Name of the Kerberos principal.")
.stringConf.createOptional
private[spark] val EXECUTOR_INSTANCES = ConfigBuilder("spark.executor.instances")
.intConf
.createOptional
private[spark] val PY_FILES = ConfigBuilder("spark.submit.pyFiles")
.internal()
.stringConf
.toSequence
.createWithDefault(Nil)
private[spark] val MAX_TASK_FAILURES =
ConfigBuilder("spark.task.maxFailures")
.intConf
.createWithDefault(4)
// Blacklist confs
private[spark] val BLACKLIST_ENABLED =
ConfigBuilder("spark.blacklist.enabled")
.booleanConf
.createOptional
private[spark] val MAX_TASK_ATTEMPTS_PER_EXECUTOR =
ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerExecutor")
.intConf
.createWithDefault(1)
private[spark] val MAX_TASK_ATTEMPTS_PER_NODE =
ConfigBuilder("spark.blacklist.task.maxTaskAttemptsPerNode")
.intConf
.createWithDefault(2)
private[spark] val MAX_FAILURES_PER_EXEC =
ConfigBuilder("spark.blacklist.application.maxFailedTasksPerExecutor")
.intConf
.createWithDefault(2)
private[spark] val MAX_FAILURES_PER_EXEC_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedTasksPerExecutor")
.intConf
.createWithDefault(2)
private[spark] val MAX_FAILED_EXEC_PER_NODE =
ConfigBuilder("spark.blacklist.application.maxFailedExecutorsPerNode")
.intConf
.createWithDefault(2)
private[spark] val MAX_FAILED_EXEC_PER_NODE_STAGE =
ConfigBuilder("spark.blacklist.stage.maxFailedExecutorsPerNode")
.intConf
.createWithDefault(2)
private[spark] val BLACKLIST_TIMEOUT_CONF =
ConfigBuilder("spark.blacklist.timeout")
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
private[spark] val BLACKLIST_KILL_ENABLED =
ConfigBuilder("spark.blacklist.killBlacklistedExecutors")
.booleanConf
.createWithDefault(false)
private[spark] val BLACKLIST_LEGACY_TIMEOUT_CONF =
ConfigBuilder("spark.scheduler.executorTaskBlacklistTime")
.internal()
.timeConf(TimeUnit.MILLISECONDS)
.createOptional
// End blacklist confs
private[spark] val LISTENER_BUS_EVENT_QUEUE_SIZE =
ConfigBuilder("spark.scheduler.listenerbus.eventqueue.size")
.intConf
.createWithDefault(10000)
// This property sets the root namespace for metrics reporting
private[spark] val METRICS_NAMESPACE = ConfigBuilder("spark.metrics.namespace")
.stringConf
.createOptional
private[spark] val PYSPARK_DRIVER_PYTHON = ConfigBuilder("spark.pyspark.driver.python")
.stringConf
.createOptional
private[spark] val PYSPARK_PYTHON = ConfigBuilder("spark.pyspark.python")
.stringConf
.createOptional
// To limit memory usage, we only track information for a fixed number of tasks
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
.intConf
.createWithDefault(100000)
// To limit how many applications are shown in the History Server summary ui
private[spark] val HISTORY_UI_MAX_APPS =
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)
private[spark] val IO_ENCRYPTION_ENABLED = ConfigBuilder("spark.io.encryption.enabled")
.booleanConf
.createWithDefault(false)
private[spark] val IO_ENCRYPTION_KEYGEN_ALGORITHM =
ConfigBuilder("spark.io.encryption.keygen.algorithm")
.stringConf
.createWithDefault("HmacSHA1")
private[spark] val IO_ENCRYPTION_KEY_SIZE_BITS = ConfigBuilder("spark.io.encryption.keySizeBits")
.intConf
.checkValues(Set(128, 192, 256))
.createWithDefault(128)
private[spark] val IO_CRYPTO_CIPHER_TRANSFORMATION =
ConfigBuilder("spark.io.crypto.cipher.transformation")
.internal()
.stringConf
.createWithDefaultString("AES/CTR/NoPadding")
private[spark] val DRIVER_HOST_ADDRESS = ConfigBuilder("spark.driver.host")
.doc("Address of driver endpoints.")
.stringConf
.createWithDefault(Utils.localHostName())
private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress")
.doc("Address where to bind network listen sockets on the driver.")
.fallbackConf(DRIVER_HOST_ADDRESS)
private[spark] val BLOCK_MANAGER_PORT = ConfigBuilder("spark.blockManager.port")
.doc("Port to use for the block manager when a more specific setting is not provided.")
.intConf
.createWithDefault(0)
private[spark] val DRIVER_BLOCK_MANAGER_PORT = ConfigBuilder("spark.driver.blockManager.port")
.doc("Port to use for the block manager on the driver.")
.fallbackConf(BLOCK_MANAGER_PORT)
private[spark] val IGNORE_CORRUPT_FILES = ConfigBuilder("spark.files.ignoreCorruptFiles")
.doc("Whether to ignore corrupt files. If true, the Spark jobs will continue to run when " +
"encountering corrupted or non-existing files and contents that have been read will still " +
"be returned.")
.booleanConf
.createWithDefault(false)
private[spark] val APP_CALLER_CONTEXT = ConfigBuilder("spark.log.callerContext")
.stringConf
.createOptional
private[spark] val FILES_MAX_PARTITION_BYTES = ConfigBuilder("spark.files.maxPartitionBytes")
.doc("The maximum number of bytes to pack into a single partition when reading files.")
.longConf
.createWithDefault(128 * 1024 * 1024)
private[spark] val FILES_OPEN_COST_IN_BYTES = ConfigBuilder("spark.files.openCostInBytes")
.doc("The estimated cost to open a file, measured by the number of bytes could be scanned in" +
" the same time. This is used when putting multiple files into a partition. It's better to" +
" over estimate, then the partitions with small files will be faster than partitions with" +
" bigger files.")
.longConf
.createWithDefault(4 * 1024 * 1024)
private[spark] val SECRET_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.regex")
.doc("Regex to decide which Spark configuration properties and environment variables in " +
"driver and executor environments contain sensitive information. When this regex matches " +
"a property key or value, the value is redacted from the environment UI and various logs " +
"like YARN and event logs.")
.regexConf
.createWithDefault("(?i)secret|password".r)
private[spark] val STRING_REDACTION_PATTERN =
ConfigBuilder("spark.redaction.string.regex")
.doc("Regex to decide which parts of strings produced by Spark contain sensitive " +
"information. When this regex matches a string part, that string part is replaced by a " +
"dummy value. This is currently used to redact the output of SQL explain commands.")
.regexConf
.createOptional
private[spark] val NETWORK_AUTH_ENABLED =
ConfigBuilder("spark.authenticate")
.booleanConf
.createWithDefault(false)
private[spark] val SASL_ENCRYPTION_ENABLED =
ConfigBuilder("spark.authenticate.enableSaslEncryption")
.booleanConf
.createWithDefault(false)
private[spark] val NETWORK_ENCRYPTION_ENABLED =
ConfigBuilder("spark.network.crypto.enabled")
.booleanConf
.createWithDefault(false)
private[spark] val CHECKPOINT_COMPRESS =
ConfigBuilder("spark.checkpoint.compress")
.doc("Whether to compress RDD checkpoints. Generally a good idea. Compression will use " +
"spark.io.compression.codec.")
.booleanConf
.createWithDefault(false)
private[spark] val SHUFFLE_ACCURATE_BLOCK_THRESHOLD =
ConfigBuilder("spark.shuffle.accurateBlockThreshold")
.doc("When we compress the size of shuffle blocks in HighlyCompressedMapStatus, we will " +
"record the size accurately if it's above this config. This helps to prevent OOM by " +
"avoiding underestimating shuffle block size when fetch shuffle blocks.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(100 * 1024 * 1024)
private[spark] val REDUCER_MAX_BLOCKS_IN_FLIGHT_PER_ADDRESS =
ConfigBuilder("spark.reducer.maxBlocksInFlightPerAddress")
.doc("This configuration limits the number of remote blocks being fetched per reduce task" +
" from a given host port. When a large number of blocks are being requested from a given" +
" address in a single fetch or simultaneously, this could crash the serving executor or" +
" Node Manager. This is especially useful to reduce the load on the Node Manager when" +
" external shuffle is enabled. You can mitigate the issue by setting it to a lower value.")
.intConf
.checkValue(_ > 0, "The max no. of blocks in flight cannot be non-positive.")
.createWithDefault(Int.MaxValue)
private[spark] val REDUCER_MAX_REQ_SIZE_SHUFFLE_TO_MEM =
ConfigBuilder("spark.reducer.maxReqSizeShuffleToMem")
.doc("The blocks of a shuffle request will be fetched to disk when size of the request is " +
"above this threshold. This is to avoid a giant request takes too much memory. We can " +
"enable this config by setting a specific value(e.g. 200m). Note that this config can " +
"be enabled only when the shuffle shuffle service is newer than Spark-2.2 or the shuffle" +
" service is disabled.")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(Long.MaxValue)
}