-
Notifications
You must be signed in to change notification settings - Fork 612
feat(asyncpg): Add span streaming support #6215
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
fd3ee4e
f21ad94
5e8fbcb
2869f76
6bb9b6c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,17 +1,23 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import contextlib | ||
| import re | ||
| from typing import Any, TypeVar, Callable, Awaitable, Iterator | ||
| from typing import Any, Awaitable, Callable, Iterator, TypeVar, Union | ||
|
|
||
| import sentry_sdk | ||
| from sentry_sdk.consts import OP, SPANDATA | ||
| from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable | ||
| from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version | ||
| from sentry_sdk.traces import StreamedSpan | ||
| from sentry_sdk.tracing import Span | ||
| from sentry_sdk.tracing_utils import add_query_source, record_sql_queries | ||
| from sentry_sdk.tracing_utils import ( | ||
| add_query_source, | ||
| has_span_streaming_enabled, | ||
| record_sql_queries_supporting_streaming, | ||
| ) | ||
| from sentry_sdk.utils import ( | ||
| capture_internal_exceptions, | ||
| ensure_integration_enabled, | ||
| parse_version, | ||
| capture_internal_exceptions, | ||
| ) | ||
|
|
||
| try: | ||
|
|
@@ -62,7 +68,8 @@ def _normalize_query(query: str) -> str: | |
|
|
||
| def _wrap_execute(f: "Callable[..., Awaitable[T]]") -> "Callable[..., Awaitable[T]]": | ||
| async def _inner(*args: "Any", **kwargs: "Any") -> "T": | ||
| if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None: | ||
| client = sentry_sdk.get_client() | ||
| if client.get_integration(AsyncPGIntegration) is None: | ||
| return await f(*args, **kwargs) | ||
|
|
||
| # Avoid recording calls to _execute twice. | ||
|
|
@@ -73,7 +80,7 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": | |
| return await f(*args, **kwargs) | ||
|
|
||
| query = _normalize_query(args[1]) | ||
| with record_sql_queries( | ||
| with record_sql_queries_supporting_streaming( | ||
| cursor=None, | ||
| query=query, | ||
| params_list=None, | ||
|
|
@@ -82,9 +89,13 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": | |
| span_origin=AsyncPGIntegration.origin, | ||
| ) as span: | ||
| res = await f(*args, **kwargs) | ||
| if isinstance(span, StreamedSpan): | ||
| with capture_internal_exceptions(): | ||
| add_query_source(span) | ||
|
|
||
| with capture_internal_exceptions(): | ||
| add_query_source(span) | ||
| if not isinstance(span, StreamedSpan): | ||
|
ericapisani marked this conversation as resolved.
|
||
| with capture_internal_exceptions(): | ||
| add_query_source(span) | ||
|
|
||
| return res | ||
|
|
||
|
|
@@ -101,15 +112,16 @@ def _record( | |
| params_list: "tuple[Any, ...] | None", | ||
| *, | ||
| executemany: bool = False, | ||
| ) -> "Iterator[Span]": | ||
| integration = sentry_sdk.get_client().get_integration(AsyncPGIntegration) | ||
| ) -> "Iterator[Union[Span, StreamedSpan]]": | ||
| client = sentry_sdk.get_client() | ||
| integration = client.get_integration(AsyncPGIntegration) | ||
| if integration is not None and not integration._record_params: | ||
| params_list = None | ||
|
|
||
| param_style = "pyformat" if params_list else None | ||
|
|
||
| query = _normalize_query(query) | ||
| with record_sql_queries( | ||
| with record_sql_queries_supporting_streaming( | ||
|
ericapisani marked this conversation as resolved.
|
||
| cursor=cursor, | ||
| query=query, | ||
| params_list=params_list, | ||
|
ericapisani marked this conversation as resolved.
|
||
|
|
@@ -152,7 +164,6 @@ def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807 | |
| ) as span: | ||
| _set_db_data(span, args[0]) | ||
| res = f(*args, **kwargs) | ||
| span.set_data("db.cursor", res) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I know we're not porting this to streamed spans, which is 💯 the right call, but are we removing it for legacy spans, too? I'd keep it in the old path, to reduce the odds of breaking a user. We'll anyway get rid of this in the next major when span streaming is the default.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
We aren't 👍🏻 It's non-obvious from this diff, but because we're calling |
||
|
|
||
| return res | ||
|
|
||
|
|
@@ -163,56 +174,85 @@ def _wrap_connect_addr( | |
| f: "Callable[..., Awaitable[T]]", | ||
| ) -> "Callable[..., Awaitable[T]]": | ||
| async def _inner(*args: "Any", **kwargs: "Any") -> "T": | ||
| if sentry_sdk.get_client().get_integration(AsyncPGIntegration) is None: | ||
| client = sentry_sdk.get_client() | ||
| if client.get_integration(AsyncPGIntegration) is None: | ||
| return await f(*args, **kwargs) | ||
|
|
||
| user = kwargs["params"].user | ||
| database = kwargs["params"].database | ||
|
|
||
| with sentry_sdk.start_span( | ||
| op=OP.DB, | ||
| name="connect", | ||
| origin=AsyncPGIntegration.origin, | ||
| ) as span: | ||
| span.set_data(SPANDATA.DB_SYSTEM, "postgresql") | ||
| addr = kwargs.get("addr") | ||
| addr = kwargs.get("addr") | ||
|
|
||
| if has_span_streaming_enabled(client.options): | ||
| span_attributes = { | ||
| "sentry.op": OP.DB, | ||
| "sentry.origin": AsyncPGIntegration.origin, | ||
| SPANDATA.DB_SYSTEM: "postgresql", | ||
| SPANDATA.DB_USER: user, | ||
| SPANDATA.DB_NAME: database, | ||
| SPANDATA.DB_DRIVER_NAME: "asyncpg", | ||
| } | ||
| if addr: | ||
| try: | ||
| span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) | ||
| span.set_data(SPANDATA.SERVER_PORT, addr[1]) | ||
| span_attributes[SPANDATA.SERVER_ADDRESS] = addr[0] | ||
| span_attributes[SPANDATA.SERVER_PORT] = addr[1] | ||
| except IndexError: | ||
| pass | ||
| span.set_data(SPANDATA.DB_NAME, database) | ||
| span.set_data(SPANDATA.DB_USER, user) | ||
| span.set_data(SPANDATA.DB_DRIVER_NAME, "asyncpg") | ||
|
|
||
| with capture_internal_exceptions(): | ||
| sentry_sdk.add_breadcrumb( | ||
| message="connect", category="query", data=span._data | ||
| ) | ||
| res = await f(*args, **kwargs) | ||
| with sentry_sdk.traces.start_span( | ||
| name="connect", attributes=span_attributes | ||
| ) as span: | ||
| with capture_internal_exceptions(): | ||
| sentry_sdk.add_breadcrumb( | ||
| message="connect", category="query", data=span_attributes | ||
| ) | ||
| res = await f(*args, **kwargs) | ||
|
|
||
| else: | ||
| with sentry_sdk.start_span( | ||
| op=OP.DB, | ||
| name="connect", | ||
| origin=AsyncPGIntegration.origin, | ||
| ) as span: | ||
| span.set_data(SPANDATA.DB_SYSTEM, "postgresql") | ||
| if addr: | ||
| try: | ||
| span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) | ||
| span.set_data(SPANDATA.SERVER_PORT, addr[1]) | ||
| except IndexError: | ||
| pass | ||
| span.set_data(SPANDATA.DB_NAME, database) | ||
| span.set_data(SPANDATA.DB_USER, user) | ||
| span.set_data(SPANDATA.DB_DRIVER_NAME, "asyncpg") | ||
|
|
||
| with capture_internal_exceptions(): | ||
| sentry_sdk.add_breadcrumb( | ||
| message="connect", category="query", data=span._data | ||
| ) | ||
| res = await f(*args, **kwargs) | ||
|
|
||
| return res | ||
|
|
||
| return _inner | ||
|
|
||
|
|
||
| def _set_db_data(span: "Span", conn: "Any") -> None: | ||
| span.set_data(SPANDATA.DB_SYSTEM, "postgresql") | ||
| span.set_data(SPANDATA.DB_DRIVER_NAME, "asyncpg") | ||
| def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None: | ||
| set_value = span.set_attribute if isinstance(span, StreamedSpan) else span.set_data | ||
|
|
||
| set_value(SPANDATA.DB_SYSTEM, "postgresql") | ||
| set_value(SPANDATA.DB_DRIVER_NAME, "asyncpg") | ||
|
|
||
| addr = conn._addr | ||
| if addr: | ||
| try: | ||
| span.set_data(SPANDATA.SERVER_ADDRESS, addr[0]) | ||
| span.set_data(SPANDATA.SERVER_PORT, addr[1]) | ||
| set_value(SPANDATA.SERVER_ADDRESS, addr[0]) | ||
| set_value(SPANDATA.SERVER_PORT, addr[1]) | ||
| except IndexError: | ||
| pass | ||
|
|
||
| database = conn._params.database | ||
| if database: | ||
| span.set_data(SPANDATA.DB_NAME, database) | ||
| set_value(SPANDATA.DB_NAME, database) | ||
|
|
||
| user = conn._params.user | ||
| if user: | ||
| span.set_data(SPANDATA.DB_USER, user) | ||
| set_value(SPANDATA.DB_USER, user) | ||
Uh oh!
There was an error while loading. Please reload this page.