diff --git a/providers/snowflake/docs/operators/snowflake.rst b/providers/snowflake/docs/operators/snowflake.rst index b831fabc718ae..b48930c85a200 100644 --- a/providers/snowflake/docs/operators/snowflake.rst +++ b/providers/snowflake/docs/operators/snowflake.rst @@ -59,6 +59,57 @@ An example usage of the SQLExecuteQueryOperator to connect to Snowflake is as fo Parameters that can be passed onto the operator will be given priority over the parameters already given in the Airflow connection metadata (such as ``schema``, ``role``, ``database`` and so forth). +.. _howto/operator:SnowflakeCheckOperator: + +SnowflakeCheckOperator +^^^^^^^^^^^^^^^^^^^^^^ + +To perform checks against Snowflake you can use +:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeCheckOperator` + +This operator expects a SQL query that will return a single row. Each value on +that first row is evaluated using Python ``bool`` casting. If any of the values +return ``False`` the check fails and errors out. + +.. exampleinclude:: /../../snowflake/tests/system/snowflake/example_snowflake.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_snowflake_check] + :end-before: [END howto_operator_snowflake_check] + +.. _howto/operator:SnowflakeValueCheckOperator: + +SnowflakeValueCheckOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To perform a simple value check using SQL code you can use +:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeValueCheckOperator` + +This operator expects a SQL query that will return a single row. That value is +evaluated against ``pass_value``, which can be either a string or numeric value. +If numeric, you can also specify ``tolerance``. + +.. exampleinclude:: /../../snowflake/tests/system/snowflake/example_snowflake.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_snowflake_value_check] + :end-before: [END howto_operator_snowflake_value_check] + +.. _howto/operator:SnowflakeIntervalCheckOperator: + +SnowflakeIntervalCheckOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +To check that the values of metrics given as SQL expressions are within a certain +tolerance of the ones from ``days_back`` before you can use +:class:`~airflow.providers.snowflake.operators.snowflake.SnowflakeIntervalCheckOperator` + +.. exampleinclude:: /../../snowflake/tests/system/snowflake/example_snowflake.py + :language: python + :dedent: 4 + :start-after: [START howto_operator_snowflake_interval_check] + :end-before: [END howto_operator_snowflake_interval_check] + SnowflakeSqlApiOperator ======================= diff --git a/providers/snowflake/tests/system/snowflake/example_snowflake.py b/providers/snowflake/tests/system/snowflake/example_snowflake.py index c6c123b8a37c5..304e0c06b20bf 100644 --- a/providers/snowflake/tests/system/snowflake/example_snowflake.py +++ b/providers/snowflake/tests/system/snowflake/example_snowflake.py @@ -26,18 +26,36 @@ from airflow import DAG from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator -from airflow.providers.snowflake.operators.snowflake import SnowflakeSqlApiOperator +from airflow.providers.snowflake.operators.snowflake import ( + SnowflakeCheckOperator, + SnowflakeIntervalCheckOperator, + SnowflakeSqlApiOperator, + SnowflakeValueCheckOperator, +) SNOWFLAKE_CONN_ID = "my_snowflake_conn" SNOWFLAKE_SAMPLE_TABLE = "sample_table" +SNOWFLAKE_CHECK_TABLE = "sample_check_table" # SQL commands CREATE_TABLE_SQL_STRING = ( f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE} (name VARCHAR(250), id INT);" ) +CREATE_CHECK_TABLE_SQL_STRING = f""" +CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_CHECK_TABLE} ( + ds DATE, + value INT +); +""" SQL_INSERT_STATEMENT = f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)" SQL_LIST = [SQL_INSERT_STATEMENT % {"id": n} for n in range(10)] SQL_MULTIPLE_STMTS = "; ".join(SQL_LIST) +SQL_CHECK_TABLE_INSERT = f""" +INSERT INTO {SNOWFLAKE_CHECK_TABLE} (ds, value) +VALUES + (TO_DATE('{{{{ ds }}}}'), 4), + (TO_DATE('{{{{ macros.ds_add(ds, -1) }}}}'), 4) +""" ENV_ID = os.environ.get("SYSTEM_TESTS_ENV_ID") DAG_ID = "example_snowflake" @@ -74,8 +92,44 @@ sql="example_snowflake_snowflake_op_template_file.sql", ) + # Create and populate a small dataset for the data quality operator examples. + # We insert one row for `ds` and one for `ds - 1`, each with value = 4. + create_check_table = SQLExecuteQueryOperator( + task_id="create_check_table", + sql=CREATE_CHECK_TABLE_SQL_STRING, + ) + + populate_check_table = SQLExecuteQueryOperator( + task_id="populate_check_table", + sql=SQL_CHECK_TABLE_INSERT, + ) + # [END howto_operator_snowflake] + # [START howto_operator_snowflake_check] + snowflake_check = SnowflakeCheckOperator( + task_id="snowflake_check", + sql=f"SELECT COUNT(*) FROM {SNOWFLAKE_CHECK_TABLE} WHERE ds = TO_DATE('{{{{ ds }}}}')", + ) + # [END howto_operator_snowflake_check] + + # [START howto_operator_snowflake_value_check] + snowflake_value_check = SnowflakeValueCheckOperator( + task_id="snowflake_value_check", + sql=f"SELECT SUM(value) FROM {SNOWFLAKE_CHECK_TABLE} WHERE ds = TO_DATE('{{{{ ds }}}}')", + pass_value=4, + ) + # [END howto_operator_snowflake_value_check] + + # [START howto_operator_snowflake_interval_check] + snowflake_interval_check = SnowflakeIntervalCheckOperator( + task_id="snowflake_interval_check", + table=SNOWFLAKE_CHECK_TABLE, + metrics_thresholds={"COUNT(*)": 1.5}, + days_back=1, + ) + # [END howto_operator_snowflake_interval_check] + # [START howto_snowflake_sql_api_operator] snowflake_sql_api_op_sql_multiple_stmt = SnowflakeSqlApiOperator( task_id="snowflake_op_sql_multiple_stmt", @@ -95,6 +149,17 @@ ] ) + snowflake_op_sql_str >> create_check_table + ( + create_check_table + >> populate_check_table + >> [ + snowflake_check, + snowflake_value_check, + snowflake_interval_check, + ] + ) + from tests_common.test_utils.system_tests import get_test_run # noqa: E402