Skip to content

Commit 4b42c73

Browse files
committed
[KYUUBI #2308][SUB-TASK][KPIP-4] Batch job configuration ignore list and pre-defined configuration in server-side
### _Why are the changes needed?_ To close #2308 Support to ignore some batch configuration items and pre-define some configuration in server side. ### _How was this patch tested?_ - [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible - [ ] Add screenshots for manual tests if appropriate - [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request Closes #2386 from turboFei/KYUUBI_2308_kpip4_batch_conf. Closes #2308 2e517ec [Fei Wang] [KYUUBI #2308][SUB-TASK][KPIP-4] Batch job configuration ignore list and server predefined configurations Authored-by: Fei Wang <fwang12@ebay.com> Signed-off-by: Fei Wang <fwang12@ebay.com>
1 parent 1a58aaf commit 4b42c73

File tree

7 files changed

+60
-4
lines changed

7 files changed

+60
-4
lines changed

docs/deployment/settings.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,7 @@ Key | Default | Meaning | Type | Since
159159
Key | Default | Meaning | Type | Since
160160
--- | --- | --- | --- | ---
161161
<code>kyuubi.batch.application.check.interval</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'>PT5S</div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>The interval to check batch job application information.</div>|<div style='width: 30pt'>duration</div>|<div style='width: 20pt'>1.6.0</div>
162+
<code>kyuubi.batch.conf.ignore.list</code>|<div style='width: 65pt;word-wrap: break-word;white-space: normal'></div>|<div style='width: 170pt;word-wrap: break-word;white-space: normal'>A comma separated list of ignored keys for batch conf. If the batch conf contains any of them, the key and the corresponding value will be removed silently during batch job submission. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering. You can also pre-define some config for batch job submission with prefix: kyuubi.batchConf.[batchType]. For example, you can pre-define `spark.master` for spark batch job with key `kyuubi.batchConf.spark.spark.master`.</div>|<div style='width: 30pt'>seq</div>|<div style='width: 20pt'>1.6.0</div>
162163

163164

164165
### Credentials

kyuubi-common/src/main/scala/org/apache/kyuubi/config/KyuubiConf.scala

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,11 @@ case class KyuubiConf(loadSysDefault: Boolean = true) extends Logging {
103103
sys.env ++ getAllWithPrefix(KYUUBI_ENGINE_ENV_PREFIX, "")
104104
}
105105

106+
/** Get all batch conf as map */
107+
def getBatchConf(batchType: String): Map[String, String] = {
108+
getAllWithPrefix(s"$KYUUBI_BATCH_CONF_PREFIX.${batchType.toLowerCase(Locale.ROOT)}", "")
109+
}
110+
106111
/**
107112
* Retrieve key-value pairs from [[KyuubiConf]] starting with `dropped.remainder`, and put them to
108113
* the result map with the `dropped` of key being dropped.
@@ -165,6 +170,7 @@ object KyuubiConf {
165170
final val KYUUBI_CONF_FILE_NAME = "kyuubi-defaults.conf"
166171
final val KYUUBI_HOME = "KYUUBI_HOME"
167172
final val KYUUBI_ENGINE_ENV_PREFIX = "kyuubi.engineEnv"
173+
final val KYUUBI_BATCH_CONF_PREFIX = "kyuubi.batchConf"
168174

169175
val kyuubiConfEntries: java.util.Map[String, ConfigEntry[_]] =
170176
java.util.Collections.synchronizedMap(new java.util.HashMap[String, ConfigEntry[_]]())
@@ -813,6 +819,21 @@ object KyuubiConf {
813819
.timeConf
814820
.createWithDefaultString("PT5S")
815821

822+
val BATCH_CONF_IGNORE_LIST: ConfigEntry[Seq[String]] =
823+
buildConf("kyuubi.batch.conf.ignore.list")
824+
.doc("A comma separated list of ignored keys for batch conf. If the batch conf contains" +
825+
" any of them, the key and the corresponding value will be removed silently during batch" +
826+
" job submission." +
827+
" Note that this rule is for server-side protection defined via administrators to" +
828+
" prevent some essential configs from tampering." +
829+
" You can also pre-define some config for batch job submission with prefix:" +
830+
" kyuubi.batchConf.[batchType]. For example, you can pre-define `spark.master`" +
831+
" for spark batch job with key `kyuubi.batchConf.spark.spark.master`.")
832+
.version("1.6.0")
833+
.stringConf
834+
.toSequence()
835+
.createWithDefault(Nil)
836+
816837
val SERVER_EXEC_POOL_SIZE: ConfigEntry[Int] =
817838
buildConf("kyuubi.backend.server.exec.pool.size")
818839
.doc("Number of threads in the operation execution thread pool of Kyuubi server")

kyuubi-common/src/main/scala/org/apache/kyuubi/session/SessionManager.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,13 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
162162

163163
private var _confRestrictList: Set[String] = _
164164
private var _confIgnoreList: Set[String] = _
165+
private var _batchConfIgnoreList: Set[String] = _
165166
private lazy val _confRestrictMatchList: Set[String] =
166167
_confRestrictList.filter(_.endsWith(".*")).map(_.stripSuffix(".*"))
167168
private lazy val _confIgnoreMatchList: Set[String] =
168169
_confIgnoreList.filter(_.endsWith(".*")).map(_.stripSuffix(".*"))
170+
private lazy val _batchConfIgnoreMatchList: Set[String] =
171+
_batchConfIgnoreList.filter(_.endsWith(".*")).map(_.stripSuffix(".*"))
169172

170173
// strip prefix and validate whether if key is restricted, ignored or valid
171174
def validateKey(key: String, value: String): Option[(String, String)] = {
@@ -206,6 +209,20 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
206209
case (k, v) => validateKey(k, v)
207210
}
208211

212+
// validate whether if a batch key should be ignored
213+
def validateBatchKey(key: String, value: String): Option[(String, String)] = {
214+
if (_batchConfIgnoreMatchList.exists(key.startsWith(_)) || _batchConfIgnoreList.contains(key)) {
215+
warn(s"$key is a ignored batch key according to the server-side configuration")
216+
None
217+
} else {
218+
Some((key, value))
219+
}
220+
}
221+
222+
def validateBatchConf(config: Map[String, String]): Map[String, String] = config.flatMap {
223+
case (k, v) => validateBatchKey(k, v)
224+
}
225+
209226
override def initialize(conf: KyuubiConf): Unit = synchronized {
210227
addService(operationManager)
211228
initOperationLogRootDir()
@@ -232,6 +249,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
232249

233250
_confRestrictList = conf.get(SESSION_CONF_RESTRICT_LIST).toSet
234251
_confIgnoreList = conf.get(SESSION_CONF_IGNORE_LIST).toSet
252+
_batchConfIgnoreList = conf.get(BATCH_CONF_IGNORE_LIST).toSet
235253

236254
execPool = ThreadUtils.newDaemonQueuedThreadPool(
237255
poolSize,

kyuubi-common/src/test/scala/org/apache/kyuubi/config/KyuubiConfSuite.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,4 +171,12 @@ class KyuubiConfSuite extends KyuubiFunSuite {
171171
kyuubiConf.set(ENGINE_SHARE_LEVEL_SUBDOMAIN.key, path)
172172
assert(kyuubiConf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN).get == path)
173173
}
174+
175+
test("get pre-defined batch conf for different batch types") {
176+
val kyuubiConf = KyuubiConf()
177+
kyuubiConf.set(s"$KYUUBI_BATCH_CONF_PREFIX.spark.spark.yarn.tags", "kyuubi")
178+
kyuubiConf.set(s"$KYUUBI_BATCH_CONF_PREFIX.flink.yarn.tags", "kyuubi")
179+
assert(kyuubiConf.getBatchConf("spark") == Map("spark.yarn.tags" -> "kyuubi"))
180+
assert(kyuubiConf.getBatchConf("flink") == Map("yarn.tags" -> "kyuubi"))
181+
}
174182
}

kyuubi-server/src/main/scala/org/apache/kyuubi/operation/BatchJobSubmission.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,12 @@ class BatchJobSubmission(session: KyuubiBatchSessionImpl, batchRequest: BatchReq
8282
private def submitBatchJob(): Unit = {
8383
builder = Option(batchRequest.batchType).map(_.toUpperCase(Locale.ROOT)) match {
8484
case Some("SPARK") =>
85+
val batchSparkConf = session.sessionConf.getBatchConf("spark")
8586
new SparkBatchProcessBuilder(
8687
session.user,
8788
session.sessionConf,
8889
session.batchId,
89-
batchRequest,
90+
batchRequest.copy(conf = batchSparkConf ++ batchRequest.conf),
9091
getOperationLog)
9192

9293
case _ =>

kyuubi-server/src/main/scala/org/apache/kyuubi/session/KyuubiBatchSessionImpl.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,12 @@ class KyuubiBatchSessionImpl(
4040
override val handle: SessionHandle = sessionManager.newBatchSessionHandle(protocol)
4141
val batchId: String = handle.identifier.toString
4242

43+
// TODO: Support batch conf advisor
44+
override val normalizedConf: Map[String, String] =
45+
sessionManager.validateBatchConf(Option(batchRequest.conf).getOrElse(Map.empty))
46+
4347
private[kyuubi] lazy val batchJobSubmissionOp = sessionManager.operationManager
44-
.newBatchJobSubmissionOperation(this, batchRequest)
48+
.newBatchJobSubmissionOperation(this, batchRequest.copy(conf = normalizedConf))
4549

4650
private val sessionEvent = KyuubiSessionEvent(this)
4751
EventBus.post(sessionEvent)

kyuubi-server/src/test/scala/org/apache/kyuubi/operation/KyuubiBatchYarnClusterSuite.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,10 @@ import org.apache.kyuubi.session.{KyuubiBatchSessionImpl, KyuubiSessionManager}
3232
class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
3333
override protected val connectionConf: Map[String, String] = Map.empty
3434

35-
override protected val kyuubiServerConf: KyuubiConf = KyuubiConf()
35+
override protected val kyuubiServerConf: KyuubiConf = {
36+
KyuubiConf().set(s"$KYUUBI_BATCH_CONF_PREFIX.spark.spark.master", "yarn")
37+
.set(BATCH_CONF_IGNORE_LIST, Seq("spark.master"))
38+
}
3639

3740
private def sessionManager(): KyuubiSessionManager =
3841
server.backendService.sessionManager.asInstanceOf[KyuubiSessionManager]
@@ -47,7 +50,7 @@ class KyuubiBatchYarnClusterSuite extends WithKyuubiServerOnYarn {
4750
sparkProcessBuilder.mainClass,
4851
"spark-batch-submission",
4952
Map(
50-
"spark.master" -> "yarn",
53+
"spark.master" -> "local",
5154
s"spark.${ENGINE_SPARK_MAX_LIFETIME.key}" -> "5000",
5255
s"spark.${ENGINE_CHECK_INTERVAL.key}" -> "1000"),
5356
Seq.empty[String])

0 commit comments

Comments
 (0)