Skip to content

Commit

Permalink
Fix failing tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog committed Apr 22, 2024
1 parent 6a15282 commit 63e6e5a
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 4 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Copyright 2023 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# The keywords airflow and DAG are required to load the DAGs from this file, see bullet 2 in the Apache Airflow FAQ:
# https://airflow.apache.org/docs/stable/faq.html

# Author: James Diprose

import logging
from typing import List

from observatory.platform.airflow import fetch_workflows, make_workflow
from observatory.platform.observatory_config import Workflow
from observatory.platform.workflows.workflow import Workflow as ObservatoryWorkflow

# Load DAGs
workflows: List[Workflow] = fetch_workflows()
for config in workflows:
logging.info(f"Making Workflow: {config.name}, dag_id={config.dag_id}")
workflow = make_workflow(config)

if isinstance(workflow, ObservatoryWorkflow):
dag = workflow.make_dag()
logging.info(f"Adding DAG: dag_id={workflow.dag_id}, dag={dag}")
globals()[workflow.dag_id] = dag
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ def assert_dag_load_from_config(self, dag_id: str):
:return: None.
"""

self.assert_dag_load(dag_id, os.path.join(module_file_path("observatory.platform.dags"), "load_workflows.py"))
self.assert_dag_load(dag_id, os.path.join(module_file_path("observatory.platform.dags"), "load_dags_legacy.py"))

def assert_blob_exists(self, bucket_id: str, blob_name: str):
"""Assert whether a blob exists or not.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

import pendulum
from airflow import AirflowException
from airflow.models import DagBag, Variable
from airflow.models import DAG, DagBag, Variable

from observatory.platform.airflow import delete_old_xcoms
from observatory.platform.config import AirflowVars
Expand Down Expand Up @@ -84,8 +84,9 @@ def load_dags_from_config():
logging.info(f"Making Workflow: {workflow.name}, dag_id={dag_id}")
dag = make_dag(workflow)

logging.info(f"Adding DAG: dag_id={dag_id}, dag={dag}")
globals()[dag_id] = dag
if isinstance(dag, DAG):
logging.info(f"Adding DAG: dag_id={dag_id}, dag={dag}")
globals()[dag_id] = dag


def make_dag(workflow: Workflow):
Expand Down

0 comments on commit 63e6e5a

Please sign in to comment.