From 2385ed64ba0538b8e41251b87bb59d5a0f15a9d2 Mon Sep 17 00:00:00 2001 From: Alex Massen-Hane Date: Wed, 21 Jun 2023 13:45:15 +0800 Subject: [PATCH] Add test and made requested changes --- .../pubmed/pubmed23n0001_bad_fields.xml.gz | 3 + .../workflows/pubmed_telescope.py | 189 +++++++++--------- .../workflows/tests/test_pubmed_telescope.py | 136 +++++++++---- 3 files changed, 195 insertions(+), 133 deletions(-) create mode 100644 academic_observatory_workflows/fixtures/pubmed/pubmed23n0001_bad_fields.xml.gz diff --git a/academic_observatory_workflows/fixtures/pubmed/pubmed23n0001_bad_fields.xml.gz b/academic_observatory_workflows/fixtures/pubmed/pubmed23n0001_bad_fields.xml.gz new file mode 100644 index 000000000..bd8d83a27 --- /dev/null +++ b/academic_observatory_workflows/fixtures/pubmed/pubmed23n0001_bad_fields.xml.gz @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:05c596f65c385c334170a5ae308aeecf3976b14e91181888f320f03bc673fa92 +size 2274 diff --git a/academic_observatory_workflows/workflows/pubmed_telescope.py b/academic_observatory_workflows/workflows/pubmed_telescope.py index d082e5433..d823f5250 100644 --- a/academic_observatory_workflows/workflows/pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/pubmed_telescope.py @@ -20,9 +20,7 @@ import json import logging import pendulum -from pathlib import Path from datetime import timedelta -from bs4 import BeautifulSoup from typing import Union, Dict, List import glob @@ -32,17 +30,12 @@ from google.cloud.bigquery import SourceFormat # To download the files from the Pubmed FTP server -from ftplib import FTP +from ftplib import FTP, error_reply import hashlib # For reading in the XML files using Biopython library. from Bio import Entrez -from Bio.Entrez.Parser import ( - StringElement, - ListElement, - DictionaryElement, - OrderedListElement, -) +from Bio.Entrez.Parser import StringElement, ListElement, DictionaryElement, OrderedListElement, ValidationError # Multithreading libraries from concurrent.futures import ProcessPoolExecutor, as_completed @@ -63,6 +56,7 @@ from observatory.platform.workflows.workflow import ( Workflow, ChangefileRelease, + set_task_state, cleanup, ) from observatory.platform.bigquery import ( @@ -114,7 +108,7 @@ def __eq__(self, other): and self.file_index == other.file_index and self.path_on_ftp == other.path_on_ftp and self.is_first_run == other.is_first_run - and abs(self.changefile_date - other.changefile_date) <= timedelta(minutes=1) + and self.changefile_date == other.changefile_date and self.changefile_release == other.changefile_release ) return False @@ -159,7 +153,7 @@ def transform_path(self): def transform_file_path(self, entity_type: str) -> str: """ - Give path to the transform file. + Give path to the transform file depending on entity type. :param entity_type: Type of the record, either "additions" or "deletions". :return: Retuns the path to the transformed file. @@ -223,7 +217,8 @@ def __init__( type: str, sub_key: str, set_key: str, - PMID_location: str, + pmid_location: str, + table_description: str, ): """ Holds the metadata about the records stored in Pubmed. @@ -232,14 +227,17 @@ def __init__( :param type: Either a set of 'addition' or 'deletion' records. :param sub_key: Sub key location of Pubmed data. :param set_key: Key location of Pubmed. - :param PMID_location: Location of PMID values + :param pmid_location: Location of PMID values + :param table_description: Description for the table on Bigquery. """ self.name = name self.type = type self.sub_key = sub_key self.set_key = set_key - self.PMID_location = PMID_location + self.pmid_location = pmid_location + self.table_description = table_description + self.schema_file_path = os.path.join(default_schema_folder(), "pubmed", f"{name}_2023-01-01.json") @@ -250,7 +248,6 @@ def __init__( dag_id: str, run_id: str, cloud_workspace: CloudWorkspace, - bq_dataset_id: str, start_date: pendulum.DateTime, end_date: pendulum.DateTime, is_first_run: bool, @@ -278,7 +275,6 @@ def __init__( sequence_end=changefile_list[-1].file_index, ) self.cloud_workspace = cloud_workspace - self.bq_dataset_id = bq_dataset_id self.is_first_run = is_first_run self.changefile_list = changefile_list @@ -326,6 +322,8 @@ def __init__( dag_id: str, cloud_workspace: CloudWorkspace, bq_dataset_id: str = "pubmed", + bq_table_id: str = "articles", + bq_dataset_description: str = "Pubmed Medline database. https://pubmed.ncbi.nlm.nih.gov/about/ ", start_date: str = pendulum.datetime(year=2022, month=11, day=27), # When the baseline data became available. schedule_interval: str = "0 0 * * 0", # weekly catchup: bool = True, @@ -339,8 +337,6 @@ def __init__( snapshot_expiry_days: int = 7, max_processes: int = 4, # Limited to 4 due to RAM usage when transforming files batch_size: int = 4, - main_table: str = "articles", - table_description: str = "Pubmed Medline database. https://pubmed.ncbi.nlm.nih.gov/about/ ", ): """Construct an PubMed Telescope instance. @@ -358,7 +354,7 @@ def __init__( :param snapshot_expiry_days: How long until the snapshot expires. :param max_processes: Max number of parallel processors. :param batch_size: Number of changefiles per batch. - :param main_table: Name of Pubmed table. + :param bq_table_id: Name of Pubmed table. :param table_description: Description of the main table. """ @@ -378,8 +374,8 @@ def __init__( # Databse settings self.cloud_workspace = cloud_workspace self.bq_dataset_id = bq_dataset_id - self.main_table = main_table - self.table_description = table_description + self.bq_table_id = bq_table_id + self.bq_dataset_description = bq_dataset_description self.snapshot_expiry_days = snapshot_expiry_days # Workflow parameters @@ -393,14 +389,22 @@ def __init__( type="additions", sub_key="PubmedArticle", set_key="PubmedArticleSet", - PMID_location="MedlineCitation", + pmid_location="MedlineCitation", + table_description="""PubmedArticle - Includes all the metadata associated with a journal article citation, both the + metadata to describe the published article, i.e. , and additional metadata often + pertaining to the publication's history or processing at NLM, i.e. .""", ), PubmedEntity( name="article_deletions", type="deletions", sub_key="PMID", set_key="DeleteCitation", - PMID_location=None, + pmid_location=None, + table_description="""DeleteCitation - Indicates one or more or that have been deleted. + PMIDs in DeleteCitation will typically have been found to be duplicate citations, or citations to content + that was determined to be out-of-scope for PubMed. It is possible that a PMID would appear in DeleteCitation + without having been distributed in a previous file. This would happen if the creation and deletion of the + record take place on the same day.""", ), ] @@ -440,7 +444,7 @@ def __init__( self.add_task(self.merge_updatefiles) self.add_task(self.upload_transformed) self.add_task(self.bq_ingest_update_tables) - self.add_task(self.bq_add_updates_to_main_table) + self.add_task(self.bq_add_updates_to_table) self.add_task(self.add_new_dataset_release) self.add_task(self.cleanup) @@ -451,7 +455,7 @@ def __init__( ) ) - def list_changefiles_for_release(self, **kwargs): + def list_changefiles_for_release(self, **kwargs) -> bool: """ Get a list of all files to download for this data interval run / release period. """ @@ -478,7 +482,7 @@ def list_changefiles_for_release(self, **kwargs): # Grab metadata and path of the file. for file in baseline_list_ftp: - if file.split(".")[-1] == "gz" in file: # Find all the xml.gz files available from the server. + if file.endswith(".xml.gz"): # Find all the xml.gz files available from the server. filename = file file_index = int(file[9:13]) path_on_ftp = self.baseline_path + file @@ -523,7 +527,9 @@ def list_changefiles_for_release(self, **kwargs): ) files_to_download.append(changefile) - # Check that all updatefiles pulled from the FTP server are sequential. + # Check that all updatefiles pulled from the FTP server are not missing any between start_index and end_index. + # e.g. 10, 11, 13, 14 - will throw an error. + # Sort from oldest to newest using the file index files_to_download.sort(key=lambda c: c.file_index, reverse=False) file_index_last = files_to_download[0].file_index @@ -594,7 +600,6 @@ def make_release(self, **kwargs) -> PubMedRelease: dag_id=self.dag_id, run_id=run_id, cloud_workspace=self.cloud_workspace, - bq_dataset_id=self.bq_dataset_id, start_date=start_date, end_date=end_date, is_first_run=is_first_run, @@ -639,17 +644,17 @@ def download(self, release: PubMedRelease, **kwargs): ) if self.check_md5_hash: - download_attemp_count = 1 + download_attempt_count = 1 download_success = False - while download_attemp_count <= self.max_download_retry and not download_success: - logging.info(f"Downloading: {changefile.filename} Attempt: {download_attemp_count}") + while download_attempt_count <= self.max_download_retry and not download_success: + logging.info(f"Downloading: {changefile.filename} Attempt: {download_attempt_count}") try: download_location = changefile.download_file_path # Download file with open(download_location, "wb") as f: ftp_conn.retrbinary(f"RETR {changefile.path_on_ftp}", f.write) logging.info(f"File downloaded to - {download_location}") - except: + except error_reply: logging.info( f"Unable to download {changefile.path_on_ftp} from PubMed's FTP server {self.ftp_server_url}." ) @@ -666,24 +671,23 @@ def download(self, release: PubMedRelease, **kwargs): # Download corresponding md5 hash. with open(f"{download_location}.md5", "wb") as f: ftp_conn.retrbinary(f"RETR {changefile.path_on_ftp}.md5", f.write) - except: + except error_reply: logging.info( f"Unable to download {changefile.path_on_ftp}.md5 from PubMed's FTP server {self.ftp_server_url}." ) - # Peep into md5 file. with open(f"{download_location}.md5", "r") as f_md5: md5_from_pubmed_ftp = f_md5.read() - # If md5 does not match, raise an Airflow exception. + # If md5 does not match, retry download if md5hash_from_download in md5_from_pubmed_ftp: download_success = True else: logging.info( - f"MD5 hash does not match the given MD5 checksum from server: {changefile.download_file_path}" + f"MD5 hash does not match the given MD5 checksum from server: {changefile.download_file_path} - Retrying download ..." ) - download_attemp_count += 1 + download_attempt_count += 1 if not download_success: raise AirflowException( @@ -696,7 +700,7 @@ def download(self, release: PubMedRelease, **kwargs): # Download file with open(changefile.download_file_path, "wb") as f: ftp_conn.retrbinary(f"RETR {changefile.path_on_ftp}", f.write) - except: + except error_reply: raise AirflowException( f"Unable to download {changefile.path_on_ftp} from PubMed's FTP server {self.ftp_server_url}" ) @@ -717,10 +721,7 @@ def upload_downloaded(self, release: PubMedRelease, **kwargs): file_paths=changefiles_to_upload, ) - if not success: - raise AirflowException( - f"Unable to upload downloaded changefiles to GCS bucket {self.cloud_workspace.download_bucket}" - ) + set_task_state(success, kwargs["ti"].task_id, release=release) def transform(self, release: PubMedRelease, **kwargs): """ @@ -750,9 +751,10 @@ def transform(self, release: PubMedRelease, **kwargs): def merge_updatefiles(self, release: PubMedRelease, **kwargs): """ - Check through the changefiles to only use the newest records for a particular PMID and version number of the article. + Check through the changefiles and only grab the newest records for a particular PMID and version number of the article. + This is to reduce the number of quiries done on the main table in BQ. - This is to reduce the number of quiries done on the main table, ~100-120gb everytime it looks for + If it is the first run of the workflow, it will merge the baseline changefiles into 4Gb chunks. """ # # Grab list of changefiles that were transformed in previous task. @@ -842,13 +844,13 @@ def merge_updatefiles(self, release: PubMedRelease, **kwargs): if entity.type == "additions": # Make the merged data a dictionary for faster lookup. - merged_dict = {str(record[entity.PMID_location]["PMID"]): record for record in merged_data} + merged_dict = {str(record[entity.pmid_location]["PMID"]): record for record in merged_data} for incoming_record in incoming_data: - key = str(incoming_record[entity.PMID_location]["PMID"]) + key = str(incoming_record[entity.pmid_location]["PMID"]) if key in merged_dict: logging.info( - f"Found a duplicate of PMID and version number {key} - {incoming_record[entity.PMID_location]['PMID']}" + f"Found a duplicate of PMID and version number {key} - {incoming_record[entity.pmid_location]['PMID']}" ) # Remove the old record from merged_data @@ -898,10 +900,7 @@ def upload_transformed(self, release: PubMedRelease, **kwargs): file_paths=files_to_upload, ) - if not success: - raise AirflowException( - f"Unable to upload downloaded changefiles to GCS bucket {self.cloud_workspace.download_bucket}" - ) + set_task_state(success, kwargs["ti"].task_id, release=release) def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): """ @@ -917,26 +916,23 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): # If its the first run of the workflow, only upload the baseline table to the main table. if release.is_first_run: # Create the dataset if it doesn't exist already. - try: - bq_create_dataset( - self.cloud_workspace.project_id, - self.bq_dataset_id, - self.cloud_workspace.data_location, - description="Pubmed dataset of citation records.", - ) - logging.info(f"Created dataset: {self.cloud_workspace.project_id,}.{self.bq_dataset_id}") - except: - logging.info(f"Dataset {self.cloud_workspace.project_id,}.{self.bq_dataset_id} already exists.") + + bq_create_dataset( + project_id=self.cloud_workspace.project_id, + dataset_id=self.bq_dataset_id, + location=self.cloud_workspace.data_location, + description=self.bq_dataset_description, + ) # Delete old baseline table just in case there was a bad previous run. bq_delete_table( project_id=self.cloud_workspace.project_id, dataset_id=self.bq_dataset_id, - table_id=self.main_table, + table_id=self.bq_table_id, not_found_okay=True, ) - logging.info(f"Uploading to table - {self.main_table}") + logging.info(f"Uploading to table - {self.bq_table_id}") merged_transform_blob_pattern = release.merged_transfer_blob_pattern("additions") @@ -946,17 +942,17 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): success = bq_load_table( uri=merged_transform_blob_pattern, - table_id=f"{self.cloud_workspace.project_id}.{self.bq_dataset_id}.{self.main_table}", + table_id=f"{self.cloud_workspace.project_id}.{self.bq_dataset_id}.{self.bq_table_id}", source_format=SourceFormat.NEWLINE_DELIMITED_JSON, schema_file_path=self.entity_list[0].schema_file_path, write_disposition=bigquery.WriteDisposition.WRITE_APPEND, - table_description=self.table_description, + table_description=self.entity_list[0].table_description, partition=False, ignore_unknown_values=False, ) if not success: - raise AirflowException(f"Unable to upload table {self.main_table}") + raise AirflowException(f"Unable to upload table {self.bq_table_id}") # Not the first workflow run, only ingest update tables into BQ. else: @@ -987,7 +983,7 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): source_format=SourceFormat.NEWLINE_DELIMITED_JSON, schema_file_path=entity.schema_file_path, write_disposition=bigquery.WriteDisposition.WRITE_APPEND, - table_description=f"Update table for {entity.type} for Pumbed update files {release.changefile_list[0].filename} to {release.changefile_list[-1].filename}.", + table_description=f"Update table for {entity.type} for Pumbed update files {release.changefile_list[0].filename} to {release.changefile_list[-1].filename}. {entity.table_description}", partition=False, ignore_unknown_values=False, ) @@ -999,7 +995,7 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): else: raise AirflowException(f"Unable to tranfer to table - {table_id}") - # Update the table expiration so we don't have lots sharded tables hanging around in the dataset. + # Update the table expiration so we don't have lots of old sharded tables hanging around in the dataset. expiry_date = pendulum.now().add(days=7) bq_update_table_expiration( self.cloud_workspace.project_id, @@ -1011,7 +1007,7 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): # Push list of update tables into the xcom ti.xcom_push(key="update_tables", value=update_tables) - def bq_add_updates_to_main_table(self, release: PubMedRelease, **kwargs): + def bq_add_updates_to_table(self, release: PubMedRelease, **kwargs): """ Updates the main table with the additions and deletions from the update files. """ @@ -1019,17 +1015,17 @@ def bq_add_updates_to_main_table(self, release: PubMedRelease, **kwargs): # Use the table from this workflow run to make the first snapshot. # This should be just the baseline dataset, so there should be no deletions to the snapshot. if release.is_first_run: - main_table_exists = bq_table_exists( - table_id=f"{self.cloud_workspace.project_id}.{self.bq_dataset_id}.{self.main_table}" + table_exists = bq_table_exists( + table_id=f"{self.cloud_workspace.project_id}.{self.bq_dataset_id}.{self.bq_table_id}" ) - if not main_table_exists: + if not table_exists: raise AirflowException( - f"Main table {self.main_table} does not exist. Something went wrong in the previous transfer step." + f"Main table {self.bq_table_id} does not exist. Something went wrong in the previous transfer step." ) else: logging.info( f"""First run of the workflow does not require more updates to be added to the table. \n - The table {self.main_table} has already been created and injested in the previous step.""" + The table {self.bq_table_id} has already been created and injested in the previous step.""" ) else: @@ -1041,50 +1037,45 @@ def bq_add_updates_to_main_table(self, release: PubMedRelease, **kwargs): # and we can revert if necessary. prev_end_date = kwargs["data_interval_end"] - main_table_id = f"{self.cloud_workspace.project_id}.{self.bq_dataset_id}.{self.main_table}" + full_table_id = f"{self.cloud_workspace.project_id}.{self.bq_dataset_id}.{self.bq_table_id}" backup_table_id = bq_sharded_table_id( - self.cloud_workspace.project_id, self.bq_dataset_id, f"{self.main_table}_backup", prev_end_date + self.cloud_workspace.project_id, self.bq_dataset_id, f"{self.bq_table_id}_backup", prev_end_date ) expiry_date = pendulum.now().add(days=31) - success = bq_snapshot(src_table_id=main_table_id, dst_table_id=backup_table_id, expiry_date=expiry_date) + success = bq_snapshot(src_table_id=full_table_id, dst_table_id=backup_table_id, expiry_date=expiry_date) - if success: - logging.info( - f"Successfully created snapshot backup table of {main_table_id} - backup table: {backup_table_id}" - ) - else: - raise AirflowException(f"Error creating snapshot backup table.") + set_task_state(success, kwargs["ti"].task_id, release=release) # Need to use custom queries for the update and delete since they rely on both the PMID value and Version. # Delete old records in the main table that are to be updated. delete_records_to_be_updated = f""" - DELETE FROM `{main_table_id}` + DELETE FROM `{full_table_id}` WHERE (MedlineCitation.PMID.value, MedlineCitation.PMID.Version) IN ( SELECT (MedlineCitation.PMID.value, MedlineCitation.PMID.Version) FROM `{update_tables['article_additions']}`) """ result = bq_run_query(delete_records_to_be_updated) - logging.info(f"Result from deleting old records from {self.main_table} - {result}") + logging.info(f"Result from deleting old records from {self.bq_table_id} - {result}") # Insert the new updated records. insert_updated_records = f""" - INSERT INTO `{main_table_id}` + INSERT INTO `{full_table_id}` SELECT * FROM `{update_tables['article_additions']}` """ result = bq_run_query(insert_updated_records) - logging.info(f"Result from inserting the updated records to {self.main_table} - {result}") + logging.info(f"Result from inserting the updated records to {self.bq_table_id} - {result}") # Delete the records from the "deletion" table using both PMID and version number. - delete_records_from_main_table = f""" - DELETE FROM `{main_table_id}` + delete_records_from_table = f""" + DELETE FROM `{full_table_id}` WHERE (MedlineCitation.PMID.value, MedlineCitation.PMID.Version) IN ( SELECT (value, Version) FROM `{update_tables['article_deletions']}`) """ - result = bq_run_query(delete_records_from_main_table) - logging.info(f"Result from inserting the updated records to {self.main_table} - {result}") + result = bq_run_query(delete_records_from_table) + logging.info(f"Result from inserting the updated records to {self.bq_table_id} - {result}") def add_new_dataset_release(self, release: PubMedRelease, **kwargs): """ @@ -1118,7 +1109,9 @@ def cleanup(self, release: PubMedRelease, **kwargs): cleanup(dag_id=self.dag_id, execution_date=kwargs["logical_date"], workflow_folder=release.workflow_folder) -def transform_pubmed_xml_file_to_jsonl(changefile: Changefile, entity_list: List[PubmedEntity]) -> Changefile: +def transform_pubmed_xml_file_to_jsonl( + changefile: Changefile, entity_list: List[PubmedEntity] +) -> Union[Changefile, bool]: """ Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed entities and their additions and or deletions. @@ -1136,7 +1129,11 @@ def transform_pubmed_xml_file_to_jsonl(changefile: Changefile, entity_list: List with gzip.open(changefile.download_file_path, "rb") as f_in: # Use the BioPython library for reading in the Pubmed XML files. - data_dict_dirty = Entrez.read(f_in, validate=True) + try: + data_dict_dirty = Entrez.read(f_in, validate=True) + except ValidationError: + logging.info(f"Fields in XML are not valid against it's own DTD file - {changefile.filename}") + return False # Need to have the XML attributes pulled out from the Biopython data classes. data_dict = add_attributes_to_data_from_biopython_classes(data_dict_dirty) @@ -1150,14 +1147,14 @@ def transform_pubmed_xml_file_to_jsonl(changefile: Changefile, entity_list: List try: try: data_part = [retrieve for retrieve in data_dict[entity.set_key][entity.sub_key]] - except: + except KeyError: data_part = [retrieve for retrieve in data_dict[entity.sub_key]] logging.info( f"Pulled out {len(data_part)} {f'{entity.set_key} records' if entity.type == 'deletions' else f'{entity.sub_key} {entity.type}'} from file - {changefile.filename}" ) - except: + except KeyError: logging.info( f"No {f'{entity.set_key} records' if entity.type == 'deletions' else f'{entity.sub_key} {entity.type}'} records in file - {changefile.filename}" ) @@ -1332,6 +1329,8 @@ def bq_update_table_expiration(project_id: str, dataset_id: str, table_id: str, :param dataset_id: Name of the dataset. :param table_id: Table name. :param expiration_date: Expiration date of the table. + + :return: None. """ bq_client = bigquery.Client(project=project_id) diff --git a/academic_observatory_workflows/workflows/tests/test_pubmed_telescope.py b/academic_observatory_workflows/workflows/tests/test_pubmed_telescope.py index ce45197fb..c08c8b7cd 100644 --- a/academic_observatory_workflows/workflows/tests/test_pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/tests/test_pubmed_telescope.py @@ -17,6 +17,7 @@ import os import json import gzip +import shutil import hashlib import logging import pendulum @@ -25,14 +26,10 @@ from click.testing import CliRunner from airflow.utils.state import State -from Bio.Entrez.Parser import ( - StringElement, - ListElement, - DictionaryElement, -) - from observatory.platform.api import get_dataset_releases from observatory.platform.observatory_config import Workflow +from observatory.platform.workflows.workflow import ChangefileRelease +from Bio.Entrez.Parser import StringElement, ListElement, DictionaryElement from observatory.platform.gcs import gcs_blob_name_from_path, gcs_download_blob from observatory.platform.observatory_environment import ObservatoryEnvironment, ObservatoryTestCase from observatory.platform.bigquery import bq_sharded_table_id, bq_create_table_from_query, bq_export_table @@ -42,13 +39,16 @@ find_free_port, FtpServer, ) + from academic_observatory_workflows.config import test_fixtures_folder from academic_observatory_workflows.workflows.pubmed_telescope import ( Changefile, PubMedCustomEncoder, PubMedRelease, PubMedTelescope, + PubmedEntity, add_attributes_to_data_from_biopython_classes, + transform_pubmed_xml_file_to_jsonl, ) @@ -177,8 +177,8 @@ def test_dag_structure(self): "transform": ["merge_updatefiles"], "merge_updatefiles": ["upload_transformed"], "upload_transformed": ["bq_ingest_update_tables"], - "bq_ingest_update_tables": ["bq_add_updates_to_main_table"], - "bq_add_updates_to_main_table": ["add_new_dataset_release"], + "bq_ingest_update_tables": ["bq_add_updates_to_table"], + "bq_add_updates_to_table": ["add_new_dataset_release"], "add_new_dataset_release": ["cleanup"], "cleanup": ["dag_run_complete"], "dag_run_complete": [], @@ -225,7 +225,7 @@ def test_telescope(self): ftp_conn.login(user="root", passwd="pass") for file_path, upload_date in self.ftp_hosted_files.items(): ftp_command = f"MFMT {upload_date.format('YYYYMMDDHHmmss')} {file_path}" - print("ftp send command ", ftp_command) + logging.info("FTP send command - {ftp_command}") ftp_conn.sendcmd(ftp_command) ftp_conn.close() @@ -274,19 +274,13 @@ def test_telescope(self): changefiles_to_download = [Changefile.from_dict(changefile) for changefile in files_to_download] self.assertEqual(len(changefiles_to_download), len(run["changefiles"])) for i in range(len(run["changefiles"])): - try: - self.assertTrue(changefiles_to_download[i].__eq__(run["changefiles"][i])) - except: - print("From test", changefiles_to_download[i].to_dict()) - print("Expected changefile", run["changefiles"][i].to_dict()) - raise NameError("sdfsdfgdfg") + self.assertTrue(changefiles_to_download[i].__eq__(run["changefiles"][i])) # Create the release release = PubMedRelease( dag_id=self.dag_id, run_id=dag_run.run_id, cloud_workspace=workflow.cloud_workspace, - bq_dataset_id=workflow.bq_dataset_id, start_date=dag_run.data_interval_start, end_date=pendulum.instance(dag_run.data_interval_end), # bug with the observatory instance. is_first_run=run["is_first_run"], @@ -327,7 +321,6 @@ def test_telescope(self): for changefile in release.changefile_list: for entity in workflow.entity_list: transform_file = changefile.transform_file_path(entity.type) - print(f"Transform file - {transform_file}") self.assertTrue(os.path.exists(transform_file)) ### Merge transformed ### @@ -338,7 +331,6 @@ def test_telescope(self): # Check that the file exists for this run. for entity in workflow.entity_list: merged_file = os.path.join(release.transform_folder, run["merged_file_output"][entity.type]) - print("merged file - ", merged_file) self.assertTrue(os.path.exists(merged_file)) ### Upload transformed ### @@ -369,21 +361,21 @@ def test_telescope(self): for entity in workflow.entity_list: table_id = ( - f"{workflow.cloud_workspace.project_id}.{workflow.bq_dataset_id}.{workflow.main_table}" + f"{workflow.cloud_workspace.project_id}.{workflow.bq_dataset_id}.{workflow.bq_table_id}" ) self.assert_table_integrity(table_id, 4) ### bq_add_updates_to_main_table - task_id = workflow.bq_add_updates_to_main_table.__name__ + task_id = workflow.bq_add_updates_to_table.__name__ ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) # On first run of the workflow, the baseline main_table should be untouched. - main_table_id = f"{env.project_id}.{workflow.bq_dataset_id}.{workflow.main_table}" + main_table_id = f"{env.project_id}.{workflow.bq_dataset_id}.{workflow.bq_table_id}" self.assert_table_integrity(main_table_id, 4) # Run query to get list of PMIDs that are present in the table and compare against what it should be. - PMID_list = f"{env.project_id}.{workflow.bq_dataset_id}.{workflow.main_table}_PMID_list_first_run" + PMID_list = f"{env.project_id}.{workflow.bq_dataset_id}.{workflow.bq_table_id}_PMID_list_first_run" bq_query_list_PMIDs = f""" SELECT (MedlineCitation.PMID.Version, MedlineCitation.PMID.value) FROM `{main_table_id}` @@ -458,19 +450,13 @@ def test_telescope(self): changefiles_to_download = [Changefile.from_dict(changefile) for changefile in files_to_download] self.assertEqual(len(changefiles_to_download), len(run["changefiles"])) for i in range(len(run["changefiles"])): - try: - self.assertTrue(changefiles_to_download[i].__eq__(run["changefiles"][i])) - except: - logging.info(f"From test - {changefiles_to_download[i].to_dict()}") - logging.info(f"Expected changefile {run['changefiles'][i].to_dict()}") - raise NameError("sdfsdfgdfg") + self.assertTrue(changefiles_to_download[i].__eq__(run["changefiles"][i])) # Create the release release = PubMedRelease( dag_id=self.dag_id, run_id=dag_run.run_id, cloud_workspace=workflow.cloud_workspace, - bq_dataset_id=workflow.bq_dataset_id, start_date=dag_run.data_interval_start, end_date=pendulum.instance(dag_run.data_interval_end), is_first_run=run["is_first_run"], @@ -511,7 +497,6 @@ def test_telescope(self): for changefile in release.changefile_list: for entity in workflow.entity_list: transform_file = changefile.transform_file_path(entity.type) - print(f"Transform file - {transform_file}") self.assertTrue(os.path.exists(transform_file)) ### Merge transformed ### @@ -522,7 +507,6 @@ def test_telescope(self): # Check that the file exists for this run. for entity in workflow.entity_list: merged_file = os.path.join(release.transform_folder, run["merged_file_output"][entity.type]) - print("merged file - ", merged_file) self.assertTrue(os.path.exists(merged_file)) ### Upload transformed ### @@ -562,7 +546,7 @@ def test_telescope(self): self.assert_table_integrity(table_id, run["update_tables"][entity.type]) ### bq_add_updates_to_main_table - task_id = workflow.bq_add_updates_to_main_table.__name__ + task_id = workflow.bq_add_updates_to_table.__name__ ti = env.run_task(task_id) self.assertEqual(State.SUCCESS, ti.state) @@ -570,19 +554,19 @@ def test_telescope(self): snapshot_table_id = bq_sharded_table_id( workflow.cloud_workspace.project_id, workflow.bq_dataset_id, - f"{workflow.main_table}_backup", + f"{workflow.bq_table_id}_backup", date=release.end_date, ) self.assert_table_integrity(snapshot_table_id, 4) # Check that additions, updates and deletions have been applied successfully. main_table_id = ( - f"{workflow.cloud_workspace.project_id}.{workflow.bq_dataset_id}.{workflow.main_table}" + f"{workflow.cloud_workspace.project_id}.{workflow.bq_dataset_id}.{workflow.bq_table_id}" ) self.assert_table_integrity(main_table_id, 5) # Run query to get list of PMIDs that are present in the table and compare against what it should be. - PMID_list = f"{env.project_id}.{workflow.bq_dataset_id}.{workflow.main_table}_PMID_list_second_run" + PMID_list = f"{env.project_id}.{workflow.bq_dataset_id}.{workflow.bq_table_id}_PMID_list_second_run" bq_query_list_PMIDs = f""" SELECT (MedlineCitation.PMID.Version, MedlineCitation.PMID.value) FROM `{main_table_id}` @@ -632,10 +616,87 @@ class TestPubMedUtils(ObservatoryTestCase): def __init__(self, *args, **kwargs): super(TestPubMedUtils, self).__init__(*args, **kwargs) + self.dag_id = "pubmed" + self.project_id = os.getenv("TEST_GCP_PROJECT_ID") + self.data_location = os.getenv("TEST_GCP_DATA_LOCATION") + + def test_transform_pubmed_xml_file_to_jsonl(self): + """Test that an exmaple PubMed XMLs can be transformed successfully.""" + + # Setup environment + env = ObservatoryEnvironment(self.project_id, self.data_location, api_port=find_free_port()) + + with env.create(task_logging=True): + changefile_release = ChangefileRelease( + dag_id="pubmed_telescope", + run_id="something", + start_date=pendulum.now(), + end_date=pendulum.now(), + sequence_start=1, + sequence_end=1, + ) + + entity_list = [ + PubmedEntity( + name="article_additions", + type="additions", + sub_key="PubmedArticle", + set_key="PubmedArticleSet", + pmid_location="MedlineCitation", + table_description="""PubmedArticle""", + ), + PubmedEntity( + name="article_deletions", + type="deletions", + sub_key="PMID", + set_key="DeleteCitation", + pmid_location=None, + table_description="""DeleteCitation""", + ), + ] + + ### Bad XML ### + changefile_bad = Changefile( + filename="pubmed23n0001_bad_fields.xml.gz", + file_index=1, + path_on_ftp="dummy_string", + is_first_run=True, + changefile_date=pendulum.now(), + changefile_release=changefile_release, + ) + bad_xml_file_path = os.path.join(test_fixtures_folder(), "pubmed", "pubmed23n0001_bad_fields.xml.gz") + shutil.copy2(bad_xml_file_path, changefile_bad.download_file_path) + + # Attempt to transform bad xml. + changefile_returned = transform_pubmed_xml_file_to_jsonl(changefile=changefile_bad, entity_list=entity_list) + self.assertFalse(changefile_returned) + + changefile_good = Changefile( + filename="pubmed23n0001.xml.gz", + file_index=1, + path_on_ftp="dummy_string", + is_first_run=True, + changefile_date=pendulum.now(), + changefile_release=changefile_release, + ) + + ### VALID XML ### + valid_xml_file_path = os.path.join(test_fixtures_folder(), "pubmed", "baseline", "pubmed23n0001.xml.gz") + shutil.copy2(valid_xml_file_path, changefile_good.download_file_path) + + # Attempt to transform valid xml - returns Changefile instance only if it's successful. + changefile_returned = transform_pubmed_xml_file_to_jsonl( + changefile=changefile_good, entity_list=entity_list + ) + self.assertTrue(isinstance(changefile_returned, Changefile)) + + # Ensure that transform files were written to disk. + for entity in entity_list: + self.assertTrue(os.path.exists(changefile_returned.transform_file_path(entity.type))) + def test_add_attributes_to_data_from_biopython(self): """ - Test that attributes from the Biopython data classes can be reliably pulled out - from the data and added to the dictionary. + Test that attributes from the Biopython data classes can be reliably pulled out and added to the dictionary. """ biopython_str = StringElement("string", tag="data", attributes={"type": "str"}, key="data") @@ -655,7 +716,6 @@ def test_add_attributes_to_data_from_biopython(self): with CliRunner().isolated_filesystem() as tmp_dir: # Write test files using custom encoder output_file = os.path.join(tmp_dir, "test_output_file.jsonl") - print(output_file) with open(output_file, "wb") as f_out: for line in objects: output = add_attributes_to_data_from_biopython_classes(line)