Skip to content

Commit

Permalink
[KYUUBI #3959] Warn if batch application exceed the starvation timeou…
Browse files Browse the repository at this point in the history
…t and only log the batch application state change

### _Why are the changes needed?_

Refer the spark scheduler, warn that batch application maybe starved.
<img width="766" alt="image" src="https://user-images.githubusercontent.com/6757692/206888162-663a3ce0-597a-4818-8f7e-04e7be06cef4.png">

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #3959 from turboFei/only_state_change.

Closes #3959

6ab5942 [fwang12] show the batch statvation timeout warn
74be3f4 [fwang12] only state change

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
  • Loading branch information
turboFei committed Dec 12, 2022
1 parent 4948056 commit 92b262b
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/deployment/settings.md
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ kyuubi.backend.server.exec.pool.wait.queue.size|100|Size of the wait queue for t
Key | Default | Meaning | Type | Since
--- | --- | --- | --- | ---
kyuubi.batch.application.check.interval|PT5S|The interval to check batch job application information.|duration|1.6.0
kyuubi.batch.application.starvation.timeout|PT3M|Threshold above which to warn batch application may be starved.|duration|1.7.0
kyuubi.batch.conf.ignore.list||A comma separated list of ignored keys for batch conf. If the batch conf contains any of them, the key and the corresponding value will be removed silently during batch job submission. Note that this rule is for server-side protection defined via administrators to prevent some essential configs from tampering. You can also pre-define some config for batch job submission with prefix: kyuubi.batchConf.[batchType]. For example, you can pre-define `spark.master` for spark batch job with key `kyuubi.batchConf.spark.spark.master`.|seq|1.6.0


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1287,6 +1287,13 @@ object KyuubiConf {
.timeConf
.createWithDefaultString("PT5S")

val BATCH_APPLICATION_STARVATION_TIMEOUT: ConfigEntry[Long] =
buildConf("kyuubi.batch.application.starvation.timeout")
.doc("Threshold above which to warn batch application may be starved.")
.version("1.7.0")
.timeConf
.createWithDefault(Duration.ofMinutes(3).toMillis)

val BATCH_CONF_IGNORE_LIST: ConfigEntry[Seq[String]] =
buildConf("kyuubi.batch.conf.ignore.list")
.doc("A comma separated list of ignored keys for batch conf. If the batch conf contains" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class BatchJobSubmission(

private val applicationCheckInterval =
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_CHECK_INTERVAL)
private val applicationStarvationTimeout =
session.sessionConf.get(KyuubiConf.BATCH_APPLICATION_STARVATION_TIMEOUT)

private def updateBatchMetadata(): Unit = {
val endTime =
Expand Down Expand Up @@ -203,15 +205,25 @@ class BatchJobSubmission(

private def submitAndMonitorBatchJob(): Unit = {
var appStatusFirstUpdated = false
var lastStarvationCheckTime = createTime
try {
info(s"Submitting $batchType batch[$batchId] job:\n$builder")
val process = builder.start
applicationInfo = currentApplicationInfo
while (!applicationFailed(applicationInfo) && process.isAlive) {
if (!appStatusFirstUpdated && applicationInfo.isDefined) {
setStateIfNotCanceled(OperationState.RUNNING)
updateBatchMetadata()
appStatusFirstUpdated = true
if (!appStatusFirstUpdated) {
if (applicationInfo.isDefined) {
setStateIfNotCanceled(OperationState.RUNNING)
updateBatchMetadata()
appStatusFirstUpdated = true
} else {
val currentTime = System.currentTimeMillis()
if (currentTime - lastStarvationCheckTime > applicationStarvationTimeout) {
lastStarvationCheckTime = currentTime
warn(s"Batch[$batchId] has not started, check the Kyuubi server to ensure" +
s" that batch jobs can be submitted.")
}
}
}
process.waitFor(applicationCheckInterval, TimeUnit.MILLISECONDS)
applicationInfo = currentApplicationInfo
Expand Down Expand Up @@ -254,7 +266,7 @@ class BatchJobSubmission(
while (applicationInfo.isDefined && !applicationTerminated(applicationInfo)) {
Thread.sleep(applicationCheckInterval)
val newApplicationStatus = currentApplicationInfo
if (newApplicationStatus != applicationInfo) {
if (newApplicationStatus.map(_.state) != applicationInfo.map(_.state)) {
applicationInfo = newApplicationStatus
info(s"Batch report for $batchId, $applicationInfo")
}
Expand Down

0 comments on commit 92b262b

Please sign in to comment.