Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-45739][PYTHON] Catch IOException instead of EOFException alone for faulthandler #43600

Closed
wants to merge 1 commit into from

Conversation

HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Oct 31, 2023

What changes were proposed in this pull request?

This PR improves spark.python.worker.faulthandler.enabled feature by catching IOException instead of EOFException (narrower).

Why are the changes needed?

Exceptions such as java.net.SocketException: Connection reset can happen because the worker unexpectedly die. We should better catch all IO exception there.

Does this PR introduce any user-facing change?

Yes, but only in special cases. When the worker dies unexpectedly during its initialization, this can happen.

How was this patch tested?

I tested this with Spark Connect:

$ cat <<EOT >> malformed_daemon.py
import ctypes

from pyspark import daemon
from pyspark import TaskContext


def raise_segfault():
    ctypes.string_at(0)


# Throw a segmentation fault during init.
TaskContext._getOrCreate = raise_segfault


if __name__ == '__main__':
    daemon.manager()
EOT
./sbin/start-connect-server.sh --conf spark.python.daemon.module=malformed_daemon --conf spark.python.worker.faulthandler.enabled=true --jars `ls connector/connect/server/target/**/spark-connect*SNAPSHOT.jar`
./bin/pyspark --remote "sc://localhost:15002"
from pyspark.sql.functions import udf
spark.addArtifact("malformed_daemon.py", pyfile=True)
spark.range(1).select(udf(lambda x: x)("id")).collect()

Before

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/connect/dataframe.py", line 1710, in collect
    table, schema = self._to_table()
    ...
  File "/.../spark/python/pyspark/sql/connect/client/core.py", line 1575, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 8 in stage 0.0 failed 1 times, most recent failure: Lost task 8.0 in stage 0.0 (TID 8) (192.168.123.102 executor driver): java.net.SocketException: Connection reset
	at 
      ...
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Driver stacktrace:

JVM stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 8 in stage 0.0 failed 1 times, most recent failure: Lost task 8.0 in stage 0.0 (TID 8) (192.168.123.102 executor driver): java.net.SocketException: Connection reset
	at java.base/sun.nio.ch.SocketChannelImpl.throwConnectionReset(SocketChannelImpl.java:394)
	at 
    ...
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.lang.Thread.run(Thread.java:833)

After

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/connect/dataframe.py", line 1710, in collect
    table, schema = self._to_table()
    ... 
"/.../spark/python/pyspark/sql/connect/client/core.py", line 1575, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Job aborted due to stage failure: Task 4 in stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 (TID 4) (192.168.123.102 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault

Current thread 0x00007ff85d338700 (most recent call first):
  File "/.../miniconda3/envs/python3.9/lib/python3.9/ctypes/__init__.py", line 525 in string_at
  File "/private/var/folders/0c/q8y15ybd3tn7sr2_jmbmftr80000gp/T/spark-397ac42b-c05b-4f50-a6b8-ede30254edc9/userFiles-fd70c41e-46b9-44ed-b781-f8dea10bcb4a/5ce3da24-912a-4207-af82-5dfc8a845714/malformed_daemon.py", line 8 in raise_segfault
  File "/.../spark/python/lib/pyspark.zip/pyspark/worker.py", line 1450 in main
  ...
"/.../miniconda3/envs/python3.9/lib/python3.9/runpy.py", line 197 in _run_module_as_main

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator$$anonfun$1.applyOrElse(PythonRunner.scala:550)
	at 
     ...
java.base/java.io.DataInputStream.readInt(DataInputStream.java:393)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$2.read(PythonUDFRunner.scala:92)
	... 30 more

Driver stacktrace:

JVM stacktrace:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 4 in stage 0.0 failed 1 times, most recent failure: Lost task 4.0 in stage 0.0 (TID 4) (192.168.123.102 executor driver): org.apache.spark.SparkException: Python worker exited unexpectedly (crashed): Fatal Python error: Segmentation fault

Current thread 0x00007ff85d338700 (most recent call first):
  File "/.../miniconda3/envs/python3.9/lib/python3.9/ctypes/__init__.py", line 525 in string_at
...

Was this patch authored or co-authored using generative AI tooling?

No.

@HyukjinKwon
Copy link
Member Author

@HyukjinKwon
Copy link
Member Author

cc @ueshin

Copy link
Contributor

@LuciferYang LuciferYang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.

@dongjoon-hyun
Copy link
Member

Although it looks irrelevant, could you re-trigger the failed PySpark test, @HyukjinKwon ?

Test: https://github.com/HyukjinKwon/spark/actions/runs/6705864326/job/18221201806

@HyukjinKwon
Copy link
Member Author

@HyukjinKwon
Copy link
Member Author

Merged to master.

@ueshin
Copy link
Member

ueshin commented Nov 1, 2023

Late LGTM.

HyukjinKwon added a commit that referenced this pull request Nov 3, 2023
…for Python execution in SQL

### What changes were proposed in this pull request?

This PR proposes to make `faulthandler` as a runtime configuration so we can turn on and off during runtime.

### Why are the changes needed?

`faulthandler` feature within PySpark is really useful especially to debug an errors that regular Python interpreter cannot catch out of the box such as a segmentation fault errors, see also #43600. It would be very useful to convert this as a runtime configuration without restarting the shell.

### Does this PR introduce _any_ user-facing change?

Yes, users can now set `spark.sql.execution.pyspark.udf.faulthandler.enabled` during runtime to enable `faulthandler` feature.

### How was this patch tested?

Unittest added

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43635 from HyukjinKwon/runtime-conf.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
@HyukjinKwon HyukjinKwon deleted the more-segfault branch January 15, 2024 00:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants