Skip to content

Commit 2a106fb

Browse files
committed
Add plain-jobs OTel tests and document metric contract
Tests cover the `Job.run_in_worker()` enqueue path: send-span semconv attributes, the skipped-enqueue branch, and failure-path metric recording. README and a code comment document the two metric contracts that the tests can't directly assert under the test rollback: - Successful enqueues defer to `transaction.on_commit`, so a rolled-back caller produces no metric — matching OTel's "MUST NOT count messages that were created but haven't yet been sent." - Skipped enqueues are visible on the span (`job.enqueue.skipped`) but intentionally do not increment `messaging.client.sent.messages` — no message was sent, so there's nothing to count.
1 parent db810e1 commit 2a106fb

3 files changed

Lines changed: 115 additions & 4 deletions

File tree

plain-jobs/plain/jobs/README.md

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,21 @@ plain jobs worker --stats-every 60
181181

182182
The worker integrates with OpenTelemetry for distributed tracing. Spans are created for:
183183

184-
- Job scheduling (`run_in_worker`)
185-
- Job execution
186-
- Job completion/failure
184+
- Job scheduling (`run_in_worker`) — emits a `send {queue}` PRODUCER span with the OTel `messaging.*` semconv attributes
185+
- Job execution — emits a `process {queue}` CONSUMER span linked back to the originating send span
186+
- Job completion/failure — recorded as the span's status and `error.type` attribute on failure
187187

188-
Jobs can be linked to the originating trace context, allowing you to track jobs initiated from web requests.
188+
Jobs are linked to the originating trace context, allowing you to follow jobs initiated from web requests.
189+
190+
Two messaging metrics are recorded:
191+
192+
- `messaging.client.sent.messages` — counter incremented for each enqueue
193+
- `messaging.client.operation.duration` — histogram of enqueue/process durations
194+
195+
Two contract details to be aware of:
196+
197+
- **Successful enqueues record metrics on transaction commit.** If you call `run_in_worker` inside a transaction that later rolls back, the message was never actually persisted — so the counter and histogram do not fire. This matches the OTel semconv: "MUST NOT count messages that were created but haven't yet been sent." Failed enqueues record immediately so transient errors are still visible.
198+
- **Skipped enqueues are visible in spans, not in metrics.** When `should_enqueue` returns `False` (e.g., a concurrency-key collision), the span gets `job.enqueue.skipped=True` but no metric is recorded — there was no send to count.
189199

190200
## Settings
191201

plain-jobs/plain/jobs/jobs.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,9 @@ def run_in_worker(
180180
metric_attributes[ERROR_TYPE] = format_exception_type(e)
181181
raise
182182
finally:
183+
# Skipped enqueues are visible on the span (`job.enqueue.skipped`)
184+
# but do not fire the messaging counter — no message was sent, so
185+
# there's nothing for `messaging.client.sent.messages` to count.
183186
if not skipped:
184187
duration = time.perf_counter() - start_time
185188
if ERROR_TYPE in metric_attributes:

plain-jobs/tests/test_otel.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
"""OTel instrumentation tests for the job enqueue path.
2+
3+
The process (consumer) side runs through `JobProcess.convert_to_result()`
4+
and is exercised by the worker; tests for it would need worker setup and
5+
are deferred. These tests cover `Job.run_in_worker()`, which is the
6+
hottest user-facing path.
7+
"""
8+
9+
from __future__ import annotations
10+
11+
import pytest
12+
from opentelemetry.sdk.metrics.export import InMemoryMetricReader
13+
from opentelemetry.sdk.trace.export.in_memory_span_exporter import (
14+
InMemorySpanExporter,
15+
)
16+
from opentelemetry.trace import SpanKind
17+
18+
from plain.jobs import Job
19+
20+
21+
class _NoopJob(Job):
22+
def run(self) -> None:
23+
pass
24+
25+
26+
class _ExclusiveJob(Job):
27+
"""Job that always reports `should_enqueue=False` to exercise the
28+
skipped-enqueue branch without needing pre-existing rows."""
29+
30+
def run(self) -> None:
31+
pass
32+
33+
def should_enqueue(self, concurrency_key: str) -> bool:
34+
return False
35+
36+
37+
@pytest.mark.usefixtures("db")
38+
def test_enqueue_emits_send_span(otel_spans: InMemorySpanExporter) -> None:
39+
_NoopJob().run_in_worker()
40+
41+
spans = [s for s in otel_spans.get_finished_spans() if s.name == "send default"]
42+
assert spans, "expected a `send default` PRODUCER span"
43+
span = spans[-1]
44+
attrs = span.attributes
45+
assert attrs is not None
46+
assert span.kind == SpanKind.PRODUCER
47+
assert attrs["messaging.system"] == "plain.jobs"
48+
assert attrs["messaging.operation.type"] == "send"
49+
assert attrs["messaging.operation.name"] == "send"
50+
assert attrs["messaging.destination.name"] == "default"
51+
assert "messaging.message.id" in attrs
52+
assert "code.function.name" in attrs
53+
54+
55+
@pytest.mark.usefixtures("db")
56+
def test_enqueue_skipped_marks_span(otel_spans: InMemorySpanExporter) -> None:
57+
result = _ExclusiveJob().run_in_worker(concurrency_key="busy")
58+
59+
assert result is None
60+
span = next(s for s in otel_spans.get_finished_spans() if s.name == "send default")
61+
assert span.attributes is not None
62+
assert span.attributes["job.enqueue.skipped"] is True
63+
64+
65+
@pytest.mark.usefixtures("db")
66+
def test_enqueue_failure_records_error_type_on_metric(
67+
monkeypatch: pytest.MonkeyPatch,
68+
otel_spans: InMemorySpanExporter,
69+
otel_metrics: InMemoryMetricReader,
70+
) -> None:
71+
# The success path defers metric recording to `transaction.on_commit`,
72+
# which never fires under the test rollback. The failure path records
73+
# immediately, so it's the one we can assert on here.
74+
def _boom(*args, **kwargs):
75+
raise RuntimeError("save failed")
76+
77+
from plain.jobs.models import JobRequest
78+
79+
monkeypatch.setattr(JobRequest, "save", _boom)
80+
81+
with pytest.raises(RuntimeError):
82+
_NoopJob().run_in_worker()
83+
84+
data = otel_metrics.get_metrics_data()
85+
assert data is not None
86+
sent_points = [
87+
p
88+
for rm in data.resource_metrics
89+
for sm in rm.scope_metrics
90+
for m in sm.metrics
91+
if m.name == "messaging.client.sent.messages"
92+
for p in m.data.data_points
93+
]
94+
assert sent_points, "expected sent_messages counter point on failure"
95+
assert all(p.attributes.get("error.type") == "RuntimeError" for p in sent_points)
96+
assert all(
97+
p.attributes.get("messaging.system") == "plain.jobs" for p in sent_points
98+
)

0 commit comments

Comments
 (0)