Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[LIVY-729] Fix livy recover the killed session #266

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -188,8 +188,14 @@ class BatchSession(
info(s"Batch session $id created [appid: ${appId.orNull}, state: ${state.toString}, " +
s"info: ${appInfo.asJavaMap}]")
case SparkApp.State.FINISHED => _state = SessionState.Success()
case SparkApp.State.KILLED => _state = SessionState.Killed()
case SparkApp.State.FAILED => _state = SessionState.Dead()
case SparkApp.State.KILLED => {
_state = SessionState.Killed()
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
}
case SparkApp.State.FAILED => {
_state = SessionState.Dead()
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
}
case _ =>
}
}
Expand Down
Expand Up @@ -577,6 +577,17 @@ class InteractiveSession(
// Since these 2 transitions are triggered by different threads, there's a race condition.
// Make sure we won't transit from dead to error state.
val areSameStates = serverSideState.getClass() == newState.getClass()

if (!areSameStates) {
newState match {
case _: SessionState.Killed | _: SessionState.Dead =>
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
case SessionState.ShuttingDown =>
sessionStore.remove(RECOVERY_SESSION_TYPE, id)
case _ =>
}
}

val transitFromInactiveToActive = !serverSideState.isActive && newState.isActive
if (!areSameStates && !transitFromInactiveToActive) {
debug(s"$this session state change from ${serverSideState} to $newState")
Expand Down
Expand Up @@ -125,7 +125,11 @@ class FileSystemStateStore(
}

override def remove(key: String): Unit = {
fileContext.delete(absPath(key), false)
try {
fileContext.delete(absPath(key), false)
} catch {
case _: FileNotFoundException => warn(s"Failed to remove non-existed file: ${key}")
}
}

private def absPath(key: String): Path = new Path(fsUri.getPath(), key)
Expand Down
Expand Up @@ -110,7 +110,7 @@ class ZooKeeperStateStore(
try {
curatorClient.delete().guaranteed().forPath(prefixKey(key))
} catch {
case _: NoNodeException =>
case _: NoNodeException => warn(s"Fail to remove non-existed zookeeper node: ${key}")
}
}

Expand Down