-
Notifications
You must be signed in to change notification settings - Fork 3
/
example.py
78 lines (63 loc) · 3.08 KB
/
example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import os
import sys
import argparse
import logging
from pathlib import Path
from datetime import timedelta
from sqlalchemy import create_engine
from aircal.events import DagRunEventsExtractor
from aircal.dao.airflow import AirflowDb
from aircal.dao.gcal import GCalClient
from aircal.export import GCalExporter
logger = logging.getLogger('aircal')
logger.setLevel(logging.INFO)
cli_handler = logging.StreamHandler()
cli_format = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
cli_handler.setFormatter(cli_format)
logger.addHandler(cli_handler)
def do_continue(df_events):
if df_events.shape[0] <= 500:
return
logger.info('# events to manage is high: %d' % df_events.shape[0])
logger.info('You might want to consider filtering them to reduce clutter.')
yn = input('Are you sure you want to export all of them (y/n): ')
if yn != 'y':
logger.info('Too many events, exiting.')
sys.exit(1)
def main(args):
logger.info('Extracting dag run events.')
airflow_db = AirflowDb(create_engine(args.sqlalchemy_conn_string))
extractor = DagRunEventsExtractor(airflow_db, n_last_runs=args.n_last_runs)
# extract all future DAG runs as calendar events
df_events = extractor.get_future_dag_runs(n_horizon_days=args.n_horizon_days)
# filter out the ones that are of no interest of you
# in this case we only keep the ones that are running more than x minutes
df_events = df_events[df_events.mean_exec_time > timedelta(minutes=args.min_dag_exec_time)]
do_continue(df_events)
logger.info('Syncing to GCal.')
gcal_client = GCalClient(calendar_id=args.calendar_id, creds_dir=Path(args.creds_path), logger=logger)
exporter = GCalExporter(gcal_client)
# identify the DAG runs that needs sync (insert, update, delete)
df_to_sync = exporter.mark_for_sync(df_events)
logger.info('# DAG runs need sync: %d' % df_to_sync.shape[0])
df_updated = exporter.sync(df_to_sync)
if not df_updated.empty:
# save the data frame for inspection
df_updated.to_csv('event_ops.csv', index=False)
logger.info('%d events synced.' % df_updated.shape[0])
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('--sqlalchemy-conn-string', required=True,
help='sql connection needed to read dag and dag_run tables in airflow DB')
parser.add_argument('--calendar-id', required=True,
help='calendar where the DAG run events will be created')
parser.add_argument('--creds-path', default=os.getcwd(),
help='place to store credentials.json; also used to store a token received from the Google API')
parser.add_argument('--n-horizon-days', type=int, default=10,
help='how many days in advance we want the events to be created')
parser.add_argument('--n-last-runs', type=int, default=5,
help='number of recent DAG runs to estimate its execution time')
parser.add_argument('--min-dag-exec-time', type=int, default=0,
help='min execution time of a DAG to export to the calendar')
args = parser.parse_args()
main(args)