Skip to content

Commit

Permalink
[LIVY-647]Fix travis failed on batch session should not be gc-ed unti…
Browse files Browse the repository at this point in the history
…l application is finished
  • Loading branch information
runzhiwang committed Sep 2, 2019
1 parent d0d8028 commit 794353a
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
Expand Up @@ -153,7 +153,14 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
}
}

def collectGarbage(): Future[Iterable[Unit]] = {
/**
* @param excludeMocked whether to exclude mocked session.
* If some thread (e.g. GarbageCollector) check the state of mocked session
* when the test doing doReturn(s).when(session).state,
* exception will be threw by the thread.
* So some thread (e.g. GarbageCollector) had better exclude mocked session.
*/
def collectGarbage(excludeMocked: Boolean): Future[Iterable[Unit]] = {
def expired(session: Session): Boolean = {
session.state match {
case s: FinishedSessionState =>
Expand All @@ -173,7 +180,14 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
}
}

Future.sequence(all().filter(expired).map { s =>
def notMocked(session: Session): Boolean = {
if (excludeMocked) {
!session.toString().contains("Mock for")
}
true
}

Future.sequence(all().filter(notMocked).filter(expired).map { s =>
info(s"Deleting $s because it was inactive for more than ${sessionTimeout / 1e6} ms.")
delete(s)
})
Expand Down Expand Up @@ -205,7 +219,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](

override def run(): Unit = {
while (true) {
collectGarbage()
collectGarbage(true)
Thread.sleep(60 * 1000)
}
}
Expand Down
Expand Up @@ -54,7 +54,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
val session = manager.register(new MockSession(manager.nextId(), null, livyConf))
manager.get(session.id).isDefined should be(true)
eventually(timeout(5 seconds), interval(100 millis)) {
Await.result(manager.collectGarbage(), Duration.Inf)
Await.result(manager.collectGarbage(false), Duration.Inf)
manager.get(session.id) should be(None)
}
}
Expand All @@ -69,7 +69,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
manager.get(session2.id).isDefined should be(true)
session2.serverState = SessionState.Busy
eventually(timeout(5 seconds), interval(100 millis)) {
Await.result(manager.collectGarbage(), Duration.Inf)
Await.result(manager.collectGarbage(false), Duration.Inf)
(manager.get(session1.id).isDefined, manager.get(session2.id).isDefined) should
be (false, true)
}
Expand Down Expand Up @@ -129,7 +129,7 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit

def changeStateAndCheck(s: SessionState)(fn: SessionManager[_, _] => Unit): Unit = {
doReturn(s).when(session).state
Await.result(sm.collectGarbage(), Duration.Inf)
Await.result(sm.collectGarbage(false), Duration.Inf)
fn(sm)
}

Expand Down

0 comments on commit 794353a

Please sign in to comment.