Skip to content

Commit

Permalink
[KYUUBI #1960] Skip creating sparksession and starting engine if the …
Browse files Browse the repository at this point in the history
…max initialization time exceeds

<!--
Thanks for sending a pull request!

Here are some tips for you:
  1. If this is your first time, please read our contributor guidelines: https://kyuubi.readthedocs.io/en/latest/community/contributions.html
  2. If the PR is related to an issue in https://github.com/apache/incubator-kyuubi/issues, add '[KYUUBI #XXXX]' in your PR title, e.g., '[KYUUBI #XXXX] Your PR title ...'.
  3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][KYUUBI #XXXX] Your PR title ...'.
-->

### _Why are the changes needed?_
<!--
Please clarify why the changes are needed. For instance,
  1. If you add a feature, you can talk about the use case of it.
  2. If you fix a bug, you can clarify why it is a bug.
-->
close #1960

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [X] Add screenshots for manual tests if appropriate

Test steps:
1. Start another application to fill the queue.
2. Modify the `stop-application.sh` so it doesn't work.
3. Submit a Kyuubi request.
4. After waiting for timeout, stop the application in step 1.

We can get the following results:
1. The kyuubi request exits after timeout.
2. After obtaining resources, the orphaned engine app fails to execute and exits immediately.
![image](https://user-images.githubusercontent.com/17894939/155467408-f9c5d9d3-ccb9-47a6-b697-1d64f42e427e.png)
![image](https://user-images.githubusercontent.com/17894939/155467301-7ee01616-10dd-4c25-9375-02c0ae88091e.png)

- [ ] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #1966 from wForget/KYUUBI-1960.

Closes #1960

0f977c8 [Wang Zhen] spotless
9bb2178 [Wang Zhen] Merge branch 'master' of https://github.com/apache/incubator-kyuubi into KYUUBI-1960
3effa4d [wForget] fix
f9f13c6 [wForget] fix test
abdc1f3 [wForget] fix
48b4bac [wForget] [KYUUBI-1960] Skip creating sparksession and starting engine if the max initialization time exceeds

Lead-authored-by: wForget <643348094@qq.com>
Co-authored-by: Wang Zhen <wangzhen07@qiyi.com>
Signed-off-by: ulysses-you <ulyssesyou@apache.org>
  • Loading branch information
2 people authored and ulysses-you committed Mar 1, 2022
1 parent 6d94fee commit 4148e5b
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 25 deletions.
Expand Up @@ -31,6 +31,7 @@ import org.apache.kyuubi.{KyuubiException, Logging, Utils}
import org.apache.kyuubi.Utils._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
import org.apache.kyuubi.engine.spark.SparkSQLEngine.{countDownLatch, currentEngine}
import org.apache.kyuubi.engine.spark.events.{EngineEvent, EngineEventsStore, EventLoggingService}
import org.apache.kyuubi.events.EventLogging
Expand Down Expand Up @@ -70,6 +71,8 @@ case class SparkSQLEngine(spark: SparkSession) extends Serverable("SparkSQLEngin

object SparkSQLEngine extends Logging {

val sparkConf: SparkConf = new SparkConf()

val kyuubiConf: KyuubiConf = KyuubiConf()

var currentEngine: Option[SparkSQLEngine] = None
Expand All @@ -78,8 +81,7 @@ object SparkSQLEngine extends Logging {

private val countDownLatch = new CountDownLatch(1)

def createSpark(): SparkSession = {
val sparkConf = new SparkConf()
def setupConf(): Unit = {
val rootDir = sparkConf.getOption("spark.repl.classdir").getOrElse(getLocalDir(sparkConf))
val outputDir = Utils.createTempDir(root = rootDir, namePrefix = "repl")
sparkConf.setIfMissing("spark.sql.execution.topKSortFallbackThreshold", "10000")
Expand Down Expand Up @@ -112,11 +114,16 @@ object SparkSQLEngine extends Logging {
debug(s"KyuubiConf: $k = $v")
}
}
}

def createSpark(): SparkSession = {
val session = SparkSession.builder.config(sparkConf).getOrCreate
(kyuubiConf.get(ENGINE_INITIALIZE_SQL) ++ kyuubiConf.get(ENGINE_SESSION_INITIALIZE_SQL))
.foreach { sqlStr =>
session.sparkContext.setJobGroup(appName, sqlStr, interruptOnCancel = true)
session.sparkContext.setJobGroup(
"engine_initializing_queries",
sqlStr,
interruptOnCancel = true)
debug(s"Execute session initializing sql: $sqlStr")
session.sql(sqlStr).isEmpty
}
Expand Down Expand Up @@ -168,29 +175,43 @@ object SparkSQLEngine extends Logging {

def main(args: Array[String]): Unit = {
SignalRegister.registerLogger(logger)
var spark: SparkSession = null
try {
spark = createSpark()
setupConf()
val startedTime = System.currentTimeMillis()
val submitTime = kyuubiConf.getOption(KYUUBI_ENGINE_SUBMIT_TIME_KEY) match {
case Some(t) => t.toLong
case _ => startedTime
}
val initTimeout = kyuubiConf.get(ENGINE_INIT_TIMEOUT)
val totalInitTime = startedTime - submitTime
if (totalInitTime > initTimeout) {
throw new KyuubiException(s"The total engine initialization time ($totalInitTime ms)" +
s" exceeds `kyuubi.session.engine.initialize.timeout` ($initTimeout ms)," +
s" and submitted at $submitTime.")
} else {
var spark: SparkSession = null
try {
startEngine(spark)
// blocking main thread
countDownLatch.await()
spark = createSpark()
try {
startEngine(spark)
// blocking main thread
countDownLatch.await()
} catch {
case e: KyuubiException => currentEngine match {
case Some(engine) =>
engine.stop()
val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
EventLogging.onEvent(event)
error(event, e)
case _ => error("Current SparkSQLEngine is not created.")
}

}
} catch {
case e: KyuubiException => currentEngine match {
case Some(engine) =>
engine.stop()
val event = EngineEvent(engine).copy(diagnostic = e.getMessage)
EventLogging.onEvent(event)
error(event, e)
case _ => error("Current SparkSQLEngine is not created.")
}

}
} catch {
case t: Throwable => error(s"Failed to instantiate SparkSession: ${t.getMessage}", t)
} finally {
if (spark != null) {
spark.stop()
case t: Throwable => error(s"Failed to instantiate SparkSession: ${t.getMessage}", t)
} finally {
if (spark != null) {
spark.stop()
}
}
}
}
Expand Down
Expand Up @@ -66,6 +66,7 @@ trait WithSparkSQLEngine extends KyuubiFunSuite {

SparkSession.clearActiveSession()
SparkSession.clearDefaultSession()
SparkSQLEngine.setupConf()
spark = SparkSQLEngine.createSpark()
SparkSQLEngine.startEngine(spark)
engine = SparkSQLEngine.currentEngine.get
Expand Down
Expand Up @@ -20,4 +20,5 @@ package org.apache.kyuubi.config
object KyuubiReservedKeys {
final val KYUUBI_SESSION_USER_KEY = "kyuubi.session.user"
final val KYUUBI_STATEMENT_ID_KEY = "kyuubi.statement.id"
final val KYUUBI_ENGINE_SUBMIT_TIME_KEY = "kyuubi.engine.submit.time"
}
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.kyuubi.{KYUUBI_VERSION, KyuubiSQLException, Logging, Utils}
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.config.KyuubiConf._
import org.apache.kyuubi.config.KyuubiReservedKeys.KYUUBI_ENGINE_SUBMIT_TIME_KEY
import org.apache.kyuubi.engine.EngineType.{EngineType, FLINK_SQL, SPARK_SQL, TRINO}
import org.apache.kyuubi.engine.ShareLevel.{CONNECTION, GROUP, SERVER, ShareLevel}
import org.apache.kyuubi.engine.flink.FlinkProcessBuilder
Expand Down Expand Up @@ -178,6 +179,8 @@ private[kyuubi] class EngineRef(

conf.set(HA_ZK_NAMESPACE, engineSpace)
conf.set(HA_ZK_ENGINE_REF_ID, engineRefId)
val started = System.currentTimeMillis()
conf.set(KYUUBI_ENGINE_SUBMIT_TIME_KEY, String.valueOf(started))
val builder = engineType match {
case SPARK_SQL =>
conf.setIfMissing(SparkProcessBuilder.APP_KEY, defaultEngineName)
Expand All @@ -203,7 +206,6 @@ private[kyuubi] class EngineRef(
try {
info(s"Launching engine:\n$builder")
val process = builder.start
val started = System.currentTimeMillis()
var exitValue: Option[Int] = None
while (engineRef.isEmpty) {
if (exitValue.isEmpty && process.waitFor(1, TimeUnit.SECONDS)) {
Expand Down

0 comments on commit 4148e5b

Please sign in to comment.