Skip to content

Commit

Permalink
fix(psycopg): support async iteration (#9940)
Browse files Browse the repository at this point in the history
Motivation:

Previously, using async iteration with `psycopg` (`async for`) would
throw an exception.


Example:

```
def connect_to_db():
    # db connection logic...

async def testfunc():
    connectstr = get_conn_str()
    async with AsyncConnectionPool(
        conninfo=connectstr, min_size=4, max_size=30
    ) as async_pool:
        async with async_pool.connection() as conn:
            async with conn.cursor() as cur:
                await cur.execute(
                    sql.SQL(
                        """
                        SELECT
                            *
                        FROM table1
                        LIMIT 5
                    """
                    )
                )
                async for row in cur:
                    print(row)
```

Running `ddtrace-run python test.py` would return `TypeError: 'async
for' requires an object with __aiter__ method, got
Psycopg3TracedAsyncCursor`.

This fix implements the `__aiter__` method for the `TracedAsyncCursor`
parent class, and has `Psycopg3TracedAsyncCursor` inherit from it.

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met 
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

---------

Co-authored-by: Munir Abdinur <munir.abdinur@datadoghq.com>
  • Loading branch information
quinna-h and mabdinur authored Jul 30, 2024
1 parent 07f0752 commit d51fa8d
Show file tree
Hide file tree
Showing 4 changed files with 46 additions and 0 deletions.
3 changes: 3 additions & 0 deletions ddtrace/contrib/dbapi_async/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ async def __aenter__(self):

return self

def __aiter__(self):
return self.__wrapped__.__aiter__()

async def __aexit__(self, exc_type, exc_val, exc_tb):
# previous versions of the dbapi didn't support context managers. let's
# reference the func that would be called to ensure that error
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
---
fixes:
- |
psycopg: Ensures traced async cursors return an asynchronous iterator object.
21 changes: 21 additions & 0 deletions tests/contrib/dbapi_async/test_dbapi_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,27 @@ async def test_fetchone_wrapped_is_called_and_returned(self):
assert "__result__" == await traced_cursor.fetchone("arg_1", kwarg1="kwarg1")
cursor.fetchone.assert_called_once_with("arg_1", kwarg1="kwarg1")

@mark_asyncio
async def test_cursor_async_connection(self):
"""Checks whether connection can execute operations with async iteration."""

def method():
pass

async with TracedAsyncCursor(self.cursor, Pin("dbapi_service", tracer=self.tracer), {}) as cursor:
await cursor.execute("""select 'one' as x""")
await cursor.execute("""select 'blah'""")

async for row in cursor:
spans = self.get_spans()
assert len(spans) == 2
assert spans[0].name == "postgres.query"
assert spans[0].resource == "select ?"
assert spans[0].service == "dbapi_service"
assert spans[1].name == "postgres.query"
assert spans[1].resource == "select ?"
assert spans[1].service == "dbapi_service"

@mark_asyncio
async def test_fetchall_wrapped_is_called_and_returned(self):
cursor = self.cursor
Expand Down
18 changes: 18 additions & 0 deletions tests/contrib/psycopg/test_psycopg_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -337,3 +337,21 @@ async def test_cursor_from_connection_shortcut(self):
rows = await cur.fetchall()
assert len(rows) == 1, rows
assert rows[0][0] == "one"

async def test_cursor_async_connect_execute(self):
"""Checks whether connection can execute operations with async iteration."""

async with psycopg.AsyncConnection.connect(**POSTGRES_CONFIG) as conn:
async with conn.cursor() as cur:
await cur.execute("""select 'one' as x""")
await cur.execute("""select 'blah'""")

async for row in cur:
spans = self.get_spans()
assert len(spans) == 2
assert spans[0].name == "postgres.query"
assert spans[0].resource == "select ?"
assert spans[0].service == "postgres"
assert spans[1].name == "postgres.query"
assert spans[1].resource == "select ?"
assert spans[1].service == "postgres"

0 comments on commit d51fa8d

Please sign in to comment.