|
1 | 1 | import sentry_sdk |
| 2 | +from collections import deque |
| 3 | +from contextlib import contextmanager |
2 | 4 | from sentry_sdk import start_span |
3 | 5 | from sentry_sdk.consts import OP, SPANDATA |
4 | 6 | from sentry_sdk.integrations import Integration, DidNotEnable |
|
16 | 18 | ) |
17 | 19 |
|
18 | 20 | from typing import TYPE_CHECKING |
| 21 | +from weakref import WeakSet |
19 | 22 |
|
20 | 23 | if TYPE_CHECKING: |
21 | 24 | from typing import Any |
| 25 | + from typing import Iterator |
22 | 26 |
|
23 | 27 |
|
24 | 28 | import importlib.util |
25 | 29 |
|
26 | 30 | if importlib.util.find_spec("pyreqwest") is None: |
27 | 31 | raise DidNotEnable("pyreqwest is not installed") |
28 | 32 |
|
| 33 | +_MAX_TRACKED_NON_WEAKREF_BUILDERS = 2048 |
| 34 | +_instrumented_builders = WeakSet() # type: "WeakSet[Any]" |
| 35 | +_instrumented_non_weakref_builder_ids = set() # type: "set[int]" |
| 36 | +_instrumented_non_weakref_builder_ids_order = deque() # type: "deque[int]" |
| 37 | + |
29 | 38 |
|
30 | 39 | class PyreqwestIntegration(Integration): |
31 | 40 | identifier = "pyreqwest" |
@@ -74,24 +83,52 @@ def _patch_builder_method(cls: type, method_name: str, middleware: "Any") -> Non |
74 | 83 | original_method = getattr(cls, method_name) |
75 | 84 |
|
76 | 85 | def sentry_patched_method(self: "Any", *args: "Any", **kwargs: "Any") -> "Any": |
77 | | - if not getattr(self, "_sentry_instrumented", False): |
| 86 | + if not _builder_is_instrumented(self): |
78 | 87 | integration = sentry_sdk.get_client().get_integration(PyreqwestIntegration) |
79 | 88 | if integration is not None: |
80 | 89 | self.with_middleware(middleware) |
81 | | - try: |
82 | | - self._sentry_instrumented = True |
83 | | - except (TypeError, AttributeError): |
84 | | - # In case the instance itself is immutable or doesn't allow extra attributes |
85 | | - pass |
| 90 | + _mark_builder_instrumented(self) |
86 | 91 | return original_method(self, *args, **kwargs) |
87 | 92 |
|
88 | 93 | setattr(cls, method_name, sentry_patched_method) |
89 | 94 |
|
90 | 95 |
|
91 | | -async def sentry_async_middleware(request: "Any", next_handler: "Any") -> "Any": |
92 | | - if sentry_sdk.get_client().get_integration(PyreqwestIntegration) is None: |
93 | | - return await next_handler.run(request) |
| 96 | +def _builder_is_instrumented(builder: "Any") -> bool: |
| 97 | + if getattr(builder, "_sentry_instrumented", False): |
| 98 | + return True |
| 99 | + |
| 100 | + if id(builder) in _instrumented_non_weakref_builder_ids: |
| 101 | + return True |
| 102 | + |
| 103 | + try: |
| 104 | + return builder in _instrumented_builders |
| 105 | + except TypeError: |
| 106 | + return False |
| 107 | + |
94 | 108 |
|
| 109 | +def _mark_builder_instrumented(builder: "Any") -> None: |
| 110 | + try: |
| 111 | + _instrumented_builders.add(builder) |
| 112 | + except TypeError: |
| 113 | + builder_id = id(builder) |
| 114 | + if builder_id not in _instrumented_non_weakref_builder_ids: |
| 115 | + _instrumented_non_weakref_builder_ids.add(builder_id) |
| 116 | + _instrumented_non_weakref_builder_ids_order.append(builder_id) |
| 117 | + if ( |
| 118 | + len(_instrumented_non_weakref_builder_ids_order) |
| 119 | + > _MAX_TRACKED_NON_WEAKREF_BUILDERS |
| 120 | + ): |
| 121 | + old_builder_id = _instrumented_non_weakref_builder_ids_order.popleft() |
| 122 | + _instrumented_non_weakref_builder_ids.discard(old_builder_id) |
| 123 | + |
| 124 | + try: |
| 125 | + builder._sentry_instrumented = True |
| 126 | + except (TypeError, AttributeError): |
| 127 | + pass |
| 128 | + |
| 129 | + |
| 130 | +@contextmanager |
| 131 | +def _sentry_middleware_span(request: "Any") -> "Iterator[Any]": |
95 | 132 | parsed_url = None |
96 | 133 | with capture_internal_exceptions(): |
97 | 134 | parsed_url = parse_url(str(request.url), sanitize=False) |
@@ -127,60 +164,29 @@ async def sentry_async_middleware(request: "Any", next_handler: "Any") -> "Any": |
127 | 164 | else: |
128 | 165 | request.headers[key] = value |
129 | 166 |
|
130 | | - response = await next_handler.run(request) |
131 | | - |
132 | | - span.set_http_status(response.status) |
| 167 | + yield span |
133 | 168 |
|
134 | 169 | with capture_internal_exceptions(): |
135 | 170 | add_http_request_source(span) |
136 | 171 |
|
137 | | - return response |
138 | | - |
139 | 172 |
|
140 | | -def sentry_sync_middleware(request: "Any", next_handler: "Any") -> "Any": |
| 173 | +async def sentry_async_middleware(request: "Any", next_handler: "Any") -> "Any": |
141 | 174 | if sentry_sdk.get_client().get_integration(PyreqwestIntegration) is None: |
142 | | - return next_handler.run(request) |
| 175 | + return await next_handler.run(request) |
143 | 176 |
|
144 | | - parsed_url = None |
145 | | - with capture_internal_exceptions(): |
146 | | - parsed_url = parse_url(str(request.url), sanitize=False) |
| 177 | + with _sentry_middleware_span(request) as span: |
| 178 | + response = await next_handler.run(request) |
| 179 | + span.set_http_status(response.status) |
147 | 180 |
|
148 | | - with start_span( |
149 | | - op=OP.HTTP_CLIENT, |
150 | | - name="%s %s" |
151 | | - % ( |
152 | | - request.method, |
153 | | - parsed_url.url if parsed_url else SENSITIVE_DATA_SUBSTITUTE, |
154 | | - ), |
155 | | - origin=PyreqwestIntegration.origin, |
156 | | - ) as span: |
157 | | - span.set_data(SPANDATA.HTTP_METHOD, request.method) |
158 | | - if parsed_url is not None: |
159 | | - span.set_data("url", parsed_url.url) |
160 | | - span.set_data(SPANDATA.HTTP_QUERY, parsed_url.query) |
161 | | - span.set_data(SPANDATA.HTTP_FRAGMENT, parsed_url.fragment) |
| 181 | + return response |
162 | 182 |
|
163 | | - if should_propagate_trace(sentry_sdk.get_client(), str(request.url)): |
164 | | - for ( |
165 | | - key, |
166 | | - value, |
167 | | - ) in sentry_sdk.get_current_scope().iter_trace_propagation_headers(): |
168 | | - logger.debug( |
169 | | - "[Tracing] Adding `{key}` header {value} to outgoing request to {url}.".format( |
170 | | - key=key, value=value, url=request.url |
171 | | - ) |
172 | | - ) |
173 | 183 |
|
174 | | - if key == BAGGAGE_HEADER_NAME: |
175 | | - add_sentry_baggage_to_headers(request.headers, value) |
176 | | - else: |
177 | | - request.headers[key] = value |
| 184 | +def sentry_sync_middleware(request: "Any", next_handler: "Any") -> "Any": |
| 185 | + if sentry_sdk.get_client().get_integration(PyreqwestIntegration) is None: |
| 186 | + return next_handler.run(request) |
178 | 187 |
|
| 188 | + with _sentry_middleware_span(request) as span: |
179 | 189 | response = next_handler.run(request) |
180 | | - |
181 | 190 | span.set_http_status(response.status) |
182 | 191 |
|
183 | | - with capture_internal_exceptions(): |
184 | | - add_http_request_source(span) |
185 | | - |
186 | 192 | return response |
0 commit comments