diff --git a/src/cc_catalog_airflow/dags/science_museum_workflow.py b/src/cc_catalog_airflow/dags/science_museum_workflow.py new file mode 100644 index 00000000..99279545 --- /dev/null +++ b/src/cc_catalog_airflow/dags/science_museum_workflow.py @@ -0,0 +1,31 @@ +""" +This file configures the Apache Airflow DAG to ingest Science museum data. + +We do this by running `provider_api_scripts.science_museum.main` +""" +# airflow DAG (necessary for Airflow to find this file) +from datetime import datetime, timedelta +import logging + +from provider_api_scripts import science_museum +from util.dag_factory import create_provider_api_workflow + +logging.basicConfig( + format='%(asctime)s - %(name)s - %(levelname)s: %(message)s', + level=logging.INFO +) + +logger = logging.getLogger(__name__) + +DAG_ID = 'science_museum_workflow' +START_DATE = datetime(2020, 1, 1) +DAGRUN_TIMEOUT = timedelta(hours=24) + +globals()[DAG_ID] = create_provider_api_workflow( + DAG_ID, + science_museum.main, + start_date=START_DATE, + schedule_string='@monthly', + dated=False, + dagrun_timeout=DAGRUN_TIMEOUT +) diff --git a/src/cc_catalog_airflow/dags/test_science_museum_workflow.py b/src/cc_catalog_airflow/dags/test_science_museum_workflow.py new file mode 100644 index 00000000..ca6f2ac5 --- /dev/null +++ b/src/cc_catalog_airflow/dags/test_science_museum_workflow.py @@ -0,0 +1,16 @@ +import os +from airflow.models import DagBag + +FILE_DIR = os.path.abspath(os.path.dirname(__file__)) + + +def test_dag_loads_with_no_errors(tmpdir): + tmp_directory = str(tmpdir) + print(tmp_directory) + dag_bag = DagBag(dag_folder=tmp_directory, include_examples=False) + dag_bag.process_file( + os.path.join(FILE_DIR, 'science_museum_workflow.py') + ) + print(dag_bag.dags) + assert len(dag_bag.import_errors) == 0 + assert len(dag_bag.dags) == 1