/
LivyConf.scala
298 lines (238 loc) · 11.8 KB
/
LivyConf.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
/*
* Licensed to Cloudera, Inc. under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. Cloudera, Inc. licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.cloudera.livy
import java.io.File
import java.lang.{Boolean => JBoolean, Long => JLong}
import java.util.{Map => JMap}
import scala.collection.JavaConverters._
import org.apache.hadoop.conf.Configuration
import com.cloudera.livy.client.common.ClientConf
import com.cloudera.livy.client.common.ClientConf.ConfEntry
import com.cloudera.livy.client.common.ClientConf.DeprecatedConf
object LivyConf {
case class Entry(override val key: String, override val dflt: AnyRef) extends ConfEntry
object Entry {
def apply(key: String, dflt: Boolean): Entry = Entry(key, dflt: JBoolean)
def apply(key: String, dflt: Int): Entry = Entry(key, dflt: Integer)
def apply(key: String, dflt: Long): Entry = Entry(key, dflt: JLong)
}
val TEST_MODE = ClientConf.TEST_MODE
val SPARK_HOME = Entry("livy.server.spark-home", null)
val LIVY_SPARK_MASTER = Entry("livy.spark.master", "local")
val LIVY_SPARK_DEPLOY_MODE = Entry("livy.spark.deploy-mode", null)
// Two configurations to specify Spark and related Scala version. These are internal
// configurations will be set by LivyServer and used in session creation. It is not required to
// set usually unless running with unofficial Spark + Scala versions
// (like Spark 2.0 + Scala 2.10, Spark 1.6 + Scala 2.11)
val LIVY_SPARK_SCALA_VERSION = Entry("livy.spark.scala-version", null)
val LIVY_SPARK_VERSION = Entry("livy.spark.version", null)
val SESSION_STAGING_DIR = Entry("livy.session.staging-dir", null)
val FILE_UPLOAD_MAX_SIZE = Entry("livy.file.upload.max.size", 100L * 1024 * 1024)
val LOCAL_FS_WHITELIST = Entry("livy.file.local-dir-whitelist", null)
val ENABLE_HIVE_CONTEXT = Entry("livy.repl.enable-hive-context", false)
val ENVIRONMENT = Entry("livy.environment", "production")
val SERVER_HOST = Entry("livy.server.host", "0.0.0.0")
val SERVER_PORT = Entry("livy.server.port", 8998)
val UI_ENABLED = Entry("livy.ui.enabled", true)
val REQUEST_HEADER_SIZE = Entry("livy.server.request-header.size", 131072)
val RESPONSE_HEADER_SIZE = Entry("livy.server.response-header.size", 131072)
val CSRF_PROTECTION = Entry("livy.server.csrf-protection.enabled", false)
val IMPERSONATION_ENABLED = Entry("livy.impersonation.enabled", false)
val SUPERUSERS = Entry("livy.superusers", null)
val ACCESS_CONTROL_ENABLED = Entry("livy.server.access-control.enabled", false)
val ACCESS_CONTROL_USERS = Entry("livy.server.access-control.users", null)
val SSL_KEYSTORE = Entry("livy.keystore", null)
val SSL_KEYSTORE_PASSWORD = Entry("livy.keystore.password", null)
val SSL_KEY_PASSWORD = Entry("livy.key-password", null)
val AUTH_TYPE = Entry("livy.server.auth.type", null)
val AUTH_KERBEROS_PRINCIPAL = Entry("livy.server.auth.kerberos.principal", null)
val AUTH_KERBEROS_KEYTAB = Entry("livy.server.auth.kerberos.keytab", null)
val AUTH_KERBEROS_NAME_RULES = Entry("livy.server.auth.kerberos.name-rules", "DEFAULT")
val HEARTBEAT_WATCHDOG_INTERVAL = Entry("livy.server.heartbeat-watchdog.interval", "1m")
val LAUNCH_KERBEROS_PRINCIPAL = Entry("livy.server.launch.kerberos.principal", null)
val LAUNCH_KERBEROS_KEYTAB = Entry("livy.server.launch.kerberos.keytab", null)
val LAUNCH_KERBEROS_REFRESH_INTERVAL = Entry("livy.server.launch.kerberos.refresh-interval", "1h")
val KINIT_FAIL_THRESHOLD = Entry("livy.server.launch.kerberos.kinit-fail-threshold", 5)
/**
* Recovery mode of Livy. Possible values:
* off: Default. Turn off recovery. Every time Livy shuts down, it stops and forgets all sessions.
* recovery: Livy persists session info to the state store. When Livy restarts, it recovers
* previous sessions from the state store.
* Must set livy.server.recovery.state-store and livy.server.recovery.state-store.url to
* configure the state store.
*/
val RECOVERY_MODE = Entry("livy.server.recovery.mode", "off")
/**
* Where Livy should store state to for recovery. Possible values:
* <empty>: Default. State store disabled.
* filesystem: Store state on a file system.
* zookeeper: Store state in a Zookeeper instance.
*/
val RECOVERY_STATE_STORE = Entry("livy.server.recovery.state-store", null)
/**
* For filesystem state store, the path of the state store directory. Please don't use a
* filesystem that doesn't support atomic rename (e.g. S3). e.g. file:///tmp/livy or hdfs:///.
* For zookeeper, the address to the Zookeeper servers. e.g. host1:port1,host2:port2
*/
val RECOVERY_STATE_STORE_URL = Entry("livy.server.recovery.state-store.url", "")
// If Livy can't find the yarn app within this time, consider it lost.
val YARN_APP_LOOKUP_TIMEOUT = Entry("livy.server.yarn.app-lookup-timeout", "60s")
// How often Livy polls YARN to refresh YARN app state.
val YARN_POLL_INTERVAL = Entry("livy.server.yarn.poll-interval", "5s")
// Days to keep Livy server request logs.
val REQUEST_LOG_RETAIN_DAYS = Entry("livy.server.request-log-retain.days", 5)
// REPL related jars separated with comma.
val REPL_JARS = Entry("livy.repl.jars", null)
// RSC related jars separated with comma.
val RSC_JARS = Entry("livy.rsc.jars", null)
// How long to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_TIMEOUT = Entry("livy.server.yarn.app-leakage.check-timeout", "600s")
// how often to check livy session leakage
val YARN_APP_LEAKAGE_CHECK_INTERVAL = Entry("livy.server.yarn.app-leakage.check-interval", "60s")
// Whether session timeout should be checked, by default it will be checked, which means inactive
// session will be stopped after "livy.server.session.timeout"
val SESSION_TIMEOUT_CHECK = Entry("livy.server.session.timeout-check", true)
// How long will an inactive session be gc-ed.
val SESSION_TIMEOUT = Entry("livy.server.session.timeout", "1h")
// How long a finished session state will be kept in memory
val SESSION_STATE_RETAIN_TIME = Entry("livy.server.session.state-retain.sec", "600s")
val SPARK_MASTER = "spark.master"
val SPARK_DEPLOY_MODE = "spark.submit.deployMode"
val SPARK_JARS = "spark.jars"
val SPARK_FILES = "spark.files"
val SPARK_ARCHIVES = "spark.yarn.dist.archives"
val SPARK_PY_FILES = "spark.submit.pyFiles"
/**
* These are Spark configurations that contain lists of files that the user can add to
* their jobs in one way or another. Livy needs to pre-process these to make sure the
* user can read them (in case they reference local files), and to provide correct URIs
* to Spark based on the Livy config.
*
* The configuration allows adding new configurations in case we either forget something in
* the hardcoded list, or new versions of Spark add new configs.
*/
val SPARK_FILE_LISTS = Entry("livy.spark.file-list-configs", null)
private val HARDCODED_SPARK_FILE_LISTS = Seq(
SPARK_JARS,
SPARK_FILES,
SPARK_ARCHIVES,
SPARK_PY_FILES,
"spark.yarn.archive",
"spark.yarn.dist.files",
"spark.yarn.dist.jars",
"spark.yarn.jar",
"spark.yarn.jars"
)
case class DepConf(
override val key: String,
override val version: String,
override val deprecationMessage: String = "")
extends DeprecatedConf
private val configsWithAlternatives: Map[String, DeprecatedConf] = Map[String, DepConf](
LIVY_SPARK_DEPLOY_MODE.key -> DepConf("livy.spark.deployMode", "0.4"),
LIVY_SPARK_SCALA_VERSION.key -> DepConf("livy.spark.scalaVersion", "0.4"),
ENABLE_HIVE_CONTEXT.key -> DepConf("livy.repl.enableHiveContext", "0.4"),
CSRF_PROTECTION.key -> DepConf("livy.server.csrf_protection.enabled", "0.4"),
ACCESS_CONTROL_ENABLED.key -> DepConf("livy.server.access_control.enabled", "0.4"),
ACCESS_CONTROL_USERS.key -> DepConf("livy.server.access_control.users", "0.4"),
AUTH_KERBEROS_NAME_RULES.key -> DepConf("livy.server.auth.kerberos.name_rules", "0.4"),
LAUNCH_KERBEROS_REFRESH_INTERVAL.key ->
DepConf("livy.server.launch.kerberos.refresh_interval", "0.4"),
KINIT_FAIL_THRESHOLD.key -> DepConf("livy.server.launch.kerberos.kinit_fail_threshold", "0.4"),
YARN_APP_LEAKAGE_CHECK_TIMEOUT.key ->
DepConf("livy.server.yarn.app-leakage.check_timeout", "0.4"),
YARN_APP_LEAKAGE_CHECK_INTERVAL.key ->
DepConf("livy.server.yarn.app-leakage.check_interval", "0.4")
)
private val deprecatedConfigs: Map[String, DeprecatedConf] = {
val configs: Seq[DepConf] = Seq(
// There are no deprecated configs without alternatives currently.
)
Map(configs.map { cfg => (cfg.key -> cfg) }: _*)
}
}
/**
*
* @param loadDefaults whether to also load values from the Java system properties
*/
class LivyConf(loadDefaults: Boolean) extends ClientConf[LivyConf](null) {
import LivyConf._
private lazy val _superusers = configToSeq(SUPERUSERS)
private lazy val _allowedUsers = configToSeq(ACCESS_CONTROL_USERS).toSet
lazy val hadoopConf = new Configuration()
lazy val localFsWhitelist = configToSeq(LOCAL_FS_WHITELIST).map { path =>
// Make sure the path ends with a single separator.
path.stripSuffix("/") + "/"
}
lazy val sparkFileLists = HARDCODED_SPARK_FILE_LISTS ++ configToSeq(SPARK_FILE_LISTS)
/**
* Create a LivyConf that loads defaults from the system properties and the classpath.
* @return
*/
def this() = this(true)
if (loadDefaults) {
loadFromMap(sys.props)
}
def loadFromFile(name: String): LivyConf = {
getConfigFile(name)
.map(Utils.getPropertiesFromFile)
.foreach(loadFromMap)
this
}
/** Return true if spark master starts with yarn. */
def isRunningOnYarn(): Boolean = sparkMaster().startsWith("yarn")
/** Return the spark deploy mode Livy sessions should use. */
def sparkDeployMode(): Option[String] = Option(get(LIVY_SPARK_DEPLOY_MODE)).filterNot(_.isEmpty)
/** Return the location of the spark home directory */
def sparkHome(): Option[String] = Option(get(SPARK_HOME)).orElse(sys.env.get("SPARK_HOME"))
/** Return the spark master Livy sessions should use. */
def sparkMaster(): String = get(LIVY_SPARK_MASTER)
/** Return the path to the spark-submit executable. */
def sparkSubmit(): String = {
sparkHome().map { _ + File.separator + "bin" + File.separator + "spark-submit" }.get
}
/** Return the list of superusers. */
def superusers(): Seq[String] = _superusers
/** Return the set of users allowed to use Livy via SPNEGO. */
def allowedUsers(): Set[String] = _allowedUsers
private val configDir: Option[File] = {
sys.env.get("LIVY_CONF_DIR")
.orElse(sys.env.get("LIVY_HOME").map(path => s"$path${File.separator}conf"))
.map(new File(_))
.filter(_.exists())
}
private def getConfigFile(name: String): Option[File] = {
configDir.map(new File(_, name)).filter(_.exists())
}
private def loadFromMap(map: Iterable[(String, String)]): Unit = {
map.foreach { case (k, v) =>
if (k.startsWith("livy.")) {
set(k, v)
}
}
}
private def configToSeq(entry: LivyConf.Entry): Seq[String] = {
Option(get(entry)).map(_.split("[, ]+").toSeq).getOrElse(Nil)
}
override def getConfigsWithAlternatives: JMap[String, DeprecatedConf] = {
configsWithAlternatives.asJava
}
override def getDeprecatedConfigs: JMap[String, DeprecatedConf] = {
deprecatedConfigs.asJava
}
}