Skip to content

Commit

Permalink
[LIVY-647]Fix travis failed on batch session should not be gc-ed unti…
Browse files Browse the repository at this point in the history
…l application is finished
  • Loading branch information
runzhiwang committed Sep 3, 2019
1 parent d0d8028 commit b019117
Showing 1 changed file with 66 additions and 33 deletions.
Expand Up @@ -29,7 +29,7 @@ import org.scalatest.mock.MockitoSugar.mock

import org.apache.livy.{LivyBaseUnitTestSuite, LivyConf}
import org.apache.livy.server.batch.{BatchRecoveryMetadata, BatchSession}
import org.apache.livy.server.interactive.InteractiveSession
import org.apache.livy.server.interactive.{InteractiveRecoveryMetadata, InteractiveSession}
import org.apache.livy.server.recovery.SessionStore
import org.apache.livy.sessions.Session.RecoveryMetadata

Expand Down Expand Up @@ -98,40 +98,53 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
}

it("batch session should not be gc-ed until application is finished") {
val sessionId = 24
val session = mock[BatchSession]
when(session.id).thenReturn(sessionId)
when(session.name).thenReturn(None)
when(session.stop()).thenReturn(Future {})
when(session.lastActivity).thenReturn(System.nanoTime())

var sessionId = 24
val conf = new LivyConf().set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
val sm = new BatchSessionManager(conf, mock[SessionStore], Some(Seq(session)))
testSessionGC(session, sm)
val sessionStore = mock[SessionStore]
when(sessionStore.getAllSessions[BatchRecoveryMetadata]("batch"))
.thenReturn(Seq.empty)
val sm = new BatchSessionManager(conf, sessionStore)

// Batch session should not be gc-ed when alive
for (s <- Seq(SessionState.Running,
SessionState.Idle,
SessionState.Recovering,
SessionState.NotStarted,
SessionState.Busy,
SessionState.ShuttingDown)) {
sessionId = sessionId + 1
val session = mock[BatchSession]
mockSessionFieldAndMethod(session, s, sessionId)
sm.register(session)

Await.result(sm.collectGarbage(), Duration.Inf)
sm.get(session.id) should be (Some(session))
}

// Stopped session should be gc-ed after retained timeout
for (s <- Seq(SessionState.Error(),
SessionState.Success(),
SessionState.Dead())) {
sessionId = sessionId + 1
val session = mock[BatchSession]
mockSessionFieldAndMethod(session, s, sessionId)
sm.register(session)

eventually(timeout(30 seconds), interval(100 millis)) {
Await.result(sm.collectGarbage(), Duration.Inf)
sm.get(session.id) should be (None)
}
}
}

it("interactive session should not gc-ed if session timeout check is off") {
val sessionId = 24
val session = mock[InteractiveSession]
when(session.id).thenReturn(sessionId)
when(session.name).thenReturn(None)
when(session.stop()).thenReturn(Future {})
when(session.lastActivity).thenReturn(System.nanoTime())
when(session.heartbeatExpired).thenReturn(false)

var sessionId = 24
val conf = new LivyConf().set(LivyConf.SESSION_TIMEOUT_CHECK, false)
.set(LivyConf.SESSION_STATE_RETAIN_TIME, "1s")
val sm = new InteractiveSessionManager(conf, mock[SessionStore], Some(Seq(session)))
testSessionGC(session, sm)
}

def testSessionGC(session: Session, sm: SessionManager[_, _]): Unit = {

def changeStateAndCheck(s: SessionState)(fn: SessionManager[_, _] => Unit): Unit = {
doReturn(s).when(session).state
Await.result(sm.collectGarbage(), Duration.Inf)
fn(sm)
}
val sessionStore = mock[SessionStore]
when(sessionStore.getAllSessions[InteractiveRecoveryMetadata]("interactive"))
.thenReturn(Seq.empty)
val sm = new InteractiveSessionManager(conf, mock[SessionStore])

// Batch session should not be gc-ed when alive
for (s <- Seq(SessionState.Running,
Expand All @@ -140,18 +153,38 @@ class SessionManagerSpec extends FunSpec with Matchers with LivyBaseUnitTestSuit
SessionState.NotStarted,
SessionState.Busy,
SessionState.ShuttingDown)) {
changeStateAndCheck(s) { sm => sm.get(session.id) should be (Some(session)) }
sessionId = sessionId + 1
val session = mock[InteractiveSession]
mockSessionFieldAndMethod(session, s, sessionId)
sm.register(session)

Await.result(sm.collectGarbage(), Duration.Inf)
sm.get(session.id) should be (Some(session))
}

// Stopped session should be gc-ed after retained timeout
for (s <- Seq(SessionState.Error(),
SessionState.Success(),
SessionState.Dead())) {
eventually(timeout(30 seconds), interval(100 millis)) {
changeStateAndCheck(s) { sm => sm.get(session.id) should be (None) }
}
sessionId = sessionId + 1
val session = mock[InteractiveSession]
mockSessionFieldAndMethod(session, s, sessionId)
sm.register(session)

eventually(timeout(30 seconds), interval(100 millis)) {
Await.result(sm.collectGarbage(), Duration.Inf)
sm.get(session.id) should be (None)
}
}
}

def mockSessionFieldAndMethod(session: Session, state: SessionState, sessionId: Int) : Unit = {
when(session.id).thenReturn(sessionId)
when(session.name).thenReturn(None)
when(session.stop()).thenReturn(Future {})
when(session.lastActivity).thenReturn(System.nanoTime())
when(session.state).thenReturn(state)
}
}

describe("BatchSessionManager") {
Expand Down

0 comments on commit b019117

Please sign in to comment.