-
Notifications
You must be signed in to change notification settings - Fork 25
/
example_snowflake.py
101 lines (84 loc) · 3.67 KB
/
example_snowflake.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
"""Example use of SnowflakeAsync related providers."""
import os
from datetime import timedelta
from airflow import DAG
from airflow.utils.timezone import datetime
from astronomer.providers.snowflake.operators.snowflake import SnowflakeOperatorAsync
SNOWFLAKE_CONN_ID = os.getenv("ASTRO_SNOWFLAKE_CONN_ID", "snowflake_default")
SNOWFLAKE_SAMPLE_TABLE = os.getenv("SNOWFLAKE_SAMPLE_TABLE", "sample_table")
EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6))
# SQL commands
CREATE_TABLE_SQL_STRING = (
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE} (name VARCHAR(250), id INT);"
)
SQL_INSERT_STATEMENT = f"INSERT INTO {SNOWFLAKE_SAMPLE_TABLE} VALUES ('name', %(id)s)"
SQL_LIST = [SQL_INSERT_STATEMENT % {"id": n} for n in range(0, 10)]
SQL_MULTIPLE_STMTS = "; ".join(SQL_LIST)
SNOWFLAKE_SLACK_SQL = f"SELECT name, id FROM {SNOWFLAKE_SAMPLE_TABLE} LIMIT 10;"
SNOWFLAKE_SLACK_MESSAGE = (
"Results in an ASCII table:\n```{{ results_df | tabulate(tablefmt='pretty', headers='keys') }}```"
)
SNOWFLAKE_SAMPLE_TABLE_MULTI = os.getenv("SNOWFLAKE_SAMPLE_TABLE_MULTI", "sample_table_multi")
MULTIPLE_QUERY_IN_ONE_RUN_CREATE_TABLES = (
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_1 (name VARCHAR(250), id INT);"
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_2 (name VARCHAR(250), id INT);"
f"CREATE OR REPLACE TRANSIENT TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_3 (name VARCHAR(250), id INT);"
)
MULTIPLE_QUERY_IN_ONE_RUN_DROP_TABLES = (
f"DROP TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_1;"
f"DROP TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_2;"
f"DROP TABLE {SNOWFLAKE_SAMPLE_TABLE_MULTI}_3;"
)
default_args = {
"execution_timeout": timedelta(hours=EXECUTION_TIMEOUT),
"snowflake_conn_id": SNOWFLAKE_CONN_ID,
"retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)),
"retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))),
}
with DAG(
dag_id="example_snowflake",
start_date=datetime(2022, 1, 1),
schedule=None,
default_args=default_args,
tags=["example", "async", "snowflake"],
catchup=False,
) as dag:
# [START howto_operator_snowflake_async]
snowflake_op_sql_str = SnowflakeOperatorAsync(
task_id="snowflake_op_sql_str",
sql=CREATE_TABLE_SQL_STRING,
)
# [END howto_operator_snowflake_async]
snowflake_op_with_params = SnowflakeOperatorAsync(
task_id="snowflake_op_with_params",
sql=SQL_INSERT_STATEMENT,
parameters={"id": 56},
)
snowflake_op_sql_list = SnowflakeOperatorAsync(
task_id="snowflake_op_sql_list",
sql=SQL_LIST,
)
snowflake_op_sql_multiple_stmts = SnowflakeOperatorAsync(
task_id="snowflake_op_sql_multiple_stmts",
sql=SQL_MULTIPLE_STMTS,
)
snowflake_op_sql_select_stmts = SnowflakeOperatorAsync(
task_id="snowflake_op_sql_select_stmts", sql=SNOWFLAKE_SLACK_SQL, return_last=False
)
snowflake_op_multiple_query_in_one_run_create_tables = SnowflakeOperatorAsync(
task_id="snowflake_op_multiple_query_in_one_run_create_tables",
sql=MULTIPLE_QUERY_IN_ONE_RUN_CREATE_TABLES,
return_last=False,
)
snowflake_op_multiple_query_in_one_run_drop_tables = SnowflakeOperatorAsync(
task_id="snowflake_op_multiple_query_in_one_run_drop_tables",
sql=MULTIPLE_QUERY_IN_ONE_RUN_DROP_TABLES,
return_last=False,
)
(
snowflake_op_sql_str
>> [snowflake_op_with_params, snowflake_op_sql_list, snowflake_op_sql_multiple_stmts]
>> snowflake_op_sql_select_stmts
>> snowflake_op_multiple_query_in_one_run_create_tables
>> snowflake_op_multiple_query_in_one_run_drop_tables
)