Skip to content

Commit

Permalink
add ut
Browse files Browse the repository at this point in the history
  • Loading branch information
yaooqinn committed May 16, 2019
1 parent 19f67fa commit 7b811f8
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class SparkSessionCacheManager private(name: String) extends AbstractService(nam
userToSession.asScala.foreach {
case (user, ssc) if ssc.isCrashed => removeSparkSession(user, doCheck = true)
case (user, ssc) => tryStopIdledCached(user, ssc)
case _ =>
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package yaooqinn.kyuubi.spark

import org.apache.spark._
import org.apache.spark.sql.SparkSession
import org.mockito.Mockito.when
import org.scalatest.Matchers
import org.scalatest.mock.MockitoSugar

import yaooqinn.kyuubi.service.State

class SparkSessionCacheManagerSuite extends SparkFunSuite with Matchers {
class SparkSessionCacheManagerSuite extends SparkFunSuite with Matchers with MockitoSugar {

override def afterAll(): Unit = {
System.clearProperty("SPARK_YARN_MODE")
Expand All @@ -39,57 +41,94 @@ class SparkSessionCacheManagerSuite extends SparkFunSuite with Matchers {
}

test("init cache") {
val cache = new SparkSessionCacheManager()
val conf = new SparkConf()
KyuubiSparkUtil.setupCommonConfig(conf)
cache.init(conf)
cache.getStartTime should be(0)
cache.getConf should be(conf)
cache.getServiceState should be(State.INITED)
cache.stop()
withCacheMgrInit { (cache, conf) =>
cache.getStartTime should be(0)
cache.getConf should be(conf)
cache.getServiceState should be(State.INITED)
}
}

test("start cache") {
val cache = new SparkSessionCacheManager()
val conf = new SparkConf()
KyuubiSparkUtil.setupCommonConfig(conf)
cache.init(conf)
cache.start()
cache.getStartTime / 1000 should be(System.currentTimeMillis() / 1000)
cache.getConf should be(conf)
cache.getServiceState should be(State.STARTED)
cache.stop()
withCacheMgrStarted { (cache, conf) =>
cache.getStartTime / 1000 should be(System.currentTimeMillis() / 1000)
cache.getConf should be(conf)
cache.getServiceState should be(State.STARTED)
}
}

test("stop cache") {
val cache = new SparkSessionCacheManager()
val conf = new SparkConf().set(KyuubiConf.BACKEND_SESSION_CHECK_INTERVAL.key, "1s")
KyuubiSparkUtil.setupCommonConfig(conf)
cache.init(conf)
cache.start()
Thread.sleep(2000)
cache.stop()
cache.getConf should be(conf)
cache.getServiceState should be(State.STOPPED)
withCacheMgrStarted { (cache, conf) =>
cache.stop()
cache.getConf should be(conf)
cache.getServiceState should be(State.STOPPED)
}
}

test("spark session cache should be null if max cache time reached") {
withCacheMgrStarted { (cache, conf) =>
val session = SparkSession.builder().config(conf).getOrCreate()
val userName = KyuubiSparkUtil.getCurrentUserName
cache.set(userName, session)
assert(cache.getAndIncrease(userName).nonEmpty)
cache.decrease(userName)
Thread.sleep(10000)
val maybeCache2 = cache.getAndIncrease(userName)
assert(maybeCache2.isEmpty, s"reason, crash ${maybeCache2.map(_.isCrashed).mkString}")
session.stop()
}
}

test("spark session cleaner thread test") {
withCacheMgrStarted { (cacheManager, conf) =>
val session = SparkSession.builder().config(conf).getOrCreate()
val ss1 = mock[SparkSession]
val sc1 = mock[SparkContext]

when(ss1.sparkContext).thenReturn(sc1)
when(sc1.isStopped).thenReturn(true)
when(ss1.conf).thenReturn(session.conf)

cacheManager.set("alice", ss1)
cacheManager.set("bob", session)
val latestLogout = System.currentTimeMillis() - 2 * KyuubiSparkUtil.timeStringAsMs(
conf.get(KyuubiConf.BACKEND_SESSION_IDLE_TIMEOUT.key))
cacheManager.getAndIncrease("bob").foreach { c =>
c.updateLogoutTime(latestLogout)
c.decReuseTimeAndGet
c.decReuseTimeAndGet
}
Thread.sleep(1000)
assert(cacheManager.getAndIncrease("alice").isEmpty)
assert(cacheManager.getAndIncrease("bob").isEmpty)
assert(cacheManager.getAndIncrease("tom").isEmpty)
}
}

def withCacheMgrInit(f: (SparkSessionCacheManager, SparkConf) => Unit): Unit = {
val cache = new SparkSessionCacheManager()
val conf = new SparkConf()
try {
cache.init(conf)
f(cache, conf)
} finally {
cache.stop()
}
}

def withCacheMgrStarted(f: (SparkSessionCacheManager, SparkConf) => Unit): Unit = {
val cache = new SparkSessionCacheManager()
val conf = new SparkConf().setMaster("local")
.set(KyuubiConf.BACKEND_SESSION_MAX_CACHE_TIME.key, "10s")
.set(KyuubiConf.BACKEND_SESSION_CHECK_INTERVAL.key, "1s")
.set(KyuubiConf.BACKEND_SESSION_MAX_CACHE_TIME.key, "5s")
.set(KyuubiConf.BACKEND_SESSION_CHECK_INTERVAL.key, "50ms")
KyuubiSparkUtil.setupCommonConfig(conf)
val session = SparkSession.builder().config(conf).getOrCreate()
cache.init(conf)
cache.start()
val userName = KyuubiSparkUtil.getCurrentUserName
cache.set(userName, session)
assert(cache.getAndIncrease(userName).nonEmpty)
cache.decrease(userName)
Thread.sleep(12500)
val maybeCache2 = cache.getAndIncrease(userName)
assert(maybeCache2.isEmpty, s"reason: ${maybeCache2.map(_.isCrashed).mkString}")
session.stop()
cache.stop()

try {
cache.init(conf)
cache.start()
f(cache, conf)
} finally {
cache.stop()
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,18 @@ import org.apache.spark.{KyuubiConf, KyuubiSparkUtil, SparkConf, SparkFunSuite}
import org.apache.spark.sql.SparkSession

class SparkSessionCacheSuite extends SparkFunSuite {
private val conf = new SparkConf()
.setMaster("local")
.set(KyuubiConf.BACKEND_SESSION_MAX_CACHE_TIME.key, "10s")
KyuubiSparkUtil.setupCommonConfig(conf)
private val spark = SparkSession.builder().config(conf).getOrCreate()

test("spark session cache") {
val conf = new SparkConf()
.setMaster("local")
.set(KyuubiConf.BACKEND_SESSION_MAX_CACHE_TIME.key, "10s")
KyuubiSparkUtil.setupCommonConfig(conf)
val spark = SparkSession.builder().config(conf).getOrCreate()
override def afterAll(): Unit = {
spark.stop()
super.afterAll()
}

test("spark session cache") {
val cache = SparkSessionCache.init(spark)
assert(!cache.isCrashed)
assert(!cache.isIdled)
Expand All @@ -43,8 +47,19 @@ class SparkSessionCacheSuite extends SparkFunSuite {
Thread.sleep(10000)
assert(cache.decReuseTimeAndGet === 0)
assert(cache.isExpired)
spark.stop()
assert(cache.isCrashed)
assert(cache.needClear)
}

test("cache status idled") {
val cache = SparkSessionCache.init(spark)
assert(!cache.isIdled, "cache is not idled, reuse time = 1")
cache.decReuseTimeAndGet
assert(!cache.isIdled, "cache is not idled, reuse time = 0, but latest logout is unset")
val latestLogout = System.currentTimeMillis() - 2 * KyuubiSparkUtil.timeStringAsMs(
spark.conf.get(KyuubiConf.BACKEND_SESSION_IDLE_TIMEOUT.key))
cache.updateLogoutTime(latestLogout)
assert(cache.isIdled, "cache is idled, reuse time = 0, idle timeout exceeded")
cache.incReuseTimeAndGet
assert(!cache.isIdled, "cache is not idled, idle timeout exceeded however reuse time = 1")
}
}

0 comments on commit 7b811f8

Please sign in to comment.