Skip to content

Commit

Permalink
Feature/airflow2.6.3 (#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
jdddog authored Aug 7, 2023
1 parent deab587 commit c59e770
Show file tree
Hide file tree
Showing 18 changed files with 56 additions and 300 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,12 @@ jobs:
cd ..
git clone https://github.com/The-Academic-Observatory/observatory-platform.git
cd observatory-platform
pip install -e observatory-api
pip install -e observatory-platform
pip install -e observatory-api --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-no-providers-${{ matrix.python-version }}.txt
pip install -e observatory-platform --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-no-providers-${{ matrix.python-version }}.txt
cd ..
cd academic-observatory-workflows
pip install -e .[tests]
pip install -e .[tests] --constraint https://raw.githubusercontent.com/apache/airflow/constraints-2.6.3/constraints-no-providers-${{ matrix.python-version }}.txt
- name: Check licenses
run: |
Expand Down
52 changes: 0 additions & 52 deletions academic_observatory_workflows/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,9 @@

# Author: James Diprose

import json
import os
from typing import List

from observatory.platform.config import module_file_path
from observatory.platform.elastic.elastic import KeepInfo, KeepOrder
from observatory.platform.elastic.kibana import TimeField
from observatory.platform.utils.jinja2_utils import render_template
from observatory.platform.workflows.elastic_import_workflow import load_elastic_mappings_simple, ElasticImportConfig


class Tag:
Expand Down Expand Up @@ -57,49 +51,3 @@ def sql_folder() -> str:
"""

return module_file_path("academic_observatory_workflows.database.sql")


def elastic_mappings_folder() -> str:
"""Get the Elasticsearch mappings path.
:return: the elastic search schema path.
"""

return module_file_path("academic_observatory_workflows.database.mappings")


def load_elastic_mappings_ao(path: str, table_prefix: str, simple_prefixes: List = None):
"""For the Observatory project, load the Elastic mappings for a given table_prefix.
:param path: the path to the mappings files.
:param table_prefix: the table_id prefix (without shard date).
:param simple_prefixes: the prefixes of mappings to load with the load_elastic_mappings_simple function.
:return: the rendered mapping as a Dict.
"""

# Set default simple_prefixes
if simple_prefixes is None:
simple_prefixes = ["ao_doi"]

if not table_prefix.startswith("ao"):
raise ValueError("Table must begin with 'ao'")
elif any([table_prefix.startswith(prefix) for prefix in simple_prefixes]):
return load_elastic_mappings_simple(path, table_prefix)
else:
prefix, aggregate, facet = table_prefix.split("_", 2)
mappings_file_name = "ao-relations-mappings.json.jinja2"
is_fixed_facet = facet in ["unique_list", "access_types", "disciplines", "output_types", "events", "metrics"]
if is_fixed_facet:
mappings_file_name = f"ao-{facet.replace('_', '-')}-mappings.json.jinja2"
mappings_path = os.path.join(path, mappings_file_name)
return json.loads(render_template(mappings_path, aggregate=aggregate, facet=facet))


ELASTIC_IMPORT_CONFIG = ElasticImportConfig(
elastic_mappings_path=elastic_mappings_folder(),
elastic_mappings_func=load_elastic_mappings_ao,
kibana_time_fields=[TimeField("^.*$", "published_year")],
index_keep_info={
"": KeepInfo(ordering=KeepOrder.newest, num=2),
"ao": KeepInfo(ordering=KeepOrder.newest, num=2),
},
)
28 changes: 0 additions & 28 deletions academic_observatory_workflows/tests/test_config.py

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ def __init__(
primary_key: str = "id",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
start_date: pendulum.DateTime = pendulum.datetime(2017, 2, 12),
schedule_interval: str = "@weekly",
schedule: str = "@weekly",
queue: str = "remote_queue",
):
"""Construct a CrossrefEventsTelescope instance.
Expand All @@ -202,14 +202,14 @@ def __init__(
:param primary_key: the primary key to use when merging files.
:param observatory_api_conn_id: the Observatory API connection key.
:param start_date: the start date of the DAG.
:param schedule_interval: the schedule interval of the DAG.
:param schedule: the schedule interval of the DAG.
:param queue: what queue to run the DAG on.
"""

super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=False,
airflow_conns=[observatory_api_conn_id],
tags=[Tag.academic_observatory],
Expand Down Expand Up @@ -240,7 +240,7 @@ def __init__(
PreviousDagRunSensor(
dag_id=self.dag_id,
external_task_id=external_task_id,
execution_delta=timedelta(days=7), # To match the @weekly schedule_interval
execution_delta=timedelta(days=7), # To match the @weekly schedule
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def __init__(
table_description: str = "The Crossref Funder Registry dataset: https://www.crossref.org/services/funder-registry/",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
start_date: pendulum.DateTime = pendulum.datetime(2014, 2, 23),
schedule_interval: str = "@weekly",
schedule: str = "@weekly",
catchup: bool = True,
gitlab_pool_name: str = "gitlab_pool",
gitlab_pool_slots: int = 2,
Expand All @@ -102,7 +102,7 @@ def __init__(
:param table_description: description for the BigQuery table.
:param observatory_api_conn_id: the Observatory API connection key.
:param start_date: the start date of the DAG.
:param schedule_interval: the schedule interval of the DAG.
:param schedule: the schedule interval of the DAG.
:param catchup: whether to catchup the DAG or not.
:param gitlab_pool_name: name of the Gitlab Pool.
:param gitlab_pool_slots: number of slots for the Gitlab Pool.
Expand All @@ -112,7 +112,7 @@ def __init__(
super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=catchup,
airflow_conns=[observatory_api_conn_id],
tags=[Tag.academic_observatory],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def __init__(
max_processes: int = os.cpu_count(),
batch_size: int = 20,
start_date: pendulum.DateTime = pendulum.datetime(2020, 6, 7),
schedule_interval: str = "0 0 7 * *",
schedule: str = "0 0 7 * *",
catchup: bool = True,
queue: str = "remote_queue",
max_active_runs: int = 1,
Expand All @@ -121,7 +121,7 @@ def __init__(
:param max_processes: the number of processes used with ProcessPoolExecutor to transform files in parallel.
:param batch_size: the number of files to send to ProcessPoolExecutor at one time.
:param start_date: the start date of the DAG.
:param schedule_interval: the schedule interval of the DAG.
:param schedule: the schedule interval of the DAG.
:param catchup: whether to catchup the DAG or not.
:param queue: what Airflow queue this job runs on.
:param max_active_runs: the maximum number of DAG runs that can be run at once.
Expand All @@ -130,7 +130,7 @@ def __init__(
super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=catchup,
airflow_conns=[observatory_api_conn_id, crossref_metadata_conn_id],
tags=[Tag.academic_observatory],
Expand Down
6 changes: 3 additions & 3 deletions academic_observatory_workflows/workflows/doi_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ def __init__(
max_fetch_threads: int = 4,
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
start_date: Optional[pendulum.DateTime] = pendulum.datetime(2020, 8, 30),
schedule_interval: Optional[str] = "@weekly",
schedule: Optional[str] = "@weekly",
sensor_dag_ids: List[str] = None,
):
"""Create the DoiWorkflow.
Expand All @@ -420,13 +420,13 @@ def __init__(
:param api_dataset_id: the DOI dataset id.
:param max_fetch_threads: maximum number of threads to use when fetching.
:param start_date: the start date.
:param schedule_interval: the schedule interval.
:param schedule: the schedule interval.
"""

super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=False,
airflow_conns=[observatory_api_conn_id],
tags=[Tag.academic_observatory],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def __init__(
table_description: str = "The GeoNames geographical database: https://www.geonames.org/",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
start_date: pendulum.DateTime = pendulum.datetime(2020, 9, 1),
schedule_interval: str = "@monthly",
schedule: str = "@monthly",
):
"""The Geonames telescope.
Expand All @@ -117,13 +117,13 @@ def __init__(
:param table_description: description for the BigQuery table.
:param observatory_api_conn_id: the Observatory API connection key.
:param start_date: the start date of the DAG.
:param schedule_interval: the schedule interval of the DAG.
:param schedule: the schedule interval of the DAG.
"""

super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=False,
airflow_conns=[observatory_api_conn_id],
tags=[Tag.academic_observatory],
Expand Down
8 changes: 4 additions & 4 deletions academic_observatory_workflows/workflows/oa_web_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def __init__(
github_conn_id="oa_web_github_token",
zenodo_conn_id="oa_web_zenodo_token",
start_date: Optional[pendulum.DateTime] = pendulum.datetime(2021, 5, 2),
schedule_interval: Optional[str] = "@weekly",
schedule: Optional[str] = "@weekly",
):
"""Create the OaWebWorkflow.
Expand All @@ -309,7 +309,7 @@ def __init__(
:param github_conn_id: the Github Token Airflow Connection ID.
:param zenodo_conn_id: the Zenodo Token Airflow Connection ID.
:param start_date: the start date.
:param schedule_interval: the schedule interval.
:param schedule: the schedule interval.
"""

if table_names is None:
Expand All @@ -318,7 +318,7 @@ def __init__(
super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=False,
airflow_conns=[github_conn_id, zenodo_conn_id],
tags=[Tag.academic_observatory],
Expand Down Expand Up @@ -1472,7 +1472,7 @@ def make_entities(entity_type: str, df_index: pd.DataFrame, df_data: pd.DataFram
total = len(df_index)

logging.info(f"Making entities: {entity_type}")
ts_groups = df_data.groupby([key_id])
ts_groups = df_data.groupby(key_id)

for entity_id, df_group in ts_groups:
# Exclude countries and institutions with small num outputs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def __init__(
table_description: str = "The OpenCitations COCI CSV table: http://opencitations.net/",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
start_date: pendulum.DateTime = pendulum.datetime(2018, 7, 1),
schedule_interval: str = "@weekly",
schedule: str = "@weekly",
catchup: bool = True,
queue: str = "remote_queue",
):
Expand All @@ -92,14 +92,14 @@ def __init__(
:param observatory_api_conn_id: the Observatory API connection key.
:param catchup: whether to catchup the DAG or not.
:param start_date: the start date of the DAG.
:param schedule_interval: the schedule interval of the DAG.
:param schedule: the schedule interval of the DAG.
:param queue: what Airflow queue to use.
"""

super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=catchup,
airflow_conns=[observatory_api_conn_id],
queue=queue,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ def __init__(
aws_openalex_bucket: str = "openalex",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
start_date: pendulum.DateTime = pendulum.datetime(2021, 12, 1),
schedule_interval: str = "@weekly",
schedule: str = "@weekly",
queue: str = "remote_queue",
):
"""Construct an OpenAlexTelescope instance.
Expand All @@ -320,7 +320,7 @@ def __init__(
:param aws_openalex_bucket: the OpenAlex AWS bucket name.
:param observatory_api_conn_id: the Observatory API Airflow Connection ID.
:param start_date: the Apache Airflow DAG start date.
:param schedule_interval: the Apache Airflow schedule interval. Whilst OpenAlex snapshots are released monthly,
:param schedule: the Apache Airflow schedule interval. Whilst OpenAlex snapshots are released monthly,
they are not released on any particular day of the month, so we instead simply run the workflow weekly on a
Sunday as this will pickup new updates regularly. See here for past release dates: https://openalex.s3.amazonaws.com/RELEASE_NOTES.txt
:param queue:
Expand All @@ -340,7 +340,7 @@ def __init__(
super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=False,
airflow_conns=[observatory_api_conn_id, aws_conn_id],
queue=queue,
Expand All @@ -366,7 +366,7 @@ def __init__(
PreviousDagRunSensor(
dag_id=self.dag_id,
external_task_id=external_task_id,
execution_delta=timedelta(days=7), # To match the @weekly schedule_interval
execution_delta=timedelta(days=7), # To match the @weekly schedule
)
)
self.add_setup_task(self.check_dependencies)
Expand Down
6 changes: 3 additions & 3 deletions academic_observatory_workflows/workflows/ror_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ def __init__(
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
ror_conceptrecid: int = 6347574,
start_date: pendulum.DateTime = pendulum.datetime(2021, 9, 1),
schedule_interval: str = "@weekly",
schedule: str = "@weekly",
catchup: bool = True,
):
"""Construct a RorTelescope instance.
Expand All @@ -106,14 +106,14 @@ def __init__(
:param observatory_api_conn_id: the Observatory API connection key.
:param ror_conceptrecid: the Zenodo conceptrecid for the ROR dataset.
:param start_date: the start date of the DAG.
:param schedule_interval: the schedule interval of the DAG.
:param schedule: the schedule interval of the DAG.
:param catchup: whether to catchup the DAG or not.
"""

super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=catchup,
airflow_conns=[observatory_api_conn_id],
tags=[Tag.academic_observatory],
Expand Down
6 changes: 3 additions & 3 deletions academic_observatory_workflows/workflows/scopus_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def __init__(
table_description: str = "The Scopus citation database: https://www.scopus.com",
observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API,
start_date: pendulum.DateTime = pendulum.datetime(2018, 5, 14),
schedule_interval: str = "@monthly",
schedule: str = "@monthly",
):
"""Scopus telescope.
:param dag_id: the id of the DAG.
Expand All @@ -126,13 +126,13 @@ def __init__(
:param table_description: description for the BigQuery table.
:param observatory_api_conn_id: the Observatory API connection key.
:param start_date: the start date of the DAG.
:param schedule_interval: the schedule interval of the DAG.
:param schedule: the schedule interval of the DAG.
"""

super().__init__(
dag_id=dag_id,
start_date=start_date,
schedule_interval=schedule_interval,
schedule=schedule,
catchup=False,
airflow_conns=[observatory_api_conn_id] + scopus_conn_ids,
tags=[Tag.academic_observatory],
Expand Down
Loading

0 comments on commit c59e770

Please sign in to comment.