Skip to content

SQLTableCheckOperator fails for Postgres #25815

@magdagultekin

Description

@magdagultekin

Apache Airflow version

2.3.3

What happened

SQLTableCheckOperator fails when used with Postgres.

What you think should happen instead

From the logs:

[2022-08-19, 09:28:14 UTC] {taskinstance.py:1910} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/airflow/providers/common/sql/operators/sql.py", line 296, in execute
    records = hook.get_first(self.sql)
  File "/usr/local/lib/python3.9/site-packages/airflow/hooks/dbapi.py", line 178, in get_first
    cur.execute(sql)
psycopg2.errors.SyntaxError: subquery in FROM must have an alias
LINE 1: SELECT MIN(row_count_check) FROM (SELECT CASE WHEN COUNT(*) ...
                                         ^
HINT:  For example, FROM (SELECT ...) [AS] foo.

How to reproduce

import pendulum
from datetime import timedelta

from airflow import DAG
from airflow.decorators import task
from airflow.providers.common.sql.operators.sql import SQLTableCheckOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

_POSTGRES_CONN = "postgresdb"
_TABLE_NAME = "employees"

default_args = {
    "owner": "cs",
    "retries": 3,
    "retry_delay": timedelta(seconds=15),
    }

with DAG(
    dag_id="sql_data_quality",
    start_date=pendulum.datetime(2022, 8, 1, tz="UTC"),
    schedule_interval=None,
) as dag:

    create_table = PostgresOperator(
        task_id="create_table",
        postgres_conn_id=_POSTGRES_CONN,
        sql=f"""
        CREATE TABLE IF NOT EXISTS {_TABLE_NAME} (
            employee_name VARCHAR NOT NULL,
            employment_year INT NOT NULL
        );
        """
    )

    populate_data = PostgresOperator(
        task_id="populate_data",
        postgres_conn_id=_POSTGRES_CONN,
        sql=f"""
            INSERT INTO {_TABLE_NAME} VALUES ('Adam', 2021);
            INSERT INTO {_TABLE_NAME} VALUES ('Chris', 2021);
            INSERT INTO {_TABLE_NAME} VALUES ('Frank', 2021);
            INSERT INTO {_TABLE_NAME} VALUES ('Fritz', 2021);
            INSERT INTO {_TABLE_NAME} VALUES ('Magda', 2022);
            INSERT INTO {_TABLE_NAME} VALUES ('Phil', 2021);
        """,
    )

    check_row_count = SQLTableCheckOperator(
        task_id="check_row_count",
        conn_id=_POSTGRES_CONN,
        table=_TABLE_NAME,
        checks={
            "row_count_check": {"check_statement": "COUNT(*) >= 3"}
        },
    )

    drop_table = PostgresOperator(
        task_id="drop_table",
        trigger_rule="all_done",
        postgres_conn_id=_POSTGRES_CONN,
        sql=f"DROP TABLE {_TABLE_NAME};",
    )

    create_table >> populate_data >> check_row_count >> drop_table

Operating System

macOS

Versions of Apache Airflow Providers

apache-airflow-providers-common-sql==1.0.0

Deployment

Astronomer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    kind:bugThis is a clearly a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions