From d5806052af5fb73e7eee87673e471305564e3be6 Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Fri, 21 Nov 2025 15:47:46 -0800 Subject: [PATCH] Import worker module after fork to avoid deadlock --- python/pyspark/daemon.py | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/python/pyspark/daemon.py b/python/pyspark/daemon.py index e75eca68fd0e..409c33d1727a 100644 --- a/python/pyspark/daemon.py +++ b/python/pyspark/daemon.py @@ -31,14 +31,6 @@ from pyspark.serializers import read_int, write_int, write_with_length, UTF8Deserializer -if len(sys.argv) > 1 and sys.argv[1].startswith("pyspark"): - import importlib - - worker_module = importlib.import_module(sys.argv[1]) - worker_main = worker_module.main -else: - from pyspark.worker import main as worker_main - def compute_real_exit_code(exit_code): # SystemExit's code can be integer or string, but os._exit only accepts integers @@ -78,6 +70,19 @@ def worker(sock, authenticated): return 1 exit_code = 0 + + # We don't know what could happen when we import the worker module. We have to + # guarantee that no thread is spawned before we fork, so we have to import the + # worker module after fork. For example, both pandas and pyarrow starts some + # threads when they are imported. + if len(sys.argv) > 1 and sys.argv[1].startswith("pyspark"): + import importlib + + worker_module = importlib.import_module(sys.argv[1]) + worker_main = worker_module.main + else: + from pyspark.worker import main as worker_main + try: worker_main(infile, outfile) except SystemExit as exc: