Skip to content

Commit

Permalink
[KYUUBI #4031] [PYSPARK] Refactor the python process watcher
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

Before:
<img width="1096" alt="image" src="https://user-images.githubusercontent.com/6757692/209627576-1eb2759c-48cd-446a-bbec-4dd219a8bec9.png">

After:
<img width="956" alt="image" src="https://user-images.githubusercontent.com/6757692/209627658-2c9c09e6-a9b0-478e-ae13-f1c6e7538a3e.png">

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [x] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4031 from turboFei/inter_exception.

Closes #4031

c551c45 [fwang12] refactor
c490a40 [fwang12] ignore interrupted exception for watcher

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
  • Loading branch information
turboFei committed Dec 27, 2022
1 parent a5a3e20 commit 70bd55d
Showing 1 changed file with 17 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ object ExecutePython extends Logging {
}

def createSessionPythonWorker(spark: SparkSession, session: Session): SessionPythonWorker = {
val sessionId = session.handle.identifier.toString
val pythonExec = StringUtils.firstNonBlank(
spark.conf.getOption("spark.pyspark.driver.python").orNull,
spark.conf.getOption("spark.pyspark.python").orNull,
Expand Down Expand Up @@ -278,7 +279,7 @@ object ExecutePython extends Logging {
"SPARK_HOME",
getSparkPythonHomeFromArchive(spark, session).getOrElse(defaultSparkHome)))
}
env.put("KYUUBI_SPARK_SESSION_UUID", session.handle.identifier.toString)
env.put("KYUUBI_SPARK_SESSION_UUID", sessionId)
env.put("PYTHON_GATEWAY_CONNECTION_INFO", KyuubiPythonGatewayServer.CONNECTION_FILE_PATH)
logger.info(
s"""
Expand All @@ -288,7 +289,10 @@ object ExecutePython extends Logging {
|""".stripMargin)
builder.redirectError(Redirect.PIPE)
val process = builder.start()
SessionPythonWorker(startStderrSteamReader(process), startWatcher(process), process)
SessionPythonWorker(
startStderrSteamReader(process, sessionId),
startWatcher(process, sessionId),
process)
}

def getSparkPythonExecFromArchive(spark: SparkSession, session: Session): Option[String] = {
Expand Down Expand Up @@ -337,8 +341,8 @@ object ExecutePython extends Logging {
}
}

private def startStderrSteamReader(process: Process): Thread = {
val stderrThread = new Thread("process stderr thread") {
private def startStderrSteamReader(process: Process, sessionId: String): Thread = {
val stderrThread = new Thread(s"session[$sessionId] process stderr thread") {
override def run(): Unit = {
val lines = scala.io.Source.fromInputStream(process.getErrorStream).getLines()
lines.filter(_.trim.nonEmpty).foreach(logger.error)
Expand All @@ -349,12 +353,16 @@ object ExecutePython extends Logging {
stderrThread
}

def startWatcher(process: Process): Thread = {
val processWatcherThread = new Thread("process watcher thread") {
def startWatcher(process: Process, sessionId: String): Thread = {
val processWatcherThread = new Thread(s"session[$sessionId] process watcher thread") {
override def run(): Unit = {
val exitCode = process.waitFor()
if (exitCode != 0) {
logger.error(f"Process has died with $exitCode")
try {
val exitCode = process.waitFor()
if (exitCode != 0) {
logger.error(f"Process has died with $exitCode")
}
} catch {
case _: InterruptedException => logger.warn("Process has been interrupted")
}
}
}
Expand Down

0 comments on commit 70bd55d

Please sign in to comment.