diff --git a/academic_observatory_workflows/database/schema/orcid/orcid_lambda.json b/academic_observatory_workflows/database/schema/orcid/orcid_lambda.json new file mode 100644 index 000000000..d1a2aae2b --- /dev/null +++ b/academic_observatory_workflows/database/schema/orcid/orcid_lambda.json @@ -0,0 +1,26 @@ +[ + { + "mode": "NULLABLE", + "name": "orcid", + "type": "STRING", + "description": "ORCID iD." + }, + { + "mode": "NULLABLE", + "name": "path", + "type": "STRING", + "description": "ORCID iD in URI form." + }, + { + "mode": "NULLABLE", + "name": "date_created", + "type": "DATE", + "description": "The date this ID was created" + }, + { + "mode": "NULLABLE", + "name": "last_modified", + "type": "DATE", + "description": "The most recent modification instance for this ID" + } +] \ No newline at end of file diff --git a/academic_observatory_workflows/workflows/orcid_telescope.py b/academic_observatory_workflows/workflows/orcid_telescope.py new file mode 100644 index 000000000..1187760ed --- /dev/null +++ b/academic_observatory_workflows/workflows/orcid_telescope.py @@ -0,0 +1,284 @@ +# Copyright 2023 Curtin University +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Author: Keegan Smith + +from __future__ import annotations + +import datetime +import logging +import os +import pathlib +import time +from concurrent.futures import ProcessPoolExecutor, as_completed, ThreadPoolExecutor +from datetime import timedelta +from typing import List, Dict, Tuple +import tarfile + +import jsonlines +import pendulum +import requests +from airflow.exceptions import AirflowSkipException +from airflow.models.taskinstance import TaskInstance +from airflow.operators.dummy import DummyOperator +from airflow.hooks.base import BaseHook +import boto3 +from google.cloud import bigquery +from google.cloud.bigquery import SourceFormat + +from academic_observatory_workflows.config import schema_folder as default_schema_folder, Tag +from observatory.api.client.model.dataset_release import DatasetRelease +from observatory.platform.airflow import PreviousDagRunSensor, is_first_dag_run +from observatory.platform.api import get_dataset_releases, get_latest_dataset_release +from observatory.platform.api import make_observatory_api +from observatory.platform.bigquery import ( + bq_table_id, + bq_find_schema, + bq_load_table, + bq_upsert_records, + bq_snapshot, + bq_sharded_table_id, + bq_create_dataset, + bq_delete_records, +) +from observatory.platform.config import AirflowConns +from observatory.platform.files import list_files, yield_jsonl, merge_update_files, save_jsonl_gz, load_csv +from observatory.platform.gcs import gcs_upload_files, gcs_blob_uri, gcs_blob_name_from_path +from observatory.platform.observatory_config import CloudWorkspace +from observatory.platform.utils.url_utils import get_user_agent, retry_get_url +from observatory.platform.workflows.workflow import Workflow, ChangefileRelease, cleanup, set_task_state + +ORCID_AWS_SUMMARIES_BUCKET = "v2.0-summaries" +ORCID_AWS_LAMBDA_BUCKET = "orcid-lambda-file" +ORCID_LAMBDA_OBJECT = "last_modified.csv.tar" + + +class OrcidRelease(ChangefileRelease): + def __init__( + self, + *, + dag_id: str, + run_id: str, + start_date: pendulum.DateTime, + end_date: pendulum.DateTime, + ): + """Construct a CrossrefEventsRelease instance + + :param dag_id: the id of the DAG. + :param start_date: the start_date of the release. Inclusive. + :param end_date: the end_date of the release. Exclusive. + """ + + super().__init__( + dag_id=dag_id, + run_id=run_id, + start_date=start_date, + end_date=end_date, + ) + self.upsert_table_file_path = os.path.join(self.transform_folder, "upsert_table.jsonl") + self.delete_table_file_path = os.path.join(self.transform_folder, "delete_table.jsonl") + + self.lambda_download_file_path = os.path.join(self.download_folder, "lambda", "orcid_lambda.csv.tar") + self.lambda_transform_file_path = os.path.join(self.transform_folder, "lambda", "orcid_lambda.csv") + + @property + def lambda_extract_files(self): + return list_files(self.extract_folder, r"^.*\.csv$") # Match files ending with .csv + + +class CrossrefEventsTelescope(Workflow): + """Crossref Events telescope""" + + def __init__( + self, + dag_id: str, + cloud_workspace: CloudWorkspace, + bq_dataset_id: str = "orcid", + bq_summary_table_name: str = "orcid", + bq_lambda_table_name: str = "orcid_lambda", + schema_folder: str = os.path.join(default_schema_folder(), "orcid"), + dataset_description: str = "The ORCID dataset", + table_description: str = "The ORCID dataset", + max_processes: int = os.cpu_count(), + api_dataset_id: str = "orcid", + observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, + aws_orcid_conn_id: str = "aws_orcid", + start_date: pendulum.DateTime = pendulum.datetime(2023, 6, 1), + schedule_interval: str = "@weekly", + queue: str = "remote_queue", + catchup: bool = False, + ): + """Construct an ORCID telescope instance""" + + self.dag_id = dag_id + self.cloud_workspace = cloud_workspace + self.bq_dataset_id = bq_dataset_id + self.bq_summary_table_name = bq_summary_table_name + self.bq_lambda_table_name = bq_lambda_table_name + self.api_dataset_id = api_dataset_id + self.schema_folder = schema_folder + self.dataset_description = dataset_description + self.table_description = table_description + self.max_processes = max_processes + self.observatory_api_conn_id = observatory_api_conn_id + self.aws_orcid_conn_id = aws_orcid_conn_id + self.start_date = start_date + self.schedule_interval = schedule_interval + self.queue = queue + self.catchup = catchup + + external_task_id = "dag_run_complete" + self.add_operator( + PreviousDagRunSensor( + dag_id=self.dag_id, + external_task_id=external_task_id, + execution_delta=timedelta(days=7), # To match the @weekly schedule_interval + ) + ) + self.add_task(self.check_dependencies) + self.add_task(self.transfer_orcid) + self.add_task(self.process_lambda) + self.add_task(self.bq_load_lambda) + # self.add_task(self.upsert_files) + + # self.add_task(self.add_new_dataset_release) + # self.add_task(self.cleanup) + + # The last task that the next DAG run's ExternalTaskSensor waits for. + self.add_operator( + DummyOperator( + task_id=external_task_id, + ) + ) + + @property + def aws_orcid_key(self) -> Tuple[str, str]: + """Return API login and password""" + connection = BaseHook.get_connection(self.aws_orcid_conn_id) + return connection.login, connection.password + + def make_release(self, **kwargs) -> OrcidRelease: + is_first_run = is_first_dag_run(kwargs["dag_run"]) + releases = get_dataset_releases(dag_id=self.dag_id, dataset_id=self.api_dataset_id) + + # Get start date + if is_first_run: + assert ( + len(releases) == 0 + ), "fetch_releases: there should be no DatasetReleases stored in the Observatory API on the first DAG run." + start_date = pendulum.instance(datetime.datetime.min) + else: + assert ( + len(releases) >= 1 + ), f"fetch_releases: there should be at least 1 DatasetRelease in the Observatory API after the first DAG run" + start_date = kwargs["data_interval_start"] + + return OrcidRelease( + dag_id=self.dag_id, + run_id=kwargs["run_id"], + start_date=start_date, + end_date=kwargs["data_interval_end"], + ) + + def transfer_orcid(self): + pass + + def process_lambda(self, release: OrcidRelease, **kwargs): + """Downloads, extracts, transforms and uploads the ORCID Lambda manifest file""" + aws_key_id, aws_key = self.aws_orcid_key + s3client = boto3.client("s3", aws_access_key_id=aws_key_id, aws_secret_access_key=aws_key) + + # Download from S3 bucket + s3client.download_file(ORCID_AWS_LAMBDA_BUCKET, ORCID_LAMBDA_OBJECT, release.lambda_download_file_path) + + # Extract + with tarfile.open(release.lambda_download_file_path) as lambda_tar: + lambda_tar.extractall(release.extract_folder) + assert len(release.lambda_extract_files) == 1, "Unexpected number of files in extract folder" + + # Transform + orcid_lambda = load_csv(release.lambda_extract_files[0]) + with ProcessPoolExecutor(max_workers=self.max_processes) as executor: + futures = [] + for row in orcid_lambda: + future = executor.submit(transform_item, row) + futures.append(future) + + transformed = [] + for future in as_completed(futures): + transformed.append(future.result()) + finished += 1 + if finished % 100000 == 0: + logging.info(f"Transformed {finished}/{len(orcid_lambda)} rows") + save_jsonl_gz(release.lambda_transform_file_path, transformed) + gcs_upload_files( + bucket_name=self.cloud_workspace.transform_bucket, file_paths=[release.lambda_transform_file_path] + ) + + def bq_load_lambda(self, release: OrcidRelease, **kwargs): + bq_create_dataset( + project_id=self.cloud_workspace.project_id, + dataset_id=self.bq_dataset_id, + location=self.cloud_workspace.data_location, + description=self.dataset_description, + ) + + # Selects all jsonl.gz files in the releases transform folder on the Google Cloud Storage bucket and all of its + # subfolders: https://cloud.google.com/bigquery/docs/batch-loading-data#load-wildcards + uri = gcs_blob_uri( + self.cloud_workspace.transform_bucket, + gcs_blob_name_from_path(release.lambda_transform_file_path), + ) + table_id = bq_table_id( + self.cloud_workspace.project_id, self.bq_dataset_id, self.bq_lambda_table_name, release.end_date + ) + schema_file_path = bq_find_schema(path=self.schema_folder, table_name=self.bq_table_name) + success = bq_load_table( + uri=uri, + table_id=table_id, + schema_file_path=schema_file_path, + source_format=SourceFormat.NEWLINE_DELIMITED_JSON, + table_description="The ORCID Lambda Manifest", + ignore_unknown_values=True, + ) + set_task_state(success, self.bq_load.__name__, release) + + +def transform_item(item): + """Transform a single Crossref Metadata JSON value. + + :param item: a JSON value. + :return: the transformed item. + """ + + if isinstance(item, dict): + new = {} + for k, v in item.items(): + # Replace hyphens with underscores for BigQuery compatibility + k = k.replace("-", "_") + + # Get inner array for date parts + if k == "date_created" or k == "last_modified": + try: + datetime.strptime(v, "%Y-%m-%dT%H:%M:%SZ") + except ValueError: + v = "" + + new[k] = transform_item(v) + return new + elif isinstance(item, list): + return [transform_item(i) for i in item] + else: + return item