From 4b26f80e2169dbdf26e6dcaf5250d56949ffa8e4 Mon Sep 17 00:00:00 2001 From: keegansmith21 Date: Tue, 9 Jul 2024 08:56:53 +0000 Subject: [PATCH] Added option to load dags with Params class --- observatory_platform/airflow/workflow.py | 38 ++++++++++++++++++++---- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/observatory_platform/airflow/workflow.py b/observatory_platform/airflow/workflow.py index f83617a8d..70969b3d2 100644 --- a/observatory_platform/airflow/workflow.py +++ b/observatory_platform/airflow/workflow.py @@ -73,30 +73,58 @@ def fetch_workflows() -> List[Workflow]: return workflows -def load_dags_from_config(): +def load_dags_from_config(use_params: bool = False): """Loads DAGs from a workflow config file, stored in the WORKFLOWS Airflow Variable. + :param use_params: whether the workflow has a Params class with which to load with :return: None. """ for workflow in fetch_workflows(): dag_id = workflow.dag_id logging.info(f"Making Workflow: {workflow.name}, dag_id={dag_id}") - dag = make_dag(workflow) + if use_params: + dag = make_dag_from_params(workflow) + else: + dag = make_dag(workflow) logging.info(f"Adding DAG: dag_id={dag_id}, dag={dag}") globals()[dag_id] = dag +def make_dag_from_params(workflow: Workflow): + """Make a DAG instance from a Workflow config. Will attempt to built the dag with its Params class + + :param workflow: the workflow configuration. + :return: the workflow instance. + """ + param_cls = locate(workflow.class_name + ".Params") + if param_cls is None: + raise ModuleNotFoundError( + f"dag_id={workflow.dag_id}: could not locate Param class_name={workflow.class_name}.Params" + ) + + cls = locate(workflow.class_name + ".create_dag") + if cls is None: + raise ModuleNotFoundError( + f"dag_id={workflow.dag_id}: could not locate class_name={workflow.class_name}.create_dag" + ) + + return cls(param_cls(dag_id=workflow.dag_id, cloud_workspace=workflow.cloud_workspace, **workflow.kwargs)) + + def make_dag(workflow: Workflow): """Make a DAG instance from a Workflow config. + :param workflow: the workflow configuration. :return: the workflow instance. """ - cls = locate(workflow.class_name) + cls = locate(workflow.class_name + ".create_dag") if cls is None: - raise ModuleNotFoundError(f"dag_id={workflow.dag_id}: could not locate class_name={workflow.class_name}") + raise ModuleNotFoundError( + f"dag_id={workflow.dag_id}: could not locate class_name={workflow.class_name}.create_dag" + ) return cls(dag_id=workflow.dag_id, cloud_workspace=workflow.cloud_workspace, **workflow.kwargs) @@ -285,7 +313,7 @@ class Workflow: Attributes: dag_id: the Airflow DAG identifier for the workflow. name: a user-friendly name for the workflow. - class_name: the fully qualified class name for the workflow class. + class_name: the fully qualified class name for the workflow class. e.g. myproject.myworkflows.my_workflow cloud_workspace: the Cloud Workspace to use when running the workflow. kwargs: a dictionary containing optional keyword arguments that are injected into the workflow constructor. """