Skip to content

Commit

Permalink
Merge pull request #148 from yaooqinn/KYUUBI-147
Browse files Browse the repository at this point in the history
User Latest Logout information should be cleaned after backend session is stopped by session cache manager
  • Loading branch information
yaooqinn committed Jan 25, 2019
2 parents 8979fc5 + df77304 commit 9c606c1
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package yaooqinn.kyuubi.spark

import java.text.SimpleDateFormat
import java.util.Date
import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -40,11 +42,11 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
new ThreadFactoryBuilder()
.setDaemon(true).setNameFormat(getClass.getSimpleName + "-%d").build())

private[this] val userToSession = new ConcurrentHashMap[String, (SparkSession, AtomicInteger)]
private[this] val userLatestLogout = new ConcurrentHashMap[String, Long]
private[this] var idleTimeout: Long = _
private val userToSession = new ConcurrentHashMap[String, (SparkSession, AtomicInteger)]
private val userLatestLogout = new ConcurrentHashMap[String, Long]
private var idleTimeout: Long = _

private[this] val sessionCleaner = new Runnable {
private val sessionCleaner = new Runnable {
override def run(): Unit = {
userToSession.asScala.foreach {
case (user, (session, _)) if session.sparkContext.isStopped =>
Expand All @@ -66,7 +68,12 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
}
}

private[this] def removeSparkSession(user: String): Unit = {
private def removeSparkSession(user: String): Unit = {
Option(userLatestLogout.remove(user)) match {
case Some(t) => info(s"User [$user] last time logout at " +
new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(t)))
case _ =>
}
userToSession.remove(user)
KyuubiServerMonitor.detachUITab(user)
}
Expand All @@ -76,9 +83,10 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
}

def getAndIncrease(user: String): Option[SparkSession] = {
Some(userToSession.get(user)) match {
Option(userToSession.get(user)) match {
case Some((ss, times)) if !ss.sparkContext.isStopped =>
info(s"SparkSession for [$user] is reused for ${times.incrementAndGet()} time(s) after + 1")
val currentTime = times.incrementAndGet()
info(s"SparkSession for [$user] is reused for $currentTime time(s) after + 1")
Some(ss)
case _ =>
info(s"SparkSession for [$user] isn't cached, will create a new one.")
Expand All @@ -87,10 +95,11 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
}

def decrease(user: String): Unit = {
Some(userToSession.get(user)) match {
Option(userToSession.get(user)) match {
case Some((ss, times)) if !ss.sparkContext.isStopped =>
userLatestLogout.put(user, System.currentTimeMillis())
info(s"SparkSession for [$user] is reused for ${times.decrementAndGet()} time(s) after -1")
val currentTime = times.decrementAndGet()
info(s"SparkSession for [$user] is reused for $currentTime time(s) after - 1")
case _ =>
warn(s"SparkSession for [$user] was not found in the cache.")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar

import yaooqinn.kyuubi.service.State
import yaooqinn.kyuubi.utils.ReflectUtils

class SparkSessionCacheManagerSuite extends SparkFunSuite with Matchers with MockitoSugar {

Expand Down Expand Up @@ -60,13 +61,39 @@ class SparkSessionCacheManagerSuite extends SparkFunSuite with Matchers with Moc
val userName = KyuubiSparkUtil.getCurrentUserName
val sc = mock[SparkContext]
when(ss.sparkContext).thenReturn(sc)

cache.decrease(userName) // None
cache.getAndIncrease(userName) // None

cache.set(userName, ss)
when(sc.isStopped).thenReturn(false)
cache.decrease(userName)
cache.getAndIncrease(userName)
cache.set(userName, ss)

when(sc.isStopped).thenReturn(true)
cache.getAndIncrease(userName)

when(sc.isStopped).thenReturn(false)
cache.decrease(userName)
when(sc.isStopped).thenReturn(true)
cache.decrease(userName)

Thread.sleep(2000)
val field = cache.getClass.getDeclaredField("sessionCleaner")
field.setAccessible(true)

when(sc.isStopped).thenReturn(false)
val runnable = field.get(cache).asInstanceOf[Runnable]
runnable.run() // > 0

when(sc.isStopped).thenReturn(false)
cache.decrease(userName)
when(sc.isStopped).thenReturn(false)
runnable.run() // not expiry
when(sc.isStopped).thenReturn(false)
ReflectUtils.setFieldValue(
cache, "yaooqinn$kyuubi$spark$SparkSessionCacheManager$$idleTimeout", 0)
runnable.run()
System.clearProperty("SPARK_YARN_MODE")
cache.stop()
}

Expand Down

0 comments on commit 9c606c1

Please sign in to comment.