Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ private class HttpClientTestBootstrap extends LifeCycle {
when(session.name).thenReturn(None)
when(session.appId).thenReturn(None)
when(session.appInfo).thenReturn(AppInfo())
when(session.state).thenReturn(SessionState.Idle)
when(session.state).thenReturn(SessionState.Idle())
when(session.proxyUser).thenReturn(None)
when(session.kind).thenReturn(Spark)
when(session.stop()).thenReturn(Future.successful(()))
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/scala/org/apache/livy/sessions/Kind.scala
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ object Kind {
case "sql" => SQL
case other => throw new IllegalArgumentException(s"Invalid kind: $other")
}

val kinds: Seq[Kind] = Seq(Spark, PySpark, SparkR, Shared, SQL)

@o-shevchenko o-shevchenko Aug 28, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also move object SQL and other kinds inside object Kind to be able to use:
import org.apache.livy.sessions.Kind._ instead of import org.apache.livy.sessions.{Kind, PySpark, SessionState, Spark, SparkR, SQL}
It looks more convenient.
Is it critical for backward compatibility?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is better to use values everywhere (instead of kinds/states)?


def isValid(kind: String): Boolean = {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where did this get called?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is public API to use as I described in the description

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then how do users use this public api?

@o-shevchenko o-shevchenko Sep 2, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to verify that we can create StateSession/Kind object from a string without additional logic for catching exceptions.
There is an example in the description.

@o-shevchenko o-shevchenko Sep 2, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you need to add livy-core dependency in your project if you are asking about it

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need to create StateSession or Kind object?

@o-shevchenko o-shevchenko Sep 2, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to simply use it in the code instead of hardcoding string values like "idle", sometimes I need to check session state or wait for state, for example, create a session and wait for status "idle" to be able to run statement. I don't want to use string variables everywhere. I need an enumeration. I can create my own in my code which totally duplicates Livy enum but better to use it from Livy. it is the typical case, isn't it?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's the case. But from my point I never expect to expose this to users

@o-shevchenko o-shevchenko Sep 2, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, having a method to check if the object is a part of the enum is common practice for Scala, it can simplify using of your enum to don't force user to catch exception or write own method to check if object exists in enum values (if exist, we don't have it now in Kind.scala, SessionState.scala that's why I have added sequences of values in these classes)

kinds.map(_.name).contains(kind)
}
}

class SessionKindModule extends SimpleModule("SessionKind") {
Expand Down
35 changes: 21 additions & 14 deletions core/src/main/scala/org/apache/livy/sessions/SessionState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -30,33 +30,33 @@ class FinishedSessionState(
object SessionState {

def apply(s: String): SessionState = s match {
case "not_started" => NotStarted
case "starting" => Starting
case "recovering" => Recovering
case "idle" => Idle
case "running" => Running
case "busy" => Busy
case "shutting_down" => ShuttingDown
case "not_started" => NotStarted()
case "starting" => Starting()
case "recovering" => Recovering()
case "idle" => Idle()
case "running" => Running()
case "busy" => Busy()
case "shutting_down" => ShuttingDown()
case "error" => Error()
case "dead" => Dead()
case "killed" => Killed()
case "success" => Success()
case _ => throw new IllegalArgumentException(s"Illegal session state: $s")
}

object NotStarted extends SessionState("not_started", true)
case class NotStarted() extends SessionState("not_started", true)

object Starting extends SessionState("starting", true)
case class Starting() extends SessionState("starting", true)

object Recovering extends SessionState("recovering", true)
case class Recovering() extends SessionState("recovering", true)

object Idle extends SessionState("idle", true)
case class Idle() extends SessionState("idle", true)

object Running extends SessionState("running", true)
case class Running() extends SessionState("running", true)

object Busy extends SessionState("busy", true)
case class Busy() extends SessionState("busy", true)

object ShuttingDown extends SessionState("shutting_down", false)
case class ShuttingDown() extends SessionState("shutting_down", false)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's purpose of changes here to switch to case class?

@o-shevchenko o-shevchenko Sep 2, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To unify access to enumeration type and to be able to create sequence of all states

val states: Seq[SessionState] = Seq(NotStarted(), Starting(), Recovering(), Idle(), Running(),
    Busy(), ShuttingDown(), Killed(), Error(), Dead(), Success())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The changes here will create many small SessionState objects, which is unnecessary and can be replaced by singleton.

@o-shevchenko o-shevchenko Sep 2, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we need to use either object or class to be able to create a sequence of states. as far as I see, you changed case class to object for FinishedSessionState classes to fix the problem with time variable initialization, so we can't use singleton everywhere. therefore, I think this is not critical to use case class instead of object for other states.


case class Killed(override val time: Long = System.nanoTime()) extends
FinishedSessionState("killed", false, time)
Expand All @@ -69,4 +69,11 @@ object SessionState {

case class Success(override val time: Long = System.nanoTime()) extends
FinishedSessionState("success", false, time)

val states: Seq[SessionState] = Seq(NotStarted(), Starting(), Recovering(), Idle(), Running(),
Busy(), ShuttingDown(), Killed(), Error(), Dead(), Success())

def isValid(state: String): Boolean = {
states.map(_.state).contains(state)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
class BatchSession(id: Int) extends Session(id, BATCH_TYPE) {
def verifySessionDead(): Unit = verifySessionState(SessionState.Dead())
def verifySessionKilled(): Unit = verifySessionState(SessionState.Killed())
def verifySessionRunning(): Unit = verifySessionState(SessionState.Running)
def verifySessionRunning(): Unit = verifySessionState(SessionState.Running())
def verifySessionSuccess(): Unit = verifySessionState(SessionState.Success())
}

Expand Down Expand Up @@ -240,7 +240,7 @@ class LivyRestClient(val httpClient: AsyncHttpClient, val livyEndpoint: String)
}

def verifySessionIdle(): Unit = {
verifySessionState(SessionState.Idle)
verifySessionState(SessionState.Idle())
}

def verifySessionKilled(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
test("deleting a session should kill YARN app") {
val output = newOutputPath()
withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
s.verifySessionState(SessionState.Running)
s.verifySessionState(SessionState.Running())
s.snapshot().appInfo.driverLogUrl.value should include ("containerlogs")

val appId = s.appId()
Expand All @@ -100,7 +100,7 @@ class BatchIT extends BaseIntegrationTestSuite with BeforeAndAfterAll {
test("killing YARN app should change batch state to dead") {
val output = newOutputPath()
withTestLib(classOf[SimpleSparkApp], List(output, "false")) { s =>
s.verifySessionState(SessionState.Running)
s.verifySessionState(SessionState.Running())
val appId = s.appId()

// Kill the YARN app and check batch state should be KILLED.
Expand Down
10 changes: 5 additions & 5 deletions repl/src/main/scala/org/apache/livy/repl/Session.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class Session(

private implicit val formats = DefaultFormats

private var _state: SessionState = SessionState.NotStarted
private var _state: SessionState = SessionState.NotStarted()

// Number of statements kept in driver's memory
private val numRetainedStatements = livyConf.getInt(RSCConf.Entry.RETAINED_STATEMENTS)
Expand Down Expand Up @@ -120,7 +120,7 @@ class Session(

def start(): Future[SparkEntries] = {
val future = Future {
changeState(SessionState.Starting)
changeState(SessionState.Starting())

// Always start SparkInterpreter after beginning, because we rely on SparkInterpreter to
// initialize SparkContext and create SparkEntries.
Expand All @@ -133,7 +133,7 @@ class Session(
interpGroup.put(Spark, sparkInterp)
}

changeState(SessionState.Idle)
changeState(SessionState.Idle())
entries
}(interpreterExecutor)

Expand Down Expand Up @@ -263,12 +263,12 @@ class Session(
private def executeCode(interp: Option[Interpreter],
executionCount: Int,
code: String): String = {
changeState(SessionState.Busy)
changeState(SessionState.Busy())

def transitToIdle() = {
val executingLastStatement = executionCount == newStatementId.intValue() - 1
if (_statements.isEmpty || executingLastStatement) {
changeState(SessionState.Idle)
changeState(SessionState.Idle())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ abstract class BaseSessionSpec(kind: Kind)
// Session's constructor should fire an initial state change event.
stateChangedCalled.intValue() shouldBe 1
Await.ready(session.start(), 30 seconds)
assert(session.state === SessionState.Idle)
assert(session.state === SessionState.Idle())
// There should be at least 1 state change event fired when session transits to idle.
stateChangedCalled.intValue() should (be > 1)
testCode(session)
Expand All @@ -74,14 +74,14 @@ abstract class BaseSessionSpec(kind: Kind)
val future = session.start()
try {
Await.ready(future, 60 seconds)
session.state should (equal (SessionState.Starting) or equal (SessionState.Idle))
session.state should (equal (SessionState.Starting()) or equal (SessionState.Idle()))
} finally {
session.close()
}
}

it should "eventually become the idle state" in withSession { session =>
session.state should equal (SessionState.Idle)
session.state should equal (SessionState.Idle())
}

}
10 changes: 8 additions & 2 deletions rsc/src/main/java/org/apache/livy/rsc/driver/StatementState.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.*;

import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -73,13 +74,18 @@ static void put(StatementState key,
PREDECESSORS = Collections.unmodifiableMap(predecessors);
}

static boolean isValid(StatementState from, StatementState to) {
static boolean isAllowed(StatementState from, StatementState to) {
return PREDECESSORS.get(to).contains(from);
}

public static boolean isValid(String state) {
return Arrays.stream(values())
.map(x -> StringUtils.capitalize(x.state)).anyMatch(state::equals);

@o-shevchenko o-shevchenko Aug 28, 2019

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change naming (upper case instead of Pascal case) for enum values. For example, WAITING instead of Waiting. This doesn't fit Java code style.
Also, in this case, we can't use StatementState.valueOf("waiting".toUpperCase()) as usually but need to capitalize first letter

StatementState.valueOf(state.toLowerCase().capitalize).

}

static void validate(StatementState from, StatementState to) {
LOG.debug("{} -> {}", from, to);
if (!isValid(from, to)) {
if (!isAllowed(from, to)) {
throw new IllegalStateException("Illegal Transition: " + from + " -> " + to);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ object BatchSession extends Logging {
id,
name,
appTag,
SessionState.Starting,
SessionState.Starting(),
livyConf,
owner,
impersonatedUser,
Expand All @@ -132,7 +132,7 @@ object BatchSession extends Logging {
m.id,
m.name,
m.appTag,
SessionState.Recovering,
SessionState.Recovering(),
livyConf,
m.owner,
m.proxyUser,
Expand Down Expand Up @@ -184,7 +184,7 @@ class BatchSession(
debug(s"$this state changed from $oldState to $newState")
newState match {
case SparkApp.State.RUNNING =>
_state = SessionState.Running
_state = SessionState.Running()
info(s"Batch session $id created [appid: ${appId.orNull}, state: ${state.toString}, " +
s"info: ${appInfo.asJavaMap}]")
case SparkApp.State.FINISHED => _state = SessionState.Success()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ object InteractiveSession extends Logging {
None,
appTag,
client,
SessionState.Starting,
SessionState.Starting(),
request.kind,
request.heartbeatTimeoutInSecond,
livyConf,
Expand All @@ -144,7 +144,7 @@ object InteractiveSession extends Logging {
metadata.appId,
metadata.appTag,
client,
SessionState.Recovering,
SessionState.Recovering(),
metadata.kind,
metadata.heartbeatTimeoutS,
livyConf,
Expand Down Expand Up @@ -429,7 +429,7 @@ class InteractiveSession(
override def onJobFailed(job: JobHandle[Void], cause: Throwable): Unit = errorOut()

override def onJobSucceeded(job: JobHandle[Void], result: Void): Unit = {
transition(SessionState.Running)
transition(SessionState.Running())
info(s"Interactive session $id created [appid: ${appId.orNull}, " +
s"owner: $owner, proxyUser:" +
s" $proxyUser, state: ${state.toString}, kind: ${kind.toString}, " +
Expand All @@ -440,7 +440,7 @@ class InteractiveSession(
// Other code might call stop() to close the RPC channel. When RPC channel is closing,
// this callback might be triggered. Check and don't call stop() to avoid nested called
// if the session is already shutting down.
if (serverSideState != SessionState.ShuttingDown) {
if (serverSideState != SessionState.ShuttingDown()) {
transition(SessionState.Error())
stop()
app.foreach { a =>
Expand All @@ -460,18 +460,18 @@ class InteractiveSession(
heartbeatTimeout.toSeconds.toInt, owner, proxyUser, rscDriverUri)

override def state: SessionState = {
if (serverSideState == SessionState.Running) {
if (serverSideState == SessionState.Running()) {
// If session is in running state, return the repl state from RSCClient.
client
.flatMap(s => Option(s.getReplState))
.map(SessionState(_))
.getOrElse(SessionState.Busy) // If repl state is unknown, assume repl is busy.
.getOrElse(SessionState.Busy()) // If repl state is unknown, assume repl is busy.
} else serverSideState
}

override def stopSession(): Unit = {
try {
transition(SessionState.ShuttingDown)
transition(SessionState.ShuttingDown())
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
client.foreach { _.stop(true) }
} catch {
Expand Down Expand Up @@ -591,7 +591,7 @@ class InteractiveSession(

private def ensureRunning(): Unit = synchronized {
serverSideState match {
case SessionState.Running =>
case SessionState.Running() =>
case _ =>
throw new IllegalStateException("Session is in state %s" format serverSideState)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class SessionManager[S <: Session, R <: RecoveryMetadata : ClassTag](
case _ =>
if (!sessionTimeoutCheck) {
false
} else if (session.state == SessionState.Busy && sessionTimeoutCheckSkipBusy) {
} else if (session.state == SessionState.Busy() && sessionTimeoutCheckSkipBusy) {
false
} else if (session.isInstanceOf[BatchSession]) {
false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ object SessionServletSpec {

override def recoveryMetadata: RecoveryMetadata = MockRecoveryMetadata(0)

override def state: SessionState = SessionState.Idle
override def state: SessionState = SessionState.Idle()

override def start(): Unit = ()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class BatchServletSpec extends BaseSessionServletSpec[BatchSession, BatchRecover

def testShowSessionProperties(name: Option[String]): Unit = {
val id = 0
val state = SessionState.Running
val state = SessionState.Running()
val appId = "appid"
val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
val log = IndexedSeq[String]("log1", "log2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ class BatchSessionSpec
val m = BatchRecoveryMetadata(99, name, None, "appTag", null, None)
val batch = BatchSession.recover(m, conf, sessionStore, Some(mockApp))

batch.state shouldBe (SessionState.Recovering)
batch.state shouldBe (SessionState.Recovering())
batch.name shouldBe (name)

batch.appIdKnown("appId")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
when(session.appId).thenReturn(None)
when(session.appInfo).thenReturn(AppInfo())
when(session.logLines()).thenReturn(IndexedSeq())
when(session.state).thenReturn(SessionState.Idle)
when(session.state).thenReturn(SessionState.Idle())
when(session.stop()).thenReturn(Future.successful(()))
when(session.proxyUser).thenReturn(None)
when(session.heartbeatExpired).thenReturn(false)
Expand Down Expand Up @@ -165,7 +165,7 @@ class InteractiveSessionServletSpec extends BaseInteractiveServletSpec {
val appId = "appid"
val owner = "owner"
val proxyUser = "proxyUser"
val state = SessionState.Running
val state = SessionState.Running()
val kind = Spark
val appInfo = AppInfo(Some("DRIVER LOG URL"), Some("SPARK UI URL"))
val log = IndexedSeq[String]("log1", "log2")
Expand Down
Loading