From b68e13ccb4df48edb0a2be6e7ed929d8d2b122e6 Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Thu, 20 Nov 2025 13:33:30 -0800 Subject: [PATCH 1/2] Enable coverage for workers --- python/.coveragerc | 5 +++++ python/test_coverage/sitecustomize.py | 27 ++++++++++++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) diff --git a/python/.coveragerc b/python/.coveragerc index 111f791b04ba..cba7f5481af8 100644 --- a/python/.coveragerc +++ b/python/.coveragerc @@ -35,3 +35,8 @@ omit = */pyspark/streaming/tests/* */pyspark/tests/* */pyspark/testing/tests/* + +[paths] +source = + pyspark/ + lib/pyspark.zip/pyspark diff --git a/python/test_coverage/sitecustomize.py b/python/test_coverage/sitecustomize.py index 1f31860d2ff4..2e4dbf22dd9a 100644 --- a/python/test_coverage/sitecustomize.py +++ b/python/test_coverage/sitecustomize.py @@ -21,6 +21,31 @@ # variable is set or not. If set, it starts to run the coverage. try: import coverage - coverage.process_startup() + cov = coverage.process_startup() + if cov: + import os + + def patch_worker(): + import sys + frame = sys._getframe(1) + if ( + frame.f_code.co_name == "manager" and + "daemon.py" in frame.f_code.co_filename and + "worker" in frame.f_globals + ): + with open(f"./coverage_{os.getpid()}.txt", "a") as f: + f.write(f"{frame}") + + def save_when_exit(func): + def wrapper(*args, **kwargs): + result = func(*args, **kwargs) + cov.save() + return result + return wrapper + + frame.f_globals["worker"] = save_when_exit(frame.f_globals["worker"]) + + os.register_at_fork(after_in_child=patch_worker) + except ImportError: pass From 331ced67e471de4ba723e151a049c36981de96b1 Mon Sep 17 00:00:00 2001 From: Tian Gao Date: Thu, 20 Nov 2025 13:36:32 -0800 Subject: [PATCH 2/2] Polish the customize file --- python/test_coverage/sitecustomize.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/python/test_coverage/sitecustomize.py b/python/test_coverage/sitecustomize.py index 2e4dbf22dd9a..6bda12db883d 100644 --- a/python/test_coverage/sitecustomize.py +++ b/python/test_coverage/sitecustomize.py @@ -26,6 +26,9 @@ import os def patch_worker(): + # If it's a worker forked from the daemon, we need to patch it to save + # the coverage data. Otherwise the worker will be killed by a signal and + # the coverage data will not be saved. import sys frame = sys._getframe(1) if ( @@ -33,8 +36,6 @@ def patch_worker(): "daemon.py" in frame.f_code.co_filename and "worker" in frame.f_globals ): - with open(f"./coverage_{os.getpid()}.txt", "a") as f: - f.write(f"{frame}") def save_when_exit(func): def wrapper(*args, **kwargs):