diff --git a/academic_observatory_workflows/workflows/pubmed_telescope.py b/academic_observatory_workflows/workflows/pubmed_telescope.py index 8e9d8c071..0b0c912a4 100644 --- a/academic_observatory_workflows/workflows/pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/pubmed_telescope.py @@ -20,12 +20,12 @@ import subprocess from typing import Dict, List import pendulum +from datetime import timedelta import logging import re import gzip import json - # To download the files from the FTP server import ftplib @@ -132,7 +132,7 @@ def __init__( *, dag_id: str, workflow_id: int, - start_date: str = pendulum.datetime(year=2022, month=12, day=1), # when to start catchup / collecting data. + start_date: str = pendulum.datetime(year=2022, month=12, day=1), # When the baseline data became available. schedule_interval: str = "0 0 * * 0", # weekly catchup: bool = True, ftp_server_url: str, @@ -232,7 +232,7 @@ def __init__( # TODO: Make condition for when the new baseline is out - after the release date for it each year. # if present day after the new baseline date then download baseline or # this is the very first release for the telescope - self.download_baseline = is_first_release(self.workflow_id) + # self.download_baseline = is_first_release(self.workflow_id) self.add_setup_task(self.check_dependencies) self.add_task(self.check_releases) @@ -274,6 +274,11 @@ def make_release(self, **kwargs) -> PubMedRelease: self.download_folder = make_workflow_folder(self.dag_id, release_date, SubFolder.downloaded.value) self.transform_folder = make_workflow_folder(self.dag_id, release_date, SubFolder.transformed.value) + if self.data_interval_start < self.start_date + timedelta(days=7): + self.download_baseline = True + else: + self.download_baseline = False + return release def check_releases(self, release: PubMedRelease, **kwargs): @@ -515,11 +520,9 @@ def transform(self, release: PubMedRelease, **kwargs): # List of objects to hold metadata and file lists about how to pull the data from Pubmed. ## This needs to be renamed to something like "pubmed_workflow_metadata" as it is used throughout. - pubmed_transform_list = [ - { - "name": "pubmed_article", + entity_list = { + "pubmed_article_additions": { "data_type": "additions", - "data": [], "sub_key": "PubmedArticle", "set_key": "PubmedArticleSet", "pmid_key_loc": "MedlineCitation", @@ -527,10 +530,8 @@ def transform(self, release: PubMedRelease, **kwargs): "transform_files": [], "merged_transform_files": [], }, - { - "name": "pubmed_book_article", + "pubmed_book_article_additions": { "data_type": "additions", - "data": [], "sub_key": "PubmedBookArticle", "set_key": "PubmedBookArticleSet", "pmid_key_loc": "MedlineCitation", @@ -538,8 +539,7 @@ def transform(self, release: PubMedRelease, **kwargs): "transform_files": [], "merged_transform_files": [], }, - { - "name": "book_document", + "book_document_additions": { "data_type": "additions", "data": [], "sub_key": "BookDocument", @@ -549,31 +549,29 @@ def transform(self, release: PubMedRelease, **kwargs): "transform_files": [], "merged_transform_files": [], }, - { - "name": "pubmed_article", + "pubmed_article_deletions": { "data_type": "deletions", "data": [], - "sub_key": "DeleteCitation", - "set_key": None, + "sub_key": "PMID", + "set_key": "DeleteCitation", "output_file_base": f"pubmed_article_deletions_{self.release_id}", "transform_files": [], "merged_transform_files": [], }, - { - "name": "book_document", + "book_document_deletions": { "data_type": "deletions", "data": [], - "sub_key": "DeleteDocument", - "set_key": None, + "sub_key": "PMID", + "set_key": "DeleteDocument", "output_file_base": f"book_document_deletions_{self.release_id}", "transform_files": [], "merged_transform_files": [], }, - ] + } # Process files in batches so that ProcessPoolExecutor doesn't deplete the system of memory - for i, chunk in enumerate(get_chunks(input_list=downloaded_files_for_release, chunk_size=self.batch_size)): - with ProcessPoolExecutor(max_workers=self.max_processes) as executor: + for i, chunk in enumerate(get_chunks(input_list=downloaded_files_for_release, chunk_size=1)): + with ProcessPoolExecutor(max_workers=1) as executor: futures = [] logging.info(f"In chunk {i} and processing files: {chunk}") @@ -581,31 +579,23 @@ def transform(self, release: PubMedRelease, **kwargs): # Create tasks for each file for input_file in chunk: future = executor.submit( - transform_pubmed_xml_file_to_jsonl, input_file, pubmed_transform_list, self.transform_folder + transform_pubmed_xml_file_to_jsonl, input_file, entity_list, self.transform_folder ) futures.append(future) # Gather list of files transformed by the above. for future in as_completed(futures): - # There's probably a much better way of doing this joining - for i in range(len(pubmed_transform_list)): - tranformed = future.result() - for j in range(len(tranformed)): - if ( - pubmed_transform_list[i]["name"] == tranformed[j]["name"] - and pubmed_transform_list[i]["data_type"] == tranformed[j]["data_type"] - ): - pubmed_transform_list[i]["transform_files"] = tranformed[j]["transform_files"] - - # Gather list of transformed files for GCS upload - transformed_files = [] - for pubmed_data in pubmed_transform_list: - transformed_files.extend(pubmed_data["transform_files"]) + returned_entity_list = future.result() + + # Merge the list of transform files from mulitprocessing step + for name, entity in returned_entity_list.items(): + entity_list[name]["transform_files"].extend(entity["transform_files"]) - logging.info(f"Tranfformed files: {transformed_files}") + # for name, entity in entity_list.items(): + # logging.info(f'List of transformed files - {name} {entity["data_type"]} - {entity["transform_files"]}') # Push transform list metadata into the Xcom - ti.xcom_push(key="pubmed_transform_list", value=pubmed_transform_list) + ti.xcom_push(key="entity_list", value=entity_list) def merge_transform_files(self, release: PubMedRelease, **kwargs): """Merge the transformed Pubmed files into appropriately sized chunks for upload GCS and import to BQ. @@ -615,32 +605,25 @@ def merge_transform_files(self, release: PubMedRelease, **kwargs): # Pull list of transform files from xcom ti: TaskInstance = kwargs["ti"] - pubmed_transform_list = ti.xcom_pull(key="pubmed_transform_list") + entity_list = ti.xcom_pull(key="entity_list") logging.info(f"Merging Pubmed transform files into {self.transform_file_size} Megabyte sized files.") # Loop through each record type of Pubmed (Article, BookArticle, BookDocument, DeleteCitation, DeleteDocument) # for pubmed_data in pubmed_transform_list: - for i in range(len(pubmed_transform_list)): - pubmed_data = pubmed_transform_list[i] - # Divide up list of files across the threads????? - - # Each thread gets a list of files that will be turned merged into chunk files. - # with ProcessPoolExecutor(max_workers=self.max_processes) as executor: - + for name, entity in entity_list.items(): # Initialise list merged_files = [] # Part number of the larger file chunks. part_num = 1 - # Get first pubmed file id to be read in. - # last_pubmed_file_id = pull_pubmed_file_id(pubmed_data["transform_files"][0]) + entity["transform_files"].sort() - pubmed_data["transform_files"].sort() + logging.info(f"List of transformed files from {name} {entity['transform_files']}") - data = [] # private to the thread that it's dealing with. - for file in pubmed_data["transform_files"]: + data = [] + for file in entity["transform_files"]: logging.info(f"Reading file into memory - {file}") # Open file in memory @@ -650,28 +633,26 @@ def merge_transform_files(self, release: PubMedRelease, **kwargs): # Append it onto existing data object. data.extend(new_data) - logging.info(f"Current data size: {asizeof.asizeof(data)/ (8.0*1024.0**2)} Mb") + current_size = asizeof.asizeof(data) / (8.0 * 1024.0**2) - # asizeof seems to give the size in bits, not bytes. + logging.info(f"Current data size: {current_size} Mb") # If the size of the data is greater than this, write out to file and clear variable from memeory - # OR of the file is the last one, write it out to file regardless. - if ( - asizeof.asizeof(data) / (8 * 1024**2) >= self.transform_file_size - or file == pubmed_data["transform_files"][-1] - ): + # OR if the file is the last one, write it out to file regardless. + + if current_size >= self.transform_file_size or file == entity["transform_files"][-1]: # To make a aptly named file name for the output. # current_pubmed_file_id = pull_pubmed_file_id(file) # Make proper name for larger file of chunk. merged_file = os.path.join( self.transform_folder, - f"{pubmed_data['name']}_{pubmed_data['data_type']}_{self.release_id}_part_{part_num}.jsonl", + f"{name}_{self.release_id}_part_{part_num}.jsonl", ) logging.info(f"Writing out to file: {merged_file}") - # Write out to file + # Write out to file as compressed *.jsonl.gz with open(merged_file, "wb") as f_out: for line in data: f_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) @@ -679,16 +660,17 @@ def merge_transform_files(self, release: PubMedRelease, **kwargs): # Clear object in memeory (necessary??) data = [] - # Save file name into memory + # Save file name to be put into entity list. merged_files.append(merged_file) + # Increase part number for next chunk part_num += 1 # Export list of merged files to transform list - pubmed_transform_list[i]["merged_files"] = pubmed_data + entity_list[name]["merged_files"] = merged_files # Update transform metadata list to xcom - ti.xcom_push(key="pubmed_transform_list", value=pubmed_transform_list) + ti.xcom_push(key="entity_list", value=entity_list) def upload_transformed(self, release: PubMedRelease, **kwargs): """Upload the transformed files to GCS for BQ import. @@ -698,12 +680,12 @@ def upload_transformed(self, release: PubMedRelease, **kwargs): # Pull list of transform files from xcom ti: TaskInstance = kwargs["ti"] - pubmed_transform_list = ti.xcom_pull(key="pubmed_transform_list") + entity_list = ti.xcom_pull(key="entity_list") # Collect list of all merged files for upload transformed_files_to_updload_to_gcs = [] - for pubmed_data in pubmed_transform_list: - transformed_files_to_updload_to_gcs.extend(pubmed_data["merged_files"]) + for name, entity in entity_list.items(): + transformed_files_to_updload_to_gcs.extend(entity["merged_files"]) uploaded_transform_files = [] for file_to_upload in transformed_files_to_updload_to_gcs: @@ -800,75 +782,6 @@ def cleanup(self, release: PubMedRelease, **kwargs): # Remove all task instance data from this workflow run. -def download_pubmed_xml_file( - file_on_ftp: str, check_md5_hash: bool, download_folder: str, max_download_retry: int, ftp_server_url: str, ftp_conn -) -> str: - """To download a single file from Pubmed's FTP server. - Option to check the md5 hash as well. - :param file: Path of the file on the FTP server to download. - :param check_md5_hash: - :param download_folder: - :return downloaded_file: Absolute path to the downloaded file. - """ - - # This split is OK as it is pulling it from the FTP server path. - file = file_on_ftp.split("/")[-1] - - # Save file to correct download path for the workflow - file_download_location = os.path.join(download_folder, file) - - # Check if the hash is the same for the downloaded vs one on the web. - if check_md5_hash: - download_attemp_count = 1 - download_success = False - while download_attemp_count <= max_download_retry and not download_success: - logging.info(f"Downloading: {file} Attempt: {download_attemp_count}") - try: - # Download file - with open(file_download_location, "wb") as f: - ftp_conn.retrbinary(f"RETR {file_on_ftp}", f.write) - except: - logging.info(f"Unable to download {file_on_ftp} from PubMed's FTP server {ftp_server_url}.") - - # Create the hash from the above downloaded file. - with open(file_download_location, "rb") as f_in: - data = f_in.read() - md5hash_from_download = hashlib.md5(data).hexdigest() - - # Download corresponding md5 hash. - with open(f"{file_download_location}.md5", "wb") as f: - ftp_conn.retrbinary(f"RETR {file_on_ftp}.md5", f.write) - - # Peep into md5 file. - with open(f"{file_download_location}.md5", "r") as f_md5: - md5_from_pubmed_ftp = f_md5.read() - - # If md5 does not match, raise an Airflow exception. - if md5hash_from_download in md5_from_pubmed_ftp: - download_success = True - return file_download_location - else: - logging.info(f"MD5 hash does not match the given MD5 checksum from server: {file}") - - download_attemp_count += 1 - - if not download_success: - raise AirflowException( - f"Unable to download {file_on_ftp} from PubMed's FTP server {ftp_server_url} after {max_download_retry} retries" - ) - - else: - logging.info(f"Downloading: {file}") - try: - # Download file - with open(file_download_location, "wb") as f: - ftp_conn.retrbinary(f"RETR {file_on_ftp}", f.write) - except: - raise AirflowException(f"Unable to download {file_on_ftp} from PubMed's FTP server {ftp_server_url}") - - return file_download_location - - def pull_pubmed_file_id(absolute_file_path: str) -> str: """Simple function to remove the Pubmed file number/ID from the path of the file.""" @@ -941,63 +854,7 @@ def change_keys(obj, convert): return new -def check_for_deletions(sub_set: str, pmid_key_loc: str, additions: List[Dict], deletions: List[Dict]): - """Run through the list of additions and deletions in memory first before uploading to BQ to save time/resources.""" - - # TODO: Need to parallelise this section. For initial baseline it will take many days to do. - - try: - logging.info(f"Running though {sub_set} record for deletions.") - - additions_with_dels_checked = additions.copy() - deletions_with_dels_checked = deletions.copy() - - for record_to_delete in deletions: - for article_addition_to_check in additions: - to_check = article_addition_to_check[f"{pmid_key_loc}"]["PMID"] - if record_to_delete == to_check: - additions_with_dels_checked.remove(record_to_delete) - deletions_with_dels_checked.remove(article_addition_to_check) - - logging.info(f"Removed the following record from additions list - {record_to_delete}") - - logging.info( - f"There are {len(deletions_with_dels_checked)} article deletions left to do on BQ and {len(additions_with_dels_checked)} article additions to add to the snapshot for this release." - ) - - except: - logging.info(f"No {sub_set} records to check against for deletions.") - - return additions_with_dels_checked, deletions_with_dels_checked - - -def pull_data_from_dict( - filename: str, data_dict: Dict, data_name: str, sub_set: str, set_key: str = None -) -> List[Dict]: - """Attempt to pull a subset of data from the dictionary injested from the Pubmed XML files.""" - - try: - logging.info(f"Extracting {sub_set} records from file - {filename}") - - # Try and catch for the cases where there are multiple e.g. Articles and BookArticles in the same dictionary. - - try: - retrieved = [retrieve for retrieve in data_dict[set_key][sub_set]] - except: - retrieved = [retrieve for retrieve in data_dict[sub_set]] - - logging.info(f"Pulled out {len(retrieved)} {sub_set} {data_name} from file - {filename}") - - except: - logging.info(f"No {sub_set} {data_name} in file - {filename}") - retrieved = [] - - return retrieved - - -def transform_pubmed_xml_file_to_jsonl( - input_file: str, pubmed_transform_list: List[Dict], transform_folder: str -) -> List[Dict]: +def transform_pubmed_xml_file_to_jsonl(input_file: str, entity_list: Dict, transform_folder: str) -> Dict: """Convert a single Pubmed XML file to JSONL, pulling out any of the PubmedArticle, PubmedBookArticle, BookDocument, DeleteCitation, DeleteDocument. @@ -1012,7 +869,7 @@ def transform_pubmed_xml_file_to_jsonl( pubmed_file_id = Path(input_file).name.split(".")[0] - logging.info(f"Reading in file to memory - {input_file}") + logging.info(f"Reading in file - {input_file}") with gzip.open(input_file, "rb") as f_in: # Use the BioPython library for reading in the Pubmed XML files. @@ -1021,23 +878,39 @@ def transform_pubmed_xml_file_to_jsonl( # Need to have the XML attributes pulled out from the Biopython data classes. data_dict = add_attributes_to_data_from_biopython_classes(data_dict_dirty) - logging.info(f"Pulling out data from file - {input_file}") + # now a key, value - loop through entities + for name, entity in entity_list.items(): + # Have to set this to empty here because python multi processing is dumb. + entity_list[name]["transform_files"] = [] - for i in range(len(pubmed_transform_list)): - pubmed_data = pubmed_transform_list[i] + # Pull out each record type from the data. + data_type, set_key, sub_key = ( + entity["data_type"], + entity["set_key"], + entity["sub_key"], + ) - # Pull out each of the citation additions and deletions from the data. - data_part = pull_data_from_dict( - filename=input_file, - data_dict=data_dict, - data_name=pubmed_data["data_type"], - sub_set=pubmed_data["sub_key"], - set_key=pubmed_data["set_key"], + logging.info( + f"Extracting {set_key if data_type == 'deletions' else sub_key} records from file - {pubmed_file_id}" ) - output_file = os.path.join(transform_folder, pubmed_data["output_file_base"] + "_" + pubmed_file_id + ".jsonl") - name = pubmed_data["name"] - data_type = pubmed_data["data_type"] + try: + try: + data_part = [retrieve for retrieve in data_dict[set_key][sub_key]] + except: + data_part = [retrieve for retrieve in data_dict[sub_key]] + + logging.info( + f"Pulled out {len(data_part)} {f'{set_key} records' if data_type == 'deletions' else f'{sub_key} {data_type}'} from file - {pubmed_file_id}" + ) + + except: + logging.info( + f"No {f'{set_key} records' if data_type == 'deletions' else f'{sub_key} {data_type}'} records in file - {pubmed_file_id}" + ) + data_part = [] + + output_file = os.path.join(transform_folder, entity["output_file_base"] + "_" + pubmed_file_id + ".jsonl") logging.info(f"Writing {name} {data_type} to file - {output_file}") @@ -1045,11 +918,9 @@ def transform_pubmed_xml_file_to_jsonl( for line in data_part: f_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) - pubmed_data["transform_files"].append(output_file) - - pubmed_transform_list[i] = pubmed_data + entity_list[name]["transform_files"].append(output_file) - return pubmed_transform_list + return entity_list # TODO: Clean up this function if possible. diff --git a/academic_observatory_workflows/workflows/tests/test_pubmed.py b/academic_observatory_workflows/workflows/tests/test_pubmed.py new file mode 100644 index 000000000..61b31e960 --- /dev/null +++ b/academic_observatory_workflows/workflows/tests/test_pubmed.py @@ -0,0 +1,532 @@ +# Copyright 2023 Curtin University +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Author: Alex Massen-Hane + +import datetime +import gzip +import io +import json +import os +from subprocess import Popen +from types import SimpleNamespace +from unittest.mock import Mock, call, patch + +import pendulum +from airflow.exceptions import AirflowException, AirflowSkipException +from airflow.models import Connection +from airflow.utils.state import State +from botocore.response import StreamingBody +from click.testing import CliRunner +from dateutil import tz + +from academic_observatory_workflows.api_type_ids import DatasetTypeId +from academic_observatory_workflows.config import test_fixtures_folder +from academic_observatory_workflows.workflows.pubmed_telescope import ( + PubMedRelease, + PubMedTelescope, + run_subprocess_cmd, + pull_data_from_dict, + transform_pubmed_xml_file_to_jsonl, + add_attributes_to_data_from_biopython_classes, + CustomEncoder, +) +from observatory.api.client import ApiClient, Configuration +from observatory.api.client.api.observatory_api import ObservatoryApi # noqa: E501 +from observatory.api.client.model.dataset import Dataset +from observatory.api.client.model.dataset_type import DatasetType +from observatory.api.client.model.organisation import Organisation +from observatory.api.client.model.table_type import TableType +from observatory.api.client.model.workflow import Workflow +from observatory.api.client.model.workflow_type import WorkflowType + +from observatory.api.testing import ObservatoryApiEnvironment +from observatory.platform.utils.airflow_utils import AirflowConns +from observatory.platform.utils.gc_utils import ( + upload_file_to_cloud_storage, +) +from observatory.platform.utils.release_utils import get_dataset_releases +from observatory.platform.utils.test_utils import ( + ObservatoryEnvironment, + ObservatoryTestCase, + SftpServer, + module_file_path, + find_free_port, +) + +# Biopython Parser and data structures. +from Bio import Entrez +from Bio.Entrez.Parser import ( + StringElement, + ListElement, + DictionaryElement, + OrderedListElement, +) + + +class TestPubMedTelescope(ObservatoryTestCase): + """Tests for the OpenAlex telescope""" + + def __init__(self, *args, **kwargs): + """Constructor which sets up variables used by tests. + :param args: arguments. + :param kwargs: keyword arguments. + """ + + + + + super(TestPubMedTelescope, self).__init__(*args, **kwargs) + self.project_id = os.getenv("TEST_GCP_PROJECT_ID") + self.data_location = os.getenv("TEST_GCP_DATA_LOCATION") + + self.ftp_port = find_free_port() + + # self.pubmed_transform_list = [ + # { + # "name": "pubmed_article", + # "data_type": "additions", + # "data": [], + # "sub_key": "PubmedArticle", + # "set_key": "PubmedArticleSet", + # "pmid_key_loc": "MedlineCitation", + # "output_file_base": f"pubmed_article_additions_{self.release_id}", + # "transform_files": [], + # "merged_transform_files": [], + # }, + # { + # "name": "pubmed_book_article", + # "data_type": "additions", + # "data": [], + # "sub_key": "PubmedBookArticle", + # "set_key": "PubmedBookArticleSet", + # "pmid_key_loc": "MedlineCitation", + # "output_file_base": f"pubmed_book_article_additions_{self.release_id}", + # "transform_files": [], + # "merged_transform_files": [], + # }, + # { + # "name": "book_document", + # "data_type": "additions", + # "data": [], + # "sub_key": "BookDocument", + # "set_key": "BookDocumentSet", + # "pmid_key_loc": "BookDocument", + # "output_file_base": f"book_document_additions_{self.release_id}", + # "transform_files": [], + # "merged_transform_files": [], + # }, + # { + # "name": "pubmed_article", + # "data_type": "deletions", + # "data": [], + # "sub_key": "DeleteCitation", + # "set_key": None, + # "output_file_base": f"pubmed_article_deletions_{self.release_id}", + # "transform_files": [], + # "merged_transform_files": [], + # }, + # { + # "name": "book_document", + # "data_type": "deletions", + # "data": [], + # "sub_key": "DeleteDocument", + # "set_key": None, + # "output_file_base": f"book_document_deletions_{self.release_id}", + # "transform_files": [], + # "merged_transform_files": [], + # }, + # ] + + # First ever run of the telescope for 2022 data. + self.run_list = [ + { + "start": pendulum.datetime(year=2022, month=12, day=4), # Absolute start date of the telescope. + "data_interval_start": pendulum.datetime(year=2022, month=12, day=4), + "data_interval_end": pendulum.datetime(year=2022, month=12, day=11), + "baseline_initial_file_date": pendulum.datetime(year=2022, month=12, day=8), + "updatefiles_initial_file_date": pendulum.datetime(year=2022, month=12, day=9) + "is_first_release": True, + }, + { + "start": pendulum.datetime(year=2022, month=12, day=4), # Absolute start date of the telescope. + "data_interval_start": pendulum.datetime(year=2022, month=12, day=11), + "data_interval_end": pendulum.datetime(year=2022, month=12, day=18), + "baseline_initial_file_date": pendulum.datetime(year=2022, month=12, day=8), + "updatefiles_initial_file_date": pendulum.datetime(year=2022, month=12, day=9), + "is_first_release": False, + "release_table_hash": "adfalsdfjhalsdjfalsdf" + } + ] + + # API environment + self.host = "localhost" + configuration = Configuration(host=f"http://{self.host}:{self.port}") + api_client = ApiClient(configuration) + self.api = ObservatoryApi(api_client=api_client) # noqa: E501 + self.env = ObservatoryApiEnvironment(host=self.host, port=self.port) + self.org_name = "Curtin University" + + def test_dag_structure(self): + """Test that the PubMed DAG has the correct structure. + :return: None + """ + + dag = PubMedTelescope(workflow_id=0).make_dag() + self.assert_dag_structure( + { + "check_dependencies": ["check_releases"], + "check_releases": ["download"], + "download": ["upload_downloaded"], + "upload_downloaded": ["transform"], + "transform": ["merged_transformed"], + "merged_transformed": ["upload_transformed"], + "upload_transformed": ["bq_append_new"], + "bq_append_new": ["bq_delete_old"], + "bq_delete_old": ["bq_create_snapshot"], + "bq_create_snapshot": ["add_new_dataset_releases"], + "add_new_dataset_releases": ["cleanup"], + "cleanup": ["cleanup"] + }, + dag, + ) + + @patch("observatory.platform.utils.release_utils.make_observatory_api") + def test_dag_load(self, m_makeapi): + """Test that the OpenAlex DAG can be loaded from a DAG bag. + :return: None + """ + + m_makeapi.return_value = self.api + env = ObservatoryEnvironment(self.project_id, self.data_location, api_host=self.host, api_port=self.port) + + with env.create(): + self.setup_connections(env) + self.setup_api() + dag_file = os.path.join(module_file_path("academic_observatory_workflows.dags"), "pubmed_telescope.py") + self.assert_dag_load("pubmed", dag_file) + + # Patch whatever is needed for this test + @patch('ftplib.FTP', autospec=True) + def test_telescope(self, m_makeapi, mock_ftp_constructor): + """Test the PubMed Telescope end to end. + :return: None. + """ + m_makeapi.return_value = self.api + + # Setup Observatory environment + env = ObservatoryEnvironment(self.project_id, self.data_location, api_host=self.host, api_port=self.port) + bq_dataset_id = env.add_dataset() + + + # First run should download the baseline files and create the large initial table. + # figure out how to make this change each year so that tests dont keep breaking, or just add a note? + + # For each run, make an environment with the telescope instance + + for run in self.run_list: + + with env.create(): + self.setup_api() + + # Setup Telescope + # TODO: Get the workflow ID of the telescope for this section. + workflow = PubMedTelescope(dag_id=self.dag_id, + ftp_server_url="ftp.server.local") + + dag = workflow.make_dag() + + # TODO: execution date is wrong here. need to use the data_interval_end date. + # First snapshot instance + run = self.first_run + with env.create_dag_run(dag, run["data_interval_start"]) as dag_run: + + # Test that all dependencies are specified: no error should be thrown + env.run_task(workflow.check_dependencies.__name__) + + # Pull out ti from kwargs for data interval start and end + + self.assertEqual(dag.default_args["start_date"], start_date) # The absolute start of the telescope + self.assertEqual(pendulum.datetime(year=2021, month=12, day=4), run["data_interval_start"]) + + # use telescope task itself to determine if it's the first release or not. + is_first_release = workflow.check_releases.__name__ + + self.assertTrue(first_release) + + # Use release info for other tasks + # TODO: Fix the properties on the release. + release = PubMedRelease( + workflow.dag_id, + workflow.workflow_id, + workflow.dataset_type_id, + + ) + + + ### CHECK RELEASES ### + + # Check that telescope checks for the release files + ti = env.run_task(workflow.check_releases.__name__) + + mock_ftp = mock_ftp_constructor.return_value + + # Check that server address is correct + mock_ftp_constructor.assert_called_with("ftp.server.local") + + # Check that ftp.login() was called + self.assertTrue(mock_ftp.login.called) + + # Check folders where changed into + mock_ftp.cwd.assert_called_with('change to folders') + + ### DOWNLOAD ### + + # Test that file was downloaded + env.run_task(workflow.download.__name__) + # TODO: Make a list of baseline release files to download for test + #self.assertEqual(len(files_downloaded), expected_files_downloaded) + + ### UPLOAD DOWNLAODED ### + + # Upload the mock downloaded files to GCS and check their hash as an assert. + for entity, info in self.entities.items(): + gzip_path = f"{entity}.jsonl.gz" + with open(info["download_path"], "rb") as f_in, gzip.open(gzip_path, "wb") as f_out: + f_out.writelines(f_in) + + download_blob = ( + f"telescopes/{release.dag_id}/{release.release_id}/transform/" + f"data/{entity}/updated_date={run['manifest_date']}/0000_part_00.gz" + ) + upload_file_to_cloud_storage(release.download_bucket, download_blob, gzip_path) + + + ### TRANSFORM ### + + # Check if files exist and if the hash of the files are correct. + + # Test that files transformed + env.run_task(workflow.transform.__name__) + self.assertEqual(3, len(release.transform_files)) + # Sort lines so that gzip crc is always the same + for file in release.transform_files: + entity = file.split("/")[-3] + with gzip.open(file, "rb") as f_in: + lines = sorted(f_in.readlines()) + with gzip.open(file, "wb") as f_out: + f_out.writelines(lines) + self.assert_file_integrity(file, self.entities[entity]["transform_hash"], "gzip_crc") + + ### UPLOAD TRANSFORMED ### + + # Test that transformed files uploaded + ti = env.run_task(workflow.upload_transformed.__name__) + #ti[] + # pull updated list from kwargs of the uploaded files. + + for pubmed_data in pubmed_transform_list: + + ### CREATE RELEASE SNAPSHOT ### + + # Get bq load info for BQ tasks + bq_load_info = workflow.get_bq_load_info(release) + + # Test append new creates table + env.run_task(workflow.bq_append_new.__name__) + for _, table, _ in bq_load_info: + table_id = f"{self.project_id}.{worflow.dataset_id}.{table}" + expected_bytes = run["table_bytes"][table] + self.assert_table_bytes(table_id, expected_bytes) + + # Test delete old task is skipped for the first release + with patch("observatory.platform.utils.gc_utils.bq_query_bytes_daily_limit_check"): + ti = env.run_task(workflow.bq_delete_old.__name__) + self.assertEqual(ti.state, State.SUCCESS) + + # Test create bigquery snapshot + ti = env.run_task(workflow.bq_create_snapshot.__name__) + self.assertEqual(ti.state, State.SUCCESS) + + # Test adding of dataset releases as well as cleanup + download_folder, extract_folder, transform_folder = ( + release.download_folder, + release.extract_folder, + release.transform_folder, + ) + + ### CLEANUP ### + + ti = env.run_task(workflow.cleanup.__name__) + + # check that download and transform files were deleted from local storage + # and that local ti were cleared + + openalex_dataset_releases = get_dataset_releases(dataset_id=1) + + self.assert_cleanup(download_folder, extract_folder, transform_folder) + + @patch("academic_observatory_workflows.workflows.pubmed_telescope.wait_for_process") + @patch("academic_observatory_workflows.workflows.pubmed_telescope.logging.info") + def test_run_subprocess_cmd(self, mock_logging, mock_wait_for_proc): + """Test the run_subprocess_cmd function. + + :return: None. + """ + # Mock logging + mock_wait_for_proc.return_value = ("out", "err") + + # Set up parameters + args = ["run", "unittest"] + proc = Mock(spec=Popen) + + # Test when return code is 0 + proc.returncode = 0 + run_subprocess_cmd(proc, args) + expected_logs = ["Executing bash command: run unittest", "out", "err", "Finished cmd successfully"] + self.assertListEqual([call(log) for log in expected_logs], mock_logging.call_args_list) + + # Test when return code is 1 + proc.returncode = 1 + with self.assertRaises(AirflowException): + run_subprocess_cmd(proc, args) + + def test_pull_out_attributes(self): + """ + Test that attributes from the Biopython data classes can be reliably pulled out from the data and added to the dictionary. + + :return: None. + """ + + # Give it a fake or random Biopython data class + + biopython_dict = DictionaryElement.__init__({"data": "something"}, attrs={"type": "dict"}) + biopython_list = ListElement.__init__(["data"], attrs={"type": "list"}) + biopython_str = StringElement.__init__("data", attrs={"type": "str"}) + + objects = [biopython_dict, biopython_list, biopython_str] + + dict_expected = {"data": "something", "type": "dict"} + list_expected = {"value": ["data"], "type": "list"} + str_expected = {"value": "data", "type": "str"} + + expected_objects = [dict_expected, list_expected, str_expected] + + for obj, expected in zip(objects, expected_objects): + # Pull out the attributes and ensure that data returned is data expected. + pulled_data = add_attributes_to_data_from_biopython_classes(obj) + + self.assertEqual(pulled_data, expected) + + def test_biopython_read_xml(self): + """ + Test that files can be reliably transformed. + + :return: None. + """ + + + example_xml = """ + + + + + + 10753808 + + 2000 + 06 + 12 + + + + + + 2000 + 4 + 8 + 9 + 0 + + + + + + + """ + + expected_biopython_output = ListElement( + DictionaryElement(), + DictionaryElement() + ) + + # read it in using the biopython library for xml to data dict. + + #Check that they are the same. + + # Confirm that the data has been cross referenced against the schema file. + + def test_pull_data_from_dict(self): + + """Test that the incoming dictionary has the correct keys for the PubMed data. + + :return: None. + """ + + example_transform_list = [ + { + "sub_key": "something_deletion1", + "set_key": "something_deletion1" + }, + { + "sub_key": "something_addition1", + "set_key": "something_addition2" + } + ] + + expected_data_parts = { + + } + + mock_input_file = "something_example.xml.gz" + mock_data_dict = {} + + for i in range(len(example_transform_list)): + pubmed_data = example_transform_list[i] + + data_part = pull_data_from_dict( + filename=mock_input_file, + data_dict=mock_data_dict, + data_name=pubmed_data["data_type"], + sub_set=pubmed_data["sub_key"], + set_key=pubmed_data["set_key"], + ) + + # Confirm that data part is what we want. + # self.assertEquals(data_part, expected_data_part[i]) + + + def test_customEncoder(self): + """Test that files are written out as expected using the CustomEncoder for PubMed files. + + :return: None. + """ + + # Read in json file + + # use customEncoder to write out the file + + # Read in the output file from the customEncoder to ensure that it's written as required. + +