diff --git a/sentry_sdk/integrations/asyncpg.py b/sentry_sdk/integrations/asyncpg.py index 9b37119564..5b0d75ee1a 100644 --- a/sentry_sdk/integrations/asyncpg.py +++ b/sentry_sdk/integrations/asyncpg.py @@ -143,8 +143,17 @@ async def _inner(*args: "Any", **kwargs: "Any") -> "T": params_list = args[2] if len(args) > 2 else None with _record(None, query, params_list, executemany=executemany) as span: _set_db_data(span, args[0]) + res = await f(*args, **kwargs) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + if not isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + return res return _inner @@ -163,8 +172,17 @@ def _inner(*args: "Any", **kwargs: "Any") -> "T": # noqa: N807 executemany=False, ) as span: _set_db_data(span, args[0]) + res = f(*args, **kwargs) + if isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + + if not isinstance(span, StreamedSpan): + with capture_internal_exceptions(): + add_query_source(span) + return res return _inner diff --git a/tests/integrations/asyncpg/test_asyncpg.py b/tests/integrations/asyncpg/test_asyncpg.py index 35195e757b..289f794d5d 100644 --- a/tests/integrations/asyncpg/test_asyncpg.py +++ b/tests/integrations/asyncpg/test_asyncpg.py @@ -731,7 +731,10 @@ async def test_query_source(sentry_init, capture_events, capture_items, span_str @pytest.mark.asyncio -async def test_query_source_with_module_in_search_path(sentry_init, capture_events): +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_with_module_in_search_path( + sentry_init, capture_events, capture_items, span_streaming +): """ Test that query source is relative to the path of the module it ran in """ @@ -740,40 +743,71 @@ async def test_query_source_with_module_in_search_path(sentry_init, capture_even traces_sample_rate=1.0, enable_db_query_source=True, db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, ) - events = capture_events() - from asyncpg_helpers.helpers import execute_query_in_connection - with start_transaction(name="test_transaction", sampled=True): - conn: Connection = await connect(PG_CONNECTION_URI) + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) - await execute_query_in_connection( - "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", - conn, - ) + await execute_query_in_connection( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + conn, + ) - await conn.close() + await conn.close() + sentry_sdk.flush() - (event,) = events + spans = [item.payload for item in items] - span = event["spans"][-1] - assert span["description"].startswith("INSERT INTO") + assert len(spans) == 3 - data = span.get("data", {}) + connect_span = spans[0] + insert_span = spans[1] + segment = spans[2] - assert SPANDATA.CODE_LINENO in data + assert segment["name"] == "test_transaction" + assert insert_span["name"].startswith("INSERT INTO") + assert connect_span["name"] == "connect" + data = insert_span.get("attributes", {}) + else: + events = capture_events() + + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + + await execute_query_in_connection( + "INSERT INTO users(name, password, dob) VALUES ('Alice', 'secret', '1990-12-25')", + conn, + ) + + await conn.close() + + (event,) = events + + span = event["spans"][-1] + assert span["description"].startswith("INSERT INTO") + data = span.get("data", {}) + + lineno_key = "code.line.number" if span_streaming else SPANDATA.CODE_LINENO + filepath_key = "code.file.path" if span_streaming else SPANDATA.CODE_FILEPATH + + assert lineno_key in data + assert filepath_key in data assert SPANDATA.CODE_NAMESPACE in data - assert SPANDATA.CODE_FILEPATH in data assert SPANDATA.CODE_FUNCTION in data - assert type(data.get(SPANDATA.CODE_LINENO)) == int - assert data.get(SPANDATA.CODE_LINENO) > 0 + assert type(data.get(lineno_key)) == int + assert data.get(lineno_key) > 0 + assert data.get(filepath_key) == "asyncpg_helpers/helpers.py" assert data.get(SPANDATA.CODE_NAMESPACE) == "asyncpg_helpers.helpers" - assert data.get(SPANDATA.CODE_FILEPATH) == "asyncpg_helpers/helpers.py" - is_relative_path = data.get(SPANDATA.CODE_FILEPATH)[0] != os.sep + is_relative_path = data.get(filepath_key)[0] != os.sep assert is_relative_path assert data.get(SPANDATA.CODE_FUNCTION) == "execute_query_in_connection" @@ -1102,3 +1136,260 @@ def before_send_transaction(event, hint): assert len(spans) == 1 assert spans[0]["description"] == "filtered" + + +def _assert_query_source(span, span_streaming, expected_function): + if span_streaming: + data = span.get("attributes", {}) + lineno_key = "code.line.number" + filepath_key = "code.file.path" + else: + data = span.get("data", {}) + lineno_key = SPANDATA.CODE_LINENO + filepath_key = SPANDATA.CODE_FILEPATH + + assert lineno_key in data + assert filepath_key in data + assert SPANDATA.CODE_NAMESPACE in data + assert SPANDATA.CODE_FUNCTION in data + + assert type(data.get(lineno_key)) == int + assert data.get(lineno_key) > 0 + assert data[SPANDATA.CODE_NAMESPACE] == "tests.integrations.asyncpg.test_asyncpg" + assert data.get(filepath_key).endswith("tests/integrations/asyncpg/test_asyncpg.py") + assert data.get(filepath_key)[0] != os.sep + assert data[SPANDATA.CODE_FUNCTION] == expected_function + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_execute( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + "Alice", + "pw", + datetime.date(1990, 12, 25), + ) + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 3 + + connect_span = spans[0] + query_span = spans[1] + segment = spans[2] + + assert connect_span["name"] == "connect" + assert query_span["name"].startswith("INSERT INTO") + assert segment["name"] == "test_transaction" + assert segment["is_segment"] is True + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.execute( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + "Alice", + "pw", + datetime.date(1990, 12, 25), + ) + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 2 + assert spans[0]["description"] == "connect" + assert spans[1]["description"].startswith("INSERT INTO") + query_span = spans[1] + + _assert_query_source(query_span, span_streaming, "test_query_source_execute") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_executemany( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [("Bob", "secret_pw", datetime.date(1984, 3, 1))], + ) + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 3 + + connect_span = spans[0] + query_span = spans[1] + segment = spans[2] + + assert connect_span["name"] == "connect" + assert query_span["name"].startswith("INSERT INTO") + assert segment["name"] == "test_transaction" + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.executemany( + "INSERT INTO users(name, password, dob) VALUES($1, $2, $3)", + [("Bob", "secret_pw", datetime.date(1984, 3, 1))], + ) + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 2 + assert spans[0]["description"] == "connect" + assert spans[1]["description"].startswith("INSERT INTO") + query_span = spans[1] + + _assert_query_source(query_span, span_streaming, "test_query_source_executemany") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_prepare( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.prepare("SELECT * FROM users WHERE name = $1") + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 3 + connect_span = spans[0] + query_span = spans[1] + segment = spans[2] + + assert connect_span["name"] == "connect" + assert query_span["name"] == "SELECT * FROM users WHERE name = $1" + assert segment["name"] == "test_transaction" + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + await conn.prepare("SELECT * FROM users WHERE name = $1") + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 2 + assert spans[0]["description"] == "connect" + assert spans[1]["description"] == "SELECT * FROM users WHERE name = $1" + query_span = spans[1] + + _assert_query_source(query_span, span_streaming, "test_query_source_prepare") + + +@pytest.mark.asyncio +@pytest.mark.parametrize("span_streaming", [True, False]) +async def test_query_source_cursor( + sentry_init, capture_events, capture_items, span_streaming +): + sentry_init( + integrations=[AsyncPGIntegration()], + traces_sample_rate=1.0, + enable_db_query_source=True, + db_query_source_threshold_ms=0, + _experiments={ + "trace_lifecycle": "stream" if span_streaming else "static", + }, + ) + + if span_streaming: + items = capture_items("span") + with sentry_sdk.traces.start_span(name="test_transaction"): + conn: Connection = await connect(PG_CONNECTION_URI) + async with conn.transaction(): + async for _ in conn.cursor( + "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) + ): + pass + await conn.close() + sentry_sdk.flush() + + spans = [item.payload for item in items] + assert len(spans) == 5 + connect_span = spans[0] + begin_span = spans[1] + query_span = spans[2] + commit_span = spans[3] + segment = spans[4] + + assert connect_span["name"] == "connect" + assert begin_span["name"] == "BEGIN;" + assert query_span["name"] == "SELECT * FROM users WHERE dob > $1" + assert commit_span["name"] == "COMMIT;" + assert segment["name"] == "test_transaction" + else: + events = capture_events() + with start_transaction(name="test_transaction", sampled=True): + conn: Connection = await connect(PG_CONNECTION_URI) + async with conn.transaction(): + async for _ in conn.cursor( + "SELECT * FROM users WHERE dob > $1", datetime.date(1970, 1, 1) + ): + pass + await conn.close() + + (event,) = events + spans = event["spans"] + assert len(spans) == 4 + + connect_span = spans[0] + begin_span = spans[1] + query_span = spans[2] + commit_span = spans[3] + + assert connect_span["description"] == "connect" + assert begin_span["description"] == "BEGIN;" + assert query_span["description"] == "SELECT * FROM users WHERE dob > $1" + assert commit_span["description"] == "COMMIT;" + + _assert_query_source(query_span, span_streaming, "test_query_source_cursor")