Skip to content

Commit

Permalink
[KYUUBI #1473] Exit gracefully when engine idle
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_
#1473

It is not graceful to exit when engine is idle, which may cause AM to try again.
The current workaround is to configure `spark.yarn.maxAppAttempts`.

```
yarn.ApplicationMaster: Final app status: FAILED, exitCode: 16, (reason: Shutdown hook called before final status was reported.)
```

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [ ] [Run test](https://kyuubi.readthedocs.io/en/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1474 from cxzl25/KYUUBI-1473.

Closes #1473

dc07b38 [sychen] assert engine
6df54eb [sychen] Exit gracefully when engine idle

Authored-by: sychen <sychen@trip.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
cxzl25 authored and ulysses-you committed Dec 1, 2021
1 parent 9d8476a commit 4bf17a2
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 4 deletions.
Expand Up @@ -30,7 +30,7 @@ import org.apache.kyuubi.{KyuubiException, Logging}
import org.apache.kyuubi.Utils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.engine.spark.SparkSQLEngine.countDownLatch
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, EventLoggingService}
import org.apache.kyuubi.ha.HighAvailabilityConf._
import org.apache.kyuubi.ha.client.RetryPolicies
Expand All @@ -54,7 +54,10 @@ case class SparkSQLEngine(
super.start()
// Start engine self-terminating checker after all services are ready and it can be reached by
// all servers in engine spaces.
backendService.sessionManager.startTerminatingChecker()
backendService.sessionManager.startTerminatingChecker(() => {
assert(currentEngine.isDefined)
currentEngine.get.stop()
})
}

override protected def stopServer(): Unit = {
Expand Down
Expand Up @@ -265,7 +265,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
timeoutChecker.scheduleWithFixedDelay(checkTask, interval, interval, TimeUnit.MILLISECONDS)
}

private[kyuubi] def startTerminatingChecker(): Unit = if (!isServer) {
private[kyuubi] def startTerminatingChecker(stop: () => Unit): Unit = if (!isServer) {
// initialize `_latestLogoutTime` at start
_latestLogoutTime = System.currentTimeMillis()
val interval = conf.get(ENGINE_CHECK_INTERVAL)
Expand All @@ -275,7 +275,7 @@ abstract class SessionManager(name: String) extends CompositeService(name) {
if (!shutdown &&
System.currentTimeMillis() - latestLogoutTime > idleTimeout && getOpenSessionCount <= 0) {
info(s"Idled for more than $idleTimeout ms, terminating")
sys.exit(0)
stop()
}
}
}
Expand Down

0 comments on commit 4bf17a2

Please sign in to comment.