Skip to content

Commit

Permalink
[KYUUBI #4026] [PYSPARK] Fail if the session python worker process ha…
Browse files Browse the repository at this point in the history
…s been exited

### _Why are the changes needed?_

Before, if the pyspark environment is not set up correctly,the python response was always `None`.
In this pr, fail if the session python worker process has been exited.

BTW: Filter the empty log.
<img width="1422" alt="image" src="https://user-images.githubusercontent.com/6757692/209502683-49aa9088-8686-4a54-b88c-85881a3fb089.png">

### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4026 from turboFei/python_exec.

Closes #4026

499e19b [fwang12] more insights
17cefc0 [fwang12] Fail if the session python worker has been exited

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
  • Loading branch information
turboFei committed Dec 27, 2022
1 parent 7909133 commit a5a3e20
Showing 1 changed file with 5 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ case class SessionPythonWorker(
* @return the python response
*/
def runCode(code: String, internal: Boolean = false): Option[PythonResponse] = withLockRequired {
if (!workerProcess.isAlive) {
throw KyuubiSQLException("Python worker process has been exited, please check the error log" +
" and re-create the session to run python code.")
}
val input = ExecutePython.toJson(Map("code" -> code, "cmd" -> "run_code"))
// scalastyle:off println
stdin.println(input)
Expand Down Expand Up @@ -337,7 +341,7 @@ object ExecutePython extends Logging {
val stderrThread = new Thread("process stderr thread") {
override def run(): Unit = {
val lines = scala.io.Source.fromInputStream(process.getErrorStream).getLines()
lines.foreach(logger.error)
lines.filter(_.trim.nonEmpty).foreach(logger.error)
}
}
stderrThread.setDaemon(true)
Expand Down

0 comments on commit a5a3e20

Please sign in to comment.