Skip to content

Commit

Permalink
Added option to load dags with Params class
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jul 9, 2024
1 parent 4ad5118 commit 4b26f80
Showing 1 changed file with 33 additions and 5 deletions.
38 changes: 33 additions & 5 deletions observatory_platform/airflow/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,30 +73,58 @@ def fetch_workflows() -> List[Workflow]:
return workflows


def load_dags_from_config():
def load_dags_from_config(use_params: bool = False):
"""Loads DAGs from a workflow config file, stored in the WORKFLOWS Airflow Variable.
:param use_params: whether the workflow has a Params class with which to load with
:return: None.
"""

for workflow in fetch_workflows():
dag_id = workflow.dag_id
logging.info(f"Making Workflow: {workflow.name}, dag_id={dag_id}")
dag = make_dag(workflow)
if use_params:
dag = make_dag_from_params(workflow)
else:
dag = make_dag(workflow)

logging.info(f"Adding DAG: dag_id={dag_id}, dag={dag}")
globals()[dag_id] = dag


def make_dag_from_params(workflow: Workflow):
"""Make a DAG instance from a Workflow config. Will attempt to built the dag with its Params class
:param workflow: the workflow configuration.
:return: the workflow instance.
"""
param_cls = locate(workflow.class_name + ".Params")
if param_cls is None:
raise ModuleNotFoundError(
f"dag_id={workflow.dag_id}: could not locate Param class_name={workflow.class_name}.Params"
)

cls = locate(workflow.class_name + ".create_dag")
if cls is None:
raise ModuleNotFoundError(
f"dag_id={workflow.dag_id}: could not locate class_name={workflow.class_name}.create_dag"
)

return cls(param_cls(dag_id=workflow.dag_id, cloud_workspace=workflow.cloud_workspace, **workflow.kwargs))


def make_dag(workflow: Workflow):
"""Make a DAG instance from a Workflow config.
:param workflow: the workflow configuration.
:return: the workflow instance.
"""

cls = locate(workflow.class_name)
cls = locate(workflow.class_name + ".create_dag")
if cls is None:
raise ModuleNotFoundError(f"dag_id={workflow.dag_id}: could not locate class_name={workflow.class_name}")
raise ModuleNotFoundError(
f"dag_id={workflow.dag_id}: could not locate class_name={workflow.class_name}.create_dag"
)

return cls(dag_id=workflow.dag_id, cloud_workspace=workflow.cloud_workspace, **workflow.kwargs)

Expand Down Expand Up @@ -285,7 +313,7 @@ class Workflow:
Attributes:
dag_id: the Airflow DAG identifier for the workflow.
name: a user-friendly name for the workflow.
class_name: the fully qualified class name for the workflow class.
class_name: the fully qualified class name for the workflow class. e.g. myproject.myworkflows.my_workflow
cloud_workspace: the Cloud Workspace to use when running the workflow.
kwargs: a dictionary containing optional keyword arguments that are injected into the workflow constructor.
"""
Expand Down

0 comments on commit 4b26f80

Please sign in to comment.