|
27 | 27 | from plain import postgres |
28 | 28 | from plain.postgres import transaction |
29 | 29 | from plain.utils import timezone |
30 | | -from plain.utils.otel import format_exception_type |
31 | 30 |
|
32 | 31 | from .locks import postgres_advisory_lock |
33 | | -from .otel import operation_duration_histogram, sent_messages_counter, tracer |
| 32 | +from .otel import ( |
| 33 | + operation_duration_histogram, |
| 34 | + record_span_error, |
| 35 | + sent_messages_counter, |
| 36 | + tracer, |
| 37 | +) |
34 | 38 | from .registry import JobParameters, jobs_registry |
35 | 39 |
|
36 | 40 | if TYPE_CHECKING: |
@@ -89,12 +93,16 @@ def run_in_worker( |
89 | 93 | } |
90 | 94 | start_time = time.perf_counter() |
91 | 95 | skipped = False |
92 | | - try: |
93 | | - with tracer.start_as_current_span( |
94 | | - f"send {queue}", |
95 | | - kind=SpanKind.PRODUCER, |
96 | | - attributes={**metric_attributes, MESSAGING_OPERATION_NAME: "send"}, |
97 | | - ) as span: |
| 96 | + with tracer.start_as_current_span( |
| 97 | + f"send {queue}", |
| 98 | + kind=SpanKind.PRODUCER, |
| 99 | + attributes={**metric_attributes, MESSAGING_OPERATION_NAME: "send"}, |
| 100 | + # We record manually via record_span_error (escaped=True) at the |
| 101 | + # workflow boundary; suppress the SDK's escaped=False auto-record |
| 102 | + # so failed sends carry a single, correctly-marked event. |
| 103 | + record_exception=False, |
| 104 | + ) as span: |
| 105 | + try: |
98 | 106 | try: |
99 | 107 | frame = sys._getframe(1) |
100 | 108 | filename = frame.f_code.co_filename |
@@ -176,30 +184,33 @@ def run_in_worker( |
176 | 184 | ) |
177 | 185 |
|
178 | 186 | return job_request |
179 | | - except Exception as e: |
180 | | - metric_attributes[ERROR_TYPE] = format_exception_type(e) |
181 | | - raise |
182 | | - finally: |
183 | | - # Skipped enqueues are visible on the span (`job.enqueue.skipped`) |
184 | | - # but do not fire the messaging counter — no message was sent, so |
185 | | - # there's nothing for `messaging.client.sent.messages` to count. |
186 | | - if not skipped: |
187 | | - duration = time.perf_counter() - start_time |
188 | | - if ERROR_TYPE in metric_attributes: |
189 | | - # No commit is coming — record now so failed sends are visible. |
190 | | - sent_messages_counter.add(1, metric_attributes) |
191 | | - operation_duration_histogram.record(duration, metric_attributes) |
192 | | - else: |
193 | | - # Defer to the outer commit so a caller-level rollback |
194 | | - # doesn't leave a phantom send. Runs immediately if not |
195 | | - # inside a transaction. |
196 | | - attrs = metric_attributes |
197 | | - |
198 | | - def _emit() -> None: |
199 | | - sent_messages_counter.add(1, attrs) |
200 | | - operation_duration_histogram.record(duration, attrs) |
201 | | - |
202 | | - transaction.on_commit(_emit) |
| 187 | + except Exception as e: |
| 188 | + # Stamps escaped=True on the span event, ERROR_TYPE on both |
| 189 | + # the span and `metric_attributes` (so the finally below |
| 190 | + # picks up the failed-send branch). |
| 191 | + record_span_error(span, e, metric_attributes) |
| 192 | + raise |
| 193 | + finally: |
| 194 | + # Skipped enqueues are visible on the span (`job.enqueue.skipped`) |
| 195 | + # but do not fire the messaging counter — no message was sent, so |
| 196 | + # there's nothing for `messaging.client.sent.messages` to count. |
| 197 | + if not skipped: |
| 198 | + duration = time.perf_counter() - start_time |
| 199 | + if ERROR_TYPE in metric_attributes: |
| 200 | + # No commit is coming — record now so failed sends are visible. |
| 201 | + sent_messages_counter.add(1, metric_attributes) |
| 202 | + operation_duration_histogram.record(duration, metric_attributes) |
| 203 | + else: |
| 204 | + # Defer to the outer commit so a caller-level rollback |
| 205 | + # doesn't leave a phantom send. Runs immediately if not |
| 206 | + # inside a transaction. |
| 207 | + attrs = metric_attributes |
| 208 | + |
| 209 | + def _emit() -> None: |
| 210 | + sent_messages_counter.add(1, attrs) |
| 211 | + operation_duration_histogram.record(duration, attrs) |
| 212 | + |
| 213 | + transaction.on_commit(_emit) |
203 | 214 |
|
204 | 215 | def get_requested_jobs( |
205 | 216 | self, *, concurrency_key: str | None = None, include_retries: bool = False |
|
0 commit comments