From da4a5730b5b82128291ffb26598b2b2677048c4f Mon Sep 17 00:00:00 2001 From: Jamie Diprose <5715104+jdddog@users.noreply.github.com> Date: Wed, 2 Aug 2023 12:15:34 +1200 Subject: [PATCH] Airflow 2.6.3 upgrade --- .github/workflows/unit-tests.yml | 8 +- academic_observatory_workflows/config.py | 52 ------ .../tests/test_config.py | 28 --- .../workflows/crossref_events_telescope.py | 8 +- .../workflows/crossref_fundref_telescope.py | 6 +- .../workflows/crossref_metadata_telescope.py | 6 +- .../workflows/doi_workflow.py | 6 +- .../workflows/geonames_telescope.py | 6 +- .../workflows/oa_web_workflow.py | 6 +- .../workflows/open_citations_telescope.py | 6 +- .../workflows/openalex_telescope.py | 8 +- .../workflows/ror_telescope.py | 6 +- .../workflows/scopus_telescope.py | 6 +- .../tests/test_elastic_import_workflow.py | 160 ------------------ .../workflows/unpaywall_telescope.py | 8 +- .../workflows/web_of_science_telescope.py | 6 +- requirements.txt | 16 +- setup.cfg | 14 +- 18 files changed, 56 insertions(+), 300 deletions(-) delete mode 100644 academic_observatory_workflows/tests/test_config.py delete mode 100644 academic_observatory_workflows/workflows/tests/test_elastic_import_workflow.py diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index d3b00bfc6..1e59ef5a6 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -27,14 +27,14 @@ jobs: python -m pip install --upgrade pip cd .. - git clone https://github.com/The-Academic-Observatory/observatory-platform.git + git clone -b feature/airflow2.6.3 https://github.com/The-Academic-Observatory/observatory-platform.git cd observatory-platform - pip install -e observatory-api - pip install -e observatory-platform + pip install -e observatory-api --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-no-providers-${{ matrix.python-version }}.txt + pip install -e observatory-platform --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-no-providers-${{ matrix.python-version }}.txt cd .. cd academic-observatory-workflows - pip install -e .[tests] + pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-no-providers-${{ matrix.python-version }}.txt - name: Check licenses run: | diff --git a/academic_observatory_workflows/config.py b/academic_observatory_workflows/config.py index d67e0af4b..1496b1710 100644 --- a/academic_observatory_workflows/config.py +++ b/academic_observatory_workflows/config.py @@ -14,15 +14,9 @@ # Author: James Diprose -import json import os -from typing import List from observatory.platform.config import module_file_path -from observatory.platform.elastic.elastic import KeepInfo, KeepOrder -from observatory.platform.elastic.kibana import TimeField -from observatory.platform.utils.jinja2_utils import render_template -from observatory.platform.workflows.elastic_import_workflow import load_elastic_mappings_simple, ElasticImportConfig class Tag: @@ -57,49 +51,3 @@ def sql_folder() -> str: """ return module_file_path("academic_observatory_workflows.database.sql") - - -def elastic_mappings_folder() -> str: - """Get the Elasticsearch mappings path. - - :return: the elastic search schema path. - """ - - return module_file_path("academic_observatory_workflows.database.mappings") - - -def load_elastic_mappings_ao(path: str, table_prefix: str, simple_prefixes: List = None): - """For the Observatory project, load the Elastic mappings for a given table_prefix. - :param path: the path to the mappings files. - :param table_prefix: the table_id prefix (without shard date). - :param simple_prefixes: the prefixes of mappings to load with the load_elastic_mappings_simple function. - :return: the rendered mapping as a Dict. - """ - - # Set default simple_prefixes - if simple_prefixes is None: - simple_prefixes = ["ao_doi"] - - if not table_prefix.startswith("ao"): - raise ValueError("Table must begin with 'ao'") - elif any([table_prefix.startswith(prefix) for prefix in simple_prefixes]): - return load_elastic_mappings_simple(path, table_prefix) - else: - prefix, aggregate, facet = table_prefix.split("_", 2) - mappings_file_name = "ao-relations-mappings.json.jinja2" - is_fixed_facet = facet in ["unique_list", "access_types", "disciplines", "output_types", "events", "metrics"] - if is_fixed_facet: - mappings_file_name = f"ao-{facet.replace('_', '-')}-mappings.json.jinja2" - mappings_path = os.path.join(path, mappings_file_name) - return json.loads(render_template(mappings_path, aggregate=aggregate, facet=facet)) - - -ELASTIC_IMPORT_CONFIG = ElasticImportConfig( - elastic_mappings_path=elastic_mappings_folder(), - elastic_mappings_func=load_elastic_mappings_ao, - kibana_time_fields=[TimeField("^.*$", "published_year")], - index_keep_info={ - "": KeepInfo(ordering=KeepOrder.newest, num=2), - "ao": KeepInfo(ordering=KeepOrder.newest, num=2), - }, -) diff --git a/academic_observatory_workflows/tests/test_config.py b/academic_observatory_workflows/tests/test_config.py deleted file mode 100644 index 275e61937..000000000 --- a/academic_observatory_workflows/tests/test_config.py +++ /dev/null @@ -1,28 +0,0 @@ -# Copyright 2020-2021 Curtin University. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import unittest - -from academic_observatory_workflows.config import elastic_mappings_folder -from observatory.platform.config import module_file_path - - -class AcademicObservatoryWorkflowsConfig(unittest.TestCase): - def test_elastic_schema_path(self): - """Test that the Elasticsearch schema path is correct""" - - expected_path = module_file_path("academic_observatory_workflows.database.mappings") - actual_path = elastic_mappings_folder() - self.assertEqual(expected_path, actual_path) diff --git a/academic_observatory_workflows/workflows/crossref_events_telescope.py b/academic_observatory_workflows/workflows/crossref_events_telescope.py index 8b7646a08..44f004d4c 100644 --- a/academic_observatory_workflows/workflows/crossref_events_telescope.py +++ b/academic_observatory_workflows/workflows/crossref_events_telescope.py @@ -179,7 +179,7 @@ def __init__( primary_key: str = "id", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2017, 2, 12), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", queue: str = "remote_queue", ): """Construct a CrossrefEventsTelescope instance. @@ -202,14 +202,14 @@ def __init__( :param primary_key: the primary key to use when merging files. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param queue: what queue to run the DAG on. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], @@ -240,7 +240,7 @@ def __init__( PreviousDagRunSensor( dag_id=self.dag_id, external_task_id=external_task_id, - execution_delta=timedelta(days=7), # To match the @weekly schedule_interval + execution_delta=timedelta(days=7), # To match the @weekly schedule ) ) diff --git a/academic_observatory_workflows/workflows/crossref_fundref_telescope.py b/academic_observatory_workflows/workflows/crossref_fundref_telescope.py index fe30dac1a..6dd6c0425 100644 --- a/academic_observatory_workflows/workflows/crossref_fundref_telescope.py +++ b/academic_observatory_workflows/workflows/crossref_fundref_telescope.py @@ -84,7 +84,7 @@ def __init__( table_description: str = "The Crossref Funder Registry dataset: https://www.crossref.org/services/funder-registry/", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2014, 2, 23), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", catchup: bool = True, gitlab_pool_name: str = "gitlab_pool", gitlab_pool_slots: int = 2, @@ -102,7 +102,7 @@ def __init__( :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param catchup: whether to catchup the DAG or not. :param gitlab_pool_name: name of the Gitlab Pool. :param gitlab_pool_slots: number of slots for the Gitlab Pool. @@ -112,7 +112,7 @@ def __init__( super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=catchup, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/crossref_metadata_telescope.py b/academic_observatory_workflows/workflows/crossref_metadata_telescope.py index 6b3ba81c8..4792ad67c 100644 --- a/academic_observatory_workflows/workflows/crossref_metadata_telescope.py +++ b/academic_observatory_workflows/workflows/crossref_metadata_telescope.py @@ -101,7 +101,7 @@ def __init__( max_processes: int = os.cpu_count(), batch_size: int = 20, start_date: pendulum.DateTime = pendulum.datetime(2020, 6, 7), - schedule_interval: str = "0 0 7 * *", + schedule: str = "0 0 7 * *", catchup: bool = True, queue: str = "remote_queue", max_active_runs: int = 1, @@ -121,7 +121,7 @@ def __init__( :param max_processes: the number of processes used with ProcessPoolExecutor to transform files in parallel. :param batch_size: the number of files to send to ProcessPoolExecutor at one time. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param catchup: whether to catchup the DAG or not. :param queue: what Airflow queue this job runs on. :param max_active_runs: the maximum number of DAG runs that can be run at once. @@ -130,7 +130,7 @@ def __init__( super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=catchup, airflow_conns=[observatory_api_conn_id, crossref_metadata_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/doi_workflow.py b/academic_observatory_workflows/workflows/doi_workflow.py index 005d2f0f1..426a9879b 100644 --- a/academic_observatory_workflows/workflows/doi_workflow.py +++ b/academic_observatory_workflows/workflows/doi_workflow.py @@ -406,7 +406,7 @@ def __init__( max_fetch_threads: int = 4, observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: Optional[pendulum.DateTime] = pendulum.datetime(2020, 8, 30), - schedule_interval: Optional[str] = "@weekly", + schedule: Optional[str] = "@weekly", sensor_dag_ids: List[str] = None, ): """Create the DoiWorkflow. @@ -420,13 +420,13 @@ def __init__( :param api_dataset_id: the DOI dataset id. :param max_fetch_threads: maximum number of threads to use when fetching. :param start_date: the start date. - :param schedule_interval: the schedule interval. + :param schedule: the schedule interval. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/geonames_telescope.py b/academic_observatory_workflows/workflows/geonames_telescope.py index 54a18e7d7..c1ca0f816 100644 --- a/academic_observatory_workflows/workflows/geonames_telescope.py +++ b/academic_observatory_workflows/workflows/geonames_telescope.py @@ -103,7 +103,7 @@ def __init__( table_description: str = "The GeoNames geographical database: https://www.geonames.org/", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2020, 9, 1), - schedule_interval: str = "@monthly", + schedule: str = "@monthly", ): """The Geonames telescope. @@ -117,13 +117,13 @@ def __init__( :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/oa_web_workflow.py b/academic_observatory_workflows/workflows/oa_web_workflow.py index 386f1f6b3..36f687c17 100644 --- a/academic_observatory_workflows/workflows/oa_web_workflow.py +++ b/academic_observatory_workflows/workflows/oa_web_workflow.py @@ -288,7 +288,7 @@ def __init__( github_conn_id="oa_web_github_token", zenodo_conn_id="oa_web_zenodo_token", start_date: Optional[pendulum.DateTime] = pendulum.datetime(2021, 5, 2), - schedule_interval: Optional[str] = "@weekly", + schedule: Optional[str] = "@weekly", ): """Create the OaWebWorkflow. @@ -309,7 +309,7 @@ def __init__( :param github_conn_id: the Github Token Airflow Connection ID. :param zenodo_conn_id: the Zenodo Token Airflow Connection ID. :param start_date: the start date. - :param schedule_interval: the schedule interval. + :param schedule: the schedule interval. """ if table_names is None: @@ -318,7 +318,7 @@ def __init__( super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[github_conn_id, zenodo_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/open_citations_telescope.py b/academic_observatory_workflows/workflows/open_citations_telescope.py index 142c97bfc..2e25c3d29 100644 --- a/academic_observatory_workflows/workflows/open_citations_telescope.py +++ b/academic_observatory_workflows/workflows/open_citations_telescope.py @@ -75,7 +75,7 @@ def __init__( table_description: str = "The OpenCitations COCI CSV table: http://opencitations.net/", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 7, 1), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", catchup: bool = True, queue: str = "remote_queue", ): @@ -92,14 +92,14 @@ def __init__( :param observatory_api_conn_id: the Observatory API connection key. :param catchup: whether to catchup the DAG or not. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param queue: what Airflow queue to use. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=catchup, airflow_conns=[observatory_api_conn_id], queue=queue, diff --git a/academic_observatory_workflows/workflows/openalex_telescope.py b/academic_observatory_workflows/workflows/openalex_telescope.py index 35bac8d1d..92ee080f2 100644 --- a/academic_observatory_workflows/workflows/openalex_telescope.py +++ b/academic_observatory_workflows/workflows/openalex_telescope.py @@ -299,7 +299,7 @@ def __init__( aws_openalex_bucket: str = "openalex", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2021, 12, 1), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", queue: str = "remote_queue", ): """Construct an OpenAlexTelescope instance. @@ -320,7 +320,7 @@ def __init__( :param aws_openalex_bucket: the OpenAlex AWS bucket name. :param observatory_api_conn_id: the Observatory API Airflow Connection ID. :param start_date: the Apache Airflow DAG start date. - :param schedule_interval: the Apache Airflow schedule interval. Whilst OpenAlex snapshots are released monthly, + :param schedule: the Apache Airflow schedule interval. Whilst OpenAlex snapshots are released monthly, they are not released on any particular day of the month, so we instead simply run the workflow weekly on a Sunday as this will pickup new updates regularly. See here for past release dates: https://openalex.s3.amazonaws.com/RELEASE_NOTES.txt :param queue: @@ -340,7 +340,7 @@ def __init__( super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id, aws_conn_id], queue=queue, @@ -366,7 +366,7 @@ def __init__( PreviousDagRunSensor( dag_id=self.dag_id, external_task_id=external_task_id, - execution_delta=timedelta(days=7), # To match the @weekly schedule_interval + execution_delta=timedelta(days=7), # To match the @weekly schedule ) ) self.add_setup_task(self.check_dependencies) diff --git a/academic_observatory_workflows/workflows/ror_telescope.py b/academic_observatory_workflows/workflows/ror_telescope.py index fb9221996..a3018d245 100644 --- a/academic_observatory_workflows/workflows/ror_telescope.py +++ b/academic_observatory_workflows/workflows/ror_telescope.py @@ -90,7 +90,7 @@ def __init__( observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, ror_conceptrecid: int = 6347574, start_date: pendulum.DateTime = pendulum.datetime(2021, 9, 1), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", catchup: bool = True, ): """Construct a RorTelescope instance. @@ -106,14 +106,14 @@ def __init__( :param observatory_api_conn_id: the Observatory API connection key. :param ror_conceptrecid: the Zenodo conceptrecid for the ROR dataset. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param catchup: whether to catchup the DAG or not. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=catchup, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/scopus_telescope.py b/academic_observatory_workflows/workflows/scopus_telescope.py index 0a686cdc2..835f764dd 100644 --- a/academic_observatory_workflows/workflows/scopus_telescope.py +++ b/academic_observatory_workflows/workflows/scopus_telescope.py @@ -109,7 +109,7 @@ def __init__( table_description: str = "The Scopus citation database: https://www.scopus.com", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14), - schedule_interval: str = "@monthly", + schedule: str = "@monthly", ): """Scopus telescope. :param dag_id: the id of the DAG. @@ -126,13 +126,13 @@ def __init__( :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id] + scopus_conn_ids, tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/tests/test_elastic_import_workflow.py b/academic_observatory_workflows/workflows/tests/test_elastic_import_workflow.py deleted file mode 100644 index 9a5879f2f..000000000 --- a/academic_observatory_workflows/workflows/tests/test_elastic_import_workflow.py +++ /dev/null @@ -1,160 +0,0 @@ -# Copyright 2021 Curtin University -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Author: James Diprose - -from __future__ import annotations - -import json -import logging -import os - -from academic_observatory_workflows.config import elastic_mappings_folder, load_elastic_mappings_ao -from observatory.platform.files import load_file -from observatory.platform.observatory_config import Workflow -from observatory.platform.observatory_environment import ObservatoryEnvironment, ObservatoryTestCase -from observatory.platform.utils.jinja2_utils import render_template - - -class TestElasticImportWorkflow(ObservatoryTestCase): - """Tests for the Elastic Import Workflow""" - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - - self.dag_id = "elastic_import_observatory" - self.project_id = os.getenv("TEST_GCP_PROJECT_ID") - self.data_location = os.getenv("TEST_GCP_DATA_LOCATION") - - def test_load_elastic_mappings_ao(self): - """Test load_elastic_mappings_ao""" - - path = elastic_mappings_folder() - aggregate = "author" - expected = [ - ("ao_dois", load_file(os.path.join(path, "ao-dois-mappings.json"))), - ( - "ao_author_access_types", - render_template( - os.path.join(path, "ao-access-types-mappings.json.jinja2"), - aggregate=aggregate, - facet="access_types", - ), - ), - ( - "ao_author_disciplines", - render_template( - os.path.join(path, "ao-disciplines-mappings.json.jinja2"), aggregate=aggregate, facet="disciplines" - ), - ), - ( - "ao_author_events", - render_template( - os.path.join(path, "ao-events-mappings.json.jinja2"), aggregate=aggregate, facet="events" - ), - ), - ( - "ao_author_metrics", - render_template( - os.path.join(path, "ao-metrics-mappings.json.jinja2"), aggregate=aggregate, facet="metrics" - ), - ), - ( - "ao_author_output_types", - render_template( - os.path.join(path, "ao-output-types-mappings.json.jinja2"), - aggregate=aggregate, - facet="output_types", - ), - ), - ( - "ao_author_unique_list", - render_template( - os.path.join(path, "ao-unique-list-mappings.json.jinja2"), aggregate=aggregate, facet="unique_list" - ), - ), - ( - "ao_author_output_types", - render_template( - os.path.join(path, "ao-output-types-mappings.json.jinja2"), - aggregate=aggregate, - facet="output_types", - ), - ), - ( - "ao_author_countries", - render_template( - os.path.join(path, "ao-relations-mappings.json.jinja2"), aggregate=aggregate, facet="countries" - ), - ), - ( - "ao_author_funders", - render_template( - os.path.join(path, "ao-relations-mappings.json.jinja2"), aggregate=aggregate, facet="funders" - ), - ), - ( - "ao_author_groupings", - render_template( - os.path.join(path, "ao-relations-mappings.json.jinja2"), aggregate=aggregate, facet="groupings" - ), - ), - ( - "ao_author_institutions", - render_template( - os.path.join(path, "ao-relations-mappings.json.jinja2"), aggregate=aggregate, facet="institutions" - ), - ), - ( - "ao_author_journals", - render_template( - os.path.join(path, "ao-relations-mappings.json.jinja2"), aggregate=aggregate, facet="journals" - ), - ), - ( - "ao_author_publishers", - render_template( - os.path.join(path, "ao-relations-mappings.json.jinja2"), aggregate=aggregate, facet="publishers" - ), - ), - ] - - for table_id, expected_mappings_str in expected: - logging.info(table_id) - expected_mappings = json.loads(expected_mappings_str) - actual_mappings = load_elastic_mappings_ao(path, table_id) - self.assertEqual(expected_mappings, actual_mappings) - - def test_dag_load(self): - """Test that workflow can be loaded from a DAG bag.""" - - env = ObservatoryEnvironment( - workflows=[ - Workflow( - dag_id=self.dag_id, - name="Elastic Import Observatory Workflow", - class_name="observatory.platform.workflows.elastic_import_workflow.ElasticImportWorkflow", - cloud_workspace=self.fake_cloud_workspace, - kwargs=dict( - sensor_dag_ids=["doi"], - kibana_spaces=["test-space"], - elastic_import_config="academic_observatory_workflows.config.ELASTIC_IMPORT_CONFIG", - tags=["academic-observatory"], - ), - ) - ] - ) - - with env.create(): - self.assert_dag_load_from_config(self.dag_id) diff --git a/academic_observatory_workflows/workflows/unpaywall_telescope.py b/academic_observatory_workflows/workflows/unpaywall_telescope.py index 94eec490b..96a112c19 100644 --- a/academic_observatory_workflows/workflows/unpaywall_telescope.py +++ b/academic_observatory_workflows/workflows/unpaywall_telescope.py @@ -215,7 +215,7 @@ def __init__( unpaywall_conn_id: str = "unpaywall", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2021, 7, 2), - schedule_interval: str = "@daily", + schedule: str = "@daily", ): """The Unpaywall Data Feed Telescope. @@ -233,13 +233,13 @@ def __init__( :param unpaywall_conn_id: Unpaywall connection key. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id, unpaywall_conn_id], tags=[Tag.academic_observatory], @@ -269,7 +269,7 @@ def __init__( PreviousDagRunSensor( dag_id=self.dag_id, external_task_id=external_task_id, - execution_delta=timedelta(days=1), # To match the @daily schedule_interval + execution_delta=timedelta(days=1), # To match the @daily schedule ) ) self.add_setup_task(self.check_dependencies) diff --git a/academic_observatory_workflows/workflows/web_of_science_telescope.py b/academic_observatory_workflows/workflows/web_of_science_telescope.py index b256ee552..c9d6ab2d9 100644 --- a/academic_observatory_workflows/workflows/web_of_science_telescope.py +++ b/academic_observatory_workflows/workflows/web_of_science_telescope.py @@ -107,7 +107,7 @@ def __init__( table_description: str = "The Web of Science citation database: https://clarivate.com/webofsciencegroup/solutions/web-of-science", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14), - schedule_interval: str = "@monthly", + schedule: str = "@monthly", ): """Web of Science telescope. @@ -124,13 +124,13 @@ def __init__( :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id, wos_conn_id], tags=[Tag.academic_observatory], diff --git a/requirements.txt b/requirements.txt index e8434dc9c..f39de8a0c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,10 +1,10 @@ -xmltodict==0.12.* -backoff<2,>=1.11.0 -ratelimit==2.2.* -wos==0.2.* -pandas>=1.3,<2 +xmltodict>=0.12.0,<1 +backoff>=2,<3 +ratelimit>=2.2.0,<3 +wos>=0.2.5,<1 +pandas>=1.3,<3 beautifulsoup4>=4.9.3,<5 boto3>=1.15.0,<2 -nltk==3.* -Deprecated>1,<2 -limits>3,<4 \ No newline at end of file +nltk>=3,<4 +Deprecated>=1,<2 +limits>=3,<4 \ No newline at end of file diff --git a/setup.cfg b/setup.cfg index 1ddbed76e..8bc71d52a 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,16 +1,16 @@ [metadata] name = academic-observatory-workflows author = Curtin University -author-email = agent@observatory.academy +author_email = agent@observatory.academy summary = Academic Observatory Workflows provides Apache Airflow Workflows for fetching, processing and analysing data about academic institutions. -description-file = README.md -description-content-type = text/markdown; charset=UTF-8 -home-page = https://github.com/The-Academic-Observatory/academic-observatory-workflows +description_file = README.md +description_content_type = text/markdown; charset=UTF-8 +home_page = https://github.com/The-Academic-Observatory/academic-observatory-workflows project_urls = Bug Tracker = https://github.com/The-Academic-Observatory/academic-observatory-workflows/issues Documentation = https://academic-observatory-workflows.readthedocs.io/en/latest/ Source Code = https://github.com/The-Academic-Observatory/academic-observatory-workflows -python-requires = >=3.7 +python_requires = >=3.7 license = Apache License Version 2.0 classifier = Development Status :: 2 - Pre-Alpha @@ -43,10 +43,6 @@ data_files = academic_observatory_workflows/database = academic_observatory_workflows/database/* academic_observatory_workflows/workflows/data = academic_observatory_workflows/workflows/data/* -[entry_points] -console_scripts = - academic-observatory-workflows-seed = academic_observatory_workflows.seed.seed:seed - [extras] tests = liccheck==0.4.*