Skip to content

Commit

Permalink
Merge pull request #132 from yaooqinn/KYUUBI-125
Browse files Browse the repository at this point in the history
[KYUUBI-125][FOLLOWUP]session ugi is not wrapping sparkcontext initializing
  • Loading branch information
yaooqinn committed Dec 11, 2018
2 parents df1abd4 + 9556edd commit 6886529
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,18 @@ class SparkSessionWithUGI(
private var initialDatabase: Option[String] = None
private var sparkException: Option[Throwable] = None

private def newContext(): Thread = {
new Thread(s"Start-SparkContext-$userName") {
private lazy val newContext: Thread = {
val threadName = "SparkContext-Starter-" + userName
new Thread(threadName) {
override def run(): Unit = {
try {
promisedSparkContext.trySuccess {
new SparkContext(conf)
}
} catch {
case NonFatal(e) => sparkException = Some(e)
case e: Exception =>
sparkException = Some(e)
throw e
}
}
}
Expand Down Expand Up @@ -156,10 +159,9 @@ class SparkSessionWithUGI(
conf.setAppName(appName)
configureSparkConf(sessionConf)
val totalWaitTime: Long = conf.getTimeAsSeconds(BACKEND_SESSTION_INIT_TIMEOUT)
val newContextThread = newContext()
try {
KyuubiHadoopUtil.doAs(user) {
newContextThread.start()
newContext.start()
val context =
Await.result(promisedSparkContext.future, Duration(totalWaitTime, TimeUnit.SECONDS))
_sparkSession = ReflectUtils.newInstance(
Expand All @@ -182,9 +184,7 @@ class SparkSessionWithUGI(
throw ke
} finally {
SparkSessionWithUGI.setFullyConstructed(userName)
if (newContextThread.isAlive) {
newContextThread.join()
}
newContext.join()
}

KyuubiServerMonitor.setListener(userName, new KyuubiServerListener(conf))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
classOf[SparkSession].getName,
Seq(classOf[SparkContext]),
Seq(sc)).asInstanceOf[SparkSession]

cache.init(conf)
cache.start()
cache.set(userName, spark)
Expand Down Expand Up @@ -80,9 +80,9 @@ class SparkSessionWithUGISuite extends SparkFunSuite {

test("test init failed with no such database") {
val sparkSessionWithUGI = new SparkSessionWithUGI(user, conf, cache)
intercept[NoSuchDatabaseException](sparkSessionWithUGI.init(Map("use:database" -> "fakedb")))
assert(ReflectUtils.getFieldValue(sparkSessionWithUGI,
"yaooqinn$kyuubi$spark$SparkSessionWithUGI$$initialDatabase") === Some("use fakedb"))
val e = intercept[NoSuchDatabaseException](
sparkSessionWithUGI.init(Map("use:database" -> "fakedb")))
assert(e.getMessage().contains("fakedb"))
assert(cache.getAndIncrease(userName).nonEmpty)
}

Expand Down Expand Up @@ -179,4 +179,12 @@ class SparkSessionWithUGISuite extends SparkFunSuite {
assert(sc.isStopped)
}
}

test("user name should be switched") {
val proxyUserName = "Kent"
val proxyUser = UserGroupInformation.createProxyUser(proxyUserName, user)
val sparkSessionWithUGI = new SparkSessionWithUGI(proxyUser, conf, cache)
sparkSessionWithUGI.init(Map.empty)
assert(sparkSessionWithUGI.sparkSession.sparkContext.sparkUser === proxyUserName)
}
}

0 comments on commit 6886529

Please sign in to comment.