diff --git a/academic_observatory_workflows/fixtures/orcid/0000-0001-5000-5000.xml b/academic_observatory_workflows/fixtures/orcid/0000-0001-5000-5000.xml new file mode 100644 index 000000000..a61b24ffc --- /dev/null +++ b/academic_observatory_workflows/fixtures/orcid/0000-0001-5000-5000.xml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d6db9eef727c70f54d364b0482d8f3fb1444303bba7bee822657d8c72c565145 +size 20874 diff --git a/academic_observatory_workflows/fixtures/orcid/0000-0001-5001-3000.xml b/academic_observatory_workflows/fixtures/orcid/0000-0001-5001-3000.xml new file mode 100644 index 000000000..df06dc7d6 --- /dev/null +++ b/academic_observatory_workflows/fixtures/orcid/0000-0001-5001-3000.xml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:cf12d51a2b02f02cf982273005749cc3736c1697a138f1b464a0054552bebfac +size 11379 diff --git a/academic_observatory_workflows/fixtures/orcid/0000-0001-5002-1000.xml b/academic_observatory_workflows/fixtures/orcid/0000-0001-5002-1000.xml new file mode 100644 index 000000000..82feb6460 --- /dev/null +++ b/academic_observatory_workflows/fixtures/orcid/0000-0001-5002-1000.xml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:75da336df829a7e55d9b1f98afc7c5317ba5338edcb6a64111bba9fc39d8befd +size 4990 diff --git a/academic_observatory_workflows/fixtures/orcid/0000-0001-5007-2000.xml b/academic_observatory_workflows/fixtures/orcid/0000-0001-5007-2000.xml new file mode 100644 index 000000000..75717254c --- /dev/null +++ b/academic_observatory_workflows/fixtures/orcid/0000-0001-5007-2000.xml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:3e34602207d107ac6ce42edde3839fbea3fc91c419bcc68f496d57cbb6cbf3c5 +size 7704 diff --git a/academic_observatory_workflows/fixtures/orcid/0000-0001-5010-1000.xml b/academic_observatory_workflows/fixtures/orcid/0000-0001-5010-1000.xml new file mode 100644 index 000000000..eabd6bd24 --- /dev/null +++ b/academic_observatory_workflows/fixtures/orcid/0000-0001-5010-1000.xml @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:f48d49d096a100d209a695346af2987eb316813639251e8266445bc91a0be16f +size 13480 diff --git a/academic_observatory_workflows/fixtures/orcid/0000-0002-9227-8610.xml b/academic_observatory_workflows/fixtures/orcid/0000-0002-9227-8610.xml deleted file mode 100644 index f0961c1de..000000000 --- a/academic_observatory_workflows/fixtures/orcid/0000-0002-9227-8610.xml +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:90a7231cb5b6430309ed689e4fec81d4817fb64022c15d63fbbe0af2538e3dac -size 56556 diff --git a/academic_observatory_workflows/fixtures/orcid/0000-0002-9228-8514.xml b/academic_observatory_workflows/fixtures/orcid/0000-0002-9228-8514.xml deleted file mode 100644 index 3c111c21e..000000000 --- a/academic_observatory_workflows/fixtures/orcid/0000-0002-9228-8514.xml +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:32c313f12bd8c6fc88e3b5df2b8bcc8337521077f7bced00c0bededcbac7217e -size 56556 diff --git a/academic_observatory_workflows/fixtures/orcid/0000-0002-9229-8514.xml b/academic_observatory_workflows/fixtures/orcid/0000-0002-9229-8514.xml deleted file mode 100644 index 538c1485d..000000000 --- a/academic_observatory_workflows/fixtures/orcid/0000-0002-9229-8514.xml +++ /dev/null @@ -1,3 +0,0 @@ -version https://git-lfs.github.com/spec/v1 -oid sha256:8a9b18e22a6705c15cae380b0c933f8a0c5afae8d5950b17ed7e3efdcb625714 -size 56556 diff --git a/academic_observatory_workflows/fixtures/orcid/test_manifest.csv b/academic_observatory_workflows/fixtures/orcid/test_manifest.csv new file mode 100644 index 000000000..88e822dc4 --- /dev/null +++ b/academic_observatory_workflows/fixtures/orcid/test_manifest.csv @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:7b215128be5b4b5909c3eb08df69906425053cbd7defcd88ea5aecaf82fe3e03 +size 554 diff --git a/academic_observatory_workflows/workflows/orcid_telescope.py b/academic_observatory_workflows/workflows/orcid_telescope.py index 86f22ce0e..563298bbf 100644 --- a/academic_observatory_workflows/workflows/orcid_telescope.py +++ b/academic_observatory_workflows/workflows/orcid_telescope.py @@ -18,24 +18,24 @@ from __future__ import annotations import datetime +import time import logging import os from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import timedelta from typing import List, Dict, Tuple, Union import itertools -import math import csv import re +import subprocess import pendulum import xmltodict -from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.exceptions import AirflowException from airflow.operators.dummy import DummyOperator from airflow.hooks.base import BaseHook from google.cloud import bigquery, storage from google.cloud.bigquery import SourceFormat -from tenacity import retry, wait_exponential, stop_after_attempt from academic_observatory_workflows.config import schema_folder as default_schema_folder, Tag @@ -56,7 +56,6 @@ from observatory.platform.files import list_files, save_jsonl_gz from observatory.platform.gcs import ( gcs_upload_files, - gcs_download_blob, gcs_blob_uri, gcs_blob_name_from_path, gcs_create_aws_transfer, @@ -71,6 +70,54 @@ ORCID_AWS_SUMMARIES_BUCKET = "v2.0-summaries" ORCID_REGEX = r"\d{4}-\d{4}-\d{4}-\d{3}(\d|X)\b" +ORCID_RECORD_REGEX = r"\d{4}-\d{4}-\d{4}-\d{3}(\d|X)\.xml$" + + +class OrcidBatch: + """Describes a single ORCID batch and its related files/folders""" + + BATCH_REGEX = r"^\d{2}(\d|X)$" + + def __init__(self, download_dir: str, transform_dir: str, batch_str: str): + self.download_dir = download_dir + self.transform_dir = transform_dir + self.batch_str = batch_str + self.download_batch_dir = os.path.join(self.download_dir, batch_str) + self.download_log_file = os.path.join(self.download_dir, f"{self.batch_str}_log.txt") + self.download_error_file = os.path.join(self.download_dir, f"{self.batch_str}_error.txt") + self.manifest_file = os.path.join(self.download_dir, f"{self.batch_str}_manifest.csv") + self.transform_upsert_file = os.path.join(self.transform_dir, f"{self.batch_str}_upsert.jsonl.gz") + self.transform_delete_file = os.path.join(self.transform_dir, f"{self.batch_str}_delete.jsonl.gz") + + assert os.path.exists(self.download_dir), f"Directory {self.download_dir} does not exist." + assert os.path.exists(self.transform_dir), f"Directory {self.transform_dir} does not exist." + assert re.match(self.BATCH_REGEX, self.batch_str), f"Batch string {self.batch_str} is not valid." + + os.makedirs(self.download_batch_dir, exist_ok=True) + + @property + def expected_records(self) -> List[str]: + """List of expected ORCID records for this ORCID directory. Derived from the manifest file""" + with open(self.manifest_file, "r") as f: + reader = csv.reader(f) + return [os.path.basename(row[1]) for row in reader] + + @property + def existing_records(self) -> List[str]: + """List of existing ORCID records on disk for this ORCID directory.""" + return [os.path.basename(path) for path in list_files(self.download_batch_dir, ORCID_RECORD_REGEX)] + + @property + def missing_records(self) -> List[str]: + """List of missing ORCID records on disk for this ORCID directory.""" + return list(set(self.expected_records) - set(self.existing_records)) + + @property + def blob_uris(self) -> List[str]: + """List of blob URIs from the manifest this ORCID directory.""" + with open(self.manifest_file, "r") as f: + reader = csv.reader(f) + return [gcs_blob_uri(bucket_name=row[0], blob_name=row[1]) for row in reader] class OrcidRelease(ChangefileRelease): @@ -121,17 +168,13 @@ def __init__( self.is_first_run = is_first_run # Files/folders - self.local_manifest_file_path = os.path.join(self.workflow_folder, "local_manifest.csv") - self.delete_file_path = os.path.join(self.transform_folder, "delete.jsonl.gz") - self.batch_folder = os.path.join(self.workflow_folder, "batch") - self.transfer_manifest_folder = os.path.join(self.workflow_folder, "transfer_manifest") - os.makedirs(self.batch_folder, exist_ok=True) - os.makedirs(self.transfer_manifest_folder, exist_ok=True) + self.master_manifest_file = os.path.join(self.workflow_folder, "manifest.csv") # Table names and URIs - self.table_uri = gcs_blob_uri( - self.cloud_workspace.transform_bucket, f"{gcs_blob_name_from_path(self.transform_folder)}/*.jsonl.gz" - ) + self.upsert_blob_glob = f"{gcs_blob_name_from_path(self.transform_folder)}/*_upsert.jsonl.gz" + self.upsert_table_uri = gcs_blob_uri(self.cloud_workspace.transform_bucket, self.upsert_blob_glob) + self.delete_blob_glob = f"{gcs_blob_name_from_path(self.transform_folder)}/*_delete.jsonl.gz" + self.delete_table_uri = gcs_blob_uri(self.cloud_workspace.transform_bucket, self.delete_blob_glob) self.bq_main_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_main_table_name) self.bq_upsert_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_upsert_table_name) self.bq_delete_table_id = bq_table_id(cloud_workspace.project_id, bq_dataset_id, bq_delete_table_name) @@ -140,22 +183,28 @@ def __init__( ) @property - def batch_files(self): # Batch files in the batch_folder - return list_files(self.batch_folder, r"^batch_\d+.txt") + def upsert_files(self): # Upsert files in the transform_folder + return list_files(self.transform_folder, r"^.*\d{2}(\d|X)_upsert\.jsonl\.gz$") + + @property + def delete_files(self): # Delete files in the transform_folder + return list_files(self.transform_folder, r"^.*\d{2}(\d|X)_delete\.jsonl\.gz$") @property - def transform_files(self): # Transform files in the transform folder - return list_files(self.transform_folder, r"^transformed_batch_\d+.jsonl.gz$") + def orcid_directory_paths(self) -> List[str]: + """Generates the paths to the orcid directories in the download folder""" + return [os.path.join(self.download_folder, folder) for folder in orcid_batch_names()] - def make_download_folders(self) -> None: - """Creates the orcid directories in the download folder if they don't exist""" - for folder in orcid_directories(): - os.makedirs(os.path.join(self.download_folder, folder), exist_ok=True) + def orcid_batches(self) -> List[OrcidBatch]: + """Creates the orcid directories in the download folder if they don't exist and returns them""" + return [OrcidBatch(self.download_folder, self.transform_folder, batch) for batch in orcid_batch_names()] class OrcidTelescope(Workflow): """ORCID telescope""" + # MANIFEST_FILEDS = ["blob", "orcid", "updated"] + def __init__( self, dag_id: str, @@ -171,9 +220,8 @@ def __init__( snapshot_expiry_days: int = 31, schema_file_path: str = os.path.join(default_schema_folder(), "orcid"), transfer_attempts: int = 5, - batch_size: int = 20000, - transfer_size: int = 250000, - max_workers: int = os.cpu_count(), + batch_size: int = 25000, + max_workers: int = os.cpu_count() * 2, api_dataset_id: str = "orcid", observatory_api_conn_id: str = AirflowConns.OBSERVATORY_API, aws_orcid_conn_id: str = "aws_orcid", @@ -204,7 +252,6 @@ def __init__( self.schema_file_path = schema_file_path self.transfer_attempts = transfer_attempts self.batch_size = batch_size - self.transfer_size = transfer_size self.dataset_description = dataset_description self.table_description = table_description self.snapshot_expiry_days = snapshot_expiry_days @@ -230,9 +277,7 @@ def __init__( self.add_task(self.bq_create_main_table_snapshot) # Scour the data for updates - self.add_task(self.create_local_manifest) - self.add_task(self.create_gcs_transfer_manifests) - self.add_task(self.create_batches) + self.add_task(self.create_manifests) # Download and transform updated files self.add_task(self.download) @@ -249,11 +294,7 @@ def __init__( self.add_task(self.cleanup) # The last task that the next DAG run's ExternalTaskSensor waits for. - self.add_operator( - DummyOperator( - task_id=external_task_id, - ) - ) + self.add_operator(DummyOperator(task_id=external_task_id)) @property def aws_orcid_key(self) -> Tuple[str, str]: @@ -342,149 +383,123 @@ def bq_create_main_table_snapshot(self, release: OrcidRelease, **kwargs): ) set_task_state(success, self.bq_create_main_table_snapshot.__name__, release) - def create_local_manifest(self, release: OrcidRelease, **kwargs): + def create_manifests(self, release: OrcidRelease, **kwargs): """Create a manifest of all the modified files in the orcid bucket.""" logging.info("Creating manifest") + orcid_batches = release.orcid_batches() + with ThreadPoolExecutor(max_workers=os.cpu_count()) as executor: futures = [] - for orcid_dir in orcid_directories(): - dir_name = f"{self.orcid_summaries_prefix}/{orcid_dir}/" + for orcid_batch in orcid_batches: futures.append( executor.submit( - create_manifest_batch, - bucket=self.orcid_bucket, - dir_name=dir_name, + create_orcid_batch_manifest, + orcid_directory=orcid_batch, reference_date=release.prev_latest_modified_record, - save_path=os.path.join(release.download_folder, f"{orcid_dir}_manifest.csv"), + bucket=self.orcid_bucket, + bucket_prefix=self.orcid_summaries_prefix, ) ) - manifest_files = [future.result() for future in futures] + for future in as_completed(futures): + future.result() logging.info("Joining manifest files") - with open(release.local_manifest_file_path, "w") as f: - f.write("path,orcid,size,updated") - for batch_file in manifest_files: - with open(batch_file, "r") as bf: - f.write("\n") - for line in bf: - f.write(line) - - def create_gcs_transfer_manifests(self, release: OrcidRelease, **kwargs): - """Creates manifest file(s) for the GCS transfer job(s). One file per 'transfer_size' records.""" - # Count the lines in the manifest file - with open(release.local_manifest_file_path, "r") as f: - f.readline() # Skip the header - num_lines = sum(1 for _ in f) - manifests = math.ceil(num_lines / self.transfer_size) - if manifests == 0: - # This should never realistically happen unless ORCID goes down for a week - raise AirflowSkipException("No files found to process. Skipping remaining tasks.") - - lines_read = 0 - with open(release.local_manifest_file_path, "r") as f: - f.readline() # Skip the header - for man_n in range(manifests): - blobs = [] - # Read 'transfer_size' lines - for _ in range(self.transfer_size): - line = f.readline() - if not line: # End of file gives empty string - break - lines_read += 1 - blobs.append(f.readline().strip().split(",")[0]) - # Write the manifest file - with open(os.path.join(release.transfer_manifest_folder, f"manifest_{man_n}.csv"), "w") as bf: - bf.write(",\n".join(blobs)) - logging.info(f"Read {lines_read} of {num_lines} lines in the manifest file") - assert lines_read == num_lines, "Not all lines in the manifest file were read. Aborting." - - def create_batches(self, release: OrcidRelease, **kwargs): - """Create batches of files to be processed""" - # Count the lines in the manifest file - with open(release.local_manifest_file_path, "r") as f: - f.readline() # Skip the header - num_lines = sum(1 for _ in f) - batches = math.ceil(num_lines / self.batch_size) - - lines_read = 0 - with open(release.local_manifest_file_path, "r") as f: - f.readline() # Skip the header - for batch_n in range(batches): - batch_blobs = [] - # Read 'batch_size' lines - for _ in range(self.batch_size): - line = f.readline() - if not line: # End of file gives empty string - break - lines_read += 1 - batch_blobs.append(f.readline().strip().split(",")[0]) - # Write the batch file - with open(os.path.join(release.batch_folder, f"batch_{batch_n}.txt"), "w") as bf: - bf.write("\n".join(batch_blobs)) - logging.info(f"Read {lines_read} of {num_lines} lines in the manifest file") - assert lines_read == num_lines, "Not all lines in the manifest file were read. Aborting." + with open(release.master_manifest_file, "w") as f: + # Open and write each directory manifest to the main manifest file + for orcid_batch in orcid_batches: + with open(orcid_batch.manifest_file, "r") as df: + for line in df: + f.write(f"{line}\n") def download(self, release: OrcidRelease, **kwargs): - """Reads the batch files and downloads the files from the gcs bucket.""" - logging.info(f"Number of batches: {len(release.batch_files)}") + """Reads each batch's manifest and downloads the files from the gcs bucket.""" + start_time = time.time() + total_files = 0 + for orcid_batch in release.orcid_batches(): + if not orcid_batch.missing_records: + logging.info(f"All files present for {orcid_batch.batch_str}. Skipping download.") + continue + all_files_present = False + + # Loop - download and assert all files downloaded + batch_start = time.time() + logging.info(f"Downloading files for ORCID directory: {orcid_batch.batch_str}") + for i in range(3): # Try up to 3 times + returncode = gsutil_download(orcid_batch=orcid_batch) + if returncode != 0: + logging.warn( + f"Attempt {i+1} for '{orcid_batch.batch_str}': returned non-zero exit code: {returncode}" + ) + continue + if orcid_batch.missing_records: + logging.warn( + f"Attempt {i+1} for '{orcid_batch.batch_str}': {len(orcid_batch.missing_records)} files missing" + ) + continue + else: + all_files_present = True + break - release.make_download_folders() - for i, batch_file in enumerate(release.batch_files): - logging.info(f"Downloading batch {i+1} of {len(release.batch_files)}") - with open(batch_file, "r") as f: - orcid_blobs = [line.strip() for line in f.readlines()] + if not all_files_present: + raise AirflowException(f"All files were not downloaded for {orcid_batch.batch_str}. Aborting.") - # Download the blobs in parallel - with ThreadPoolExecutor(max_workers=os.cpu_count() * 2) as executor: - futures = [] - logging.disable(logging.INFO) # Turn off logging to avoid spamming the logs - for blob in orcid_blobs: - # Get path components of the blob (path / to / summaries / 123 / blob_name.xml) - components = os.path.normpath(blob).split(os.sep) - file_path = os.path.join(release.download_folder, components[-2], components[-1]) - future = executor.submit( - gcs_download_blob, bucket_name=self.orcid_bucket, blob_name=blob, file_path=file_path - ) - futures.append(future) - for future in as_completed(futures): - future.result() - logging.disable(logging.NOTSET) + total_files += len(orcid_batch.expected_records) + logging.info(f"Download for '{orcid_batch.batch_str}' completed successfully.") + logging.info(f"Downloaded {len(orcid_batch.expected_records)} in {time.time() - batch_start} seconds") + logging.info(f"Completed download for {total_files} files in {(time.time() - start_time)/3600} hours") def transform(self, release: OrcidRelease, **kwargs): """Transforms the downloaded files into serveral bigquery-compatible .jsonl files""" - logging.info(f"Number of batches: {len(release.batch_files)}") - delete_paths = [] - for i, batch_file in enumerate(os.listdir(release.batch_folder)): - with open(os.path.join(release.batch_folder, batch_file), "r") as f: - orcid_records = [line.strip() for line in f.readlines()] - - # Transfrom the files in parallel + total_upsert_records = 0 + total_delete_records = 0 + start_time = time.time() + for orcid_batch in release.orcid_batches(): + batch_start = time.time() + logging.info(f"Transforming ORCID batch {orcid_batch.batch_str}") transformed_data = [] with ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [] - for record in orcid_records: - future = executor.submit(transform_orcid_record, record) + for record in orcid_batch.existing_records: + future = executor.submit( + transform_orcid_record, os.path.join(orcid_batch.download_batch_dir, record) + ) futures.append(future) for future in futures: transformed_data.append(future.result()) - # Save transformed records - transformed_data = [record for record in transformed_data if isinstance(record, dict)] - transform_file = os.path.join(release.transform_folder, f"transformed_batch_{i}.jsonl.gz") - save_jsonl_gz(transformed_data, transform_file) - - # Keep track of reccords to delete - delete_paths.extend([{"id": record} for record in transformed_data if isinstance(record, str)]) - - # Save the delete paths if there are any - if delete_paths: - save_jsonl_gz(delete_paths, release.delete_file_path) + # Save records to upsert + batch_upserts = [record for record in transformed_data if isinstance(record, dict)] + n_batch_upserts = len(batch_upserts) + if n_batch_upserts > 0: + save_jsonl_gz(orcid_batch.transform_upsert_file, batch_upserts) + + # Save records to delete + batch_deletes = [{"id": record} for record in transformed_data if isinstance(record, str)] + n_batch_deletes = len(batch_deletes) + if n_batch_deletes > 0: + save_jsonl_gz(orcid_batch.transform_delete_file, batch_deletes) + + # Check that the number of records matches the expected number of records + total_records = n_batch_upserts + n_batch_deletes + assert total_records == len( + orcid_batch.expected_records + ), f"Expected {len(orcid_batch.expected_records)} records but got {total_records} records ({n_batch_upserts} upserts | {n_batch_deletes} deletes)" + + # Record keeping + total_upsert_records += n_batch_upserts + total_delete_records += n_batch_deletes + logging.info( + f"Transformed {n_batch_upserts} upserts and {n_batch_deletes} deletes for batch {orcid_batch.batch_str}" + ) + logging.info(f"Time taken for batch: {time.time() - batch_start} seconds") + logging.info(f"Transformed {total_upsert_records} upserts and {total_delete_records} deletes in total") + logging.info(f"Time taken for all batches: {(time.time() - start_time)/3600} hours") def upload_transformed(self, release: OrcidRelease, **kwargs): - """Uploads the transformed files to the transform bucket.""" - success = gcs_upload_files( - bucket_name=self.cloud_workspace.transform_bucket, file_paths=release.transform_files - ) + """Uploads the upsert and delete files to the transform bucket.""" + success = gcs_upload_files(bucket_name=self.cloud_workspace.transform_bucket, file_paths=release.upsert_files) + set_task_state(success, self.upload_transformed.__name__, release) + success = gcs_upload_files(bucket_name=self.cloud_workspace.transform_bucket, file_paths=release.delete_files) set_task_state(success, self.upload_transformed.__name__, release) def bq_load_main_table(self, release: OrcidRelease, **kwargs): @@ -492,8 +507,15 @@ def bq_load_main_table(self, release: OrcidRelease, **kwargs): if not release.is_first_run: logging.info(f"bq_load_main_table: skipping as the main table is only created on the first run") return + raise Exception("asdfasd") + + # Check that the number of files matches the number of blobs + storage_client = storage.Client() + blobs = list(storage_client.list_blobs(self.cloud_workspace.transform_bucket, prefix=release.upsert_blob_glob)) + assert len(blobs) == len( + release.upsert_files + ), f"Number of blobs {len(blobs)} does not match number of files {len(release.upsert_files)}" - assert len(release.batch_files), "No batch files found. Batch files must exist before loading the main table." success = bq_load_table( uri=release.table_uri, table_id=release.main_table_id, @@ -510,6 +532,13 @@ def bq_load_upsert_table(self, release: OrcidRelease, **kwargs): logging.info(f"bq_load_upsert_table: skipping as no records are upserted on the first run") return + # Check that the number of files matches the number of blobs + storage_client = storage.Client() + blobs = list(storage_client.list_blobs(self.cloud_workspace.transform_bucket, prefix=release.upsert_blob_glob)) + assert len(blobs) == len( + release.upsert_files + ), f"Number of blobs {len(blobs)} does not match number of files {len(release.upsert_files)}" + success = bq_load_table( uri=release.table_uri, table_id=release.upsert_table_id, @@ -525,10 +554,17 @@ def bq_load_delete_table(self, release: OrcidRelease, **kwargs): if release.is_first_run: logging.info(f"bq_load_delete_table: skipping as no records are deleted on the first run") return - if not os.path.exists(release.delete_file_path): + if not os.path.exists(release.delete_files): logging.info(f"bq_load_delete_table: skipping as no delete file exists") return + # Check that the number of files matches the number of blobs + storage_client = storage.Client() + blobs = list(storage_client.list_blobs(self.cloud_workspace.transform_bucket, prefix=release.upsert_blob_glob)) + assert len(blobs) == len( + release.upsert_files + ), f"Number of blobs {len(blobs)} does not match number of files {len(release.upsert_files)}" + success = bq_load_table( uri=release.table_uri, table_id=release.delete_table_id, @@ -557,7 +593,7 @@ def bq_delete_records(self, release: OrcidRelease, **kwargs): if release.is_first_run: logging.info("bq_delete_records: skipping as no records are deleted on the first run") return - if not os.path.exists(release.delete_file_path): + if not os.path.exists(release.delete_files): logging.info(f"bq_delete_records: skipping as no delete file exists") return @@ -578,7 +614,7 @@ def add_new_dataset_release(self, release: OrcidRelease, **kwargs) -> None: dag_run_id=release.run_id, changefile_start_date=release.start_date, changefile_end_date=release.end_date, - extra={"latest_modified_record_date": latest_modified_record_date(release.local_manifest_file_path)}, + extra={"latest_modified_record_date": latest_modified_record_date(release.master_manifest_file)}, ) api = make_observatory_api(observatory_api_conn_id=self.observatory_api_conn_id) api.post_dataset_release(dataset_release) @@ -588,31 +624,52 @@ def cleanup(self, release: OrcidRelease, **kwargs) -> None: cleanup(dag_id=self.dag_id, execution_date=kwargs["logical_date"], workflow_folder=release.workflow_folder) -def create_manifest_batch(bucket: str, dir_name: str, save_path: str, reference_date: pendulum.DateTime) -> str: - """Create a manifest of all the modified files in the orcid bucket for a subfolder +def create_orcid_batch_manifest( + orcid_batch: OrcidBatch, reference_date: pendulum.DateTime, bucket: str, bucket_prefix: str = None +) -> None: + """Create a manifest (.csv) for each orcid batch containing blob names and modification dates of all the + modified files in the bucket's orcid directory. Only blobs modified after the reference date are included. - :param bucket: The name of the bucket - :param dir_name: The name of the subfolder. e.g. 025 or 94X - see orcid_directories() - :param save_path: The path to save the manifest to + :param orcid_directory: The OrcidBatch instance for this orcid directory :param reference_date: The date to use as a reference for the manifest - :return: The path to the manifest + :param bucket: The name of the bucket + :param bucket_prefix: The prefix to use when listing blobs in the bucket. i.e. where the orcid directories are located + :return: The path to the date manifest, the path to the uri manifest """ - logging.info(f"Creating manifest for {dir_name}") - if not save_path: - save_path = f"{dir_name}_manifest.csv" - blobs = gcs_list_blobs(bucket, prefix=dir_name) + prefix = f"{bucket_prefix}/{orcid_batch.batch_str}/" if bucket_prefix else f"{orcid_batch.batch_str}/" + + logging.info(f"Creating manifests for {orcid_batch}") + blobs = gcs_list_blobs(bucket, prefix=prefix) manifest = [] for blob in blobs: if pendulum.instance(blob.updated) > reference_date: - # Extract the orcid ID from the blob name - orcid = re.search(ORCID_REGEX, blob.name).group(0) - manifest.append((blob.name, orcid, blob.size, blob.updated)) + orcid = re.search(ORCID_REGEX, blob.name).group(0) # Extract the orcid ID from the blob name + manifest.append([blob.bucket.name, blob.name, orcid, blob.updated]) + + with open(orcid_batch.manifest_file, "w", newline="") as csvfile: + writer = csv.writer(csvfile) + writer.writerows(manifest) + + logging.info(f"Manifest saved to {orcid_batch.manifest_file}") - if manifest: - with open(save_path, "w") as f: - f.write("\n".join([",".join([str(i) for i in row]) for row in manifest])) - return save_path +def gsutil_download(orcid_batch: OrcidBatch) -> int: + """Download the ORCID files from GCS to the local machine + + :param orcid_batch: The OrcidBatch instance for this orcid directory + :return: The return code of the gsutil command's subprocess + """ + download_script = "gsutil -m -q cp -I -L {log_file} {download_folder}" + + blob_stdin = "\n".join(orcid_batch.blob_uris) + download_command = download_script.format( + log_file=orcid_batch.log_file, download_folder=orcid_batch.orcid_directory + ) + with open(orcid_batch.error_file, "w") as f: + download_process = subprocess.Popen(download_command.split(" "), stdin=subprocess.PIPE, stderr=f, stdout=f) + download_process.communicate(input=blob_stdin.encode()) + returncode = download_process.wait() + return returncode def latest_modified_record_date(manifest_file_path) -> pendulum.DateTime: @@ -630,12 +687,12 @@ def latest_modified_record_date(manifest_file_path) -> pendulum.DateTime: return modified_dates[-1] -def orcid_directories() -> List[str]: +def orcid_batch_names() -> List[str]: """Create a list of all the possible ORCID directories :return: A list of all the possible ORCID directories """ - n_1_2 = [str(i) for i in range(4)] # 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 + n_1_2 = [str(i) for i in range(3)] # 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 n_3 = n_1_2.copy() + ["X"] # 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, X combinations = list(itertools.product(n_1_2, n_1_2, n_3)) # Creates the 000 to 99X directory structure return ["".join(i) for i in combinations] @@ -664,7 +721,7 @@ def transform_orcid_record(record_path: str) -> Union(Dict[str, any], str): # Get the orcid from the record path expected_orcid = re.search(ORCID_REGEX, record_path).group(0) - with open(record_path, "r") as f: + with open(record_path, "rb") as f: orcid_dict = xmltodict.parse(f) orcid_record = orcid_dict.get("record:record") @@ -677,7 +734,7 @@ def transform_orcid_record(record_path: str) -> Union(Dict[str, any], str): # Check that the ORCID in the file name matches the ORCID in the record assert ( - orcid_record["record:record"]["common:orcid-identifier"]["common:path"] == expected_orcid + orcid_record["common:orcid-identifier"]["common:path"] == expected_orcid ), f"Expected ORCID {expected_orcid} does not match ORCID in record {orcid_record['common:path']}" # Transform the keys of the dictionary so they are valid BigQuery fields @@ -714,8 +771,7 @@ def convert(k: str) -> str: :param k: Key :return: The converted key """ - if len(k.split(":")) > 1: - k = k.split(":")[1] + k = k.split(":")[-1] if k.startswith("@") or k.startswith("#"): k = k[1:] k = k.replace("-", "_") diff --git a/academic_observatory_workflows/workflows/tests/test_orcid_telescope.py b/academic_observatory_workflows/workflows/tests/test_orcid_telescope.py new file mode 100644 index 000000000..8df270bf1 --- /dev/null +++ b/academic_observatory_workflows/workflows/tests/test_orcid_telescope.py @@ -0,0 +1,192 @@ +# Copyright 2023 Curtin University +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Author: Keegan Smith + +from __future__ import annotations + +import copy +import datetime +import gzip +import json +import os +import pathlib +import csv +import shutil +import tempfile +from typing import Dict +from unittest.mock import patch + +import pendulum +from airflow.models import Connection +from airflow.utils.state import State +from google.cloud import storage + +from academic_observatory_workflows.config import test_fixtures_folder +from academic_observatory_workflows.workflows.orcid_telescope import ( + OrcidBatch, + OrcidRelease, + OrcidTelescope, + create_orcid_batch_manifest, + gsutil_download, + latest_modified_record_date, + orcid_batch_names, + gcs_list_blobs, + transform_orcid_record, +) +from observatory.platform.api import get_dataset_releases +from observatory.platform.bigquery import bq_table_id, bq_sharded_table_id +from observatory.platform.files import save_jsonl_gz, load_file +from observatory.platform.gcs import gcs_blob_name_from_path +from observatory.platform.observatory_config import Workflow, CloudWorkspace +from observatory.platform.observatory_environment import ( + ObservatoryEnvironment, + ObservatoryTestCase, + aws_bucket_test_env, + find_free_port, + load_and_parse_json, + random_id, +) + + +class TestOrcidUtils(ObservatoryTestCase): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.dag_id = "orcid" + self.aws_key = (os.getenv("AWS_ACCESS_KEY_ID"), os.getenv("AWS_SECRET_ACCESS_KEY")) + self.aws_region_name = os.getenv("AWS_DEFAULT_REGION") + self.fixtures_folder = test_fixtures_folder("orcid") + + def test_orcid_batch(self): + """Test that the orcid batches are correctly constructed""" + with tempfile.TemporaryDirectory() as tmp_dir: + download_dir = os.path.join(tmp_dir, "download") + transform_dir = os.path.join(tmp_dir, "transform") + test_batch_str = "12X" + + # Download/transform dirs don't exist + with self.assertRaises(AssertionError): + OrcidBatch(download_dir, transform_dir, test_batch_str) + shutil.makedirs(download_dir) + with self.assertRaises(AssertionError): + OrcidBatch(download_dir, transform_dir, test_batch_str) + shutil.makedirs(transform_dir) + + # Invalid batch string + for batch_str in ["0000", "12C", "99", "XXX"]: + with self.assertRaises(AssertionError): + OrcidBatch(download_dir, transform_dir, batch_str) + + # Create a batch for testing + test_batch = OrcidBatch(download_dir, transform_dir, test_batch_str) + + # Check that expected folders exist + self.assertTrue(os.path.isdir(test_batch.download_dir)) + + # Check that file names are as expected + self.assertEqual(test_batch.download_batch_dir, os.path.join(download_dir, test_batch_str)) + self.assertEqual(test_batch.download_log_file, os.path.join(download_dir, f"{test_batch_str}_log.txt")) + self.assertEqual(test_batch.download_error_file, os.path.join(download_dir, f"{test_batch_str}_error.txt")) + self.assertEqual(test_batch.manifest_file, os.path.join(download_dir, f"{test_batch_str}_manifest.csv")) + self.assertEqual( + test_batch.transform_upsert_file, os.path.join(download_dir, f"{test_batch_str}_upsert.jsonl.gz") + ) + self.assertEqual( + test_batch.transform_delete_file, os.path.join(download_dir, f"{test_batch_str}_delete.jsonl.gz") + ) + + # Make the manifest file + shutil.copy(os.path.join(self.fixtures_folder, "manifest.csv"), test_batch.manifest_file) + + # Check that missing, expected and existing records are correctly identified + records = [ + "0000-0001-5000-5000.xml", + "0000-0001-5001-3000.xml", + "0000-0001-5002-1000.xml", + "0000-0001-5007-2000.xml", + "0000-0001-5010-1000.xml", + ] + self.assertEqual(set(test_batch.expected_records), set(records)) + self.assertEqual(test_batch.existing_records, []) + self.assertEqual(set(test_batch.missing_records), set(records)) + for record in records: + shutil.copy( + os.path.join(self.fixtures_folder, record), os.path.join(test_batch.download_batch_dir, record) + ) + self.assertEqual(set(test_batch.expected_records), set(records)) + self.assertEqual(test_batch.existing_records, set(records)) + self.assertEqual(test_batch.missing_records, []) + + # Check that the blob uris are correctly generated + expected_blob_uris = [ + "gs://orcid-testing/orcid_summaries/000/0000-0001-5000-5000.xml", + "gs://orcid-testing/orcid_summaries/000/0000-0001-5001-3000.xml", + "gs://orcid-testing/orcid_summaries/000/0000-0001-5002-1000.xml", + "gs://orcid-testing/orcid_summaries/000/0000-0001-5007-2000.xml", + "gs://orcid-testing/orcid_summaries/000/0000-0001-5010-1000.xml", + ] + self.assertEqual(set(test_batch.blob_uris), set(expected_blob_uris)) + + def test_create_orcid_batch_manifest(self): + """Tests the create_orcid_batch_manifest function""" + with tempfile.TemporaryDirectory() as tmp_dir: + download_dir = os.path.join(tmp_dir, "download") + transform_dir = os.path.join(tmp_dir, "transform") + test_batch_str = "12X" + # Create a batch for testing + test_batch = OrcidBatch(download_dir, transform_dir, test_batch_str) + + # Upload the .xml files to the test bucket + client = storage.Client() + bucket_id = f"orcid_test_{random_id()}" + bucket = client.create_bucket(bucket_id) + + blob1 = storage.Blob(f"{test_batch_str}/0000-0001-5000-1000.xml", bucket) + blob1.upload_from_string("Test data 1") + # Make now the reference time - blob1 should be ignored + reference_time = pendulum.now() + blob2 = storage.Blob(f"{test_batch_str}/0000-0001-5000-2000.xml", bucket) + blob2.upload_from_string("Test data 2") + blob3 = storage.Blob(f"{test_batch_str}/0000-0001-5000-3000.xml", bucket) + blob3.upload_from_string("Test data 3") + # Put a blob in a different folder - should be ignored + blob4 = storage.Blob(f"somewhere_else/{test_batch_str}/0000-0001-5000-4000.xml", bucket) + blob4.upload_from_string("Test data 4") + + create_orcid_batch_manifest(orcid_batch=test_batch, reference_time=reference_time, bucket=bucket_id) + with open(test_batch.manifest_file, "w", newline="") as csvfile: + reader = csv.reader(csvfile) + manifest_rows = [row for row in reader] + bucket = [row[0] for row in manifest_rows] + blobs = [row[1] for row in manifest_rows] + orcid = [row[2] for row in manifest_rows] + modification_times = [row[3] for row in manifest_rows] + self.assertEqual(len(manifest_rows), 2) + self.assertEqual(set(blobs), set([blob2.name, blob3.name])) + self.assertEqual(set(orcid), set(["0000-0001-5000-2000", "0000-0001-5000-3000"])) + self.assertEqual(set(modification_times), set([blob2.updated.isoformat(), blob3.updated.isoformat()])) + + +class TestOrcidTelescope(ObservatoryTestCase): + """Tests for the OpenAlex telescope""" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.dag_id = "orcid" + self.project_id = os.getenv("TEST_GCP_PROJECT_ID") + self.data_location = os.getenv("TEST_GCP_DATA_LOCATION") + self.aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID") + self.aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY") + self.aws_region_name = os.getenv("AWS_DEFAULT_REGION")