Skip to content

Commit e08b25d

Browse files
committed
Add per-Worker observable gauges via WorkerMetrics
Five new plain.jobs.* observable gauges report queue and worker state per queue: queue.depth, queue.oldest.age, queue.scheduled, running, and worker.processes. Each Worker emits one observation per queue it handles, including zero for empty queues so last_value dashboards don't show stale readings. The OTel SDK keeps the first callback registered for a given instrument name. To survive reload paths that shut a Worker down and construct a new one in the same process, callbacks dispatch through a class-level current slot rather than binding to a Worker at registration time. Each Worker owns a WorkerMetrics; constructing one swaps in as the active target. Adds JobRequestQuerySet.ready_to_run() and .scheduled() and reuses them in Worker.run() and the gauge callbacks. Reuses JobProcess.query.running() for the running gauge so the metric and the queryset agree on what "running" means.
1 parent 146103c commit e08b25d

5 files changed

Lines changed: 335 additions & 16 deletions

File tree

plain-jobs/plain/jobs/models.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,18 @@
4545
logger = get_framework_logger()
4646

4747

48+
class JobRequestQuerySet(postgres.QuerySet["JobRequest"]):
49+
def ready_to_run(self) -> Self:
50+
"""JobRequests with no scheduling constraint or whose `start_at` is past."""
51+
return self.filter(
52+
postgres.Q(start_at__isnull=True) | postgres.Q(start_at__lte=timezone.now())
53+
)
54+
55+
def scheduled(self) -> Self:
56+
"""JobRequests scheduled to start in the future."""
57+
return self.filter(start_at__gt=timezone.now())
58+
59+
4860
@postgres.register_model
4961
class JobRequest(postgres.Model):
5062
"""
@@ -79,7 +91,7 @@ class JobRequest(postgres.Model):
7991

8092
# expires_at = postgres.DateTimeField(required=False, allow_null=True)
8193

82-
query: postgres.QuerySet[JobRequest] = postgres.QuerySet()
94+
query: JobRequestQuerySet = JobRequestQuerySet()
8395

8496
model_options = postgres.Options(
8597
ordering=["-priority", "-created_at"],

plain-jobs/plain/jobs/otel.py

Lines changed: 158 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,28 @@
11
from __future__ import annotations
22

33
import importlib.metadata
4-
from typing import Any
4+
from collections.abc import Iterable
5+
from typing import TYPE_CHECKING, Any, ClassVar
56

67
from opentelemetry import metrics, trace
8+
from opentelemetry.metrics import CallbackOptions, Observation
9+
from opentelemetry.semconv._incubating.attributes.messaging_attributes import (
10+
MESSAGING_DESTINATION_NAME,
11+
)
712
from opentelemetry.semconv._incubating.metrics.messaging_metrics import (
813
create_messaging_client_consumed_messages,
914
create_messaging_client_operation_duration,
1015
create_messaging_client_sent_messages,
1116
)
1217
from opentelemetry.semconv.attributes.error_attributes import ERROR_TYPE
1318

19+
from plain.postgres.aggregates import Count, Min
20+
from plain.utils import timezone
1421
from plain.utils.otel import format_exception_type
1522

23+
if TYPE_CHECKING:
24+
from .workers import Worker
25+
1626
try:
1727
_package_version = importlib.metadata.version("plain.jobs")
1828
except importlib.metadata.PackageNotFoundError:
@@ -21,6 +31,7 @@
2131
tracer = trace.get_tracer("plain.jobs", _package_version)
2232
meter = metrics.get_meter("plain.jobs", version=_package_version)
2333

34+
# Per-event instruments (semconv messaging metrics + plain.jobs queue.wait.duration).
2435
sent_messages_counter = create_messaging_client_sent_messages(meter)
2536
consumed_messages_counter = create_messaging_client_consumed_messages(meter)
2637
operation_duration_histogram = create_messaging_client_operation_duration(meter)
@@ -41,3 +52,149 @@ def record_span_error(
4152
span.set_status(trace.StatusCode.ERROR)
4253
span.set_attribute(ERROR_TYPE, error_type)
4354
metric_attributes[ERROR_TYPE] = error_type
55+
56+
57+
class WorkerMetrics:
58+
"""Per-Worker observable gauges (queue depth/age/scheduled, running count,
59+
worker process count).
60+
61+
The OTel SDK keeps the *first* callback registered for a given instrument
62+
name, so instruments are registered once per process. The Worker they
63+
observe may change across reload paths, so each Worker owns a
64+
WorkerMetrics; constructing one swaps it in as the active target for the
65+
(process-singleton) callbacks. The new instance simply replaces the old
66+
one in the class-level `_current` slot — no explicit teardown is needed
67+
because either a successor swaps in (reload) or the process exits
68+
(signal shutdown).
69+
70+
Each callback emits one observation per queue this Worker handles, every
71+
export interval, including zero for empty queues so `last_value`
72+
dashboards don't show stale readings after a drain. When two Workers
73+
handle the same queue they emit identical values; aggregate with
74+
`last_value`/`max`, never `sum`.
75+
"""
76+
77+
_current: ClassVar[WorkerMetrics | None] = None
78+
_registered: ClassVar[bool] = False
79+
80+
def __init__(self, worker: Worker) -> None:
81+
self.worker = worker
82+
type(self)._register_instruments()
83+
type(self)._current = self
84+
85+
@classmethod
86+
def _register_instruments(cls) -> None:
87+
if cls._registered:
88+
return
89+
cls._registered = True
90+
meter.create_observable_gauge(
91+
name="plain.jobs.worker.processes",
92+
callbacks=[cls._gauge_worker_processes],
93+
unit="{process}",
94+
description="OS processes spawned by this worker.",
95+
)
96+
meter.create_observable_gauge(
97+
name="plain.jobs.queue.depth",
98+
callbacks=[cls._gauge_queue_depth],
99+
unit="{job}",
100+
description="Pending JobRequests ready to run, per queue.",
101+
)
102+
meter.create_observable_gauge(
103+
name="plain.jobs.queue.oldest.age",
104+
callbacks=[cls._gauge_queue_oldest_age],
105+
unit="s",
106+
description="Age of the oldest ready-to-run JobRequest, per queue.",
107+
)
108+
meter.create_observable_gauge(
109+
name="plain.jobs.queue.scheduled",
110+
callbacks=[cls._gauge_queue_scheduled],
111+
unit="{job}",
112+
description="JobRequests with start_at in the future, per queue.",
113+
)
114+
meter.create_observable_gauge(
115+
name="plain.jobs.running",
116+
callbacks=[cls._gauge_running],
117+
unit="{job}",
118+
description="JobProcess rows currently running, per queue.",
119+
)
120+
121+
# --- Callbacks ----------------------------------------------------------
122+
123+
# Each callback snapshots `cls._current` to a local — `deactivate()` can
124+
# null the class var on another thread mid-callback (PeriodicExporting
125+
# MetricReader runs callbacks off the main thread).
126+
127+
@classmethod
128+
def _gauge_worker_processes(cls, options: CallbackOptions) -> Iterable[Observation]:
129+
active = cls._current
130+
if active is None:
131+
return []
132+
try:
133+
n = len(active.worker.executor._processes)
134+
except (AttributeError, TypeError):
135+
# Pool may be mid-shutdown; report 0 rather than crashing the export.
136+
n = 0
137+
return [Observation(n)]
138+
139+
@classmethod
140+
def _gauge_queue_depth(cls, options: CallbackOptions) -> Iterable[Observation]:
141+
active = cls._current
142+
if active is None:
143+
return []
144+
# Lazy import - see Worker._worker_process_initializer() comment for why.
145+
from .models import JobRequest
146+
147+
return _count_per_queue(JobRequest.query.ready_to_run(), active.worker.queues)
148+
149+
@classmethod
150+
def _gauge_queue_oldest_age(cls, options: CallbackOptions) -> Iterable[Observation]:
151+
active = cls._current
152+
if active is None:
153+
return []
154+
from .models import JobRequest
155+
156+
queues = active.worker.queues
157+
rows = (
158+
JobRequest.query.ready_to_run()
159+
.filter(queue__in=queues)
160+
.values("queue")
161+
.annotate(oldest=Min("created_at"))
162+
)
163+
now = timezone.now()
164+
# `max(0, ...)` defends against Python/Postgres clock skew producing
165+
# a negative age. Empty queues fall through to 0.0 below.
166+
ages = {
167+
row["queue"]: max(0.0, (now - row["oldest"]).total_seconds())
168+
for row in rows
169+
if row["oldest"] is not None
170+
}
171+
return [
172+
Observation(ages.get(q, 0.0), {MESSAGING_DESTINATION_NAME: q})
173+
for q in queues
174+
]
175+
176+
@classmethod
177+
def _gauge_queue_scheduled(cls, options: CallbackOptions) -> Iterable[Observation]:
178+
active = cls._current
179+
if active is None:
180+
return []
181+
from .models import JobRequest
182+
183+
return _count_per_queue(JobRequest.query.scheduled(), active.worker.queues)
184+
185+
@classmethod
186+
def _gauge_running(cls, options: CallbackOptions) -> Iterable[Observation]:
187+
active = cls._current
188+
if active is None:
189+
return []
190+
from .models import JobProcess
191+
192+
return _count_per_queue(JobProcess.query.running(), active.worker.queues)
193+
194+
195+
def _count_per_queue(queryset: Any, queues: list[str]) -> list[Observation]:
196+
rows = queryset.filter(queue__in=queues).values("queue").annotate(c=Count("*"))
197+
counts = {row["queue"]: row["c"] for row in rows}
198+
return [
199+
Observation(counts.get(q, 0), {MESSAGING_DESTINATION_NAME: q}) for q in queues
200+
]

plain-jobs/plain/jobs/workers.py

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,15 +9,14 @@
99
from functools import partial
1010
from typing import TYPE_CHECKING, Any
1111

12-
from plain import postgres
1312
from plain.logs import get_framework_logger
1413
from plain.postgres import transaction
1514
from plain.postgres.db import return_database_connection
1615
from plain.runtime import settings
17-
from plain.utils import timezone
1816
from plain.utils.module_loading import import_string
1917
from plain.utils.os import get_cpu_count
2018

19+
from .otel import WorkerMetrics
2120
from .registry import jobs_registry
2221

2322
if TYPE_CHECKING:
@@ -95,6 +94,8 @@ def __init__(
9594

9695
self._is_shutting_down = False
9796

97+
self.metrics = WorkerMetrics(self)
98+
9899
def run(self) -> None:
99100
# Lazy import - see _worker_process_initializer() comment for why
100101
from .models import JobRequest
@@ -134,14 +135,9 @@ def run(self) -> None:
134135

135136
with transaction.atomic():
136137
job_request = (
137-
JobRequest.query.select_for_update(skip_locked=True)
138-
.filter(
139-
queue__in=self.queues,
140-
)
141-
.filter(
142-
postgres.Q(start_at__isnull=True)
143-
| postgres.Q(start_at__lte=timezone.now())
144-
)
138+
JobRequest.query.ready_to_run()
139+
.filter(queue__in=self.queues)
140+
.select_for_update(skip_locked=True)
145141
.order_by("-priority", "-start_at", "-created_at")
146142
.first()
147143
)

0 commit comments

Comments
 (0)