Skip to content

Commit c4551d1

Browse files
committed
Run setup in job worker spawned processes and use lazy model imports in Worker file
1 parent 51461b9 commit c4551d1

1 file changed

Lines changed: 50 additions & 2 deletions

File tree

plain-jobs/plain/jobs/workers.py

Lines changed: 50 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import time
88
from concurrent.futures import Future, ProcessPoolExecutor
99
from functools import partial
10-
from typing import Any
10+
from typing import TYPE_CHECKING, Any
1111

1212
from plain import models
1313
from plain.models import transaction
@@ -16,12 +16,44 @@
1616
from plain.utils import timezone
1717
from plain.utils.module_loading import import_string
1818

19-
from .models import JobProcess, JobRequest, JobResult, JobResultStatuses
2019
from .registry import jobs_registry
2120

21+
if TYPE_CHECKING:
22+
from .models import JobResult
23+
24+
# Models are NOT imported at the top of this file!
25+
# See comment on _worker_process_initializer() for explanation.
26+
2227
logger = logging.getLogger("plain.jobs")
2328

2429

30+
def _worker_process_initializer() -> None:
31+
"""Initialize Plain framework in worker process before processing jobs.
32+
33+
Why this is needed:
34+
- We use multiprocessing with 'spawn' context (not 'fork')
35+
- Spawn creates fresh Python processes, not forked copies
36+
- When a spawned process starts, it re-imports this module BEFORE the initializer runs
37+
- If we imported models at the top of this file, model registration would
38+
happen before plain.runtime.setup(), causing PackageRegistryNotReady errors
39+
40+
Solution:
41+
- This initializer runs plain.runtime.setup() FIRST in each worker process
42+
- All model imports happen lazily inside functions (after setup completes)
43+
- This ensures packages registry is ready before any models are accessed
44+
45+
Execution order in spawned worker:
46+
1. Re-import workers.py (but models NOT imported yet - lazy!)
47+
2. Run this initializer → plain.runtime.setup()
48+
3. Execute process_job() → NOW it's safe to import models
49+
"""
50+
from plain.runtime import setup
51+
52+
# Each spawned worker process needs to set up Plain
53+
# (spawn context creates fresh processes, not forks)
54+
setup()
55+
56+
2557
class Worker:
2658
def __init__(
2759
self,
@@ -39,6 +71,7 @@ def __init__(
3971
max_workers=max_processes,
4072
max_tasks_per_child=max_jobs_per_process,
4173
mp_context=multiprocessing.get_context("spawn"),
74+
initializer=_worker_process_initializer,
4275
)
4376

4477
self.queues = queues
@@ -56,6 +89,9 @@ def __init__(
5689
self._is_shutting_down = False
5790

5891
def run(self) -> None:
92+
# Lazy import - see _worker_process_initializer() comment for why
93+
from .models import JobRequest
94+
5995
logger.info(
6096
"⬣ Starting Plain worker\n Registered jobs: %s\n Queues: %s\n Jobs schedule: %s\n Stats every: %s seconds\n Max processes: %s\n Max jobs per process: %s\n Max pending per process: %s\n PID: %s",
6197
"\n ".join(
@@ -211,6 +247,9 @@ def maybe_schedule_jobs(self) -> None:
211247
self._jobs_schedule_checked_at = now
212248

213249
def log_stats(self) -> None:
250+
# Lazy import - see _worker_process_initializer() comment for why
251+
from .models import JobProcess, JobRequest
252+
214253
try:
215254
num_proccesses = len(self.executor._processes)
216255
except (AttributeError, TypeError):
@@ -232,12 +271,18 @@ def log_stats(self) -> None:
232271

233272
def rescue_job_results(self) -> None:
234273
"""Find any lost or failed jobs on this worker's queues and handle them."""
274+
# Lazy import - see _worker_process_initializer() comment for why
275+
from .models import JobProcess, JobResult
276+
235277
# TODO return results and log them if there are any?
236278
JobProcess.query.filter(queue__in=self.queues).mark_lost_jobs()
237279
JobResult.query.filter(queue__in=self.queues).retry_failed_jobs()
238280

239281

240282
def future_finished_callback(job_process_uuid: str, future: Future) -> None:
283+
# Lazy import - see _worker_process_initializer() comment for why
284+
from .models import JobProcess, JobResultStatuses
285+
241286
if future.cancelled():
242287
logger.warning("Job cancelled job_process_uuid=%s", job_process_uuid)
243288
try:
@@ -264,6 +309,9 @@ def future_finished_callback(job_process_uuid: str, future: Future) -> None:
264309

265310

266311
def process_job(job_process_uuid: str) -> None:
312+
# Lazy import - see _worker_process_initializer() comment for why
313+
from .models import JobProcess
314+
267315
try:
268316
worker_pid = os.getpid()
269317

0 commit comments

Comments
 (0)