-
Notifications
You must be signed in to change notification settings - Fork 5
/
extract_openlineage.py
77 lines (62 loc) · 2.17 KB
/
extract_openlineage.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import json
import os
from pendulum import datetime
from airflow import DAG
from airflow.decorators import task
from openlineage.client import OpenLineageClient
from snowflake.connector import connect
SNOWFLAKE_USER = os.getenv('SNOWFLAKE_USER')
SNOWFLAKE_PASSWORD = os.getenv('SNOWFLAKE_PASSWORD')
SNOWFLAKE_ACCOUNT = os.getenv('SNOWFLAKE_ACCOUNT')
@task
def send_ol_events():
client = OpenLineageClient.from_environment()
with connect(
user=SNOWFLAKE_USER,
password=SNOWFLAKE_PASSWORD,
account=SNOWFLAKE_ACCOUNT,
database='OPENLINEAGE',
schema='PUBLIC',
) as conn:
with conn.cursor() as cursor:
ol_view = 'OPENLINEAGE_ACCESS_HISTORY'
ol_event_time_tag = 'OL_LATEST_EVENT_TIME'
var_query = f'''
use warehouse {SNOWFLAKE_WAREHOUSE};
'''
cursor.execute(var_query)
var_query = f'''
set current_organization='{SNOWFLAKE_ACCOUNT}';
'''
cursor.execute(var_query)
ol_query = f'''
SELECT * FROM {ol_view}
WHERE EVENT:eventTime > system$get_tag('{ol_event_time_tag}', '{ol_view}', 'table')
ORDER BY EVENT:eventTime ASC;
'''
cursor.execute(ol_query)
ol_events = [json.loads(ol_event[0]) for ol_event in cursor.fetchall()]
for ol_event in ol_events:
client.emit(ol_event)
if len(ol_events) > 0:
latest_event_time = ol_events[-1]['eventTime']
cursor.execute(f'''
ALTER VIEW {ol_view} SET TAG {ol_event_time_tag} = '{latest_event_time}';
''')
with DAG(
'etl_openlineage',
start_date=datetime(2022, 4, 12),
schedule_interval='@hourly',
catchup=False,
default_args={
'owner': 'openlineage',
'depends_on_past': False,
'email_on_failure': False,
'email_on_retry': False,
'email': ['demo@openlineage.io'],
'snowflake_conn_id': 'openlineage_snowflake'
},
description='Send OL events every minutes.',
tags=["extract"],
) as dag:
send_ol_events()