[SPARK-54055][CONNECT][PYTHON] Clean up per-session PythonWorkerFactory#55131
[SPARK-54055][CONNECT][PYTHON] Clean up per-session PythonWorkerFactory#55131kumbham wants to merge 1 commit intoapache:masterfrom
Conversation
16b9db1 to
8d4843c
Compare
|
all of the failed tests are failing with this exception: https://github.com/kumbham/spark/actions/runs/23860864341/job/69567033253#step:11:8167 None of the failing tests are related to the changes in this PR (PythonWorkerFactory, |
…ory on session close Each Spark Connect session creates its own PythonWorkerFactory keyed by SPARK_JOB_ARTIFACT_UUID. These factories (and their daemon processes) were never cleaned up until SparkContext shutdown, causing unbounded process and thread leaks on long-running servers. This change adds two cleanup mechanisms: 1. Eager cleanup: SessionHolder.close() now calls SparkSession.cleanupPythonWorkers() which removes and stops all PythonWorkerFactory instances matching the session's artifact UUID from SparkEnv's cache. 2. Idle-timeout eviction: A background reaper thread in SparkEnv periodically scans for PythonWorkerFactory instances with a non-default artifact UUID that have been idle for >5 minutes, and evicts them. This handles executor-side cleanup where session close notifications are not received. Closes #XXXXX Made-with: Cursor
8d4843c to
8ce7463
Compare
holdenk
left a comment
There was a problem hiding this comment.
Thanks for working on this! I've got some questions, it's been a hot minute since I thought about how we spawn Python workers.
| private val idleFactoryReaper = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("idle-python-factory-reaper") | ||
| idleFactoryReaper.scheduleAtFixedRate( | ||
| () => evictIdlePythonWorkerFactories(), | ||
| PythonWorkerFactory.IDLE_FACTORY_CHECK_INTERVAL_MS, | ||
| PythonWorkerFactory.IDLE_FACTORY_CHECK_INTERVAL_MS, | ||
| TimeUnit.MILLISECONDS) |
There was a problem hiding this comment.
Would it make sense to only launch this if we have a Python job present? Or is the overhead low enough/complexity high enough of doing that it doesn't matter. (I can probably be convinced either way just looking for the thinking here)
| @@ -120,6 +120,14 @@ class SparkEnv ( | |||
| pythonExec: String, workerModule: String, daemonModule: String, envVars: Map[String, String]) | |||
| private val pythonWorkers = mutable.HashMap[PythonWorkersKey, PythonWorkerFactory]() | |||
There was a problem hiding this comment.
So are we just depending on the env var flow through here to make the cache work? I know this isn't your OG code but it feels funky as is.
| // Create and start the daemon | ||
| val command = Arrays.asList(pythonExec, "-m", daemonModule, workerModule) | ||
| val pb = new ProcessBuilder(command) | ||
| val jobArtifactUUID = envVars.getOrElse("SPARK_JOB_ARTIFACT_UUID", "default") |
There was a problem hiding this comment.
This seems like a redef of L124
What changes were proposed in this pull request?
Each Spark Connect session creates its own
PythonWorkerFactorykeyed bySPARK_JOB_ARTIFACT_UUID. These factories (and their daemon processes) were never cleaned up untilSparkContextshutdown, causing unbounded process and thread leaks on long-running servers.This change adds two cleanup mechanisms:
Eager cleanup (driver-side):
SessionHolder.close()now callsSparkSession.cleanupPythonWorkers(), which finds and stops allPythonWorkerFactoryinstances matching the session's artifact UUID inSparkEnv's cache. This follows the same pattern as the existingcleanupPythonWorkerLogs()in the same lifecycle hook.Idle-timeout eviction (executor-side safety net): A
ScheduledExecutorServiceinSparkEnvperiodically scans forPythonWorkerFactoryinstances with a non-defaultSPARK_JOB_ARTIFACT_UUIDthat have no active/idle workers and have been idle for >5 minutes, and evicts them. This handles executor-side cleanup where session close notifications from the driver cannot reach. The scheduler follows the same pattern used byContextCleaner,Heartbeater, and other Spark core components.Factories with a default artifact UUID (i.e., non-Connect workloads) are never evicted by the idle-timeout mechanism.
Closes SPARK-54055
Why are the changes needed?
With Spark Connect, each session always has a unique
SPARK_JOB_ARTIFACT_UUID, even if there are no artifacts. This makes the UDF environment built byBasePythonRunner.computeunique per session, so each session gets its ownPythonWorkerFactoryand daemon process.PythonWorkerFactoryhas astopmethod, but no one called it exceptSparkEnv.stop(which only runs at full shutdown). On a long-running Spark Connect server, this causes unbounded accumulation of daemon processes,MonitorThreads, and stderr/stdout reader threads — eventually leading to OOM.Reproduction (from JIRA reporter):
After 200 sessions, 200+ daemon processes and ~1000 threads are leaked.
Does this PR introduce any user-facing change?
No. This is a resource leak fix. Python UDF behavior is unchanged.
How was this patch tested?
Added 4 new unit tests in PythonWorkerFactoryIdleSuite:
isIdleFactory returns false for default artifact UUID — factories without a session UUID are never evicted
isIdleFactory returns false for session factory with recent activity — active factories are not evicted
isIdleFactory returns true for session factory past timeout — idle session factories are correctly identified
destroyPythonWorkersByArtifactUUID removes only matching factories — validates selective cleanup by UUID
Also verified no regressions in existing test suites:
PythonWorkerFactorySuite (3/3 passed)
SparkConnectSessionHolderSuite (18/18 passed)
SparkConnectSessionManagerSuite (10/10 passed)
All api.python core tests (14/14 passed)
Was this patch authored or co-authored using generative AI tooling?
Cursor (Claude claude-4.6-opus-high-thinking)