From 90347472d850596cf0257008eb7edbe465d44e3a Mon Sep 17 00:00:00 2001 From: Jamie Diprose <5715104+jdddog@users.noreply.github.com> Date: Wed, 2 Aug 2023 14:00:17 +1200 Subject: [PATCH] schedule_interval -> schedule and remove elastic import workflow --- academic_observatory_workflows/config.py | 52 ------------------- .../workflows/crossref_events_telescope.py | 8 +-- .../workflows/crossref_fundref_telescope.py | 6 +-- .../workflows/crossref_metadata_telescope.py | 6 +-- .../workflows/doi_workflow.py | 6 +-- .../workflows/geonames_telescope.py | 6 +-- .../workflows/oa_web_workflow.py | 6 +-- .../workflows/open_citations_telescope.py | 6 +-- .../workflows/openalex_telescope.py | 8 +-- .../workflows/ror_telescope.py | 6 +-- .../workflows/scopus_telescope.py | 6 +-- .../workflows/unpaywall_telescope.py | 8 +-- .../workflows/web_of_science_telescope.py | 6 +-- 13 files changed, 39 insertions(+), 91 deletions(-) diff --git a/academic_observatory_workflows/config.py b/academic_observatory_workflows/config.py index d67e0af4b..1496b1710 100644 --- a/academic_observatory_workflows/config.py +++ b/academic_observatory_workflows/config.py @@ -14,15 +14,9 @@ # Author: James Diprose -import json import os -from typing import List from observatory.platform.config import module_file_path -from observatory.platform.elastic.elastic import KeepInfo, KeepOrder -from observatory.platform.elastic.kibana import TimeField -from observatory.platform.utils.jinja2_utils import render_template -from observatory.platform.workflows.elastic_import_workflow import load_elastic_mappings_simple, ElasticImportConfig class Tag: @@ -57,49 +51,3 @@ def sql_folder() -> str: """ return module_file_path("academic_observatory_workflows.database.sql") - - -def elastic_mappings_folder() -> str: - """Get the Elasticsearch mappings path. - - :return: the elastic search schema path. - """ - - return module_file_path("academic_observatory_workflows.database.mappings") - - -def load_elastic_mappings_ao(path: str, table_prefix: str, simple_prefixes: List = None): - """For the Observatory project, load the Elastic mappings for a given table_prefix. - :param path: the path to the mappings files. - :param table_prefix: the table_id prefix (without shard date). - :param simple_prefixes: the prefixes of mappings to load with the load_elastic_mappings_simple function. - :return: the rendered mapping as a Dict. - """ - - # Set default simple_prefixes - if simple_prefixes is None: - simple_prefixes = ["ao_doi"] - - if not table_prefix.startswith("ao"): - raise ValueError("Table must begin with 'ao'") - elif any([table_prefix.startswith(prefix) for prefix in simple_prefixes]): - return load_elastic_mappings_simple(path, table_prefix) - else: - prefix, aggregate, facet = table_prefix.split("_", 2) - mappings_file_name = "ao-relations-mappings.json.jinja2" - is_fixed_facet = facet in ["unique_list", "access_types", "disciplines", "output_types", "events", "metrics"] - if is_fixed_facet: - mappings_file_name = f"ao-{facet.replace('_', '-')}-mappings.json.jinja2" - mappings_path = os.path.join(path, mappings_file_name) - return json.loads(render_template(mappings_path, aggregate=aggregate, facet=facet)) - - -ELASTIC_IMPORT_CONFIG = ElasticImportConfig( - elastic_mappings_path=elastic_mappings_folder(), - elastic_mappings_func=load_elastic_mappings_ao, - kibana_time_fields=[TimeField("^.*$", "published_year")], - index_keep_info={ - "": KeepInfo(ordering=KeepOrder.newest, num=2), - "ao": KeepInfo(ordering=KeepOrder.newest, num=2), - }, -) diff --git a/academic_observatory_workflows/workflows/crossref_events_telescope.py b/academic_observatory_workflows/workflows/crossref_events_telescope.py index 8b7646a08..44f004d4c 100644 --- a/academic_observatory_workflows/workflows/crossref_events_telescope.py +++ b/academic_observatory_workflows/workflows/crossref_events_telescope.py @@ -179,7 +179,7 @@ def __init__( primary_key: str = "id", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2017, 2, 12), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", queue: str = "remote_queue", ): """Construct a CrossrefEventsTelescope instance. @@ -202,14 +202,14 @@ def __init__( :param primary_key: the primary key to use when merging files. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param queue: what queue to run the DAG on. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], @@ -240,7 +240,7 @@ def __init__( PreviousDagRunSensor( dag_id=self.dag_id, external_task_id=external_task_id, - execution_delta=timedelta(days=7), # To match the @weekly schedule_interval + execution_delta=timedelta(days=7), # To match the @weekly schedule ) ) diff --git a/academic_observatory_workflows/workflows/crossref_fundref_telescope.py b/academic_observatory_workflows/workflows/crossref_fundref_telescope.py index fe30dac1a..6dd6c0425 100644 --- a/academic_observatory_workflows/workflows/crossref_fundref_telescope.py +++ b/academic_observatory_workflows/workflows/crossref_fundref_telescope.py @@ -84,7 +84,7 @@ def __init__( table_description: str = "The Crossref Funder Registry dataset: https://www.crossref.org/services/funder-registry/", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2014, 2, 23), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", catchup: bool = True, gitlab_pool_name: str = "gitlab_pool", gitlab_pool_slots: int = 2, @@ -102,7 +102,7 @@ def __init__( :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param catchup: whether to catchup the DAG or not. :param gitlab_pool_name: name of the Gitlab Pool. :param gitlab_pool_slots: number of slots for the Gitlab Pool. @@ -112,7 +112,7 @@ def __init__( super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=catchup, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/crossref_metadata_telescope.py b/academic_observatory_workflows/workflows/crossref_metadata_telescope.py index 6b3ba81c8..4792ad67c 100644 --- a/academic_observatory_workflows/workflows/crossref_metadata_telescope.py +++ b/academic_observatory_workflows/workflows/crossref_metadata_telescope.py @@ -101,7 +101,7 @@ def __init__( max_processes: int = os.cpu_count(), batch_size: int = 20, start_date: pendulum.DateTime = pendulum.datetime(2020, 6, 7), - schedule_interval: str = "0 0 7 * *", + schedule: str = "0 0 7 * *", catchup: bool = True, queue: str = "remote_queue", max_active_runs: int = 1, @@ -121,7 +121,7 @@ def __init__( :param max_processes: the number of processes used with ProcessPoolExecutor to transform files in parallel. :param batch_size: the number of files to send to ProcessPoolExecutor at one time. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param catchup: whether to catchup the DAG or not. :param queue: what Airflow queue this job runs on. :param max_active_runs: the maximum number of DAG runs that can be run at once. @@ -130,7 +130,7 @@ def __init__( super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=catchup, airflow_conns=[observatory_api_conn_id, crossref_metadata_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/doi_workflow.py b/academic_observatory_workflows/workflows/doi_workflow.py index b8eb27eb2..7e68be5b9 100644 --- a/academic_observatory_workflows/workflows/doi_workflow.py +++ b/academic_observatory_workflows/workflows/doi_workflow.py @@ -522,7 +522,7 @@ def __init__( max_fetch_threads: int = 4, observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: Optional[pendulum.DateTime] = pendulum.datetime(2020, 8, 30), - schedule_interval: Optional[str] = "@weekly", + schedule: Optional[str] = "@weekly", sensor_dag_ids: List[str] = None, ): """Create the DoiWorkflow. @@ -537,13 +537,13 @@ def __init__( :param api_dataset_id: the DOI dataset id. :param max_fetch_threads: maximum number of threads to use when fetching. :param start_date: the start date. - :param schedule_interval: the schedule interval. + :param schedule: the schedule interval. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/geonames_telescope.py b/academic_observatory_workflows/workflows/geonames_telescope.py index 54a18e7d7..c1ca0f816 100644 --- a/academic_observatory_workflows/workflows/geonames_telescope.py +++ b/academic_observatory_workflows/workflows/geonames_telescope.py @@ -103,7 +103,7 @@ def __init__( table_description: str = "The GeoNames geographical database: https://www.geonames.org/", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2020, 9, 1), - schedule_interval: str = "@monthly", + schedule: str = "@monthly", ): """The Geonames telescope. @@ -117,13 +117,13 @@ def __init__( :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/oa_web_workflow.py b/academic_observatory_workflows/workflows/oa_web_workflow.py index ce077c8e7..f42e08dae 100644 --- a/academic_observatory_workflows/workflows/oa_web_workflow.py +++ b/academic_observatory_workflows/workflows/oa_web_workflow.py @@ -288,7 +288,7 @@ def __init__( github_conn_id="oa_web_github_token", zenodo_conn_id="oa_web_zenodo_token", start_date: Optional[pendulum.DateTime] = pendulum.datetime(2021, 5, 2), - schedule_interval: Optional[str] = "@weekly", + schedule: Optional[str] = "@weekly", ): """Create the OaWebWorkflow. @@ -309,7 +309,7 @@ def __init__( :param github_conn_id: the Github Token Airflow Connection ID. :param zenodo_conn_id: the Zenodo Token Airflow Connection ID. :param start_date: the start date. - :param schedule_interval: the schedule interval. + :param schedule: the schedule interval. """ if table_names is None: @@ -318,7 +318,7 @@ def __init__( super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[github_conn_id, zenodo_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/open_citations_telescope.py b/academic_observatory_workflows/workflows/open_citations_telescope.py index 142c97bfc..2e25c3d29 100644 --- a/academic_observatory_workflows/workflows/open_citations_telescope.py +++ b/academic_observatory_workflows/workflows/open_citations_telescope.py @@ -75,7 +75,7 @@ def __init__( table_description: str = "The OpenCitations COCI CSV table: http://opencitations.net/", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 7, 1), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", catchup: bool = True, queue: str = "remote_queue", ): @@ -92,14 +92,14 @@ def __init__( :param observatory_api_conn_id: the Observatory API connection key. :param catchup: whether to catchup the DAG or not. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param queue: what Airflow queue to use. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=catchup, airflow_conns=[observatory_api_conn_id], queue=queue, diff --git a/academic_observatory_workflows/workflows/openalex_telescope.py b/academic_observatory_workflows/workflows/openalex_telescope.py index 35bac8d1d..92ee080f2 100644 --- a/academic_observatory_workflows/workflows/openalex_telescope.py +++ b/academic_observatory_workflows/workflows/openalex_telescope.py @@ -299,7 +299,7 @@ def __init__( aws_openalex_bucket: str = "openalex", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2021, 12, 1), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", queue: str = "remote_queue", ): """Construct an OpenAlexTelescope instance. @@ -320,7 +320,7 @@ def __init__( :param aws_openalex_bucket: the OpenAlex AWS bucket name. :param observatory_api_conn_id: the Observatory API Airflow Connection ID. :param start_date: the Apache Airflow DAG start date. - :param schedule_interval: the Apache Airflow schedule interval. Whilst OpenAlex snapshots are released monthly, + :param schedule: the Apache Airflow schedule interval. Whilst OpenAlex snapshots are released monthly, they are not released on any particular day of the month, so we instead simply run the workflow weekly on a Sunday as this will pickup new updates regularly. See here for past release dates: https://openalex.s3.amazonaws.com/RELEASE_NOTES.txt :param queue: @@ -340,7 +340,7 @@ def __init__( super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id, aws_conn_id], queue=queue, @@ -366,7 +366,7 @@ def __init__( PreviousDagRunSensor( dag_id=self.dag_id, external_task_id=external_task_id, - execution_delta=timedelta(days=7), # To match the @weekly schedule_interval + execution_delta=timedelta(days=7), # To match the @weekly schedule ) ) self.add_setup_task(self.check_dependencies) diff --git a/academic_observatory_workflows/workflows/ror_telescope.py b/academic_observatory_workflows/workflows/ror_telescope.py index fb9221996..a3018d245 100644 --- a/academic_observatory_workflows/workflows/ror_telescope.py +++ b/academic_observatory_workflows/workflows/ror_telescope.py @@ -90,7 +90,7 @@ def __init__( observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, ror_conceptrecid: int = 6347574, start_date: pendulum.DateTime = pendulum.datetime(2021, 9, 1), - schedule_interval: str = "@weekly", + schedule: str = "@weekly", catchup: bool = True, ): """Construct a RorTelescope instance. @@ -106,14 +106,14 @@ def __init__( :param observatory_api_conn_id: the Observatory API connection key. :param ror_conceptrecid: the Zenodo conceptrecid for the ROR dataset. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. :param catchup: whether to catchup the DAG or not. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=catchup, airflow_conns=[observatory_api_conn_id], tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/scopus_telescope.py b/academic_observatory_workflows/workflows/scopus_telescope.py index 0a686cdc2..835f764dd 100644 --- a/academic_observatory_workflows/workflows/scopus_telescope.py +++ b/academic_observatory_workflows/workflows/scopus_telescope.py @@ -109,7 +109,7 @@ def __init__( table_description: str = "The Scopus citation database: https://www.scopus.com", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14), - schedule_interval: str = "@monthly", + schedule: str = "@monthly", ): """Scopus telescope. :param dag_id: the id of the DAG. @@ -126,13 +126,13 @@ def __init__( :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id] + scopus_conn_ids, tags=[Tag.academic_observatory], diff --git a/academic_observatory_workflows/workflows/unpaywall_telescope.py b/academic_observatory_workflows/workflows/unpaywall_telescope.py index 94eec490b..96a112c19 100644 --- a/academic_observatory_workflows/workflows/unpaywall_telescope.py +++ b/academic_observatory_workflows/workflows/unpaywall_telescope.py @@ -215,7 +215,7 @@ def __init__( unpaywall_conn_id: str = "unpaywall", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2021, 7, 2), - schedule_interval: str = "@daily", + schedule: str = "@daily", ): """The Unpaywall Data Feed Telescope. @@ -233,13 +233,13 @@ def __init__( :param unpaywall_conn_id: Unpaywall connection key. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id, unpaywall_conn_id], tags=[Tag.academic_observatory], @@ -269,7 +269,7 @@ def __init__( PreviousDagRunSensor( dag_id=self.dag_id, external_task_id=external_task_id, - execution_delta=timedelta(days=1), # To match the @daily schedule_interval + execution_delta=timedelta(days=1), # To match the @daily schedule ) ) self.add_setup_task(self.check_dependencies) diff --git a/academic_observatory_workflows/workflows/web_of_science_telescope.py b/academic_observatory_workflows/workflows/web_of_science_telescope.py index b256ee552..c9d6ab2d9 100644 --- a/academic_observatory_workflows/workflows/web_of_science_telescope.py +++ b/academic_observatory_workflows/workflows/web_of_science_telescope.py @@ -107,7 +107,7 @@ def __init__( table_description: str = "The Web of Science citation database: https://clarivate.com/webofsciencegroup/solutions/web-of-science", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14), - schedule_interval: str = "@monthly", + schedule: str = "@monthly", ): """Web of Science telescope. @@ -124,13 +124,13 @@ def __init__( :param table_description: description for the BigQuery table. :param observatory_api_conn_id: the Observatory API connection key. :param start_date: the start date of the DAG. - :param schedule_interval: the schedule interval of the DAG. + :param schedule: the schedule interval of the DAG. """ super().__init__( dag_id=dag_id, start_date=start_date, - schedule_interval=schedule_interval, + schedule=schedule, catchup=False, airflow_conns=[observatory_api_conn_id, wos_conn_id], tags=[Tag.academic_observatory],