Skip to content

Commit 49fa195

Browse files
committed
Split plain-jobs consumed counter by terminal outcome and add workers gauge
Move messaging.client.consumed.messages from JobProcess.run()'s finally block to record_consumed(), called from convert_to_result() and defer(). Adds a plain.jobs.outcome attribute (successful/errored/lost/cancelled/ deferred) so the rescue and cancellation paths show up in throughput dashboards, and forwards error.type from the live exception path. New plain.jobs.workers observable gauge splits WorkerHeartbeat rows by plain.jobs.worker.state (active/stale) using a single cutoff snapshot. heartbeat_cutoff() is extracted to models.py so rescue, admin, and OTel agree on which workers are alive.
1 parent 8c37d88 commit 49fa195

5 files changed

Lines changed: 330 additions & 44 deletions

File tree

plain-jobs/plain/jobs/README.md

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -228,15 +228,30 @@ The worker integrates with OpenTelemetry for distributed tracing. Spans are crea
228228

229229
Jobs are linked to the originating trace context, allowing you to follow jobs initiated from web requests.
230230

231-
Two messaging metrics are recorded:
231+
Messaging metrics:
232232

233233
- `messaging.client.sent.messages` — counter incremented for each enqueue
234+
- `messaging.client.consumed.messages` — counter incremented for every terminal `JobResult`. Carries a `plain.jobs.outcome` attribute (`successful`, `errored`, `lost`, `cancelled`, `deferred`) so dashboards can split throughput by outcome.
234235
- `messaging.client.operation.duration` — histogram of enqueue/process durations
236+
- `plain.jobs.queue.wait.duration` — histogram of how long a job waited in queue before a worker picked it up
237+
238+
Per-worker observable gauges (queryable per `messaging.destination.name` where applicable):
239+
240+
- `plain.jobs.worker.processes` — OS processes spawned by this worker
241+
- `plain.jobs.queue.depth` — pending `JobRequest`s ready to run
242+
- `plain.jobs.queue.scheduled``JobRequest`s with `start_at` in the future
243+
- `plain.jobs.queue.oldest.age` — age in seconds of the oldest ready-to-run `JobRequest`
244+
- `plain.jobs.running``JobProcess` rows currently running
245+
246+
Worker-liveness gauge (global, no per-queue dimension):
247+
248+
- `plain.jobs.workers``WorkerHeartbeat` row count, split by a `plain.jobs.worker.state` attribute taking `active` (within `JOBS_HEARTBEAT_TIMEOUT`) or `stale` (past it, eligible for rescue on the next tick)
235249

236250
Two contract details to be aware of:
237251

238252
- **Successful enqueues record metrics on transaction commit.** If you call `run_in_worker` inside a transaction that later rolls back, the message was never actually persisted — so the counter and histogram do not fire. This matches the OTel semconv: "MUST NOT count messages that were created but haven't yet been sent." Failed enqueues record immediately so transient errors are still visible.
239253
- **Skipped enqueues are visible in spans, not in metrics.** When `should_enqueue` returns `False` (e.g., a concurrency-key collision), the span gets `job.enqueue.skipped=True` but no metric is recorded — there was no send to count.
254+
- **Observable gauges emit once per worker process.** When two workers cover the same queue, the per-queue gauges emit identical values from each. `plain.jobs.workers` is global (no per-queue dimension) and likewise emits the full table count from every worker. Aggregate these gauges with `last_value`/`max`, never `sum`.
240255

241256
## Settings
242257

plain-jobs/plain/jobs/admin.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import datetime
43
from datetime import timedelta
54

65
from plain import postgres
@@ -14,14 +13,14 @@
1413
from plain.http import RedirectResponse
1514
from plain.postgres.expressions import Case, When
1615
from plain.runtime import settings
17-
from plain.utils import timezone
1816

1917
from .models import (
2018
JobProcess,
2119
JobRequest,
2220
JobResult,
2321
JobResultQuerySet,
2422
WorkerHeartbeat,
23+
heartbeat_cutoff,
2524
)
2625

2726

@@ -133,10 +132,6 @@ def get_metric(self) -> int:
133132
return JobProcess.query.running().count()
134133

135134

136-
def _heartbeat_cutoff() -> datetime.datetime:
137-
return timezone.now() - timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
138-
139-
140135
class ActiveWorkersCard(Card):
141136
title = "Active workers"
142137
text = "View"
@@ -147,7 +142,7 @@ def get_description(self) -> str:
147142

148143
def get_metric(self) -> int:
149144
return WorkerHeartbeat.query.filter(
150-
last_heartbeat_at__gte=_heartbeat_cutoff()
145+
last_heartbeat_at__gte=heartbeat_cutoff()
151146
).count()
152147

153148
def get_link(self) -> str:
@@ -167,7 +162,7 @@ def get_description(self) -> str:
167162

168163
def get_metric(self) -> int:
169164
return WorkerHeartbeat.query.filter(
170-
last_heartbeat_at__lt=_heartbeat_cutoff()
165+
last_heartbeat_at__lt=heartbeat_cutoff()
171166
).count()
172167

173168
def get_link(self) -> str:
@@ -364,7 +359,7 @@ def get_initial_queryset(self) -> postgres.QuerySet[WorkerHeartbeat]:
364359
queryset = super().get_initial_queryset()
365360
return queryset.annotate(
366361
stale=Case(
367-
When(last_heartbeat_at__lt=_heartbeat_cutoff(), then=True),
362+
When(last_heartbeat_at__lt=heartbeat_cutoff(), then=True),
368363
default=False,
369364
output_field=postgres.BooleanField(),
370365
),
@@ -373,7 +368,7 @@ def get_initial_queryset(self) -> postgres.QuerySet[WorkerHeartbeat]:
373368
def filter_queryset(
374369
self, queryset: postgres.QuerySet[WorkerHeartbeat]
375370
) -> postgres.QuerySet[WorkerHeartbeat]:
376-
cutoff = _heartbeat_cutoff()
371+
cutoff = heartbeat_cutoff()
377372
if self.filter == "Active":
378373
return queryset.filter(last_heartbeat_at__gte=cutoff)
379374
if self.filter == "Stale":

plain-jobs/plain/jobs/models.py

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,8 @@
88

99
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
1010
MESSAGING_CONSUMER_GROUP_NAME,
11-
MESSAGING_DESTINATION_NAME,
1211
MESSAGING_MESSAGE_ID,
1312
MESSAGING_OPERATION_NAME,
14-
MESSAGING_OPERATION_TYPE,
15-
MESSAGING_SYSTEM,
16-
MessagingOperationTypeValues,
17-
)
18-
from opentelemetry.semconv.attributes.code_attributes import (
19-
CODE_FUNCTION_NAME,
2013
)
2114
from opentelemetry.trace import Link, SpanContext, SpanKind, TraceFlags
2215

@@ -29,9 +22,10 @@
2922

3023
from .exceptions import DeferError, DeferJob
3124
from .otel import (
32-
consumed_messages_counter,
3325
operation_duration_histogram,
26+
process_metric_attributes,
3427
queue_wait_duration_histogram,
28+
record_consumed,
3529
record_span_error,
3630
tracer,
3731
)
@@ -293,12 +287,9 @@ def run(self) -> JobResult:
293287
extra={"job_uuid": self.uuid},
294288
)
295289

296-
metric_attributes: dict[str, Any] = {
297-
MESSAGING_SYSTEM: "plain.jobs",
298-
MESSAGING_OPERATION_TYPE: MessagingOperationTypeValues.PROCESS.value,
299-
MESSAGING_DESTINATION_NAME: self.queue,
300-
CODE_FUNCTION_NAME: f"{self.job_class}.run",
301-
}
290+
metric_attributes: dict[str, Any] = process_metric_attributes(
291+
self.queue, self.job_class
292+
)
302293
start_time = time.perf_counter()
303294
try:
304295
with tracer.start_as_current_span(
@@ -349,10 +340,11 @@ def run(self) -> JobResult:
349340
"Defer failed",
350341
extra={"job_class": self.job_class, "error": str(e)},
351342
)
352-
record_span_error(span, e, metric_attributes)
343+
error_type = record_span_error(span, e, metric_attributes)
353344
return self.convert_to_result(
354345
status=JobResultStatuses.ERRORED,
355346
error=str(e),
347+
error_type=error_type,
356348
)
357349

358350
except Exception as e:
@@ -363,14 +355,14 @@ def run(self) -> JobResult:
363355
# second log line. Rare; correct outcome; not worth
364356
# pre-checking on every successful job.
365357
logger.exception(e)
366-
record_span_error(span, e, metric_attributes)
358+
error_type = record_span_error(span, e, metric_attributes)
367359
return self.convert_to_result(
368360
status=JobResultStatuses.ERRORED,
369361
error="".join(traceback.format_tb(e.__traceback__)),
362+
error_type=error_type,
370363
)
371364
finally:
372365
duration = time.perf_counter() - start_time
373-
consumed_messages_counter.add(1, metric_attributes)
374366
operation_duration_histogram.record(duration, metric_attributes)
375367

376368
def defer(self, *, job: Job, defer_exception: DeferJob) -> JobResult:
@@ -443,14 +435,29 @@ def defer(self, *, job: Job, defer_exception: DeferJob) -> JobResult:
443435
span_id=self.span_id,
444436
)
445437

446-
return result
438+
# Counter ticks for the DEFERRED outcome too — defer() bypasses
439+
# convert_to_result, so without this the deferred path would not
440+
# show up in the consumed counter.
441+
record_consumed(result)
442+
return result
447443

448444
def convert_to_result(
449-
self, *, status: str, error: str = "", fire_hook: bool = True
445+
self,
446+
*,
447+
status: str,
448+
error: str = "",
449+
error_type: str | None = None,
450+
fire_hook: bool = True,
450451
) -> JobResult:
451452
"""
452453
Convert this JobProcess to a JobResult.
453454
455+
error_type, when supplied, is the OTel-style exception name (matching
456+
the spec's `error.type` attribute). It rides along to the consumed
457+
counter so dashboards can group ERRORED jobs by exception class. Only
458+
the live exception-driven paths supply it — rescue (LOST) and direct
459+
cancellations have no exception object to derive it from.
460+
454461
fire_hook controls whether on_aborted dispatches synchronously. The
455462
rescue path passes fire_hook=False so it can dispatch hooks AFTER its
456463
outer transaction commits — otherwise a hook DB error would mark the
@@ -483,6 +490,13 @@ def convert_to_result(
483490
# Delete the JobProcess now
484491
self.delete()
485492

493+
# Counter ticks for every terminal status — the live SUCCESSFUL/ERRORED
494+
# paths plus the LOST/CANCELLED paths that don't flow through
495+
# JobProcess.run()'s finally. The outcome attribute lets dashboards
496+
# split throughput by final status; error_type is forwarded for ERRORED
497+
# jobs caught by the live path.
498+
record_consumed(result, error_type=error_type)
499+
486500
# Fire Job.on_aborted outside the atomic block so a raise in user code
487501
# can't roll back the framework's bookkeeping. Only for terminal
488502
# statuses run() couldn't observe.
@@ -768,6 +782,15 @@ def __str__(self) -> str:
768782
return f"WorkerHeartbeat({self.worker_id} on {self.hostname}:{self.pid})"
769783

770784

785+
def heartbeat_cutoff() -> datetime.datetime:
786+
"""The timestamp before which a WorkerHeartbeat is considered stale.
787+
788+
Single source of truth — rescue, admin display, and OTel gauges all
789+
consult this so they agree on which workers are alive.
790+
"""
791+
return timezone.now() - datetime.timedelta(seconds=settings.JOBS_HEARTBEAT_TIMEOUT)
792+
793+
771794
def rescue_stale_workers() -> list[JobResult]:
772795
"""
773796
Convert in-flight JobProcess rows from dead workers to JobResult(LOST).
@@ -786,9 +809,7 @@ def rescue_stale_workers() -> list[JobResult]:
786809
inherently global: filtering would let one rescuer claim a dead heartbeat
787810
without converting all of that worker's jobs, stranding the rest forever.
788811
"""
789-
cutoff = timezone.now() - datetime.timedelta(
790-
seconds=settings.JOBS_HEARTBEAT_TIMEOUT
791-
)
812+
cutoff = heartbeat_cutoff()
792813
dead_workers = WorkerHeartbeat.query.filter(last_heartbeat_at__lt=cutoff)
793814

794815
pending_hooks: list[JobResult] = []

plain-jobs/plain/jobs/otel.py

Lines changed: 77 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,14 @@
66

77
from opentelemetry import metrics, trace
88
from opentelemetry.metrics import CallbackOptions, Observation
9+
from opentelemetry.semconv._incubating.attributes.code_attributes import (
10+
CODE_FUNCTION_NAME,
11+
)
912
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
1013
MESSAGING_DESTINATION_NAME,
14+
MESSAGING_OPERATION_TYPE,
15+
MESSAGING_SYSTEM,
16+
MessagingOperationTypeValues,
1117
)
1218
from opentelemetry.semconv._incubating.metrics.messaging_metrics import (
1319
create_messaging_client_consumed_messages,
@@ -16,13 +22,21 @@
1622
)
1723
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
1824

25+
from plain.postgres import Q
1926
from plain.postgres.aggregates import Count, Min
2027
from plain.utils import timezone
2128
from plain.utils.otel import format_exception_type
2229

2330
if TYPE_CHECKING:
31+
from .models import JobResult
2432
from .workers import Worker
2533

34+
# Attribute key for the terminal-status dimension on the consumed counter.
35+
PLAIN_JOBS_OUTCOME = "plain.jobs.outcome"
36+
37+
# Attribute key for the worker-liveness dimension on plain.jobs.workers.
38+
PLAIN_JOBS_WORKER_STATE = "plain.jobs.worker.state"
39+
2640
try:
2741
_package_version = importlib.metadata.version("plain.jobs")
2842
except importlib.metadata.PackageNotFoundError:
@@ -46,12 +60,46 @@ def record_span_error(
4660
span: trace.Span,
4761
exc: BaseException,
4862
metric_attributes: dict[str, Any],
49-
) -> None:
63+
) -> str:
64+
"""Mark the span as failed, stamp error.type on it and on the per-call
65+
metric attribute dict, and return the error.type string so the caller
66+
can forward it to other instruments."""
5067
error_type = format_exception_type(exc)
5168
span.record_exception(exc)
5269
span.set_status(trace.StatusCode.ERROR)
5370
span.set_attribute(ERROR_TYPE, error_type)
5471
metric_attributes[ERROR_TYPE] = error_type
72+
return error_type
73+
74+
75+
def process_metric_attributes(queue: str, job_class: str) -> dict[str, Any]:
76+
"""Base attribute dict for messaging.client.* process-side metrics.
77+
78+
Shared by JobProcess.run() (which adds error.type for failed jobs) and
79+
record_consumed (which adds the outcome dimension). One builder so
80+
keys/values stay in lockstep across the two call sites.
81+
"""
82+
return {
83+
MESSAGING_SYSTEM: "plain.jobs",
84+
MESSAGING_OPERATION_TYPE: MessagingOperationTypeValues.PROCESS.value,
85+
MESSAGING_DESTINATION_NAME: queue,
86+
CODE_FUNCTION_NAME: f"{job_class}.run",
87+
}
88+
89+
90+
def record_consumed(result: JobResult, *, error_type: str | None = None) -> None:
91+
"""Record one consumed-message metric point per terminal JobResult.
92+
93+
`plain.jobs.outcome` carries the terminal status (successful/errored/
94+
lost/cancelled/deferred). `error.type` is included when known — i.e.,
95+
when the live path caught an exception and forwarded it through. The
96+
rescue path (LOST) and direct cancellations don't carry an error type
97+
because there is no exception object to derive it from."""
98+
attrs = process_metric_attributes(result.queue, result.job_class)
99+
attrs[PLAIN_JOBS_OUTCOME] = result.status.lower()
100+
if error_type is not None:
101+
attrs[ERROR_TYPE] = error_type
102+
consumed_messages_counter.add(1, attrs)
55103

56104

57105
class WorkerMetrics:
@@ -117,6 +165,15 @@ def _register_instruments(cls) -> None:
117165
unit="{job}",
118166
description="JobProcess rows currently running, per queue.",
119167
)
168+
meter.create_observable_gauge(
169+
name="plain.jobs.workers",
170+
callbacks=[cls._gauge_workers],
171+
unit="{worker}",
172+
description=(
173+
"WorkerHeartbeat row count, split by liveness state "
174+
"(active=within JOBS_HEARTBEAT_TIMEOUT, stale=past it)."
175+
),
176+
)
120177

121178
# --- Callbacks ----------------------------------------------------------
122179

@@ -191,6 +248,25 @@ def _gauge_running(cls, options: CallbackOptions) -> Iterable[Observation]:
191248

192249
return _count_per_queue(JobProcess.query.running(), active.worker.queues)
193250

251+
# The worker-liveness gauge observes the global WorkerHeartbeat table and
252+
# doesn't need a calling Worker — emit unconditionally so dashboards keep
253+
# reporting even during a full worker drain. One snapshot of the cutoff
254+
# is shared across both observations so a row landing exactly at the
255+
# boundary can't be counted in both states (or neither).
256+
@classmethod
257+
def _gauge_workers(cls, options: CallbackOptions) -> Iterable[Observation]:
258+
from .models import WorkerHeartbeat, heartbeat_cutoff
259+
260+
cutoff = heartbeat_cutoff()
261+
counts = WorkerHeartbeat.query.aggregate(
262+
active=Count("id", filter=Q(last_heartbeat_at__gte=cutoff)),
263+
stale=Count("id", filter=Q(last_heartbeat_at__lt=cutoff)),
264+
)
265+
return [
266+
Observation(counts["active"], {PLAIN_JOBS_WORKER_STATE: "active"}),
267+
Observation(counts["stale"], {PLAIN_JOBS_WORKER_STATE: "stale"}),
268+
]
269+
194270

195271
def _count_per_queue(queryset: Any, queues: list[str]) -> list[Observation]:
196272
rows = queryset.filter(queue__in=queues).values("queue").annotate(c=Count("*"))

0 commit comments

Comments
 (0)