Skip to content

Commit

Permalink
[KYUUBI #4028] [PYSPARK] Fix internal python code issue
Browse files Browse the repository at this point in the history
### _Why are the changes needed?_

1. wrap the code with correct delimiter

Before:
```
{"code":"spark.sparkContext.setJobGroup(07753dd9-804e-478f-b84f-bf0735732334, ..., true)","cmd":"run_code"}
```

After:
```
{"code":"spark.sparkContext.setJobGroup('07753dd9-804e-478f-b84f-bf0735732334', '...', True)","cmd":"run_code"}
```

2. using cancelJobGroup for pyspark

Before:
```
'SparkContext' object has no attribute 'clearJobGroup'
```
After:
Using SparkContext.cancelJobGroup

3. Simplify the internal python code and throw exception on failure
We can not trust the user code is formatted correctly and we shall ensure the internal python code is simple and correct to prevent code correctness and even cause result out of sequence.
Such as, the user code might be below(maybe user invoke executePython api)
```
spark.sql('123\'\n\b\t'
```

It is difficult to escape the user code and set the job description as the statement as.
So, in this pr, I simplify the job description, just record its statementId, user can check the original code from log or on UI I think.
### _How was this patch tested?_
- [x] Add some test cases that check the changes thoroughly including negative and positive cases if possible

- [ ] Add screenshots for manual tests if appropriate

- [x] [Run test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests) locally before make a pull request

Closes #4028 from turboFei/python_async_debug.

Closes #4028

51a4c5e [fwang12] typo
6da88d1 [fwang12] code style
83f5a48 [fwang12] fail the internal python code
5f2db04 [fwang12] remove debug code
3a798cf [fwang12] Simplify the statement
c3b4640 [fwang12] do not lock for close
009f66a [fwang12] add ReentrantLock for SessionPythonWorker run python code
39bd861 [fwang12] fix
4116dab [fwang12] job desc
f16c656 [fwang12] escape
81db20c [fwang12] fix 'SparkContext' object has no attribute 'clearJobGroup'
985118e [fwang12] escape for python
f7250c1 [fwang12] revert withLocalProperties
13228f9 [fwang12] debug
e318c69 [fwang12] Revert "prevent timeout"
f81c605 [fwang12] prevent timeout
2ca5339 [fwang12] test
1390b0f [fwang12] remove not needed
26ee602 [fwang12] remove not needed
93c08ff [fwang12] debug

Authored-by: fwang12 <fwang12@ebay.com>
Signed-off-by: fwang12 <fwang12@ebay.com>
  • Loading branch information
turboFei committed Dec 27, 2022
1 parent a0a52be commit 7909133
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.net.URI
import java.nio.file.{Files, Path, Paths}
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.ReentrantLock
import javax.ws.rs.core.UriBuilder

import scala.collection.JavaConverters._
Expand Down Expand Up @@ -99,7 +100,7 @@ class ExecutePython(
}
}

override protected def runInternal(): Unit = withLocalProperties {
override protected def runInternal(): Unit = {
addTimeoutMonitor(queryTimeout)
if (shouldRunAsync) {
val asyncOperation = new Runnable {
Expand Down Expand Up @@ -129,14 +130,20 @@ class ExecutePython(
override def setSparkLocalProperty: (String, String) => Unit =
(key: String, value: String) => {
val valueStr = if (value == null) "None" else s"'$value'"
worker.runCode(s"spark.sparkContext.setLocalProperty('$key', $valueStr)")
worker.runCode(s"spark.sparkContext.setLocalProperty('$key', $valueStr)", internal = true)
()
}

override protected def withLocalProperties[T](f: => T): T = {
try {
worker.runCode("spark.sparkContext.setJobGroup" +
s"($statementId, $redactedStatement, $forceCancel)")
// to prevent the transferred set job group python code broken
val jobDesc = s"Python statement: $statementId"
// for python, the boolean value is capitalized
val pythonForceCancel = if (forceCancel) "True" else "False"
worker.runCode(
"spark.sparkContext.setJobGroup" +
s"('$statementId', '$jobDesc', $pythonForceCancel)",
internal = true)
setSparkLocalProperty(KYUUBI_SESSION_USER_KEY, session.user)
setSparkLocalProperty(KYUUBI_STATEMENT_ID_KEY, statementId)
schedulerPool match {
Expand All @@ -153,7 +160,8 @@ class ExecutePython(
setSparkLocalProperty(KYUUBI_SESSION_USER_KEY, "")
setSparkLocalProperty(KYUUBI_STATEMENT_ID_KEY, "")
setSparkLocalProperty(SPARK_SCHEDULER_POOL_KEY, "")
worker.runCode("spark.sparkContext.clearJobGroup()")
// using cancelJobGroup for pyspark, see details in pyspark/context.py
worker.runCode(s"spark.sparkContext.cancelJobGroup('$statementId')", internal = true)
if (isSessionUserSignEnabled) {
clearSessionUserSign()
}
Expand All @@ -168,15 +176,38 @@ case class SessionPythonWorker(
private val stdin: PrintWriter = new PrintWriter(workerProcess.getOutputStream)
private val stdout: BufferedReader =
new BufferedReader(new InputStreamReader(workerProcess.getInputStream), 1)
private val lock = new ReentrantLock()

def runCode(code: String): Option[PythonResponse] = {
private def withLockRequired[T](block: => T): T = {
try {
lock.lock()
block
} finally lock.unlock()
}

/**
* Run the python code and return the response. This method maybe invoked internally,
* such as setJobGroup and cancelJobGroup, if the internal python code is not formatted correctly,
* it might impact the correctness and even cause result out of sequence. To prevent that,
* please make sure the internal python code simple and set internal flag, to be aware of the
* internal python code failure.
*
* @param code the python code
* @param internal whether is internal python code
* @return the python response
*/
def runCode(code: String, internal: Boolean = false): Option[PythonResponse] = withLockRequired {
val input = ExecutePython.toJson(Map("code" -> code, "cmd" -> "run_code"))
// scalastyle:off println
stdin.println(input)
// scalastyle:on
stdin.flush()
Option(stdout.readLine())
.map(ExecutePython.fromJson[PythonResponse](_))
val pythonResponse = Option(stdout.readLine()).map(ExecutePython.fromJson[PythonResponse](_))
// throw exception if internal python code fail
if (internal && pythonResponse.map(_.content.status) != Some(PythonResponse.OK_STATUS)) {
throw KyuubiSQLException(s"Internal python code $code failure: $pythonResponse")
}
pythonResponse
}

def close(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,11 +72,7 @@ class PySparkTests extends WithSparkSQLEngine with HiveJDBCTestHelper {
val statement = connection.createStatement().asInstanceOf[KyuubiStatement]
statement.setQueryTimeout(5)
try {
var code =
"""
|import time
|time.sleep(10)
|""".stripMargin
var code = "spark.sql(\"select java_method('java.lang.Thread', 'sleep', 10000L)\").show()"
var e = intercept[SQLTimeoutException] {
statement.executePython(code)
}.getMessage
Expand Down

0 comments on commit 7909133

Please sign in to comment.