This repository was archived by the owner on May 23, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathflow.py
More file actions
105 lines (91 loc) · 3.46 KB
/
flow.py
File metadata and controls
105 lines (91 loc) · 3.46 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from datetime import timedelta
from prefect import Flow, Parameter, artifacts, task
from prefect.run_configs.local import LocalRun
from prefect.storage.local import Local
from prefect.tasks.airbyte.airbyte import AirbyteConnectionTask
from prefect.tasks.dbt.dbt import DbtShellTask
from prefect.tasks.secrets.base import PrefectSecret
from prefect.tasks.snowflake.snowflake import SnowflakeQuery
sync_airbyte_connection = AirbyteConnectionTask(
max_retries=3, retry_delay=timedelta(seconds=10)
)
run_dbt = DbtShellTask(
command="dbt run",
environment="dev",
profile_name="github_common_contributors",
helper_script="cd github_common_contributors",
set_profiles_envar=False,
max_retries=3,
retry_delay=timedelta(seconds=10),
)
query_snowflake = SnowflakeQuery(
user="AIRBYTE_USER",
database="AIRBYTE_DATABASE",
schema="AIRBYTE_SCHEMA",
role="AIRBYTE_ROLE",
warehouse="AIRBYTE_WAREHOUSE",
max_retries=3,
retry_delay=timedelta(seconds=10),
)
@task
def generate_result_markdown(common_committers, common_issue_submitters):
markdown_lines = []
markdown_lines.append("# Committers common between Prefect, Airbyte, and dbt")
for committer in common_committers:
markdown_lines.append(f"{committer[0]}")
markdown_lines.append("# Issue submitters common between Prefect, Airbyte and dbt")
for issue_submitter in common_issue_submitters:
markdown_lines.append(f"{issue_submitter[0]}")
artifacts.create_markdown("\n".join(markdown_lines))
with Flow("Determine common contributors flow", storage=Local(), run_config=LocalRun()) as flow:
# Airbyte connection strings
airbyte_github_connection_id = Parameter("AIRBYTE_GITHUB_CONNECTION_ID")
dbt_github_connection_id = Parameter("DBT_GITHUB_CONNECTION_ID")
prefect_github_connection_id = Parameter("PREFECT_GITHUB_CONNECTION_ID")
# Snowflake configuration
snowflake_account = Parameter("SNOWFLAKE_ACCOUNT")
snowflake_password = PrefectSecret("SNOWFLAKE_PASSWORD")
# Sync Airbyte GitHub data
airbyte_github_sync = sync_airbyte_connection(
airbyte_server_host="localhost",
airbyte_server_port=8000,
airbyte_api_version="v1",
connection_id=airbyte_github_connection_id,
)
# Sync dbt GitHub data
dbt_github_sync = sync_airbyte_connection(
connection_id=dbt_github_connection_id,
airbyte_server_host="localhost",
airbyte_server_port=8000,
airbyte_api_version="v1",
)
# Sync Prefect GitHub data
prefect_github_sync = sync_airbyte_connection(
airbyte_server_host="localhost",
airbyte_server_port=8000,
airbyte_api_version="v1",
connection_id=prefect_github_connection_id,
)
# Run dbt to create views
dbt_run = run_dbt(
upstream_tasks=[airbyte_github_sync, dbt_github_sync, prefect_github_sync]
)
# Query common committers from Snowflake
common_committers = query_snowflake(
account=snowflake_account,
password=snowflake_password,
upstream_tasks=[dbt_run],
query="""
select login from commit_authors
""",
)
# Query common issue submitters from Snowflake
common_issue_submitters = query_snowflake(
account=snowflake_account,
password=snowflake_password,
upstream_tasks=[dbt_run],
query="""
select login from issue_submitters
""",
)
generate_result_markdown(common_committers, common_issue_submitters)