Skip to content

Commit

Permalink
[KYUUBI-160]create spark context thread-safely (#161)
Browse files Browse the repository at this point in the history
* fix #160 create spark context thread-safely

* using object wait
  • Loading branch information
yaooqinn committed Mar 6, 2019
1 parent 7631ac3 commit 70c5c6d
Showing 1 changed file with 12 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ class SparkSessionWithUGI(
user: UserGroupInformation,
conf: SparkConf,
cache: SparkSessionCacheManager) extends Logging {
import SparkSessionWithUGI._

private var _sparkSession: SparkSession = _
private val userName: String = user.getShortUserName
private val promisedSparkContext = Promise[SparkContext]()
Expand Down Expand Up @@ -125,28 +127,29 @@ class SparkSessionWithUGI(
}
}

private def getOrCreate(sessionConf: Map[String, String]): Unit = synchronized {
private def getOrCreate(
sessionConf: Map[String, String]): Unit = SPARK_INSTANTIATION_LOCK.synchronized {
val totalRounds = math.max(conf.get(BACKEND_SESSION_WAIT_OTHER_TIMES).toInt, 15)
var checkRound = totalRounds
val interval = conf.getTimeAsMs(BACKEND_SESSION_WAIT_OTHER_INTERVAL)
// if user's sc is being constructed by another
while (SparkSessionWithUGI.isPartiallyConstructed(userName)) {
wait(interval)
while (isPartiallyConstructed(userName)) {
checkRound -= 1
if (checkRound <= 0) {
throw new KyuubiSQLException(s"A partially constructed SparkContext for [$userName] " +
s"has last more than ${totalRounds * interval / 1000} seconds")
}
info(s"A partially constructed SparkContext for [$userName], $checkRound times countdown.")
SPARK_INSTANTIATION_LOCK.wait(interval)
}

cache.getAndIncrease(userName) match {
case Some(ss) =>
_sparkSession = ss.newSession()
configureSparkSession(sessionConf)
case _ =>
SparkSessionWithUGI.setPartiallyConstructed(userName)
notifyAll()
setPartiallyConstructed(userName)
SPARK_INSTANTIATION_LOCK.notifyAll()
create(sessionConf)
}
}
Expand Down Expand Up @@ -183,7 +186,7 @@ class SparkSessionWithUGI(
sparkException.foreach(ke.addSuppressed)
throw ke
} finally {
SparkSessionWithUGI.setFullyConstructed(userName)
setFullyConstructed(userName)
newContext.join()
}

Expand Down Expand Up @@ -221,6 +224,9 @@ class SparkSessionWithUGI(
}

object SparkSessionWithUGI {

val SPARK_INSTANTIATION_LOCK = new Object()

private val userSparkContextBeingConstruct = new MHSet[String]()

def setPartiallyConstructed(user: String): Unit = {
Expand Down

0 comments on commit 70c5c6d

Please sign in to comment.