Skip to content

Surface cursor.rowcount in PostgresOperator #9834

@cjsekl

Description

@cjsekl

Description

The Cursor object in psycopg2 provides a useful feature of logging how many rows were affected by the last execute.
https://www.psycopg.org/docs/cursor.html#cursor.rowcount
This can be added as a parameter to PostgresOperator and passed through to DbApiHook.run to log affected row after each query.

The operator parameter could be a verbose flag that when enabled logs the SQL statement before running it and the rows affected after.

Example of conn.cursor logging:

>>> sql_statements = ['select 1;',
...                   'insert into jeklund.rowtest (player_id) select 1;',
...                   'select now()::date - generate_series(0, 59);']
>>> with closing(conn.cursor()) as cur:
...     for sql_statement in sql_statements:
...             print('Running {}'.format(sql_statement))
...             cur.execute(sql_statement)
...             print('Affected rows: {}'.format(cur.rowcount))
...
Running select 1;
Affected rows: 1
Running insert into jeklund.rowtest (player_id) select 1;
Affected rows: 1
Running select now()::date - generate_series(0, 59);
Affected rows: 60

Use case / motivation

Having rows affected in Airflow task logs is useful for monitoring and debugging multi step ETLs.
Making it easy to spot cases where e.g. an INSERT query unexpectedly inserted 0 rows.

Related Issues

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions