-
-
Notifications
You must be signed in to change notification settings - Fork 167
/
dagfactory.py
121 lines (97 loc) · 4.41 KB
/
dagfactory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""Module contains code for loading a DagFactory config and generating DAGs"""
import os
from typing import Any, Dict, Union, List
import yaml
from airflow.models import DAG
from dagfactory.dagbuilder import DagBuilder
class DagFactory:
"""
Takes a YAML config and generates DAGs.
:param config_filepath: the filepath of the DAG factory YAML config file.
Must be absolute path to file.
"""
def __init__(self, config_filepath: str) -> None:
DagFactory._validate_config_filepath(config_filepath=config_filepath)
self.config_filepath: str = config_filepath
self.config: Dict[str, Any] = DagFactory._load_config(
config_filepath=config_filepath
)
@staticmethod
def _validate_config_filepath(config_filepath: str) -> None:
"""
Validates config file path is absolute
"""
if not os.path.isabs(config_filepath):
raise Exception("DAG Factory `config_filepath` must be absolute path")
@staticmethod
def _load_config(config_filepath: str) -> Dict[str, Any]:
"""
Loads YAML config file to dictionary
:returns: dict from YAML config file
"""
try:
config: Dict[str, Any] = yaml.load(
stream=open(config_filepath, "r"), Loader=yaml.FullLoader
)
except Exception as err:
raise "Invalid DAG Factory config file" from err
return config
def get_dag_configs(self) -> Dict[str, Dict[str, Any]]:
"""
Returns configuration for each the DAG in factory
:returns: dict with configuration for dags
"""
return {dag: self.config[dag] for dag in self.config.keys() if dag != "default"}
def get_default_config(self) -> Dict[str, Any]:
"""
Returns defaults for the DAG factory. If no defaults exist, returns empty dict.
:returns: dict with default configuration
"""
return self.config.get("default", {})
# pylint: disable=redefined-builtin
def generate_dags(self, globals: Dict[str, Any]) -> None:
"""
Generates DAGs from YAML config
:param globals: The globals() from the file used to generate DAGs. The dag_id
must be passed into globals() for Airflow to import
"""
dag_configs: Dict[str, Dict[str, Any]] = self.get_dag_configs()
default_config: Dict[str, Any] = self.get_default_config()
for dag_name, dag_config in dag_configs.items():
dag_builder: DagBuilder = DagBuilder(
dag_name=dag_name, dag_config=dag_config, default_config=default_config
)
try:
dag: Dict[str, Union[str, DAG]] = dag_builder.build()
except Exception as err:
raise f"Failed to generate dag {dag_name}. verify config is correct" from err
globals[dag["dag_id"]]: DAG = dag["dag"]
# pylint: disable=redefined-builtin
def clean_dags(self, globals: Dict[str, Any]) -> None:
"""
Clean old DAGs that are not on YAML config but were auto-generated through dag-factory
:param globals: The globals() from the file used to generate DAGs. The dag_id
must be passed into globals() for Airflow to import
"""
dag_configs: Dict[str, Dict[str, Any]] = self.get_dag_configs()
default_config: Dict[str, Any] = self.get_default_config()
dags: Dict[str, Any] = {}
for dag_name, dag_config in dag_configs.items():
dag_builder: DagBuilder = DagBuilder(
dag_name=dag_name, dag_config=dag_config, default_config=default_config
)
try:
dag: Dict[str, Union[str, DAG]] = dag_builder.build()
dags[dag["dag_id"]]: DAG = dag["dag"]
except Exception as err:
raise f"Failed to generate dag {dag_name}. verify config is correct" from err
# filter dags that exists in globals and is auto-generated by dag-factory
dags_in_globals: Dict[str, Any] = {}
for k, glb in globals.items():
if isinstance(glb, DAG) and hasattr(glb, "is_dagfactory_auto_generated"):
dags_in_globals[k] = glb
# finding dags that doesn't exist anymore
dags_to_remove: List[str] = list(set(dags_in_globals) - set(dags))
# removing dags from DagBag
for dag_to_remove in dags_to_remove:
del globals[dag_to_remove]