Skip to content

Commit

Permalink
Fix (and test) SQLTableCheckOperator on postgresql (#25821)
Browse files Browse the repository at this point in the history
  • Loading branch information
ashb committed Aug 19, 2022
1 parent b535262 commit dd72e67
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 3 deletions.
8 changes: 5 additions & 3 deletions airflow/providers/common/sql/operators/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ class SQLTableCheckOperator(BaseSQLOperator):

sql_check_template = """
SELECT '_check_name' AS check_name, MIN(_check_name) AS check_result
FROM(SELECT CASE WHEN check_statement THEN 1 ELSE 0 END AS _check_name FROM table)
FROM (SELECT CASE WHEN check_statement THEN 1 ELSE 0 END AS _check_name FROM table) AS sq
"""

def __init__(
Expand Down Expand Up @@ -382,8 +382,10 @@ def execute(self, context: 'Context'):
]
)
partition_clause_statement = f"WHERE {self.partition_clause}" if self.partition_clause else ""
self.sql = f"SELECT check_name, check_result FROM ({checks_sql}) "
f"AS check_table {partition_clause_statement};"
self.sql = f"""
SELECT check_name, check_result FROM ({checks_sql})
AS check_table {partition_clause_statement}
"""

records = hook.get_records(self.sql)

Expand Down
37 changes: 37 additions & 0 deletions tests/providers/common/sql/operators/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,43 @@ def get_records(*arg):
monkeypatch.setattr(MockHook, "get_records", get_records)
return operator

@pytest.mark.parametrize(
['conn_id'],
[
pytest.param('postgres_default', marks=[pytest.mark.backend('postgres')]),
pytest.param('mysql_default', marks=[pytest.mark.backend('mysql')]),
],
)
def test_sql_check(self, conn_id):
operator = SQLTableCheckOperator(
task_id="test_task",
table="employees",
checks={"row_count_check": {"check_statement": "COUNT(*) >= 3"}},
conn_id=conn_id,
)

hook = operator.get_db_hook()
hook.run(
[
"""
CREATE TABLE IF NOT EXISTS employees (
employee_name VARCHAR(50) NOT NULL,
employment_year INT NOT NULL
);
""",
"INSERT INTO employees VALUES ('Adam', 2021)",
"INSERT INTO employees VALUES ('Chris', 2021)",
"INSERT INTO employees VALUES ('Frank', 2021)",
"INSERT INTO employees VALUES ('Fritz', 2021)",
"INSERT INTO employees VALUES ('Magda', 2022)",
"INSERT INTO employees VALUES ('Phil', 2021)",
]
)
try:
operator.execute({})
finally:
hook.run(["DROP TABLE employees"])

def test_pass_all_checks_check(self, monkeypatch):
records = [('row_count_check', 1), ('column_sum_check', 'y')]
operator = self._construct_operator(monkeypatch, self.checks, records)
Expand Down

0 comments on commit dd72e67

Please sign in to comment.