Skip to content

Commit

Permalink
[LIVY-415] Use objects and abstract classes in for Kind and SessionSt…
Browse files Browse the repository at this point in the history
…ate.

## What changes were proposed in this pull request?

- Use a singleton (object) rather than having a case class and creating an object of only one type whenever we need an object of the type. Using an object helps with code efficiency and readability.
- Use abstract classes to set up methods with default definitions to follow the DRY principles.

## How was this patch tested?

All the existing tests were modified to use the new changes. The changes were then tested on Travis with existing settings to make sure it's compatible with the the upstream repository.

Please review https://livy.incubator.apache.org/community/ before opening a pull request.

I have read through the page and felt like this change does not require a JIRA. If you feel otherwise, I can create a JIRA and then attach it to the PR.

Author: Arun Allamsetty <arun.allamsetty@gmail.com>

Closes #62 from aa8y/singleton.
  • Loading branch information
aa8y authored and jerryshao committed Nov 15, 2017
1 parent ef5dccb commit 5471544
Show file tree
Hide file tree
Showing 27 changed files with 142 additions and 186 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -275,9 +275,9 @@ private class HttpClientTestBootstrap extends LifeCycle {
when(session.id).thenReturn(id)
when(session.appId).thenReturn(None)
when(session.appInfo).thenReturn(AppInfo())
when(session.state).thenReturn(SessionState.Idle())
when(session.state).thenReturn(SessionState.Idle)
when(session.proxyUser).thenReturn(None)
when(session.kind).thenReturn(Spark())
when(session.kind).thenReturn(Spark)
when(session.stop()).thenReturn(Future.successful(()))
require(HttpClientSpec.session == null, "Session already created?")
HttpClientSpec.session = session
Expand Down
28 changes: 11 additions & 17 deletions core/src/main/scala/org/apache/livy/sessions/Kind.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,27 @@ import com.fasterxml.jackson.core.{JsonGenerator, JsonParser, JsonToken}
import com.fasterxml.jackson.databind._
import com.fasterxml.jackson.databind.module.SimpleModule

sealed trait Kind
case class Spark() extends Kind {
override def toString: String = "spark"
sealed abstract class Kind(val name: String) {
override def toString: String = name
}

case class PySpark() extends Kind {
override def toString: String = "pyspark"
}
object Spark extends Kind("spark")

case class SparkR() extends Kind {
override def toString: String = "sparkr"
}
object PySpark extends Kind("pyspark")

case class Shared() extends Kind {
override def toString: String = "shared"
}
object SparkR extends Kind("sparkr")

object Shared extends Kind("shared")

object Kind {

def apply(kind: String): Kind = kind match {
case "spark" | "scala" => Spark()
case "pyspark" | "python" => PySpark()
case "sparkr" | "r" => SparkR()
case "shared" => Shared()
case "spark" | "scala" => Spark
case "pyspark" | "python" => PySpark
case "sparkr" | "r" => SparkR
case "shared" => Shared
case other => throw new IllegalArgumentException(s"Invalid kind: $other")
}

}

class SessionKindModule extends SimpleModule("SessionKind") {
Expand Down
106 changes: 35 additions & 71 deletions core/src/main/scala/org/apache/livy/sessions/SessionState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,91 +17,55 @@

package org.apache.livy.sessions

sealed trait SessionState {
/** Returns true if the State represents a process that can eventually execute commands */
def isActive: Boolean
sealed abstract class SessionState(val state: String, val isActive: Boolean) {
override def toString: String = state
}

sealed trait FinishedSessionState extends SessionState {
/** When session is finished. */
def time: Long
}
class FinishedSessionState(
override val state: String,
override val isActive: Boolean,
val time: Long
) extends SessionState(state, isActive)

object SessionState {

def apply(s: String): SessionState = {
s match {
case "not_started" => NotStarted()
case "starting" => Starting()
case "recovering" => Recovering()
case "idle" => Idle()
case "running" => Running()
case "busy" => Busy()
case "shutting_down" => ShuttingDown()
case "error" => Error()
case "dead" => Dead()
case "success" => Success()
case _ => throw new IllegalArgumentException(s"Illegal session state: $s")
}
def apply(s: String): SessionState = s match {
case "not_started" => NotStarted
case "starting" => Starting
case "recovering" => Recovering
case "idle" => Idle
case "running" => Running
case "busy" => Busy
case "shutting_down" => ShuttingDown
case "error" => Error
case "dead" => Dead
case "success" => Success
case _ => throw new IllegalArgumentException(s"Illegal session state: $s")
}

case class NotStarted() extends SessionState {
override def isActive: Boolean = true

override def toString: String = "not_started"
}
object NotStarted extends SessionState("not_started", true)

case class Starting() extends SessionState {
override def isActive: Boolean = true
object Starting extends SessionState("starting", true)

override def toString: String = "starting"
}
object Recovering extends SessionState("recovering", true)

case class Recovering() extends SessionState {
override def isActive: Boolean = true
object Idle extends SessionState("idle", true)

override def toString: String = "recovering"
}
object Running extends SessionState("running", true)

case class Idle() extends SessionState {
override def isActive: Boolean = true
object Busy extends SessionState("busy", true)

override def toString: String = "idle"
}
object ShuttingDown extends SessionState("shutting_down", false)

case class Running() extends SessionState {
override def isActive: Boolean = true

override def toString: String = "running"
}

case class Busy() extends SessionState {
override def isActive: Boolean = true

override def toString: String = "busy"
}
case class Error(override val time: Long) extends
FinishedSessionState("error", true, time)
object Error extends Error(System.nanoTime)

case class ShuttingDown() extends SessionState {
override def isActive: Boolean = false
case class Dead(override val time: Long) extends
FinishedSessionState("dead", false, time)
object Dead extends Dead(System.nanoTime)

override def toString: String = "shutting_down"
}

case class Error(time: Long = System.nanoTime()) extends FinishedSessionState {
override def isActive: Boolean = true

override def toString: String = "error"
}

case class Dead(time: Long = System.nanoTime()) extends FinishedSessionState {
override def isActive: Boolean = false

override def toString: String = "dead"
}

case class Success(time: Long = System.nanoTime()) extends FinishedSessionState {
override def isActive: Boolean = false

override def toString: String = "success"
}
case class Success(override val time: Long) extends
FinishedSessionState("success", false, time)
object Success extends Success(System.nanoTime)
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
}

class BatchSession(id: Int) extends Session(id, BATCH_TYPE) {
def verifySessionDead(): Unit = verifySessionState(SessionState.Dead())
def verifySessionRunning(): Unit = verifySessionState(SessionState.Running())
def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success())
def verifySessionDead(): Unit = verifySessionState(SessionState.Dead)
def verifySessionRunning(): Unit = verifySessionState(SessionState.Running)
def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success)
}

class InteractiveSession(id: Int) extends Session(id, INTERACTIVE_TYPE) {
Expand Down Expand Up @@ -226,11 +226,11 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
.setBody(mapper.writeValueAsString(requestBody))
.execute()

verifySessionState(SessionState.Dead())
verifySessionState(SessionState.Dead)
}

def verifySessionIdle(): Unit = {
verifySessionState(SessionState.Idle())
verifySessionState(SessionState.Idle)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
test("deleting a session should kill YARN app") {
val output = newOutputPath()
withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
s.verifySessionState(SessionState.Running())
s.verifySessionState(SessionState.Running)
s.snapshot().appInfo.driverLogUrl.value should include ("containerlogs")

val appId = s.appId()
Expand All @@ -102,7 +102,7 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
test("killing YARN app should change batch state to dead") {
val output = newOutputPath()
withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
s.verifySessionState(SessionState.Running())
s.verifySessionState(SessionState.Running)
val appId = s.appId()

// Kill the YARN app and check batch state should be KILLED.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.livy.test.framework.{BaseIntegrationTestSuite, LivyRestClient}

class InteractiveIT extends BaseIntegrationTestSuite {
test("basic interactive session") {
withNewSession(Spark()) { s =>
withNewSession(Spark) { s =>
s.run("val sparkVersion = sc.version").result().left.foreach(info(_))
s.run("1+1").verifyResult("res0: Int = 2")
s.run("""sc.getConf.get("spark.executor.instances")""").verifyResult("res1: String = 1")
Expand Down Expand Up @@ -67,7 +67,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
}

pytest("pyspark interactive session") {
withNewSession(PySpark()) { s =>
withNewSession(PySpark) { s =>
s.run("1+1").verifyResult("2")
s.run("sqlContext").verifyResult(startsWith("<pyspark.sql.context.HiveContext"))
s.run("sc.parallelize(range(100)).map(lambda x: x * 2).reduce(lambda x, y: x + y)")
Expand All @@ -82,7 +82,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
}

rtest("R interactive session") {
withNewSession(SparkR()) { s =>
withNewSession(SparkR) { s =>
// R's output sometimes includes the count of statements, which makes it annoying to test
// things. This helps a bit.
val curr = new AtomicInteger()
Expand All @@ -102,14 +102,14 @@ class InteractiveIT extends BaseIntegrationTestSuite {
}

test("application kills session") {
withNewSession(Spark()) { s =>
withNewSession(Spark) { s =>
s.runFatalStatement("System.exit(0)")
}
}

test("should kill RSCDriver if it doesn't respond to end session") {
val testConfName = s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_END_SESSION.key()}"
withNewSession(Spark(), Map(testConfName -> "true")) { s =>
withNewSession(Spark, Map(testConfName -> "true")) { s =>
val appId = s.appId()
s.stop()
val appReport = cluster.yarnClient.getApplicationReport(appId)
Expand All @@ -120,7 +120,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
test("should kill RSCDriver if it didn't register itself in time") {
val testConfName =
s"${RSCConf.LIVY_SPARK_PREFIX}${RSCConf.Entry.TEST_STUCK_START_DRIVER.key()}"
withNewSession(Spark(), Map(testConfName -> "true"), false) { s =>
withNewSession(Spark, Map(testConfName -> "true"), false) { s =>
eventually(timeout(2 minutes), interval(5 seconds)) {
val appId = s.appId()
appId should not be null
Expand All @@ -133,7 +133,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
test("user jars are properly imported in Scala interactive sessions") {
// Include a popular Java library to test importing user jars.
val sparkConf = Map("spark.jars.packages" -> "org.codehaus.plexus:plexus-utils:3.0.24")
withNewSession(Spark(), sparkConf) { s =>
withNewSession(Spark, sparkConf) { s =>
// Check is the library loaded in JVM in the proper class loader.
s.run("Thread.currentThread.getContextClassLoader.loadClass" +
"""("org.codehaus.plexus.util.FileUtils")""")
Expand All @@ -153,7 +153,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
test("heartbeat should kill expired session") {
// Set it to 2s because verifySessionIdle() is calling GET every second.
val heartbeatTimeout = Duration.create("2s")
withNewSession(Spark(), Map.empty, true, heartbeatTimeout.toSeconds.toInt) { s =>
withNewSession(Spark, Map.empty, true, heartbeatTimeout.toSeconds.toInt) { s =>
// If the test reaches here, that means verifySessionIdle() is successfully keeping the
// session alive. Now verify heartbeat is killing expired session.
Thread.sleep(heartbeatTimeout.toMillis * 2)
Expand All @@ -162,7 +162,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
}

test("recover interactive session") {
withNewSession(Spark()) { s =>
withNewSession(Spark) { s =>
val stmt1 = s.run("1")
stmt1.verifyResult("res0: Int = 1")

Expand All @@ -182,7 +182,7 @@ class InteractiveIT extends BaseIntegrationTestSuite {
s.verifySessionDoesNotExist()

// Verify new session doesn't reuse old session id.
withNewSession(Spark(), Map.empty, false) { s1 =>
withNewSession(Spark, Map.empty, false) { s1 =>
s1.id should be > s.id
}
}
Expand Down
8 changes: 4 additions & 4 deletions repl/src/main/scala/org/apache/livy/repl/ReplDriver.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,24 +93,24 @@ class ReplDriver(conf: SparkConf, livyConf: RSCConf)

override protected def createWrapper(msg: BaseProtocol.BypassJobRequest): BypassJobWrapper = {
Kind(msg.jobType) match {
case PySpark() if session.interpreter(PySpark()).isDefined =>
case PySpark if session.interpreter(PySpark).isDefined =>
new BypassJobWrapper(this, msg.id,
new BypassPySparkJob(msg.serializedJob,
session.interpreter(PySpark()).get.asInstanceOf[PythonInterpreter]))
session.interpreter(PySpark).get.asInstanceOf[PythonInterpreter]))
case _ => super.createWrapper(msg)
}
}

override protected def addFile(path: String): Unit = {
if (!ClientConf.TEST_MODE) {
session.interpreter(PySpark()).foreach { _.asInstanceOf[PythonInterpreter].addFile(path) }
session.interpreter(PySpark).foreach { _.asInstanceOf[PythonInterpreter].addFile(path) }
}
super.addFile(path)
}

override protected def addJarOrPyFile(path: String): Unit = {
if (!ClientConf.TEST_MODE) {
session.interpreter(PySpark())
session.interpreter(PySpark)
.foreach { _.asInstanceOf[PythonInterpreter].addPyFile(this, conf, path) }
}
super.addJarOrPyFile(path)
Expand Down

0 comments on commit 5471544

Please sign in to comment.