Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions providers/snowflake/docs/operators/snowflake.rst
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,57 @@ An example usage of the SQLExecuteQueryOperator to connect to Snowflake is as fo
Parameters that can be passed onto the operator will be given priority over the parameters already given
in the Airflow connection metadata (such as ``schema``, ``role``, ``database`` and so forth).

.. _howto/operator:SnowflakeCheckOperator:

SnowflakeCheckOperator
^^^^^^^^^^^^^^^^^^^^^^

To perform checks against Snowflake you can use
:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeCheckOperator`

This operator expects a SQL query that will return a single row. Each value on
that first row is evaluated using Python ``bool`` casting. If any of the values
return ``False`` the check fails and errors out.

.. exampleinclude:: /../../snowflake/tests/system/snowflake/example_snowflake.py
:language: python
:dedent: 4
:start-after: [START howto_operator_snowflake_check]
:end-before: [END howto_operator_snowflake_check]

.. _howto/operator:SnowflakeValueCheckOperator:

SnowflakeValueCheckOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^

To perform a simple value check using SQL code you can use
:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeValueCheckOperator`

This operator expects a SQL query that will return a single row. That value is
evaluated against ``pass_value``, which can be either a string or numeric value.
If numeric, you can also specify ``tolerance``.

.. exampleinclude:: /../../snowflake/tests/system/snowflake/example_snowflake.py
:language: python
:dedent: 4
:start-after: [START howto_operator_snowflake_value_check]
:end-before: [END howto_operator_snowflake_value_check]

.. _howto/operator:SnowflakeIntervalCheckOperator:

SnowflakeIntervalCheckOperator
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

To check that the values of metrics given as SQL expressions are within a certain
tolerance of the ones from ``days_back`` before you can use
:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeIntervalCheckOperator`

.. exampleinclude:: /../../snowflake/tests/system/snowflake/example_snowflake.py
:language: python
:dedent: 4
:start-after: [START howto_operator_snowflake_interval_check]
:end-before: [END howto_operator_snowflake_interval_check]


SnowflakeSqlApiOperator
=======================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,36 @@

from airflow import DAG
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator
from airflow.providers.snowflake.operators.snowflake import (
SnowflakeCheckOperator,
SnowflakeIntervalCheckOperator,
SnowflakeSqlApiOperator,
SnowflakeValueCheckOperator,
)

SNOWFLAKE_CONN_ID = "my_snowflake_conn"
SNOWFLAKE_SAMPLE_TABLE = "sample_table"
SNOWFLAKE_CHECK_TABLE = "sample_check_table"

# SQL commands
CREATE_TABLE_SQL_STRING = (
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE} (name VARCHAR(250), id INT);"
)
CREATE_CHECK_TABLE_SQL_STRING = f"""
CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_CHECK_TABLE} (
ds DATE,
value INT
);
"""
SQL_INSERT_STATEMENT = f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)"
SQL_LIST = [SQL_INSERT_STATEMENT % {"id": n} for n in range(10)]
SQL_MULTIPLE_STMTS = "; ".join(SQL_LIST)
SQL_CHECK_TABLE_INSERT = f"""
INSERT INTO {SNOWFLAKE_CHECK_TABLE} (ds, value)
VALUES
(TO_DATE('{{{{ ds }}}}'), 4),
(TO_DATE('{{{{ macros.ds_add(ds, -1) }}}}'), 4)
"""
ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID")
DAG_ID = "example_snowflake"

Expand Down Expand Up @@ -74,8 +92,44 @@
sql="example_snowflake_snowflake_op_template_file.sql",
)

Comment thread
potiuk marked this conversation as resolved.
# Create and populate a small dataset for the data quality operator examples.
# We insert one row for `ds` and one for `ds - 1`, each with value = 4.
create_check_table = SQLExecuteQueryOperator(
task_id="create_check_table",
sql=CREATE_CHECK_TABLE_SQL_STRING,
)

populate_check_table = SQLExecuteQueryOperator(
task_id="populate_check_table",
sql=SQL_CHECK_TABLE_INSERT,
)

# [END howto_operator_snowflake]

# [START howto_operator_snowflake_check]
snowflake_check = SnowflakeCheckOperator(
task_id="snowflake_check",
sql=f"SELECT COUNT(*) FROM {SNOWFLAKE_CHECK_TABLE} WHERE ds = TO_DATE('{{{{ ds }}}}')",
)
# [END howto_operator_snowflake_check]

# [START howto_operator_snowflake_value_check]
snowflake_value_check = SnowflakeValueCheckOperator(
task_id="snowflake_value_check",
sql=f"SELECT SUM(value) FROM {SNOWFLAKE_CHECK_TABLE} WHERE ds = TO_DATE('{{{{ ds }}}}')",
pass_value=4,
)
# [END howto_operator_snowflake_value_check]

# [START howto_operator_snowflake_interval_check]
snowflake_interval_check = SnowflakeIntervalCheckOperator(
task_id="snowflake_interval_check",
table=SNOWFLAKE_CHECK_TABLE,
metrics_thresholds={"COUNT(*)": 1.5},
days_back=1,
)
# [END howto_operator_snowflake_interval_check]

# [START howto_snowflake_sql_api_operator]
snowflake_sql_api_op_sql_multiple_stmt = SnowflakeSqlApiOperator(
task_id="snowflake_op_sql_multiple_stmt",
Expand All @@ -95,6 +149,17 @@
]
)

snowflake_op_sql_str >> create_check_table
(
create_check_table
>> populate_check_table
>> [
snowflake_check,
snowflake_value_check,
snowflake_interval_check,
]
)


from tests_common.test_utils.system_tests import get_test_run # noqa: E402

Expand Down
Loading