Skip to content

Commit 3171bb4

Browse files
committed
plain-jobs: skip silently when defer re-enqueue is blocked
Previously `JobProcess.defer()` raised `DeferError` when the re-enqueue's `should_enqueue()` returned False, surfacing as an `ERRORED` JobResult and a per-job exception on the consumer span. That contradicted the framework convention everywhere else: `run_in_worker()` and `retry_job()` both return None silently in the same situation. Honor `should_enqueue=False` the same way in the defer path — record `DEFERRED` with `retry_job_request_uuid=NULL` and stamp `plain.jobs.defer.skipped=True` on the consumer span. Remove the now-unraised `DeferError`.
1 parent 2f32cb4 commit 3171bb4

4 files changed

Lines changed: 63 additions & 46 deletions

File tree

plain-jobs/plain/jobs/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
__version__ = version("plain.jobs")
44

5-
from .exceptions import DeferError, DeferJob
5+
from .exceptions import DeferJob
66
from .jobs import Job
77
from .middleware import JobMiddleware
88
from .registry import register_job
99

10-
__all__ = ["Job", "DeferJob", "DeferError", "JobMiddleware", "register_job"]
10+
__all__ = ["Job", "DeferJob", "JobMiddleware", "register_job"]

plain-jobs/plain/jobs/exceptions.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,3 @@ def __init__(self, *, delay: int, increment_retries: bool = False):
2121
self.delay = delay
2222
self.increment_retries = increment_retries
2323
super().__init__(f"Job deferred for {delay} seconds")
24-
25-
26-
class DeferError(Exception):
27-
"""Raised when a deferred job cannot be re-enqueued.
28-
29-
This typically happens when concurrency limits prevent the job from being
30-
re-queued. The transaction will be rolled back and the job will remain
31-
in its current state, then be converted to ERRORED status for retry.
32-
"""
33-
34-
pass

plain-jobs/plain/jobs/models.py

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from plain.runtime import settings
2121
from plain.utils import timezone
2222

23-
from .exceptions import DeferError, DeferJob
23+
from .exceptions import DeferJob
2424
from .otel import (
2525
operation_duration_histogram,
2626
process_metric_attributes,
@@ -326,25 +326,19 @@ def run(self) -> JobResult:
326326
"job_process_uuid": self.uuid,
327327
},
328328
)
329-
return self.defer(job=job, defer_exception=e)
329+
result = self.defer(job=job, defer_exception=e)
330+
if result.retry_job_request_uuid is None:
331+
# Re-enqueue was blocked by should_enqueue() —
332+
# either the default uniqueness rule (a peer
333+
# exists) or a user override (rate limit, custom
334+
# rule). Same treatment as the initial-enqueue
335+
# path's `job.enqueue.skipped`: not an error,
336+
# just visibility on the consumer span.
337+
span.set_attribute("plain.jobs.defer.skipped", True)
338+
return result
330339

331340
return self.convert_to_result(status=JobResultStatuses.SUCCESSFUL)
332341

333-
except DeferError as e:
334-
# Defer failed (e.g., concurrency limit reached during re-enqueue)
335-
# The transaction was rolled back, so the JobProcess still exists in DB.
336-
# The pk was restored in defer() before raising, so we can proceed normally.
337-
logger.warning(
338-
"Defer failed",
339-
extra={"job_class": self.job_class, "error": str(e)},
340-
)
341-
error_type = record_span_error(span, e, metric_attributes)
342-
return self.convert_to_result(
343-
status=JobResultStatuses.ERRORED,
344-
error=str(e),
345-
error_type=error_type,
346-
)
347-
348342
except Exception as e:
349343
# Note: if a rescuer already wrote JobResult(LOST) for this
350344
# row (heartbeat went stale during a long job, then the job
@@ -367,12 +361,17 @@ def defer(self, *, job: Job, defer_exception: DeferJob) -> JobResult:
367361
"""Defer this job by re-enqueueing it for later execution.
368362
369363
Atomically deletes the JobProcess, re-enqueues the job, and creates
370-
a JobResult linking to the new request. This ensures the concurrency
371-
slot is released before attempting to re-enqueue.
372-
373-
Raises:
374-
DeferError: If the job cannot be re-enqueued (e.g., due to concurrency limits).
375-
The transaction will be rolled back and the JobProcess will remain.
364+
a JobResult. The concurrency slot is released before re-enqueue so
365+
the new request's own `should_enqueue()` check can pass.
366+
367+
If `should_enqueue()` blocks the re-enqueue, the framework honors
368+
that signal — same convention as `run_in_worker()` and `retry_job()`,
369+
which both return `None` silently in the same situation. The
370+
JobResult is still `DEFERRED` but `retry_job_request_uuid` is
371+
`None`, the error message records that the re-enqueue was skipped,
372+
and the caller stamps `plain.jobs.defer.skipped=True` on the
373+
consumer span so this case is queryable in APM without surfacing
374+
as an exception.
376375
"""
377376
# Calculate new retry_attempt based on increment_retries
378377
retry_attempt = (
@@ -383,7 +382,6 @@ def defer(self, *, job: Job, defer_exception: DeferJob) -> JobResult:
383382

384383
with transaction.atomic():
385384
# 1. Save JobProcess state and delete (releases concurrency slot)
386-
saved_id = self.id
387385
job_process_uuid = self.uuid
388386
job_request_uuid = self.job_request_uuid
389387
requested_at = self.requested_at
@@ -400,21 +398,23 @@ def defer(self, *, job: Job, defer_exception: DeferJob) -> JobResult:
400398
concurrency_key=self.concurrency_key,
401399
)
402400

403-
# Check if re-enqueue failed
404401
if new_job_request is None:
405-
# Restore id since transaction will roll back and object still exists
406-
self.id = saved_id
407-
raise DeferError(
408-
f"Failed to re-enqueue deferred job {self.job_class}: "
409-
f"concurrency limit reached for key '{self.concurrency_key}'"
402+
error = (
403+
f"Deferred for {defer_exception.delay} seconds "
404+
f"(re-enqueue skipped: should_enqueue() returned False "
405+
f"for concurrency_key '{self.concurrency_key}')"
410406
)
407+
retry_job_request_uuid = None
408+
else:
409+
error = f"Deferred for {defer_exception.delay} seconds"
410+
retry_job_request_uuid = new_job_request.uuid
411411

412-
# 3. Create JobResult linking to new request
412+
# 3. Create JobResult (linking to new request if one was created)
413413
result = JobResult.query.create(
414414
ended_at=timezone.now(),
415-
error=f"Deferred for {defer_exception.delay} seconds",
415+
error=error,
416416
status=JobResultStatuses.DEFERRED,
417-
retry_job_request_uuid=new_job_request.uuid,
417+
retry_job_request_uuid=retry_job_request_uuid,
418418
# From the JobProcess
419419
job_process_uuid=job_process_uuid,
420420
started_at=started_at,

plain-jobs/tests/internal/test_otel.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -544,6 +544,34 @@ def test_consumed_counter_records_outcome_for_deferred(
544544
assert deferred, "expected a consumed counter point with outcome=deferred"
545545

546546

547+
@pytest.mark.usefixtures("db")
548+
def test_defer_skipped_when_reenqueue_blocked() -> None:
549+
"""When defer()'s re-enqueue is blocked by should_enqueue() returning
550+
False, the framework honors the signal silently — same convention as
551+
run_in_worker() and retry_job(), which both return None in the same
552+
situation. The result is recorded as DEFERRED with no retry uuid so
553+
the case is visible in admin without surfacing as an exception."""
554+
from plain.jobs.exceptions import DeferJob
555+
from plain.jobs.models import JobResultStatuses
556+
557+
# Seed a JobProcess via a job whose should_enqueue allows it through.
558+
request = _NoopJob().run_in_worker(concurrency_key="busy")
559+
assert request is not None
560+
process = request.convert_to_job_process(worker_id=uuid.uuid4())
561+
562+
# Defer using a job that always says should_enqueue=False. In
563+
# production these would be the same class; using two here lets the
564+
# initial enqueue succeed and only the re-enqueue get blocked.
565+
result = process.defer(
566+
job=_ExclusiveJob(),
567+
defer_exception=DeferJob(delay=60),
568+
)
569+
570+
assert result.status == JobResultStatuses.DEFERRED
571+
assert result.retry_job_request_uuid is None
572+
assert "re-enqueue skipped" in result.error
573+
574+
547575
@pytest.mark.usefixtures("db")
548576
def test_workers_gauge_splits_by_state_attribute(metrics, settings) -> None:
549577
"""One `plain.jobs.workers` gauge with `plain.jobs.worker.state` attribute

0 commit comments

Comments
 (0)