From 427fbefb1d263af5f6816ed4797b62f1004f5b32 Mon Sep 17 00:00:00 2001 From: Alex Massen-Hane Date: Mon, 7 Aug 2023 12:32:53 +0800 Subject: [PATCH] Made requested changes --- .../pubmed/{deletes.json => delete.json} | 0 .../pubmed/{upserts.json => pubmed.json} | 0 .../workflows/pubmed_telescope.py | 759 ++++++++---------- .../workflows/tests/test_pubmed_telescope.py | 458 +++++++---- 4 files changed, 644 insertions(+), 573 deletions(-) rename academic_observatory_workflows/database/schema/pubmed/{deletes.json => delete.json} (100%) rename academic_observatory_workflows/database/schema/pubmed/{upserts.json => pubmed.json} (100%) diff --git a/academic_observatory_workflows/database/schema/pubmed/deletes.json b/academic_observatory_workflows/database/schema/pubmed/delete.json similarity index 100% rename from academic_observatory_workflows/database/schema/pubmed/deletes.json rename to academic_observatory_workflows/database/schema/pubmed/delete.json diff --git a/academic_observatory_workflows/database/schema/pubmed/upserts.json b/academic_observatory_workflows/database/schema/pubmed/pubmed.json similarity index 100% rename from academic_observatory_workflows/database/schema/pubmed/upserts.json rename to academic_observatory_workflows/database/schema/pubmed/pubmed.json diff --git a/academic_observatory_workflows/workflows/pubmed_telescope.py b/academic_observatory_workflows/workflows/pubmed_telescope.py index 0d34eb2c1..11768f805 100644 --- a/academic_observatory_workflows/workflows/pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/pubmed_telescope.py @@ -18,6 +18,7 @@ import os import re +import ast import gzip import json import time @@ -29,8 +30,8 @@ from datetime import timedelta from google.cloud import bigquery from ftplib import FTP, error_reply -from typing import Any, Union, Dict, List, Optional from google.cloud.bigquery import SourceFormat +from typing import Tuple, Union, Dict, List, Optional from concurrent.futures import ProcessPoolExecutor, as_completed from Bio.Entrez.Parser import StringElement, ListElement, DictionaryElement, OrderedListElement, ValidationError @@ -38,19 +39,16 @@ from airflow.operators.dummy import DummyOperator from airflow.models.taskinstance import TaskInstance -from observatory.platform.airflow import PreviousDagRunSensor, is_first_dag_run -from observatory.platform.files import get_chunks +from academic_observatory_workflows.config import schema_folder as default_schema_folder, Tag + +from observatory.platform.config import AirflowConns +from observatory.platform.observatory_config import CloudWorkspace from observatory.api.client.model.dataset_release import DatasetRelease -from observatory.platform.api import get_latest_dataset_release, get_dataset_releases, make_observatory_api +from observatory.platform.files import get_chunks, load_jsonl, save_jsonl_gz from observatory.platform.gcs import gcs_upload_files, gcs_blob_name_from_path -from observatory.platform.observatory_config import CloudWorkspace -from observatory.platform.config import AirflowConns -from observatory.platform.workflows.workflow import ( - Workflow, - set_task_state, - cleanup, -) - +from observatory.platform.airflow import PreviousDagRunSensor, is_first_dag_run +from observatory.platform.workflows.workflow import Workflow, set_task_state, cleanup +from observatory.platform.api import get_latest_dataset_release, get_dataset_releases, make_observatory_api from observatory.platform.workflows.workflow import ChangefileRelease as DatafileRelease from observatory.platform.bigquery import ( bq_sharded_table_id, @@ -60,7 +58,6 @@ bq_upsert_records, bq_delete_records, ) -from academic_observatory_workflows.config import schema_folder as default_schema_folder, Tag class Datafile: @@ -142,100 +139,29 @@ def download_file_path(self): return os.path.join(self.datafile_release.download_folder, self.filename) @property - def transform_path(self): - assert self.datafile_release is not None, "Datafile.transform_path: self.datafile_release is None" - return self.datafile_release.transform_folder - - def transform_file_path(self, entity_type: str) -> str: - """ - Give path to the transform file depending on entity type. - - :param entity_type: Type of the record, either "upsert" or "delete". - :return: Retuns the path to the transformed file. - """ - - assert self.datafile_release is not None, "Datafile.transform_file_path: self.datafile_release is None" - - # Remove the .gz compression if it's not a baseline file. - if self.baseline: - file_type = self.file_type - else: - file_type = self.file_type[:-3] - - return os.path.join( - self.datafile_release.transform_folder, - f"{entity_type}_{self.filename[:-7]}.{file_type}", - ) - - def transform_file_pattern(self, entity_type: str) -> str: - """Return a glob pattern for the transform files. - - :param entity_type: Type of the record, either "upsert" or "delete". - :return: Retuns the path to the transformed file. - """ - - assert self.datafile_release is not None, "Datafile.transform_file_path: self.datafile_release is None" - return os.path.join( - self.datafile_release.transform_folder, - f"{entity_type}_*.{self.file_type}", - ) - - def merged_transform_file_path( - self, entity_type: str, first_file_index: int, last_file_index: int, part_num: int - ) -> str: - """ - Give path to the merged transform file. - - :param entity_type: Type of the record, either "upsert" or "delete". - :return: Retuns the path to the transformed file. - """ - - assert self.datafile_release is not None, "Datafile.transform_file_path: self.datafile_release is None" - return os.path.join( - self.datafile_release.transform_folder, - f"merged_{entity_type}_{first_file_index}-{last_file_index}_part_{part_num}.{self.file_type}", - ) - - def merged_tranform_file_pattern(self, entity_type: str) -> str: - """ - Return a glob pattern for the merged transform files. - - :param entity_type: Type of the record, either either "upsert" or "delete". - :return: Retuns the path to the transformed file. - """ - assert self.datafile_release is not None, "Datafile.transform_file_path: self.datafile_release is None" - return os.path.join( - self.datafile_release.transform_folder, - f"merged_{entity_type}_*-*_part_*.{self.file_type}", - ) + def transform_baseline_file_path(self): + assert self.datafile_release is not None, "Datafile.transform_baseline_path: self.datafile_release is None" + return os.path.join(self.datafile_release.transform_folder, f"baseline_{self.filename[:-7]}.jsonl.gz") + @property + def transform_upsert_file_path(self): + assert self.datafile_release is not None, "Datafile.transform_upsert_path: self.datafile_release is None" + return os.path.join(self.datafile_release.transform_folder, f"upserts_{self.filename[:-7]}.jsonl") -class PubmedEntity: - def __init__( - self, - type: str, - sub_key: str, - set_key: str, - pmid_location: Optional[str], - table_name: Optional[str], - table_description: str, - ): - """ - Holds the metadata about the records stored in Pubmed. - :param type: Either a set of 'upsert' or 'deletion' records. - :param sub_key: Sub key location of Pubmed data. - :param set_key: Key location of Pubmed. - :param pmid_location: Location of PMID values - :param table_description: Description for the table on Bigquery. - """ + @property + def transform_delete_file_path(self): + assert self.datafile_release is not None, "Datafile.delete_file_path: self.datafile_release is None" + return os.path.join(self.datafile_release.transform_folder, f"deletes_{self.filename[:-7]}.jsonl") - self.type = type - self.sub_key = sub_key - self.set_key = set_key - self.pmid_location = pmid_location - self.table_description = table_description + @property + def merged_upsert_file_path(self): + assert self.datafile_release is not None, "Datafile.merged_upsert_path: self.datafile_release is None" + return os.path.join(self.datafile_release.transform_folder, f"upsert_merged_{self.filename[:-7]}.jsonl.gz") - self.schema_file_path = os.path.join(default_schema_folder(), "pubmed", f"{table_name}.json") + @property + def merged_delete_file_path(self): + assert self.datafile_release is not None, "Datafile.merged_delete_file_path: self.datafile_release is None" + return os.path.join(self.datafile_release.transform_folder, f"delete_merged_{self.filename[:-7]}.jsonl.gz") class PubMedRelease(DatafileRelease): @@ -287,27 +213,30 @@ def __init__( for datafile in datafile_list: datafile.datafile_release = self.datafile_release - def transfer_blob_pattern(self, entity_type: str) -> str: + def schema_file_path(self, record_type: str) -> str: + return os.path.join(default_schema_folder(), "pubmed", f"{record_type}.json") + + def transfer_blob_pattern(self, table_type: str) -> str: """ Create a blob pattern for importing the transformed unmerged records from GCS into Bigquery. - :param entity_type: Type of the entity. + :param table_type: Type of the record. :return: Uri pattern for transformed files. """ - return f"gs://{self.cloud_workspace.transform_bucket}/{gcs_blob_name_from_path(self.datafile_release.transform_folder)}/{entity_type}_*.{self.datafile_list[0].file_type}" + return f"gs://{self.cloud_workspace.transform_bucket}/{gcs_blob_name_from_path(self.datafile_release.transform_folder)}/{table_type}_*.{self.datafile_list[0].file_type}" - def merged_transfer_blob_pattern(self, entity_type: str) -> str: + def merged_transfer_blob_pattern(self, table_type: str) -> str: """ Create a blob pattern for importing the transformed merged records from GCS into Bigquery. Only the upserts and delete records are merged and will use this glob pattern. - :param entity_type: Type of the entity. + :param table_type: Type of the record. :return: Uri pattern for merged transform files. """ - return f"gs://{self.cloud_workspace.transform_bucket}/{gcs_blob_name_from_path(self.datafile_release.transform_folder)}/merged_{entity_type}_*.{self.datafile_list[0].file_type}" + return f"gs://{self.cloud_workspace.transform_bucket}/{gcs_blob_name_from_path(self.datafile_release.transform_folder)}/{table_type}_merged_*.{self.datafile_list[0].file_type}" class PubMedTelescope(Workflow): @@ -330,7 +259,6 @@ def __init__( bq_dataset_description: str = "Pubmed Medline database, only PubmedArticle records: https://pubmed.ncbi.nlm.nih.gov/about/", start_date: pendulum.DateTime = pendulum.datetime(year=2022, month=12, day=8), schedule_interval: str = "@weekly", - catchup: bool = False, ftp_server_url: str = "ftp.ncbi.nlm.nih.gov", ftp_port: int = 21, reset_ftp_counter: int = 40, @@ -350,7 +278,6 @@ def __init__( :param bq_dataset_description: Description of the Pubmed dataset. :param start_date: The start date of the DAG. :param schedule_interval: tTe schedule interval of the DAG, how often it should run. - :param catchup: Turn catch up on or not. :param ftp_server_url: Server address of Pubmed's FTP server. :param ftp_port: Port for connectiong to Pubmed's FTP server. :param reset_ftp_counter: Resets FTP connection after downloading x number of files. @@ -358,7 +285,7 @@ def __init__( :param num_merged_records: Number of records to write to file when merging the upsert and delete records, before import into BQ. :param queue: The queue that the tasks should run on, "default" or "remote_queue". :param snapshot_expiry_days: How long until the backup snapshot (before this release's upserts and deletes) of the Pubmed table exist in BQ. - :param max_processes: Max number of parallel processors. + :param max_processes: Max number of parallel processes. """ self.observatory_api_conn_id = observatory_api_conn_id @@ -367,7 +294,7 @@ def __init__( dag_id=dag_id, start_date=start_date, schedule_interval=schedule_interval, - catchup=catchup, + catchup=False, airflow_conns=[observatory_api_conn_id], queue=queue, tags=[Tag.academic_observatory], @@ -378,44 +305,15 @@ def __init__( self.bq_dataset_id = bq_dataset_id self.bq_table_id = bq_table_id self.main_table_id = f"{cloud_workspace.project_id}.{bq_dataset_id}.{bq_table_id}" + self.upsert_table_id = f"{self.main_table_id}_upsert" + self.delete_table_id = f"{self.main_table_id}_delete" self.bq_dataset_description = bq_dataset_description self.snapshot_expiry_days = snapshot_expiry_days - # Metadata on how to pull out and transform the Pubmed data. - self.entity_list = { - "baseline": PubmedEntity( - type="baseline", - sub_key="PubmedArticle", - set_key="PubmedArticleSet", - pmid_location="MedlineCitation", - table_name="upserts", - table_description="""Pubmed's PubmedArticle table - Includes all the metadata associated with a journal article citation, - both the metadata to describe the published article, i.e. , and additional metadata often pertaining to - the publication's history or processing at NLM, i.e. .""", - ), - "upsert": PubmedEntity( - type="upsert", - sub_key="PubmedArticle", - set_key="PubmedArticleSet", - pmid_location="MedlineCitation", - table_name="upserts", - table_description="""PubmedArticle - Includes all the metadata associated with a journal article citation, both the - metadata to describe the published article, i.e. , and additional metadata often - pertaining to the publication's history or processing at NLM, i.e. .""", - ), - "delete": PubmedEntity( - type="delete", - sub_key="PMID", - set_key="DeleteCitation", - pmid_location=None, - table_name="deletes", - table_description="""DeleteCitation - Indicates one or more or that have been deleted. - PMIDs in DeleteCitation will typically have been found to be duplicate citations, or citations to content - that was determined to be out-of-scope for PubMed. It is possible that a PMID would appear in DeleteCitation - without having been distributed in a previous file. This would happen if the creation and deletion of the - record take place on the same day.""", - ), - } + # Descriptions of the Pubmed tables. + self.baseline_table_description = """Pubmed's main table of PubmedArticle reocrds - Includes all the metadata associated with a journal article citation, both the metadata to describe the published article, i.e. , and additional metadata often pertaining to the publication's history or processing at NLM, i.e. .""" + self.upsert_table_description = """PubmedArticle upserts - Includes all the metadata associated with a journal article citation, both the metadata to describe the published article, i.e. , and additional metadata often pertaining to the publication's history or processing at NLM, i.e. .""" + self.delete_table_description = """PubmedArticle deletes - Indicates one or more or that have been deleted. PMIDs in DeleteCitation will typically have been found to be duplicate citations, or citations to content that was determined to be out-of-scope for PubMed. It is possible that a PMID would appear in DeleteCitation without having been distributed in a previous file. This would happen if the creation and deletion of the record take place on the same day.""" # Workflow specific parameters. self.ftp_server_url = ftp_server_url @@ -460,14 +358,16 @@ def __init__( self.add_task(self.upload_downloaded_updatefiles) self.add_task(self.transform_updatefiles) - # Merge and apply upsert records. - self.add_task(self.merge_upsert_records) + # Determine which upserts and deletes are necessary to keep for this release. + self.add_task(self.merge_upserts_and_deletes) + self.add_task(self.save_merged_upserts_and_deletes) + + # Uplaod and apply upsert records. self.add_task(self.upload_merged_upsert_records) self.add_task(self.bq_load_upsert_table) self.add_task(self.bq_upsert_records) - # Merge and apply delete records. - self.add_task(self.merge_delete_records) + # Uplaod amd apply delete records. self.add_task(self.upload_merged_delete_records) self.add_task(self.bq_load_delete_table) self.add_task(self.bq_delete_records) @@ -510,9 +410,9 @@ def list_datafiles_for_release(self, **kwargs) -> bool: baseline_first_file = [file for file in baseline_list_ftp if file.endswith("0001.xml.gz")][0] baseline_upload = ftp_conn.sendcmd("MDTM {}".format(baseline_first_file))[4:] baseline_upload_date = pendulum.from_format(baseline_upload, "YYYYMMDDHHmmss") - logging.info(f"baselinev upload time: {baseline_first_file}, {baseline_upload_date}") + logging.info(f"baseline upload time: {baseline_first_file}, {baseline_upload_date}") - # Make workflow re-download the baseline yearly data if the upload date does not match from the last release. + # Make workflow re-download the baseline yearly data if the upload date does not match the date from the last release. if is_first_run: year_first_run = True else: @@ -716,10 +616,10 @@ def download_baseline(self, release: PubMedRelease, **kwargs): logging.info("download_baseline: skipping as this is only run when there is a new baseline dump available.") return - baseline_datafiles = [datafile for datafile in release.datafile_list if datafile.baseline] + datafiles = [datafile for datafile in release.datafile_list if datafile.baseline] - success = download_datafiles_from_ftp_server( - datafile_list=baseline_datafiles, + success = download_datafiles( + datafile_list=datafiles, ftp_server_url=self.ftp_server_url, ftp_port=self.ftp_port, reset_ftp_counter=self.reset_ftp_counter, @@ -738,11 +638,11 @@ def upload_downloaded_baseline(self, release: PubMedRelease, **kwargs): return # Grab list of files to upload. - datafiles_to_upload = [datafile.download_file_path for datafile in release.datafile_list if datafile.baseline] + file_paths = [datafile.download_file_path for datafile in release.datafile_list if datafile.baseline] success = gcs_upload_files( bucket_name=self.cloud_workspace.download_bucket, - file_paths=datafiles_to_upload, + file_paths=file_paths, ) set_task_state(success, kwargs["ti"].task_id, release=release) @@ -750,8 +650,6 @@ def upload_downloaded_baseline(self, release: PubMedRelease, **kwargs): def transform_baseline(self, release: PubMedRelease, **kwargs): """ Transform the *.xml.gz files downloaded from PubMed into usable json files for BigQuery import. - - This is a multithreaded task that pulls the information from the files using the keys defined in self.entity_list. """ if not release.year_first_run: @@ -761,23 +659,25 @@ def transform_baseline(self, release: PubMedRelease, **kwargs): return # Get list of datafiles for the baseline to trasform. - datafiles_to_transform = [datafile for datafile in release.datafile_list if datafile.baseline] + datafiles = [datafile for datafile in release.datafile_list if datafile.baseline] # Process files in batches so that ProcessPoolExecutor doesn't deplete the system of memory - for i, chunk in enumerate(get_chunks(input_list=datafiles_to_transform, chunk_size=self.max_processes)): + for i, chunk in enumerate(get_chunks(input_list=datafiles, chunk_size=self.max_processes)): with ProcessPoolExecutor(max_workers=self.max_processes) as executor: logging.info(f"In chunk {i} and processing files: {chunk}") - futures = [ - executor.submit(transform_pubmed_xml_file_to_jsonl, datafile, [self.entity_list["baseline"]]) - for datafile in chunk - ] + futures = [] + datafile: Datafile + for datafile in chunk: + input_path = datafile.download_file_path + upsert_path = datafile.transform_baseline_file_path + futures.append(executor.submit(transform_pubmed, input_path, upsert_path)) # Make sure that all datafiles have been properly transformed. for future in as_completed(futures): - transformed_datafile = future.result() + filename = future.result() - logging.info(f"Successfully transformed file - {transformed_datafile.filename}") + assert filename, f"Unable to transform baseline file: {filename}" def upload_transformed_baseline(self, release: PubMedRelease, **kwargs): """Upload transformed baseline files to GCS.""" @@ -788,13 +688,11 @@ def upload_transformed_baseline(self, release: PubMedRelease, **kwargs): ) return - files_to_upload = [ - datafile.transform_file_path("baseline") for datafile in release.datafile_list if datafile.baseline - ] + file_paths = [datafile.transform_baseline_file_path for datafile in release.datafile_list if datafile.baseline] success = gcs_upload_files( bucket_name=self.cloud_workspace.transform_bucket, - file_paths=files_to_upload, + file_paths=file_paths, ) set_task_state(success, kwargs["ti"].task_id, release=release) @@ -826,26 +724,29 @@ def bq_load_main_table(self, release: PubMedRelease, **kwargs): uri=baseline_transform_blob_pattern, table_id=self.main_table_id, source_format=SourceFormat.NEWLINE_DELIMITED_JSON, - schema_file_path=self.entity_list["baseline"].schema_file_path, + schema_file_path=release.schema_file_path(record_type="pubmed"), write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, - table_description=self.entity_list["baseline"].table_description, - partition=False, - ignore_unknown_values=False, + table_description=self.baseline_table_description, + ignore_unknown_values=True, ) set_task_state(success, kwargs["ti"].task_id, release=release) def create_snapshot(self, release: PubMedRelease, **kwargs): - """Create a snapshot of main table as a backup just in case something happens with the below upserts and - deletes.""" + """Create a snapshot of main table as a backup just in case something happens when applying the upserts and deletes.""" - snapshot_table_id = bq_sharded_table_id( - self.cloud_workspace.project_id, self.bq_dataset_id, f"{self.bq_table_id}_backup", release.start_date - ) - expiry_date = pendulum.now().add(days=self.snapshot_expiry_days) - success = bq_snapshot(src_table_id=self.main_table_id, dst_table_id=snapshot_table_id, expiry_date=expiry_date) - - set_task_state(success, kwargs["ti"].task_id, release=release) + if not release.year_first_run: + snapshot_table_id = bq_sharded_table_id( + self.cloud_workspace.project_id, self.bq_dataset_id, f"{self.bq_table_id}_snapshot", release.start_date + ) + expiry_date = pendulum.now().add(days=self.snapshot_expiry_days) + success = bq_snapshot( + src_table_id=self.main_table_id, dst_table_id=snapshot_table_id, expiry_date=expiry_date + ) + logging.info(f"Created snapshot table: {snapshot_table_id}") + set_task_state(success, kwargs["ti"].task_id, release=release) + else: + logging.info("Not required to create a snapshot of the table for this run.") def download_updatefiles(self, release: PubMedRelease, **kwargs): """ @@ -856,7 +757,7 @@ def download_updatefiles(self, release: PubMedRelease, **kwargs): updatefiles_datafiles = [datafile for datafile in release.datafile_list if not datafile.baseline] - success = download_datafiles_from_ftp_server( + success = download_datafiles( datafile_list=updatefiles_datafiles, ftp_server_url=self.ftp_server_url, ftp_port=self.ftp_port, @@ -869,7 +770,6 @@ def download_updatefiles(self, release: PubMedRelease, **kwargs): def upload_downloaded_updatefiles(self, release: PubMedRelease, **kwargs): """Upload downloaded updatefiles files to GCS.""" - # Grab list of files to upload. datafiles_to_upload = [ datafile.download_file_path for datafile in release.datafile_list if not datafile.baseline ] @@ -885,115 +785,155 @@ def transform_updatefiles(self, release: PubMedRelease, **kwargs): """ Transform the *.xml.gz files downloaded from PubMed's FTP server into usable json-like files for BigQuery import. - This is a multithreaded task that pulls the information from the files using the keys defined in self.entity_list. + This is a multithreaded and pulls the PubmedArticle records from the downloaded XML files. """ # Get list of datafiles for the baseline to trasform. - datafiles_to_transform = [datafile for datafile in release.datafile_list if not datafile.baseline] + datafiles = [datafile for datafile in release.datafile_list if not datafile.baseline] + + # Object to store all of the upserts and delete keys that are present in each file. + record_keys = {} # Process files in batches so that ProcessPoolExecutor doesn't deplete the system of memory - for i, chunk in enumerate(get_chunks(input_list=datafiles_to_transform, chunk_size=self.max_processes)): + for i, chunk in enumerate(get_chunks(input_list=datafiles, chunk_size=self.max_processes)): with ProcessPoolExecutor(max_workers=self.max_processes) as executor: logging.info(f"In chunk {i} and processing files: {chunk}") - futures = [ - executor.submit( - transform_pubmed_xml_file_to_jsonl, - datafile, - [self.entity_list["upsert"], self.entity_list["delete"]], - ) - for datafile in chunk - ] + futures = [] + datafile: Datafile + for datafile in chunk: + input_path = datafile.download_file_path + upsert_path = datafile.transform_upsert_file_path + delete_path = datafile.transform_delete_file_path + + futures.append(executor.submit(transform_pubmed, input_path, upsert_path, delete_path)) - # Make sure that all datafiles have been properly transformed. for future in as_completed(futures): - transformed_datafile = future.result() + filename, record_keys_per_file = future.result() + record_keys[filename] = record_keys_per_file - logging.info(f"Successfully transformed file - {transformed_datafile.filename}") + assert len(datafiles) == len( + record_keys.keys() + ), f"Number of updatefiles does not match the number of keys in record_keys: {len(datafiles)} vs {len(record_keys.keys())}" - ########## UPSERT RECORDS ############# + ti: TaskInstance = kwargs["ti"] + ti.xcom_push(key="record_keys", value=record_keys) - def merge_upsert_records(self, release: PubMedRelease, **kwargs): + def merge_upserts_and_deletes(self, release: PubMedRelease, **kwargs): """ - Merge the upsert records that will be applied to the main table. + Using the keys found from the transform step, loop through each of the updatefiles and determine + which records needed to be kept for this release period. - Only keep the newest upsert record from the updatefiles. + Upserts are replaced with newer updated versions and duplicated deletes are removed throughout. + + If an upsert appears in a newer file, it is removed from the previous sets of deletes and upserts. + Similarly, if a delete appears in a newer file, old upserts and deletes with the same keys are removed + from previous updatefiles. """ - upsert_files_to_merge = [datafile for datafile in release.datafile_list if not datafile.baseline] + ti: TaskInstance = kwargs["ti"] + record_keys: dict = ti.xcom_pull(key="record_keys") - upsert_files = {} - for datafile in upsert_files_to_merge: - transform_file = datafile.transform_file_path(entity_type=self.entity_list["upsert"].type) + logging.info(f"Dictionary of keys per updatefile: {record_keys}") - with open(transform_file, "r") as f_in: - incoming_data = [json.loads(line) for line in f_in] + # Ensure that the list of keys is sorted on the filename/file_index. + # Order matters here because we are using the order of the updatefiles to base + # which upserts and delete records we keep. + record_keys_list = list(record_keys.keys()) + record_keys_list.sort() - # Get list of 'primary keys' for each record. - records = [str(record[self.entity_list["upsert"].pmid_location]["PMID"]) for record in incoming_data] + prev_updatefile_list = [] + for updatefile in record_keys_list: + logging.info(f"Looking at keys in file: {updatefile}") - set_new_records_to_check = set(records) - for key_file_path, records_in_file in upsert_files.items(): - # Find the primary keys that are the same in the old updatefile. - set_old = set(records_in_file) - same_records = set_old.intersection(set_new_records_to_check) + current_upserts = set(record_keys[updatefile]["upserts"]) + current_deletes = set(record_keys[updatefile]["deletes"]) - # Remove primary keys in the old upsert file set if it is present in a newer upsert file. - set_old.difference_update(same_records) + # Loop through previous updatefiles to check for the below cases. + for prev_updatefile in prev_updatefile_list: + # Convert to sets for easy checking + prev_updatefile_upserts = set(record_keys[prev_updatefile]["upserts"]) + prev_updatefile_deletes = set(record_keys[prev_updatefile]["deletes"]) - if same_records: - logging.info( - ( - f"Found {len(same_records)} newer records in file: {transform_file.split('/')[-1]} vs {key_file_path.split('/')[-1]}." - f" {len(set_old)} records left in {key_file_path.split('/')[-1]}" - ) - ) + logging.info(f"Removing old upserts and deletes from: {prev_updatefile}") - # Relace with list of primary keys with old keys removed. - upsert_files[key_file_path] = list(set_old) + prev_updatefile_upserts.difference_update(current_upserts) - # Add newest updatefile to list of updatefiles and it's records. - upsert_files[transform_file] = records + # Remove any deletes from previous updatefiles if there is an updated upsert of that key. + prev_updatefile_deletes.difference_update(current_upserts) - merged_upsert_files = [] - first_file_index = upsert_files_to_merge[0].file_index - last_file_index = upsert_files_to_merge[-1].file_index - for file_count, (transform_file, records_to_write) in enumerate(upsert_files.items()): - logging.info(f"Reading in file to pull out records - {transform_file}") - with open(transform_file) as f_in: - incoming_data = [json.loads(line) for line in f_in] + # Remove any duplicate deletes across previous updatefiles. + prev_updatefile_deletes.difference_update(current_deletes) - # Turn list of records to a dictionary for faster lookup. - records = { - str(record[self.entity_list["upsert"].pmid_location]["PMID"]): record for record in incoming_data - } + # Remove any upserts across previous all updatefiles if theres a delete in current updatefile. + prev_updatefile_upserts.difference_update(current_deletes) - merged_output_file = datafile.merged_transform_file_path( - self.entity_list["upsert"].type, first_file_index, last_file_index, part_num=file_count + 1 - ) + # Convert back to lists + record_keys[prev_updatefile]["upserts"] = list(prev_updatefile_upserts) + record_keys[prev_updatefile]["deletes"] = list(prev_updatefile_deletes) + + # Add updatefile to list of previously checked files. + prev_updatefile_list.append(updatefile) + + # Remove possible duplicates from updatefile by converting back to list.. + current_upserts_list = list(current_upserts) + current_deletes_list = list(current_deletes) + + current_upserts_list.sort() + current_deletes_list.sort() + + record_keys[updatefile]["upserts"] = current_upserts_list + record_keys[updatefile]["deletes"] = current_deletes_list - logging.info(f"Writing {len(records_to_write)} records to file - {merged_output_file}") + # Push list of valid records to xcom for the next section. + ti.xcom_push(key="valid_record_keys", value=record_keys) - # Write to compressed *.jsonl.gz as this will be uploaded to GCS and imported into BQ. - with gzip.open(merged_output_file, "w") as f_out: - for record_key in records_to_write: - f_out.write(str.encode(json.dumps(records[record_key], cls=PubMedCustomEncoder) + "\n")) + def save_merged_upserts_and_deletes(self, release: PubMedRelease, **kwargs): + """Write the valid upserts and deletes for this release to file.""" - merged_upsert_files.append(merged_output_file) + datafiles = [datafile for datafile in release.datafile_list if not datafile.baseline] - # Send list of merged upsert record files to the Xcom. ti: TaskInstance = kwargs["ti"] - ti.xcom_push(key="merged_upsert_files", value=merged_upsert_files) + valid_record_keys = ti.xcom_pull(key="valid_record_keys") + + # Process datafiles in parallel. + for i, chunk in enumerate(get_chunks(input_list=datafiles, chunk_size=self.max_processes)): + with ProcessPoolExecutor(max_workers=self.max_processes) as executor: + logging.info(f"In chunk {i} and processing files: {chunk}") + + futures = [] + datafile: Datafile + for datafile in chunk: + filename = os.path.basename(datafile.download_file_path) + assert ( + filename in valid_record_keys + ), f"records for file: {filename} are not present in valid_record_keys. " + + futures.append( + executor.submit( + save_pubmed_merged_records, + filename, + valid_record_keys[filename], + datafile.transform_upsert_file_path, + datafile.merged_upsert_file_path, + datafile.merged_delete_file_path, + ) + ) + + for future in as_completed(futures): + filename = future.result() + logging.info(f"Finished writing out upserts and deletes for updatefile: {filename}") + + ########## UPSERT RECORDS ########## def upload_merged_upsert_records(self, release: PubMedRelease, **kwargs): """Upload the merged upsert records to GCS.""" - ti: TaskInstance = kwargs["ti"] - merged_upsert_files = ti.xcom_pull(key="merged_upsert_files") + file_paths = [datafile.merged_upsert_file_path for datafile in release.datafile_list if not datafile.baseline] success = gcs_upload_files( bucket_name=self.cloud_workspace.transform_bucket, - file_paths=merged_upsert_files, + file_paths=file_paths, ) set_task_state(success, kwargs["ti"].task_id, release=release) @@ -1001,36 +941,26 @@ def upload_merged_upsert_records(self, release: PubMedRelease, **kwargs): def bq_load_upsert_table(self, release: PubMedRelease, **kwargs): """Ingest the upsert records from GCS to BQ using a file pattern.""" - upsert_table_id = bq_sharded_table_id( - self.cloud_workspace.project_id, - self.bq_dataset_id, - table_name=self.entity_list["upsert"].type, - date=release.end_date, - ) - merged_transform_blob_pattern = release.merged_transfer_blob_pattern( - entity_type=self.entity_list["upsert"].type - ) + merged_transform_blob_pattern = release.merged_transfer_blob_pattern(table_type="upsert") - logging.info(f"Uploading to table - {upsert_table_id}") + logging.info(f"Uploading to table - {self.upsert_table_id}") success = bq_load_table( uri=merged_transform_blob_pattern, - table_id=upsert_table_id, + table_id=self.upsert_table_id, source_format=SourceFormat.NEWLINE_DELIMITED_JSON, - schema_file_path=self.entity_list["upsert"].schema_file_path, + schema_file_path=release.schema_file_path(record_type="pubmed"), write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, - table_description=self.entity_list["upsert"].table_description, - partition=False, - ignore_unknown_values=False, + table_description=self.upsert_table_description, + ignore_unknown_values=True, ) - assert success, f"Unable to tranfer to table - {upsert_table_id}" + assert success, f"Unable to tranfer to table - {self.upsert_table_id}" expiry_date = pendulum.now().add(days=7) - bq_update_table_expiration(upsert_table_id, expiration_date=expiry_date) + bq_update_table_expiration(self.upsert_table_id, expiration_date=expiry_date) - ti: TaskInstance = kwargs["ti"] - ti.xcom_push(key="upsert_table_id", value=upsert_table_id) + set_task_state(success, kwargs["ti"].task_id, release=release) def bq_upsert_records(self, release: PubMedRelease, **kwargs): """ @@ -1040,74 +970,24 @@ def bq_upsert_records(self, release: PubMedRelease, **kwargs): the main table. """ - ti: TaskInstance = kwargs["ti"] - upsert_table_id = ti.xcom_pull(key="upsert_table_id") - keys_to_match_on = ["MedlineCitation.PMID.value", "MedlineCitation.PMID.Version"] bq_upsert_records( main_table_id=self.main_table_id, - upsert_table_id=upsert_table_id, + upsert_table_id=self.upsert_table_id, primary_key=keys_to_match_on, ) ########## DELETE RECORDS ########## - def merge_delete_records(self, release: PubMedRelease, **kwargs): - """ - Merge the delete records that will be applied to the main table. - - Remove duplicates and write file. - """ - - delete_files_to_merge = [datafile for datafile in release.datafile_list if not datafile.baseline] - - merged = [] - first_file_index = delete_files_to_merge[0].file_index - for datafile in delete_files_to_merge: - transform_file = datafile.transform_file_path(entity_type=self.entity_list["delete"].type) - - logging.info(f"Reading in file - {transform_file}") - - # Read in delete file - with open(transform_file, "r") as f_in: - incoming_data = [json.loads(line) for line in f_in] - - merged.extend(incoming_data) - - # Remove duplicate records - merged_unique = [dict(tup) for tup in {tuple(delete.items()) for delete in merged}] - logging.info(f"{len(merged)} in merged but number of unique entries are: {len(merged_unique)}") - - # Write out to file with num_merged_records in each file. - merged_delete_files = [] - for i, chunk in enumerate(get_chunks(input_list=merged_unique, chunk_size=self.num_merged_records)): - merged_output_file = datafile.merged_transform_file_path( - self.entity_list["delete"].type, first_file_index, datafile.file_index, part_num=i + 1 - ) - - logging.info(f"Writing to file - {merged_output_file}") - - # Write to compressed *.jsonl.gz as this will be uploaded to GCS and imported into BQ. - with gzip.open(merged_output_file, "w") as f_out: - for line in chunk: - f_out.write(str.encode(json.dumps(line, cls=PubMedCustomEncoder) + "\n")) - - merged_delete_files.append(merged_output_file) - - # Send list of merged delete record files to the Xcom. - ti: TaskInstance = kwargs["ti"] - ti.xcom_push(key="merged_delete_files", value=merged_delete_files) - def upload_merged_delete_records(self, release: PubMedRelease, **kwargs): """Upload the merged delete records to GCS.""" - ti: TaskInstance = kwargs["ti"] - merged_delete_files = ti.xcom_pull(key="merged_delete_files") + file_paths = [datafile.merged_delete_file_path for datafile in release.datafile_list if not datafile.baseline] success = gcs_upload_files( bucket_name=self.cloud_workspace.transform_bucket, - file_paths=merged_delete_files, + file_paths=file_paths, ) set_task_state(success, kwargs["ti"].task_id, release=release) @@ -1115,37 +995,26 @@ def upload_merged_delete_records(self, release: PubMedRelease, **kwargs): def bq_load_delete_table(self, release: PubMedRelease, **kwargs): """Ingest delete records from GCS to BQ.""" - delete_table_id = bq_sharded_table_id( - self.cloud_workspace.project_id, - self.bq_dataset_id, - table_name=self.entity_list["delete"].type, - date=release.end_date, - ) - merged_transform_blob_pattern = release.merged_transfer_blob_pattern( - entity_type=self.entity_list["delete"].type - ) + merged_transform_blob_pattern = release.merged_transfer_blob_pattern("delete") - logging.info(f"Uploading to table - {delete_table_id}") + logging.info(f"Uploading to table - {self.delete_table_id}") success = bq_load_table( uri=merged_transform_blob_pattern, - table_id=delete_table_id, + table_id=self.delete_table_id, source_format=SourceFormat.NEWLINE_DELIMITED_JSON, - schema_file_path=self.entity_list["delete"].schema_file_path, + schema_file_path=release.schema_file_path(record_type="delete"), write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE, - table_description=self.entity_list["delete"].table_description, - partition=False, - ignore_unknown_values=False, + table_description=self.delete_table_description, + ignore_unknown_values=True, ) - assert success, f"Unable to tranfer to table - {delete_table_id}" + assert success, f"Unable to tranfer to table - {self.delete_table_id}" expiry_date = pendulum.now().add(days=7) - bq_update_table_expiration(delete_table_id, expiration_date=expiry_date) + bq_update_table_expiration(self.delete_table_id, expiration_date=expiry_date) - # Send list of merged delete record files to the Xcom. - ti: TaskInstance = kwargs["ti"] - ti.xcom_push(key="delete_table_id", value=delete_table_id) + set_task_state(success, kwargs["ti"].task_id, release=release) def bq_delete_records(self, release: PubMedRelease, **kwargs): """ @@ -1155,15 +1024,12 @@ def bq_delete_records(self, release: PubMedRelease, **kwargs): the main table. """ - # Pull list of update tables from xcom - ti: TaskInstance = kwargs["ti"] - delete_table_id = ti.xcom_pull(key="delete_table_id") main_table_keys_to_match_on = ["MedlineCitation.PMID.value", "MedlineCitation.PMID.Version"] delete_table_keys_to_match_on = ["value", "Version"] bq_delete_records( main_table_id=self.main_table_id, - delete_table_id=delete_table_id, + delete_table_id=self.delete_table_id, main_table_primary_key=main_table_keys_to_match_on, delete_table_primary_key=delete_table_keys_to_match_on, ) @@ -1204,7 +1070,7 @@ def cleanup(self, release: PubMedRelease, **kwargs): cleanup(dag_id=self.dag_id, execution_date=kwargs["logical_date"], workflow_folder=release.workflow_folder) -def download_datafiles_from_ftp_server( +def download_datafiles( datafile_list: List[Datafile], ftp_server_url: str, ftp_port: int, @@ -1304,70 +1170,138 @@ def download_datafiles_from_ftp_server( return download_success -def transform_pubmed_xml_file_to_jsonl(datafile: Datafile, entity_list: List[PubmedEntity]) -> Union[Datafile, bool]: +def load_datafile(input_path: str) -> List[Dict]: + """Read in a Pubmed XML file and return it in a well-defined diction/json object. + + :param input_path: Path to the Pubmed xml.gz file. + :return data: A list of Pubmed records. + """ + + logging.info(f"Reading in file - {input_path}") + + with gzip.open(input_path, "rb") as f_in: + # Use the BioPython package for reading in the Pubmed XML files. + # This package also checks against its own DTD schema defined in the XML header. + data = Entrez.read(f_in, validate=True) + + # Need pull out XML attributes from the Biopython data classes. + data = add_attributes(data) + + # Remove unwanted nested list structure from the Pubmed dictionary. + data = change_pubmed_list_structure(data) + + return data + + +def save_pubmed_jsonl(output_path: str, data: List[Dict]): + """Save a Pubmed jsonl to file using the custom encoder. + + :param output_path: Path of the output file. + :param data: The data to write to file. + :return: None. + """ + + if output_path.endswith(".gz"): + with gzip.open(output_path, "w") as f_out: + for line in data: + f_out.write(str.encode(json.dumps(line, cls=PubMedCustomEncoder) + "\n")) + else: + with open(output_path, "w") as f_out: + for line in data: + f_out.write(json.dumps(line, cls=PubMedCustomEncoder) + "\n") + + +def parse_articles(data: Dict) -> Union[List, List[Dict]]: + try: + return data["PubmedArticle"] + except KeyError: + logging.info(f"No PubmedArticle records in file") + return [] + + +def parse_deletes(data: Dict) -> Union[List, List[Dict]]: + try: + return data["DeleteCitation"]["PMID"] + except KeyError: + logging.info(f"No DeleteCitation.PMID records in file") + return [] + + +def transform_pubmed( + input_path: str, upsert_path: str, delete_path: Optional[str] = None +) -> Union[str, Tuple[str, dict]]: """ Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed entities and their upserts and or deletes. Used in parallelised transform section. - :param datafile: Incoming datafile to transform. - :param entity_list: List of entities to pull out from the datafile. - :return: Datafile that's been transformed. + :param input_path: Path to the donwloaded xml.gz file. + :param upsert_path: Output file path for the upserts. + :param delete_path: Output file path for the deletes. + :return bool: True if transform was successful, False if not. """ - logging.info(f"Reading in file - {datafile.filename}") + try: + data = load_datafile(input_path) + except ValidationError: + logging.info(f"Fields in XML are not valid against it's own DTD file - {input_path}") + return False - with gzip.open(datafile.download_file_path, "rb") as f_in: - # Use the BioPython package for reading in the Pubmed XML files. - # This package also checks against it's own DTD schema defined in the XML header. - try: - data_dirty1 = Entrez.read(f_in, validate=True) - except ValidationError: - logging.info(f"Fields in XML are not valid against it's own DTD file - {datafile.filename}") - return False + upserts = parse_articles(data) + logging.info(f"Pulled out {len(upserts)} upserts from file, writing to file: {upsert_path}") - # Need pull out XML attributes from the Biopython data classes. - data_dirty2 = add_attributes_to_data_from_biopython_classes(data_dirty1) - del data_dirty1 + # Save upserts to file. + save_pubmed_jsonl(upsert_path, upserts) - # Remove unwanted nested list structure from the Pubmed dictionary. - data = change_pubmed_list_structure(data_dirty2) - del data_dirty2 + if delete_path is not None: + deletes = parse_deletes(data) + logging.info(f"Pulled out {len(deletes)} deletes from file, writing to file: {delete_path}") - for entity in entity_list: - try: - try: - data_part = [retrieve for retrieve in data[entity.set_key][entity.sub_key]] - except KeyError: - data_part = [retrieve for retrieve in data[entity.sub_key]] + # Save deletes to file. + save_pubmed_jsonl(delete_path, deletes) - logging.info( - f"Pulled out {len(data_part)} {f'{entity.sub_key} {entity.type}'} from file - {datafile.filename}" - ) + # Pull out keys of upserts and deletes from an updatefile. This is not required for baseline files. + upsert_keys = [str(record["MedlineCitation"]["PMID"]) for record in upserts] + delete_keys = [str(record) for record in deletes] - except KeyError: - logging.info(f"No {f'{entity.sub_key} {entity.type}'} records in file - {datafile.filename}") - data_part = [] + return os.path.basename(input_path), dict(upserts=upsert_keys, deletes=delete_keys) - output_file = datafile.transform_file_path(entity.type) + return os.path.basename(input_path) - logging.info(f"Writing {entity.type} {entity.type} to file - {output_file}") - # Baseline files are not read in later, can be dumped straight to compressed file for import to GCS and BQ. - if datafile.baseline: - with gzip.open(output_file, "w") as f_out: - for line in data_part: - f_out.write(str.encode(json.dumps(line, cls=PubMedCustomEncoder) + "\n")) - else: - with open(output_file, "w") as f_out: - for line in data_part: - f_out.write(json.dumps(line, cls=PubMedCustomEncoder) + "\n") +def save_pubmed_merged_records( + filename: str, + valid_record_keys_per_file: dict, + upsert_input_path: str, + upsert_output_path: str, + delete_output_path: str, +) -> str: + """Used in parallel by save_merged_upserts_and_deletes to write out the merged upsert and delete records + using the custom Pubmed encoder. + + :param valid_record_keys_per_file: A dictionary of what upserts and deletes to pull out from file and write. + :param upsert_input_path: Where the original upsert records are stored. + :param upsert_output_path: Destination of where to write the merged upsert records. + :param delete_output_path: Destination of where to write the merged delete records. + :return filename: Name of the original updatefile, for logging purposes. + """ - return datafile + ### Upserts ### + # Write out the upserts required from each updatefile. + # Need to read in all the upsert records again and pull out the required files. + upsert_records = {str(record["MedlineCitation"]["PMID"]): record for record in load_jsonl(upsert_input_path)} + save_pubmed_jsonl( + upsert_output_path, + [upsert_records[record_key] for record_key in valid_record_keys_per_file["upserts"]], + ) + ### Deletes ### + # Write out deletes from valid_record_keys - deletions are just the PMID value and Version. + save_jsonl_gz(delete_output_path, [ast.literal_eval(record) for record in valid_record_keys_per_file["deletes"]]) -def add_attributes_to_data_from_biopython_classes( - obj: Union[StringElement, DictionaryElement, ListElement, OrderedListElement, list] -): + return filename + + +def add_attributes(obj: Union[StringElement, DictionaryElement, ListElement, OrderedListElement, list, str]): """ Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs. @@ -1381,45 +1315,44 @@ def add_attributes_to_data_from_biopython_classes( if len(list(obj.attributes.keys())) > 0: # New object to hold the string data. new = {} - new["value"] = obj - + new["value"] = str(obj) # Loop through attributes and add as needed. for key in list(obj.attributes.keys()): - new[key] = add_attributes_to_data_from_biopython_classes(obj.attributes[key]) + new[key] = add_attributes(obj.attributes[key]) else: - new = obj + new = str(obj) return new - if isinstance(obj, DictionaryElement): + elif isinstance(obj, DictionaryElement): # New object to hold the string data. new = {} # Loop through attributes and add as needed. for key in list(obj.attributes.keys()): - new[key] = add_attributes_to_data_from_biopython_classes(obj.attributes[key]) + new[key] = add_attributes(obj.attributes[key]) # loop through values as needed for k, v in list(obj.items()): - new[k] = add_attributes_to_data_from_biopython_classes(v) + new[k] = add_attributes(v) return new - if isinstance(obj, (ListElement, OrderedListElement)): + elif isinstance(obj, (ListElement, OrderedListElement)): # New object to hold the string data. new = {} if len(obj) > 0: - new[obj[0].tag] = [add_attributes_to_data_from_biopython_classes(v) for v in obj] + new[obj[0].tag] = [add_attributes(v) for v in obj] try: # Loop through attributes and add as needed. for key in list(obj.attributes.keys()): - new[key] = add_attributes_to_data_from_biopython_classes(obj.attributes[key]) + new[key] = add_attributes(obj.attributes[key]) except: pass return new - if isinstance(obj, list): - new = [add_attributes_to_data_from_biopython_classes(v) for v in obj] + elif isinstance(obj, list): + new = [add_attributes(v) for v in obj] return new else: @@ -1550,7 +1483,7 @@ def change_pubmed_list_structure( class PubMedCustomEncoder(json.JSONEncoder): """ - Custom encoder for JSON dump for it to write a dictionary field as a string of text for a + Custom encoder for json dump for it to write a dictionary field as a string of text for a select number of key values in the Pubmed data. For example, the AbstractText field can be a string or an array containing background, methods, etc, diff --git a/academic_observatory_workflows/workflows/tests/test_pubmed_telescope.py b/academic_observatory_workflows/workflows/tests/test_pubmed_telescope.py index a88e1d83f..b07cf66e7 100644 --- a/academic_observatory_workflows/workflows/tests/test_pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/tests/test_pubmed_telescope.py @@ -14,6 +14,7 @@ # Author: Alex Massen-Hane +import ast import os import json import gzip @@ -29,11 +30,11 @@ from observatory.platform.api import get_dataset_releases from observatory.platform.observatory_config import Workflow -from observatory.platform.workflows.workflow import ChangefileRelease, set_task_state +from observatory.platform.workflows.workflow import ChangefileRelease from Bio.Entrez.Parser import StringElement, ListElement, DictionaryElement -from observatory.platform.gcs import gcs_blob_name_from_path, gcs_download_blob +from observatory.platform.gcs import gcs_blob_name_from_path from observatory.platform.observatory_environment import ObservatoryEnvironment, ObservatoryTestCase -from observatory.platform.bigquery import bq_run_query, bq_sharded_table_id, bq_create_table_from_query, bq_export_table +from observatory.platform.bigquery import bq_run_query, bq_sharded_table_id from observatory.platform.observatory_environment import ( ObservatoryEnvironment, ObservatoryTestCase, @@ -47,11 +48,15 @@ PubMedCustomEncoder, PubMedRelease, PubMedTelescope, - PubmedEntity, - add_attributes_to_data_from_biopython_classes, + add_attributes, change_pubmed_list_structure, - download_datafiles_from_ftp_server, - transform_pubmed_xml_file_to_jsonl, + download_datafiles, + load_datafile, + parse_articles, + parse_deletes, + save_pubmed_jsonl, + save_pubmed_merged_records, + transform_pubmed, ) @@ -136,8 +141,20 @@ def __init__(self, *args, **kwargs): "pubmed22n0003.xml.gz": "d6da2c87390489d22cdeb6e046b77da1", "pubmed22n0004.xml.gz": "83764fc19cd98d247dc5603ca65569e6", }, - "merged_upsert_files": ["merged_upsert_3-4_part_1.jsonl.gz", "merged_upsert_3-4_part_2.jsonl.gz"], - "merged_delete_files": ["merged_delete_3-4_part_1.jsonl.gz"], + "valid_record_keys": { + "pubmed22n0003.xml.gz": { + "upserts": ["{'value': '2', 'Version': '2'}"], + "deletes": ["{'value': '2', 'Version': '1'}"], + }, + "pubmed22n0004.xml.gz": { + "upserts": [ + "{'value': '1', 'Version': '1'}", + "{'value': '36519887', 'Version': '1'}", + "{'value': '36519888', 'Version': '1'}", + ], + "deletes": ["{'value': '30971', 'Version': '1'}"], + }, + }, "PMID_list": [ {"f0_": {"_field_1": "1", "_field_2": "1"}}, {"f0_": {"_field_1": "2", "_field_2": "2"}}, @@ -174,8 +191,12 @@ def __init__(self, *args, **kwargs): "md5hash_download": { "pubmed22n0005.xml.gz": "9c61c5b19f021cadfc57845d0d1dcbc9", }, - "merged_upsert_files": ["merged_upsert_5-5_part_1.jsonl.gz"], - "merged_delete_files": ["merged_delete_5-5_part_1.jsonl.gz"], + "valid_record_keys": { + "pubmed22n0005.xml.gz": { + "upserts": ["{'value': '2994179', 'Version': '1'}", "{'value': '2994180', 'Version': '1'}"], + "deletes": ["{'value': '2', 'Version': '2'}"], + }, + }, "update_tables": { "additions": 2, "deletions": 1, @@ -266,12 +287,12 @@ def test_dag_structure(self): "create_snapshot": ["download_updatefiles"], "download_updatefiles": ["upload_downloaded_updatefiles"], "upload_downloaded_updatefiles": ["transform_updatefiles"], - "transform_updatefiles": ["merge_upsert_records"], - "merge_upsert_records": ["upload_merged_upsert_records"], + "transform_updatefiles": ["merge_upserts_and_deletes"], + "merge_upserts_and_deletes": ["save_merged_upserts_and_deletes"], + "save_merged_upserts_and_deletes": ["upload_merged_upsert_records"], "upload_merged_upsert_records": ["bq_load_upsert_table"], "bq_load_upsert_table": ["bq_upsert_records"], - "bq_upsert_records": ["merge_delete_records"], - "merge_delete_records": ["upload_merged_delete_records"], + "bq_upsert_records": ["upload_merged_delete_records"], "upload_merged_delete_records": ["bq_load_delete_table"], "bq_load_delete_table": ["bq_delete_records"], "bq_delete_records": ["add_new_dataset_release"], @@ -326,6 +347,8 @@ def test_telescope(self): ##################### ##### FIRST RUN ##### + # Initial intake of the Pubmed dataset. + run = self.first_run with env.create_dag_run(dag, run["execution_date"]) as dag_run: # Before the tests start, we need to manually change the modified dates of the datafiles @@ -400,7 +423,7 @@ def test_telescope(self): ##### BASELINE ##### - baseline_datafiles = [datafile for datafile in release.datafile_list if datafile.baseline] + baseline = [datafile for datafile in release.datafile_list if datafile.baseline] ### Download baseline ### task_id = workflow.download_baseline.__name__ @@ -408,7 +431,7 @@ def test_telescope(self): self.assertEqual(State.SUCCESS, ti.state) # Loop through downloaded baseline files, check that they exist and that hashes match. - for datafile in baseline_datafiles: + for datafile in baseline: self.assertTrue(os.path.exists(datafile.download_file_path)) with open(datafile.download_file_path, "rb") as f_hash: data = f_hash.read() @@ -421,7 +444,7 @@ def test_telescope(self): ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - for datafile in baseline_datafiles: + for datafile in baseline: self.assert_blob_integrity( env.download_bucket, gcs_blob_name_from_path(datafile.download_file_path), @@ -433,10 +456,8 @@ def test_telescope(self): ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - for datafile in baseline_datafiles: - for entity in [workflow.entity_list["baseline"]]: - transform_file = datafile.transform_file_path(entity.type) - self.assertTrue(os.path.exists(transform_file)) + for datafile in baseline: + self.assertTrue(os.path.exists(datafile.transform_baseline_file_path)) ### Upload transformed baseline ### task_id = workflow.upload_transformed_baseline.__name__ @@ -444,11 +465,9 @@ def test_telescope(self): self.assertEqual(State.SUCCESS, ti.state) # Get list of transformed files for upload. - files_to_upload = [ - datafile.transform_file_path("baseline") for datafile in baseline_datafiles if datafile.baseline - ] + file_paths = [datafile.transform_baseline_file_path for datafile in baseline if datafile.baseline] - for file in files_to_upload: + for file in file_paths: logging.info(f"Transform_file_path - {file}") self.assert_blob_integrity( env.transform_bucket, @@ -466,23 +485,14 @@ def test_telescope(self): ) self.assert_table_integrity(full_table_id, 4) - ### Create Snapshot ### + ### Create Snapshot ### - This is skipped on the first yearly run of the workflow. task_id = workflow.create_snapshot.__name__ ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - # Check that that snapshot table exists. - snapshot_table_id = bq_sharded_table_id( - workflow.cloud_workspace.project_id, - workflow.bq_dataset_id, - f"{workflow.bq_table_id}_backup", - release.start_date, - ) - self.assert_table_integrity(snapshot_table_id, 4) - ##### UPDATEFILES ##### - updatefile_datafile = [datafile for datafile in release.datafile_list if not datafile.baseline] + updatefiles = [datafile for datafile in release.datafile_list if not datafile.baseline] ### Download updatefiles ### task_id = workflow.download_updatefiles.__name__ @@ -490,7 +500,7 @@ def test_telescope(self): self.assertEqual(State.SUCCESS, ti.state) # Loop through downloaded baseline files, check that they exist and that hashes match. - for datafile in updatefile_datafile: + for datafile in updatefiles: self.assertTrue(os.path.exists(datafile.download_file_path)) with open(datafile.download_file_path, "rb") as f_hash: data = f_hash.read() @@ -503,7 +513,7 @@ def test_telescope(self): ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - for datafile in updatefile_datafile: + for datafile in updatefiles: self.assert_blob_integrity( env.download_bucket, gcs_blob_name_from_path(datafile.download_file_path), @@ -516,28 +526,44 @@ def test_telescope(self): self.assertEqual(State.SUCCESS, ti.state) # This step pulls out the upserts and delete records from the files - for datafile in updatefile_datafile: - for entity in [workflow.entity_list["upsert"], workflow.entity_list["delete"]]: - transform_file = datafile.transform_file_path(entity.type) - self.assertTrue(os.path.exists(transform_file)) - - ##### UPSERTS ##### + for datafile in updatefiles: + self.assertTrue(os.path.exists(datafile.transform_upsert_file_path)) + self.assertTrue(os.path.exists(datafile.transform_delete_file_path)) - ### Merge upsert records ### - task_id = workflow.merge_upsert_records.__name__ + ### Merge upserts and deletes ### + task_id = workflow.merge_upserts_and_deletes.__name__ ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - # Check that the files exists for this run. - merged_upsert_files = ti.xcom_pull( - key="merged_upsert_files", task_ids=task_id, include_prior_dates=False + # Check that keys for upserts and deletes have been merged correctly. + valid_record_keys = ti.xcom_pull( + key="valid_record_keys", task_ids=task_id, include_prior_dates=False ) + self.assertEqual(len(valid_record_keys.keys()), len(run["valid_record_keys"].keys())) + + for updatefile, records in run["valid_record_keys"].items(): + self.assertTrue(updatefile in valid_record_keys) - self.assertEqual(len(merged_upsert_files), len(run["merged_upsert_files"])) - for expected, result in zip(run["merged_upsert_files"], merged_upsert_files): - self.assertEqual(os.path.join(release.transform_folder, expected), result) - self.assertTrue(os.path.exists(result)) - logging.info(f"Merged upsert file exists: {result}") + self.assertListEqual( + valid_record_keys[updatefile]["upserts"], run["valid_record_keys"][updatefile]["upserts"] + ) + self.assertListEqual( + valid_record_keys[updatefile]["deletes"], run["valid_record_keys"][updatefile]["deletes"] + ) + + ### Save merged upserts and deletes ### + task_id = workflow.save_merged_upserts_and_deletes.__name__ + ti = env.run_task(task_id) + self.assertEqual(State.SUCCESS, ti.state) + + # Check that files have been created for each datafile. + for datafile in updatefiles: + self.assertTrue(os.path.exists(datafile.merged_upsert_file_path)) + self.assertTrue(os.path.exists(datafile.merged_delete_file_path)) + + ##### UPSERTS ##### + + file_paths = [datafile.merged_upsert_file_path for datafile in updatefiles if not datafile.baseline] ### Upload merged upsert records ### task_id = workflow.upload_merged_upsert_records.__name__ @@ -545,7 +571,7 @@ def test_telescope(self): self.assertEqual(State.SUCCESS, ti.state) # Check that they exist in the cloud. - for file in merged_upsert_files: + for file in file_paths: logging.info(f"Transform_file_path - {file}") self.assert_blob_integrity( env.transform_bucket, @@ -558,8 +584,7 @@ def test_telescope(self): ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - upsert_table_id = ti.xcom_pull(key="upsert_table_id", task_ids=task_id, include_prior_dates=False) - self.assert_table_integrity(upsert_table_id, 4) + self.assert_table_integrity(workflow.upsert_table_id, 4) ### BQ upsert records ### task_id = workflow.bq_upsert_records.__name__ @@ -568,21 +593,7 @@ def test_telescope(self): ##### DELETES ##### - ### Merge delete records ### - task_id = workflow.merge_delete_records.__name__ - ti = env.run_task(task_id) - self.assertEqual(State.SUCCESS, ti.state) - - # Check that the files exists for this run. - merged_delete_files = ti.xcom_pull( - key="merged_delete_files", task_ids=task_id, include_prior_dates=False - ) - - self.assertEqual(len(merged_delete_files), len(run["merged_delete_files"])) - for expected, result in zip(run["merged_delete_files"], merged_delete_files): - self.assertEqual(os.path.join(release.transform_folder, expected), result) - self.assertTrue(os.path.exists(result)) - logging.info(f"Merged delete file exists: {result}") + file_paths = [datafile.merged_delete_file_path for datafile in updatefiles if not datafile.baseline] ### Upload merged delete records ### task_id = workflow.upload_merged_delete_records.__name__ @@ -590,7 +601,7 @@ def test_telescope(self): self.assertEqual(State.SUCCESS, ti.state) # Check that they exist in the cloud. - for file in merged_delete_files: + for file in file_paths: logging.info(f"Transform_file_path - {file}") self.assert_blob_integrity( env.transform_bucket, @@ -598,13 +609,12 @@ def test_telescope(self): file, ) - ### BQ load delete table ### + ### BQ load delete table task_id = workflow.bq_load_delete_table.__name__ ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - delete_table_id = ti.xcom_pull(key="delete_table_id", task_ids=task_id, include_prior_dates=False) - self.assert_table_integrity(delete_table_id, 2) + self.assert_table_integrity(workflow.delete_table_id, 2) ### BQ delete records ### task_id = workflow.bq_delete_records.__name__ @@ -646,6 +656,8 @@ def test_telescope(self): ###################### ##### SECOND RUN ##### + # This run is to make sure that it can apply a sequential update. + run = self.second_run with env.create_dag_run(dag, run["execution_date"]) as dag_run: # Before the tests start, we need to manually change the modified dates of the datafiles @@ -721,8 +733,8 @@ def test_telescope(self): ##### BASELINE ##### # No new baseline files should be downloaded as it's already been done for this year. - baseline_datafiles = [datafile for datafile in release.datafile_list if datafile.baseline] - self.assertEqual(len(baseline_datafiles), 0) + baseline = [datafile for datafile in release.datafile_list if datafile.baseline] + self.assertEqual(len(baseline), 0) task_ids = [ "download_baseline", @@ -750,14 +762,14 @@ def test_telescope(self): snapshot_table_id = bq_sharded_table_id( workflow.cloud_workspace.project_id, workflow.bq_dataset_id, - f"{workflow.bq_table_id}_backup", + f"{workflow.bq_table_id}_snapshot", release.start_date, ) self.assert_table_integrity(snapshot_table_id, 5) ##### UPDATEFILES ##### - updatefile_datafile = [datafile for datafile in release.datafile_list if not datafile.baseline] + updatefiles = [datafile for datafile in release.datafile_list if not datafile.baseline] ### Download updatefiles ### task_id = workflow.download_updatefiles.__name__ @@ -765,7 +777,7 @@ def test_telescope(self): self.assertEqual(State.SUCCESS, ti.state) # Loop through downloaded baseline files, check that they exist and that hashes match. - for datafile in updatefile_datafile: + for datafile in updatefiles: self.assertTrue(os.path.exists(datafile.download_file_path)) with open(datafile.download_file_path, "rb") as f_hash: data = f_hash.read() @@ -778,7 +790,7 @@ def test_telescope(self): ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - for datafile in updatefile_datafile: + for datafile in updatefiles: self.assert_blob_integrity( env.download_bucket, gcs_blob_name_from_path(datafile.download_file_path), @@ -791,36 +803,56 @@ def test_telescope(self): self.assertEqual(State.SUCCESS, ti.state) # This step pulls out the upserts and delete records from the files - for datafile in updatefile_datafile: - for entity in [workflow.entity_list["upsert"], workflow.entity_list["delete"]]: - transform_file = datafile.transform_file_path(entity.type) - self.assertTrue(os.path.exists(transform_file)) - - ##### UPSERTS ##### + for datafile in updatefiles: + self.assertTrue(os.path.exists(datafile.transform_upsert_file_path)) + self.assertTrue(os.path.exists(datafile.transform_delete_file_path)) - ### Merge upsert records ### - task_id = workflow.merge_upsert_records.__name__ + ### Merge upserts and deletes ### + task_id = workflow.merge_upserts_and_deletes.__name__ ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - # Check that the files exists for this run. - merged_upsert_files = ti.xcom_pull( - key="merged_upsert_files", task_ids=task_id, include_prior_dates=False + valid_record_keys = ti.xcom_pull( + key="valid_record_keys", task_ids=task_id, include_prior_dates=False ) - self.assertEqual(len(merged_upsert_files), len(run["merged_upsert_files"])) - for expected, result in zip(run["merged_upsert_files"], merged_upsert_files): - self.assertEqual(os.path.join(release.transform_folder, expected), result) - self.assertTrue(os.path.exists(result)) - logging.info(f"Merged upsert file exists: {result}") + # Check that keys for upserts and deletes have been merged correctly. + valid_record_keys = ti.xcom_pull( + key="valid_record_keys", task_ids=task_id, include_prior_dates=False + ) + self.assertEqual(len(valid_record_keys.keys()), len(run["valid_record_keys"].keys())) + + for updatefile, records in run["valid_record_keys"].items(): + self.assertTrue(updatefile in valid_record_keys) + + self.assertListEqual( + valid_record_keys[updatefile]["upserts"], run["valid_record_keys"][updatefile]["upserts"] + ) + self.assertListEqual( + valid_record_keys[updatefile]["deletes"], run["valid_record_keys"][updatefile]["deletes"] + ) + + ### Save merged upserts and deletes ### + task_id = workflow.save_merged_upserts_and_deletes.__name__ + ti = env.run_task(task_id) + self.assertEqual(State.SUCCESS, ti.state) + + # Check that files have been created for each datafile. + for datafile in updatefiles: + self.assertTrue(os.path.exists(datafile.merged_upsert_file_path)) + self.assertTrue(os.path.exists(datafile.merged_delete_file_path)) + + ##### UPSERTS ##### ### Upload merged upsert records ### task_id = workflow.upload_merged_upsert_records.__name__ ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) + file_paths = [datafile.merged_upsert_file_path for datafile in updatefiles if not datafile.baseline] + # Check that they exist in the cloud. - for file in merged_upsert_files: + for file in file_paths: logging.info(f"Transform_file_path - {file}") self.assert_blob_integrity( env.transform_bucket, @@ -833,8 +865,7 @@ def test_telescope(self): ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - upsert_table_id = ti.xcom_pull(key="upsert_table_id", task_ids=task_id, include_prior_dates=False) - self.assert_table_integrity(upsert_table_id, 2) + self.assert_table_integrity(workflow.upsert_table_id, 2) ### BQ upsert records ### task_id = workflow.bq_upsert_records.__name__ @@ -843,21 +874,7 @@ def test_telescope(self): ##### DELETES ##### - ### Merge delete records ### - task_id = workflow.merge_delete_records.__name__ - ti = env.run_task(task_id) - self.assertEqual(State.SUCCESS, ti.state) - - # Check that the files exists for this run. - merged_delete_files = ti.xcom_pull( - key="merged_delete_files", task_ids=task_id, include_prior_dates=False - ) - - self.assertEqual(len(merged_delete_files), len(run["merged_delete_files"])) - for expected, result in zip(run["merged_delete_files"], merged_delete_files): - self.assertEqual(os.path.join(release.transform_folder, expected), result) - self.assertTrue(os.path.exists(result)) - logging.info(f"Merged delete file exists: {result}") + file_paths = [datafile.merged_delete_file_path for datafile in updatefiles if not datafile.baseline] ### Upload merged delete records ### task_id = workflow.upload_merged_delete_records.__name__ @@ -865,7 +882,7 @@ def test_telescope(self): self.assertEqual(State.SUCCESS, ti.state) # Check that they exist in the cloud. - for file in merged_delete_files: + for file in file_paths: logging.info(f"Transform_file_path - {file}") self.assert_blob_integrity( env.transform_bucket, @@ -878,8 +895,7 @@ def test_telescope(self): ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) - delete_table_id = ti.xcom_pull(key="delete_table_id", task_ids=task_id, include_prior_dates=False) - self.assert_table_integrity(delete_table_id, 1) + self.assert_table_integrity(workflow.delete_table_id, 1) ### BQ delete records ### task_id = workflow.bq_delete_records.__name__ @@ -921,6 +937,8 @@ def test_telescope(self): ###################### ##### THIRD RUN ##### + # This run only needs to confirm that the first year run bool works and it downloads the new baseline dataset. + run = self.third_run with env.create_dag_run(dag, run["execution_date"]) as dag_run: # Before the tests start, we need to manually change the modified dates of the datafiles @@ -991,8 +1009,6 @@ def test_telescope(self): datafile_list=run["datafiles"], ) - # This run only needs to confirm that the first year run bool works and download the new baseline dataset. - ##### BASELINE ##### baseline_datafiles = [datafile for datafile in release.datafile_list if datafile.baseline] @@ -1008,6 +1024,8 @@ def test_telescope(self): "download_updatefiles", "upload_downloaded_updatefiles", "transform_updatefiles", + "merge_upserts_and_deletes", + "save_merged_upserts_and_deletes", ] for task_id in task_ids: @@ -1023,7 +1041,6 @@ def test_telescope(self): ##### UPSERTS ##### task_ids = [ - "merge_upsert_records", "upload_merged_upsert_records", "bq_load_upsert_table", "bq_upsert_records", @@ -1037,7 +1054,6 @@ def test_telescope(self): ##### DELETES ##### task_ids = [ - "merge_delete_records", "upload_merged_delete_records", "bq_load_delete_table", "bq_delete_records", @@ -1095,10 +1111,10 @@ def __init__(self, *args, **kwargs): self.baseline_path = "/pubmed/baseline/" self.updatefiles_path = "/pubmed/updatefiles/" - def test_download_datafiles_from_ftp_server(self): + def test_download_datafiles(self): """Test that an exmaple PubMed XMLs can be transformed successfully.""" - # Create mock FTP server that holds the testing Pubmed Files. + # Create mock FTP server to host the test Pubmed Files. ftp_server = FtpServer( host=self.ftp_server_url, port=self.ftp_port, directory=os.path.join(test_fixtures_folder()) ) @@ -1136,7 +1152,7 @@ def test_download_datafiles_from_ftp_server(self): ), ] - success = download_datafiles_from_ftp_server( + success = download_datafiles( datafile_list=datafiles_to_download, ftp_server_url=self.ftp_server_url, ftp_port=self.ftp_port, @@ -1149,8 +1165,115 @@ def test_download_datafiles_from_ftp_server(self): for datafile in datafiles_to_download: self.assertTrue(os.path.exists(datafile.download_file_path)) - def test_transform_pubmed_xml_file_to_jsonl(self): - """Test that an exmaple PubMed XMLs can be transformed successfully.""" + def test_load_datafile(self): + """Test that a Pubmed datafile can be read in and parsed.""" + + xml_file_path = os.path.join(test_fixtures_folder(), "pubmed", "baseline", "pubmed22n0001.xml.gz") + data = load_datafile(input_path=xml_file_path) + + self.assertTrue(data) + + def test_save_pubmed_jsonl(self): + """Test that data can be saved from to a json.gz or a .jsonl file correctly.""" + + data_to_write = [{"value": 12345, "Version": 1}] + + with CliRunner().isolated_filesystem() as tmp_dir: + ### Uncompressed ### + output_path = os.path.join(tmp_dir, "test_output_file.jsonl") + save_pubmed_jsonl(output_path=output_path, data=data_to_write) + self.assertTrue(os.path.exists(output_path)) + + with open(output_path, "r") as f_in: + data_read_in = [json.loads(line) for line in f_in] + + self.assertEqual(data_to_write, data_read_in) + + ### Compressed ### + output_path = os.path.join(tmp_dir, "test_output_file.jsonl.gz") + save_pubmed_jsonl(output_path=output_path, data=data_to_write) + self.assertTrue(os.path.exists(output_path)) + + with gzip.open(output_path, "rb") as f_in: + data_read_in = [json.loads(line) for line in f_in] + + self.assertEqual(data_to_write, data_read_in) + + def test_save_pubmed_merged_records(self): + """Test if records can be reliably pulled from transformed files and written to merged record files.""" + + record_keys_per_file = { + "upserts": ["{'value': 12345, 'Version': 1}"], + "deletes": ["{'value': 67891, 'Version': 2}"], + } + + record = [ + { + "MedlineCitation": { + "PMID": {"value": 12345, "Version": 1}, + "AuthorList": [{"FirstName": "Foo", "Lastname": "Bar"}, {"FirstName": "James", "Lastname": "Bond"}], + "AbstractText": "Something", + } + } + ] + + filename = "pubmed_temp.jsonl" + + with CliRunner().isolated_filesystem() as tmp_dir: + input_dir = os.path.join(tmp_dir, filename) + save_pubmed_jsonl(input_dir, record) + + upsert_output_path = os.path.join(tmp_dir, "upsert_output.jsonl.gz") + delete_output_path = os.path.join(tmp_dir, "delete_output.jsonl.gz") + + save_pubmed_merged_records( + filename, record_keys_per_file, input_dir, upsert_output_path, delete_output_path + ) + + with gzip.open(upsert_output_path, "rb") as f_in: + data = [json.loads(line) for line in f_in] + self.assertListEqual(record, data) + + with gzip.open(delete_output_path, "rb") as f_in: + data = [json.loads(line) for line in f_in] + self.assertListEqual([ast.literal_eval(record) for record in record_keys_per_file["deletes"]], data) + + def test_parse_articles(self): + """Test if PubmedArticle records can be pulled out from a data dictionary.""" + + data_good = { + "PubmedArticle": [{"value": 12345, "Version": 1}, {"value": 67891, "Version": 2}], + "NotPubmedArticle": [{"value": 999, "Version": 999}], + "DeleteCitation": {"PMID": [{"value": 1, "Version": 1}]}, + } + + data_bad = { + "NotPubmedArticle": [{"value": 999, "Version": 999}], + "NotDeleteCitation": {"PMID": [{"value": 1, "Version": 1}]}, + } + + self.assertEqual([{"value": 12345, "Version": 1}, {"value": 67891, "Version": 2}], parse_articles(data_good)) + self.assertEqual([], parse_articles(data_bad)) + + def test_parse_deletes(self): + """Test if DeleteCiation records can be pulled out from a data dictionary.""" + + data_good = { + "PubmedArticle": [{"value": 12345, "Version": 1}, {"value": 67891, "Version": 2}], + "NotPubmedArticle": [{"value": 999, "Version": 999}], + "DeleteCitation": {"PMID": [{"value": 1, "Version": 1}]}, + } + + data_bad = { + "NotPubmedArticle": [{"value": 999, "Version": 999}], + "NotDeleteCitation": {"PMID": [{"value": 1, "Version": 1}]}, + } + + self.assertEqual([{"value": 1, "Version": 1}], parse_deletes(data_good)) + self.assertEqual([], parse_deletes(data_bad)) + + def test_transform_pubmed(self): + """Test that exmaple PubMed XMLs can be transformed successfully.""" # Setup environment env = ObservatoryEnvironment(self.project_id, self.data_location, api_port=find_free_port()) @@ -1165,25 +1288,6 @@ def test_transform_pubmed_xml_file_to_jsonl(self): sequence_end=1, ) - entity_list = [ - PubmedEntity( - type="upsert", - sub_key="PubmedArticle", - set_key="PubmedArticleSet", - pmid_location="MedlineCitation", - table_name="upserts", - table_description="""PubmedArticle""", - ), - PubmedEntity( - type="delete", - sub_key="PMID", - set_key="DeleteCitation", - pmid_location=None, - table_name="deletes", - table_description="""DeleteCitation""", - ), - ] - ### Bad XML ### datafile_bad = Datafile( filename="pubmed22n0001_bad_fields.xml.gz", @@ -1196,9 +1300,12 @@ def test_transform_pubmed_xml_file_to_jsonl(self): bad_xml_file_path = os.path.join(test_fixtures_folder(), "pubmed", "pubmed22n0001_bad_fields.xml.gz") shutil.copy2(bad_xml_file_path, datafile_bad.download_file_path) - # Attempt to transform bad xml. - datafile_returned = transform_pubmed_xml_file_to_jsonl(datafile=datafile_bad, entity_list=entity_list) - self.assertFalse(datafile_returned) + # Attempt to transform bad xml - just a baseline file, returns a baseline file if it is successful. + output = transform_pubmed( + input_path=datafile_bad.download_file_path, upsert_path=datafile_bad.transform_upsert_file_path + ) + self.assertFalse(os.path.exists(datafile_bad.transform_baseline_file_path)) + self.assertFalse(output) datafile_good = Datafile( filename="pubmed22n0001.xml.gz", @@ -1209,19 +1316,50 @@ def test_transform_pubmed_xml_file_to_jsonl(self): datafile_release=changefile_release, ) - ### VALID XML ### + ### VALID BASELINE XML ### valid_xml_file_path = os.path.join(test_fixtures_folder(), "pubmed", "baseline", "pubmed22n0001.xml.gz") shutil.copy2(valid_xml_file_path, datafile_good.download_file_path) - # Attempt to transform valid xml - returns Changefile instance only if it's successful. - datafile_returned = transform_pubmed_xml_file_to_jsonl(datafile=datafile_good, entity_list=entity_list) - self.assertTrue(isinstance(datafile_returned, Datafile)) + # Attempt to transform valid xml - should output a transformed file if it is successful. + filename = transform_pubmed( + input_path=datafile_good.download_file_path, upsert_path=datafile_good.transform_baseline_file_path + ) + self.assertTrue(os.path.exists(datafile_good.transform_baseline_file_path)) + self.assertEqual(os.path.basename(datafile_good.download_file_path), filename) + + ### VALID UPDATEFILE XML ### + + expected_keys = { + "deletes": ["{'value': '2', 'Version': '1'}"], + "upserts": [ + "{'value': '1', 'Version': '1'}", + "{'value': '2', 'Version': '2'}", + ], + } - # Ensure that transform files were written to disk. - for entity in entity_list: - self.assertTrue(os.path.exists(datafile_returned.transform_file_path(entity.type))) + datafile_good = Datafile( + filename="pubmed22n0003.xml.gz", + file_index=1, + path_on_ftp="dummy_string", + baseline=False, + datafile_date=pendulum.now(), + datafile_release=changefile_release, + ) + + valid_xml_file_path = os.path.join(test_fixtures_folder(), "pubmed", "updatefiles", "pubmed22n0003.xml.gz") + shutil.copy2(valid_xml_file_path, datafile_good.download_file_path) + + filename, output_keys = transform_pubmed( + input_path=datafile_good.download_file_path, + upsert_path=datafile_good.transform_upsert_file_path, + delete_path=datafile_good.transform_delete_file_path, + ) + self.assertTrue(os.path.exists(datafile_good.transform_upsert_file_path)) + self.assertTrue(os.path.exists(datafile_good.transform_delete_file_path)) + self.assertEqual(os.path.basename(datafile_good.download_file_path), filename) + self.assertDictEqual(output_keys, expected_keys) - def test_add_attributes_to_data_from_biopython(self): + def test_add_attributes(self): """ Test that attributes from the Biopython data classes can be reliably pulled out and added to the dictionary. """ @@ -1245,7 +1383,7 @@ def test_add_attributes_to_data_from_biopython(self): output_file = os.path.join(tmp_dir, "test_output_file.jsonl") with open(output_file, "wb") as f_out: for line in objects: - output = add_attributes_to_data_from_biopython_classes(line) + output = add_attributes(line) f_out.write(str.encode(json.dumps(output, cls=PubMedCustomEncoder) + "\n")) with open(output_file, "r") as f_in: