Skip to content

Commit c72ebe7

Browse files
committed
Wrap job worker maintenance in an INTERNAL span
Currently DB transients during maintenance (heartbeat, rescue, schedule) get caught by the outer except-Exception and swallowed via logger.exception with no surrounding span — so they're invisible to OTel-based exception tooling even though they're real failures. Wrapping the maintenance work in a `worker loop` INTERNAL span stamps the canonical status=ERROR plus error.type signal on each failed iteration, keeping the existing log-and- continue behavior intact.
1 parent bb9251f commit c72ebe7

2 files changed

Lines changed: 98 additions & 10 deletions

File tree

plain-jobs/plain/jobs/workers.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,19 @@
1414
from functools import partial
1515
from typing import TYPE_CHECKING, Any
1616

17+
from opentelemetry import trace
18+
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
19+
1720
from plain.logs import get_framework_logger
1821
from plain.postgres import transaction
1922
from plain.postgres.db import return_database_connection
2023
from plain.runtime import settings
2124
from plain.utils import timezone
2225
from plain.utils.module_loading import import_string
2326
from plain.utils.os import get_cpu_count
27+
from plain.utils.otel import format_exception_type
2428

25-
from .otel import WorkerMetrics
29+
from .otel import WorkerMetrics, tracer
2630
from .registry import jobs_registry
2731

2832
if TYPE_CHECKING:
@@ -184,15 +188,23 @@ def _run_loop(self) -> None:
184188
from .models import JobRequest
185189

186190
while not self._is_shutting_down:
187-
try:
188-
self.maybe_heartbeat()
189-
self.maybe_log_stats()
190-
self.maybe_check_job_results()
191-
self.maybe_schedule_jobs()
192-
except Exception as e:
193-
# Log the issue, but don't stop the worker
194-
# (these tasks are kind of ancilarry to the main job processing)
195-
logger.exception(e)
191+
with tracer.start_as_current_span(
192+
"worker loop", kind=trace.SpanKind.INTERNAL
193+
) as span:
194+
try:
195+
self.maybe_heartbeat()
196+
self.maybe_log_stats()
197+
self.maybe_check_job_results()
198+
self.maybe_schedule_jobs()
199+
except Exception as e:
200+
# The catch is inside the span, so the SDK's auto-record
201+
# on context exit won't fire — stamp the canonical
202+
# failure signal explicitly. Log and continue: these
203+
# tasks are ancillary to the main job processing.
204+
span.record_exception(e)
205+
span.set_status(trace.StatusCode.ERROR)
206+
span.set_attribute(ERROR_TYPE, format_exception_type(e))
207+
logger.exception(e)
196208

197209
# Re-check shutdown after maintenance — a signal may have arrived
198210
# between the loop condition and now. Don't pick up new work.

plain-jobs/tests/internal/test_otel.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
from __future__ import annotations
1010

11+
import threading
1112
import uuid
1213

1314
import pytest
@@ -164,6 +165,81 @@ def _boom(*args, **kwargs):
164165
)
165166

166167

168+
# --- Worker run-loop span -----------------------------------------------
169+
170+
171+
def _build_worker_for_loop_test() -> Worker:
172+
"""Construct a Worker bypassing __init__ so `_run_loop` can run a single
173+
iteration without a real ProcessPoolExecutor. Tests override the maintenance
174+
methods to control when the loop exits."""
175+
worker = Worker.__new__(Worker)
176+
worker.queues = ["default"]
177+
worker._is_shutting_down = False
178+
worker._heartbeat_registered = True
179+
worker._inflight_futures = {}
180+
worker._inflight_lock = threading.Lock()
181+
worker.max_processes = 1
182+
worker.max_pending_per_process = 1
183+
worker.maybe_heartbeat = lambda: None
184+
worker.maybe_log_stats = lambda: None
185+
worker.maybe_check_job_results = lambda: None
186+
worker.maybe_schedule_jobs = lambda: None
187+
return worker
188+
189+
190+
@pytest.mark.usefixtures("db")
191+
def test_worker_loop_emits_internal_span_per_iteration(
192+
otel_spans: InMemorySpanExporter,
193+
) -> None:
194+
"""Each worker loop iteration wraps the maintenance work in a `worker loop`
195+
INTERNAL span so DB transients during maintenance land on a span."""
196+
from opentelemetry.trace import StatusCode
197+
198+
worker = _build_worker_for_loop_test()
199+
200+
def shutdown_during_heartbeat() -> None:
201+
worker._is_shutting_down = True
202+
203+
worker.maybe_heartbeat = shutdown_during_heartbeat # ty: ignore[invalid-assignment]
204+
worker._run_loop()
205+
206+
loop_spans = [s for s in otel_spans.get_finished_spans() if s.name == "worker loop"]
207+
assert len(loop_spans) == 1
208+
span = loop_spans[0]
209+
assert span.kind == SpanKind.INTERNAL
210+
assert span.status.status_code == StatusCode.UNSET
211+
212+
213+
@pytest.mark.usefixtures("db")
214+
def test_worker_loop_records_error_when_maintenance_fails(
215+
otel_spans: InMemorySpanExporter,
216+
) -> None:
217+
"""A maintenance exception leaves the loop running and stamps the canonical
218+
failure signal (status=ERROR + error.type) on the `worker loop` span. This
219+
is the path that previously swallowed DB transients like the production
220+
`psycopg.OperationalError` we saw escaping `rescue_job_results`."""
221+
from opentelemetry.trace import StatusCode
222+
223+
worker = _build_worker_for_loop_test()
224+
225+
def boom_then_shutdown() -> None:
226+
worker._is_shutting_down = True
227+
raise RuntimeError("db transient")
228+
229+
worker.maybe_heartbeat = boom_then_shutdown # ty: ignore[invalid-assignment]
230+
# Must NOT raise — the loop catches and continues, just like in production.
231+
worker._run_loop()
232+
233+
loop_spans = [s for s in otel_spans.get_finished_spans() if s.name == "worker loop"]
234+
assert len(loop_spans) == 1
235+
span = loop_spans[0]
236+
assert span.status.status_code == StatusCode.ERROR
237+
assert span.attributes is not None
238+
assert span.attributes["error.type"] == "RuntimeError"
239+
exception_events = [e for e in span.events if e.name == "exception"]
240+
assert exception_events
241+
242+
167243
# --- Worker-state observable gauges -------------------------------------
168244
#
169245
# Each Worker owns a WorkerMetrics; instantiating one swaps it in as the

0 commit comments

Comments
 (0)