Skip to content

Commit

Permalink
[KYUUBI-196]Refactoring SparkSessionWithUGI‘s get and create SparkCon…
Browse files Browse the repository at this point in the history
…text logic (#197)

* fxit 194 Refactoring SparkSessionWithUGI‘s get and create sc logic

* upgrade scala-maven-plugin to 3.4.6

* fix travis

* fix travis again

* fix travis open jdk8

* fix travis open jdk8

* fix travis open jdk8

* fix travis open jdk8

* fix travis open jdk8

* add state to get yarn applications

* promise complete

* fix #69 capture spark exceptions

* typo

* fix #196 add ut and log

* typo

* typo

* add ut
  • Loading branch information
yaooqinn committed Jun 17, 2019
1 parent e295995 commit 63245dd
Show file tree
Hide file tree
Showing 13 changed files with 212 additions and 230 deletions.
15 changes: 11 additions & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
language: scala
scala:
- 2.11.8
jdk:
- openjdk8

cache:
directories:
- $HOME/.m2
- ./build

before_deploy:
- jdk_switcher use openjdk8
- ./build/dist --tgz

deploy:
Expand All @@ -30,16 +33,20 @@ jobs:
include:
- stage: spark2.1
language: scala
script: ./build/mvn clean install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V
script:
- ./build/mvn clean install -Pspark-2.1 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.2
language: scala
script: ./build/mvn clean install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V
script:
- ./build/mvn clean install -Pspark-2.2 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.3
language: scala
script: ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
script:
- ./build/mvn clean install -Pspark-2.3 -Dmaven.javadoc.skip=true -B -V
- stage: spark2.4
language: scala
script: ./build/mvn clean install -Pspark-2.4 -Dmaven.javadoc.skip=true -B -V
script:
- ./build/mvn clean install -Pspark-2.4 -Dmaven.javadoc.skip=true -B -V

after_success:
- bash <(curl -s https://codecov.io/bash)
2 changes: 0 additions & 2 deletions docs/configurations.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,6 @@ spark.kyuubi.<br />frontend.session.check.operation| true |Session will be consi

Name|Default|Description
---|---|---
spark.kyuubi.<br />backend.session.wait.other.times | 60 | How many times to check when another session with the same user is initializing SparkContext. Total Time will be times by `spark.kyuubi.backend.session.wait.other.interval`.
spark.kyuubi.<br />backend.session.wait.other.interval|1s|The interval for checking whether other thread with the same user has completed SparkContext instantiation.
spark.kyuubi.<br />backend.session.init.timeout|60s|How long we suggest the server to give up instantiating SparkContext.
spark.kyuubi.<br />backend.session.check.interval|5min|The check interval for backend session a.k.a SparkSession timeout.
spark.kyuubi.<br />backend.session.idle.timeout|30min|How long the SparkSession instance will be cached after user logout. Using cached SparkSession can significantly cut the startup time for SparkContext, which makes sense for queries that are short lived. The timeout is calculated from when all sessions of the user are disconnected
Expand Down
17 changes: 1 addition & 16 deletions kyuubi-server/src/main/scala/org/apache/spark/KyuubiConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ object KyuubiConf {
.timeConf(TimeUnit.SECONDS)
.createWithDefault(TimeUnit.SECONDS.toSeconds(20L))

val FRONTEND_LOGIN_BEBACKOFF_SLOT_LENGTH: ConfigEntry[Long] =
val FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.frontend.backoff.slot.length")
.doc("Time to back off during login to Kyuubi Server.")
.timeConf(TimeUnit.MILLISECONDS)
Expand All @@ -251,21 +251,6 @@ object KyuubiConf {
// SparkSession //
/////////////////////////////////////////////////////////////////////////////////////////////////

val BACKEND_SESSION_WAIT_OTHER_TIMES: ConfigEntry[Int] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.wait.other.times")
.doc("How many times to check when another session with the same user is initializing " +
"SparkContext. Total Time will be times by " +
"`spark.kyuubi.backend.session.wait.other.interval`")
.intConf
.createWithDefault(60)

val BACKEND_SESSION_WAIT_OTHER_INTERVAL: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.wait.other.interval")
.doc("The interval for checking whether other thread with the same user has completed" +
" SparkContext instantiation.")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefault(TimeUnit.SECONDS.toMillis(1L))

val BACKEND_SESSION_INIT_TIMEOUT: ConfigEntry[Long] =
KyuubiConfigBuilder("spark.kyuubi.backend.session.init.timeout")
.doc("How long we suggest the server to give up instantiating SparkContext")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,6 @@ object KyuubiSparkUtil extends Logging {
val HIVE_VAR_PREFIX: Regex = """set:hivevar:([^=]+)""".r
val USE_DB = "use:database"
val QUEUE: String = SPARK_PREFIX + YARN_PREFIX + "queue"
val DEPRECATED_QUEUE = "mapred.job.queue.name"
val HDFS_CLIENT_CACHE: String = SPARK_HADOOP_PREFIX + "fs.hdfs.impl.disable.cache"
val HDFS_CLIENT_CACHE_DEFAULT = "true"
val FILE_CLIENT_CACHE: String = SPARK_HADOOP_PREFIX + "fs.file.impl.disable.cache"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ object SparkEnv extends Logging {
*/
def get: SparkEnv = {
debug(s"Kyuubi: Get SparkEnv for $user")
envs.getOrDefault(user, envs.values().asScala.headOption.getOrElse(env))
Option(envs.get(user)).getOrElse(envs.values().asScala.headOption.getOrElse(env))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ abstract class AbstractOperation(
}
}

override def cancel(): Unit = {
setState(CANCELED)
throw new UnsupportedOperationException("KyuubiOperation.cancel()")
}

protected def setHasResultSet(hasResultSet: Boolean): Unit = {
this.hasResultSet = hasResultSet
opHandle.setHasResultSet(hasResultSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ abstract class MetadataOperation(session: KyuubiSession, opType: OperationType)

setHasResultSet(true)

override def cancel(): Unit = {
setState(CANCELED)
throw new UnsupportedOperationException("MetadataOperation.cancel()")
}

override def close(): Unit = {
setState(CLOSED)
cleanupOperationLog()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package yaooqinn.kyuubi.server

import java.net.{InetAddress, ServerSocket}
import java.util.concurrent.{SynchronousQueue, ThreadPoolExecutor, TimeUnit}
import java.util.concurrent.{SynchronousQueue, TimeUnit}

import scala.collection.JavaConverters._
import scala.util.{Failure, Try}
Expand All @@ -28,7 +28,6 @@ import org.apache.hive.service.cli.thrift._
import org.apache.spark.{KyuubiConf, SparkConf}
import org.apache.spark.KyuubiConf._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.thrift.protocol.{TBinaryProtocol, TProtocol}
import org.apache.thrift.server.{ServerContext, TServer, TServerEventHandler, TThreadPoolServer}
import org.apache.thrift.transport.{TServerSocket, TTransport}
Expand Down Expand Up @@ -57,7 +56,7 @@ class FrontendService private(name: String, beService: BackendService, OOMHook:

private val OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS)

private var serverEventHandler: TServerEventHandler = new FeTServerEventHandler
private val serverEventHandler: TServerEventHandler = new FeTServerEventHandler
private var currentServerContext: ThreadLocal[ServerContext] = _

private var server: Option[TServer] = None
Expand Down Expand Up @@ -612,7 +611,7 @@ class FrontendService private(name: String, beService: BackendService, OOMHook:
// Server args
val maxMessageSize = conf.get(FRONTEND_MAX_MESSAGE_SIZE).toInt
val requestTimeout = conf.getTimeAsSeconds(FRONTEND_LOGIN_TIMEOUT).toInt
val beBackoffSlotLength = conf.getTimeAsMs(FRONTEND_LOGIN_BEBACKOFF_SLOT_LENGTH).toInt
val beBackoffSlotLength = conf.getTimeAsMs(FRONTEND_LOGIN_BACKOFF_SLOT_LENGTH).toInt
val args = new TThreadPoolServer.Args(tSocket)
.processorFactory(processorFactory)
.transportFactory(transportFactory)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package yaooqinn.kyuubi.spark

import java.util.concurrent.TimeUnit
import java.util.UUID
import java.util.concurrent.{ConcurrentHashMap, TimeUnit}

import scala.collection.mutable.{HashSet => MHSet}
import scala.concurrent.{Await, Promise}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Promise, TimeoutException}
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import scala.util.Try

import org.apache.hadoop.security.UserGroupInformation
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{KyuubiSparkUtil, SparkConf, SparkContext}
import org.apache.spark.KyuubiConf._
import org.apache.spark.KyuubiSparkUtil._
import org.apache.spark.sql.SparkSession
Expand All @@ -42,43 +41,19 @@ class SparkSessionWithUGI(
conf: SparkConf,
cacheMgr: SparkSessionCacheManager) extends Logging {
import SparkSessionWithUGI._
import KyuubiHadoopUtil._

private var _sparkSession: SparkSession = _
private val userName: String = user.getShortUserName
private val promisedSparkContext = Promise[SparkContext]()
private var initialDatabase: Option[String] = None
private var sparkException: Option[Throwable] = None
private val startTime = System.currentTimeMillis()
private val timeout = KyuubiSparkUtil.timeStringAsMs(conf.get(BACKEND_SESSION_INIT_TIMEOUT))

private lazy val newContext: Thread = {
val threadName = "SparkContext-Starter-" + userName
new Thread(threadName) {
override def run(): Unit = {
try {
promisedSparkContext.trySuccess {
new SparkContext(conf)
}
} catch {
case e: Exception =>
sparkException = Some(e)
throw e
}
}
}
}

/**
* Invoke SparkContext.stop() if not succeed initializing it
*/
private def stopContext(): Unit = {
promisedSparkContext.future.map { sc =>
warn(s"Error occurred during initializing SparkContext for $userName, stopping")
try {
sc.stop
} catch {
case NonFatal(e) => error(s"Error Stopping $userName's SparkContext", e)
} finally {
System.setProperty("SPARK_YARN_MODE", "true")
}
override def run(): Unit = promisedSparkContext.complete(Try(new SparkContext(conf)))
}
}

Expand All @@ -90,7 +65,6 @@ class SparkSessionWithUGI(
private def configureSparkConf(sessionConf: Map[String, String]): Unit = {
for ((key, value) <- sessionConf) {
key match {
case HIVE_VAR_PREFIX(DEPRECATED_QUEUE) => conf.set(QUEUE, value)
case HIVE_VAR_PREFIX(k) =>
if (k.startsWith(SPARK_PREFIX)) {
conf.set(k, value)
Expand Down Expand Up @@ -127,70 +101,71 @@ class SparkSessionWithUGI(
}

private def getOrCreate(
sessionConf: Map[String, String]): Unit = SPARK_INSTANTIATION_LOCK.synchronized {
val totalRounds = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES).toInt, 15)
var checkRound = totalRounds
val interval = conf.getTimeAsMs(BACKEND_SESSION_WAIT_OTHER_INTERVAL)
// if user's sc is being constructed by another
while (isPartiallyConstructed(userName)) {
checkRound -= 1
if (checkRound <= 0) {
throw new KyuubiSQLException(s"A partially constructed SparkContext for [$userName] " +
s"has last more than ${totalRounds * interval / 1000} seconds")
}
info(s"A partially constructed SparkContext for [$userName], $checkRound times countdown.")
SPARK_INSTANTIATION_LOCK.wait(interval)
}

sessionConf: Map[String, String]): Unit = {
cacheMgr.getAndIncrease(userName) match {
case Some(ssc) =>
_sparkSession = ssc.spark.newSession()
configureSparkSession(sessionConf)
case _ if isPartiallyConstructed(userName) =>
if (System.currentTimeMillis() - startTime > timeout) {
throw new KyuubiSQLException(userName + " has a constructing sc, timeout, aborting")
} else {
SPARK_INSTANTIATION_LOCK.synchronized {
SPARK_INSTANTIATION_LOCK.wait(1000)
}
getOrCreate(sessionConf)
}
case _ =>
setPartiallyConstructed(userName)
SPARK_INSTANTIATION_LOCK.notifyAll()
create(sessionConf)
SPARK_INSTANTIATION_LOCK.synchronized {
if (isPartiallyConstructed(userName)) {
getOrCreate(sessionConf)
} else {
setPartiallyConstructed(userName)
}
}
if (_sparkSession == null) {
create(sessionConf)
}
}
}

private def create(sessionConf: Map[String, String]): Unit = {
info(s"--------- Create new SparkSession for $userName ----------")
// kyuubi|user name|canonical host name| port
// kyuubi|user name|canonical host name|port|uuid
val appName = Seq(
"kyuubi", userName, conf.get(FRONTEND_BIND_HOST), conf.get(FRONTEND_BIND_PORT)).mkString("|")
"kyuubi",
userName,
conf.get(FRONTEND_BIND_HOST),
conf.get(FRONTEND_BIND_PORT),
UUID.randomUUID().toString).mkString("|")
conf.setAppName(appName)
configureSparkConf(sessionConf)
val totalWaitTime: Long = conf.getTimeAsSeconds(BACKEND_SESSION_INIT_TIMEOUT)
info(s"Create new SparkSession for " + userName + " as " + appName)

try {
KyuubiHadoopUtil.doAs(user) {
doAs(user) {
newContext.start()
val context =
Await.result(promisedSparkContext.future, Duration(totalWaitTime, TimeUnit.SECONDS))
Await.result(promisedSparkContext.future, Duration(timeout, TimeUnit.SECONDS))
_sparkSession = ReflectUtils.newInstance(
classOf[SparkSession].getName,
Seq(classOf[SparkContext]),
Seq(context)).asInstanceOf[SparkSession]
}
cacheMgr.set(userName, _sparkSession)
} catch {
case e: Exception =>
case e: TimeoutException =>
if (conf.getOption("spark.master").contains("yarn")) {
KyuubiHadoopUtil.doAsAndLogNonFatal(user) {
KyuubiHadoopUtil.killYarnAppByName(appName)
}
doAsAndLogNonFatal(user)(killYarnAppByName(appName))
}
stopContext()
val cause = findCause(e)
val msg =
s"""
|Get SparkSession for [$userName] failed
|Diagnosis: ${sparkException.map(_.getMessage).getOrElse(cause.getMessage)}
|Please check if the specified yarn queue [${conf.getOption(QUEUE)
.getOrElse("")}] is available or has sufficient resources left
|Failed to get SparkSession for [$userName]
|Diagnosis: ${e.getMessage}
|Please check whether the specified yarn queue [${conf.getOption(QUEUE)
.getOrElse("")}] has sufficient resources left
""".stripMargin
val ke = new KyuubiSQLException(msg, "08S01", 1001, cause)
sparkException.foreach(ke.addSuppressed)
throw ke
throw new KyuubiSQLException(msg, "08S01", 1001, e)
case e: Exception => throw new KyuubiSQLException(e)
} finally {
setFullyConstructed(userName)
newContext.join()
Expand All @@ -209,9 +184,7 @@ class SparkSessionWithUGI(

try {
initialDatabase.foreach { db =>
KyuubiHadoopUtil.doAs(user) {
_sparkSession.sql(db)
}
doAs(user)(_sparkSession.sql(db))
}
} catch {
case e: Exception =>
Expand All @@ -233,14 +206,14 @@ object SparkSessionWithUGI {

val SPARK_INSTANTIATION_LOCK = new Object()

private val userSparkContextBeingConstruct = new MHSet[String]()
private val userSparkContextBeingConstruct = new ConcurrentHashMap[String, Boolean]()

def setPartiallyConstructed(user: String): Unit = {
userSparkContextBeingConstruct.add(user)
userSparkContextBeingConstruct.put(user, true)
}

def isPartiallyConstructed(user: String): Boolean = {
userSparkContextBeingConstruct.contains(user)
Option(userSparkContextBeingConstruct.get(user)).getOrElse(false)
}

def setFullyConstructed(user: String): Unit = {
Expand Down

0 comments on commit 63245dd

Please sign in to comment.