Skip to content

Commit

Permalink
[KYUUBI #2419] Release engine during closing kyuubi server session if…
Browse files Browse the repository at this point in the history
… share level is connection

### _Why are the changes needed?_

close #2419

We need to clean up the ProcBuilder process and engine application when the session is closed.

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #2482 from wForget/KYUUBI-2419.

Closes #2419

2690b5d [wforget] comment
ae8be05 [wforget] revert BatchJobSubmission
4fe3c2f [wforget] [KYUUBI-2419] Destroy the ProcBuilder process and call killApplication during closing session

Authored-by: wforget <643348094@qq.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
wForget authored and ulysses-you committed Apr 28, 2022
1 parent af162b1 commit 93f13ef
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ private[kyuubi] class EngineRef(

private val clientPoolName: String = conf.get(ENGINE_POOL_NAME)

private var builder: ProcBuilder = _

@VisibleForTesting
private[kyuubi] val subdomain: String = conf.get(ENGINE_SHARE_LEVEL_SUBDOMAIN) match {
case Some(_subdomain) => _subdomain
Expand Down Expand Up @@ -162,7 +164,7 @@ private[kyuubi] class EngineRef(
conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
val started = System.currentTimeMillis()
conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
val builder = engineType match {
builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
new SparkProcessBuilder(appUser, conf, extraEngineLog)
Expand Down Expand Up @@ -230,4 +232,17 @@ private[kyuubi] class EngineRef(
create(discoveryClient, extraEngineLog)
}
}

def close(): Unit = {
if (shareLevel == CONNECTION && builder != null) {
try {
val clusterManager = builder.clusterManager()
builder.close(true)
engineManager.killApplication(clusterManager, engineRefId)
} catch {
case e: Exception =>
warn(s"Error closing engine builder, engineRefId: $engineRefId", e)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -237,12 +237,12 @@ trait ProcBuilder {
process
}

def close(): Unit = synchronized {
def close(destroyProcess: Boolean = !waitCompletion): Unit = synchronized {
if (logCaptureThread != null) {
logCaptureThread.interrupt()
logCaptureThread = null
}
if (!waitCompletion && process != null) {
if (destroyProcess && process != null) {
info("Destroy the process, since waitCompletion is false.")
process.destroyForcibly()
process = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ class KyuubiSessionImpl(
try {
if (_client != null) _client.closeSession()
} finally {
if (engine != null) engine.close()
sessionEvent.endTime = System.currentTimeMillis()
EventBus.post(sessionEvent)
MetricsSystem.tracing(_.decCount(MetricRegistry.name(CONN_OPEN, user)))
Expand Down

0 comments on commit 93f13ef

Please sign in to comment.