diff --git a/academic_observatory_workflows/dags/pubmed_telescope.py b/academic_observatory_workflows/dags/pubmed_telescope.py index fd4ecf10..6c4d5cd6 100644 --- a/academic_observatory_workflows/dags/pubmed_telescope.py +++ b/academic_observatory_workflows/dags/pubmed_telescope.py @@ -39,7 +39,7 @@ schema_folder="none", source_format=SourceFormat.NEWLINE_DELIMITED_JSON, workflow_id=workflow.id, - start_date=pendulum.datetime(2022, 12, 4), + start_date=pendulum.datetime(2022, 12, 1), schedule_interval="@weekly", catchup=True, ftp_server_url="ftp.ncbi.nlm.nih.gov", diff --git a/academic_observatory_workflows/workflows/pubmed_telescope.py b/academic_observatory_workflows/workflows/pubmed_telescope.py index 4df90b58..eb0a50c1 100644 --- a/academic_observatory_workflows/workflows/pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/pubmed_telescope.py @@ -16,12 +16,16 @@ # Common libraries import os +from pathlib import Path import subprocess from typing import Dict, List, OrderedDict, Tuple import pendulum import datetime import logging import re +import gzip +import json + # To download the files from the FTP server import ftplib @@ -39,14 +43,11 @@ OrderedListElement, ) -# Alternative method for injest -import xmlschema +# Multithreading libraries +from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed -import gzip -import json +# import concurrent.futures -# Parallelising libraries -import concurrent.futures # Airflow modules from airflow import AirflowException @@ -57,6 +58,7 @@ from academic_observatory_workflows.config import schema_folder as default_schema_folder from observatory.platform.utils.airflow_utils import AirflowVars, make_workflow_folder from observatory.platform.workflows.workflow import Workflow, Release +from observatory.platform.utils.workflow_utils import get_chunks from observatory.platform.utils.gc_utils import ( bigquery_table_exists, @@ -112,6 +114,9 @@ def __init__( transform_files_regex=transform_files_regex, ) + # @property + # + class PubMedTelescope(Workflow): DAG_ID_PREFIX = "pubmed" @@ -128,15 +133,13 @@ def __init__( *, dag_id: str, workflow_id: int, - start_date: str = pendulum.datetime(year=2022, month=12, day=4), - # data_interval_start: pendulum.DateTime = pendulum.datetime(2022, 12, 8), + start_date: str = pendulum.datetime(year=2022, month=12, day=1), # when to start catchup / collecting data. schedule_interval: str = "0 0 * * 0", # weekly catchup: bool = True, dataset_id: str = None, merge_partition_field: str = None, schema_folder: str = default_schema_folder(), dataset_type_id: str = None, - # queue: str = "default", table_id: str, ftp_server_url: str, check_md5_hash: bool, @@ -144,8 +147,9 @@ def __init__( dataset_description: str, source_format: str, airflow_vars: List[str] = None, - max_processes: int, + max_processes: int = 4, queue: str = "default", + batch_size: int = 4, **kwargs, ): """Construct an PubMed Telescope instance. @@ -197,13 +201,13 @@ def __init__( self.table_id = table_id self.full_table_id = f"{self.project_id}.{self.dataset_id}.{self.table_id}" self.source_format = source_format - self.schema_folder = default_schema_folder() # schema_folder + self.schema_folder = schema_folder # Workflow parameters self.workflow_id = workflow_id self.schedule_interval = schedule_interval - # PubMed settings + # PubMed processing parameters self.ftp_server_url = ftp_server_url # FTP server URL self.baseline_path = "/pubmed/baseline/" self.updatefiles_path = "/pubmed/updatefiles/" @@ -211,9 +215,14 @@ def __init__( self.check_md5_hash = check_md5_hash self.max_download_retry = max_download_retry self.max_processes = max_processes + self.batch_size = batch_size # If this is the first ever run of the telescope, download the "baseline" database and build on this. ## Use if first dag run later instead of this. + + # TODO: Make condition for when the new baseline is out - after the release date for it each year. + # if present day after the new baseline date then download baseline or + # this is the very first release for the telescope self.download_baseline = is_first_release(self.workflow_id) self.add_setup_task(self.check_dependencies) @@ -242,6 +251,7 @@ def make_release(self, **kwargs) -> PubMedRelease: self.data_interval_start = kwargs["data_interval_start"] self.data_interval_end = kwargs["data_interval_end"] self.first_release = is_first_release(self.workflow_id) + # Check for if the initial baseline table exists and what pubmed IDs are in he description??????????????????????????/ logging.info( f"Start data date: {self.data_interval_start} End date: {self.data_interval_end} First release: {self.first_release}" @@ -254,6 +264,8 @@ def make_release(self, **kwargs) -> PubMedRelease: release = PubMedRelease(dag_id=self.dag_id, release_id=self.release_id) + # TODO: Make this properties on the release. + self.workflow_folder = make_workflow_folder(self.dag_id, release_date) self.download_folder = make_workflow_folder(self.dag_id, release_date, SubFolder.downloaded.value) self.transform_folder = make_workflow_folder(self.dag_id, release_date, SubFolder.transformed.value) @@ -337,9 +349,8 @@ def download(self, release: PubMedRelease, **kwargs): """Download files from PubMed's FTP server for this release. Unable to do this in parallel because their FTP server is not able to handle too many requests. - """ - # TODO: Consider parallelising this whole section if files to download > 100 + """ # Grab list of files to download from xcom ti: TaskInstance = kwargs["ti"] @@ -348,17 +359,18 @@ def download(self, release: PubMedRelease, **kwargs): logging.info(f"Files to download from PubMed for this release ({len(files_to_download)}): {files_to_download}") # Open FTP connection - ftp_conn = ftplib.FTP(self.ftp_server_url, timeout=1000000.0) + ftp_conn = ftplib.FTP(self.ftp_server_url, timeout=1000000000.0) ftp_conn.login() # anonymous login (publicly available data) - # Having to do this in serial because their FTP server chucks errors when downloading in parallel. downloaded_files_for_release = [] # For testing - if len(files_to_download) < 5: - num_to_download = len(files_to_download) - else: - num_to_download = 5 + # if len(files_to_download) < 6: + # num_to_download = len(files_to_download) + # else: + # num_to_download = 6 + + num_to_download = len(files_to_download) # for file_on_ftp in files_to_download: for i in range(0, num_to_download, 1): # - for testing @@ -430,14 +442,14 @@ def download(self, release: PubMedRelease, **kwargs): ti.xcom_push(key="downloaded_files_for_release", value=downloaded_files_for_release) def upload_downloaded(self, release: PubMedRelease, **kwargs): - """Put all files into a tar ball and upload to GCS.""" + """Put all downloaded files onto GCS.""" # Pull list of transform files from xcom ti: TaskInstance = kwargs["ti"] - downloaded_files_for_release = ti.xcom_pull(key="downloaded_files_for_release") + downloaded_file_paths_for_release = ti.xcom_pull(key="downloaded_file_paths_for_release") uploaded_download_files = [] - for file_to_upload in downloaded_files_for_release: # upload tarball of release files + for file_to_upload in downloaded_file_paths_for_release: # upload tarball of release files file = file_to_upload.split("/")[-1] try: @@ -451,10 +463,8 @@ def upload_downloaded(self, release: PubMedRelease, **kwargs): check_blob_hash=False, ) - # ??? gcs uri ??? uploaded_download_files.append(file_to_upload) - # get gcs uri from the upload of the blob except: raise AirflowException(f"Unable to upload file: {file} to GCS bucket {self.download_bucket}") @@ -462,165 +472,106 @@ def upload_downloaded(self, release: PubMedRelease, **kwargs): ti.xcom_push(key="uploaded_download_files", value=uploaded_download_files) def transform(self, release: PubMedRelease, **kwargs): - """Transform the *.xml.gz files from the FTP server into usable jsonl like files. + """Transform the *.xml.gz files from the FTP server into usable jsonl files. This task pulls all of the PubmedArticle, PubmedBookArticle and BookDocument entires from the Pubmed files. - compiles all the additions and deletions from the update files into separate tables. - Loops through and checks if there are any deletions to apply before uploading them to Bigquery, to save cost. - - Matches on both the version of the publication and the PMID number. """ # Grab list of files downloaded from previous step. ti: TaskInstance = kwargs["ti"] downloaded_files_for_release = ti.xcom_pull(key="downloaded_files_for_release") - # List of objects to hold the data pulled from XML for additions, deletions, articles, bookarticles and bookdocuments. + # List of objects to hold metadata and file lists about how to pull the data from Pubmed. pubmed_transform_list = [ { "name": "pubmed_article", - "data": [], "data_type": "additions", "sub_key": "PubmedArticle", "set_key": "PubmedArticleSet", "pmid_key_loc": "MedlineCitation", - "output_file": os.path.join(self.transform_folder, f"pubmed_article_additions_{self.release_id}.jsonl"), + "output_file_base": f"pubmed_article_additions_{self.release_id}", + "transform_files": [], }, { "name": "pubmed_book_article", - "data": [], "data_type": "additions", "sub_key": "PubmedBookArticle", "set_key": "PubmedBookArticleSet", "pmid_key_loc": "MedlineCitation", - "output_file": os.path.join( - self.transform_folder, f"pubmed_book_article_additions_{self.release_id}.jsonl" - ), + "output_file_base": f"pubmed_book_article_additions_{self.release_id}", + "transform_files": [], }, { "name": "book_document", - "data": [], "data_type": "additions", "sub_key": "BookDocument", "set_key": "BookDocumentSet", "pmid_key_loc": "BookDocument", - "output_file": os.path.join(self.transform_folder, f"book_document_additions_{self.release_id}.jsonl"), + "output_file_base": f"book_document_additions_{self.release_id}", + "transform_files": [], }, { "name": "pubmed_article", - "data": [], "data_type": "deletions", "sub_key": "DeleteCitation", "set_key": None, - "output_file": os.path.join(self.transform_folder, f"pubmed_article_deletions_{self.release_id}.jsonl"), + "output_file_base": f"pubmed_article_deletions_{self.release_id}", + "transform_files": [], }, { "name": "book_document", - "data": [], "data_type": "deletions", "sub_key": "DeleteDocument", "set_key": None, - "output_file": os.path.join(self.transform_folder, f"book_document_deletions_{self.release_id}.jsonl"), + "output_file_base": f"book_document_deletions_{self.release_id}", + "transform_files": [], }, ] - # process_files["chunk_size"] = 2 # integer value for the number of files per chunk - # process_files["chunk_parts"] = "a" # list of pubmed files for this chunk - - # dictionary for a list of chunks to keep the number of files limited to < 4gb, to make parallelising easier and uploading to BQ faster. - file_chucks = {} - file_chucks["number_of_chunks"] = 4 # To be automatically calculated. - - # Loop through each of the PubMed database files and gather additions and deletions. - - # For testing - if len(downloaded_files_for_release) < 5: - num_to_transform = len(downloaded_files_for_release) - else: - num_to_transform = 5 - - # TODO: Parallelise this read in and pull out data step. + # TODO: Rewrite for chunks that are around ~4b each for baseline? + # do while size of pubmed_transformed_list less than 4gb ? + # else + # break loop and write to file? - # for file in downloaded_files_for_release: - for i in range(0, num_to_transform, 1): # - for testing - input_file = downloaded_files_for_release[i] # - for testing + # Process files in batches so that ProcessPoolExecutor doesn't deplete the system of memory + for i, chunk in enumerate(get_chunks(input_list=downloaded_files_for_release, chunk_size=self.batch_size)): + with ProcessPoolExecutor(max_workers=self.max_processes) as executor: + futures = [] - logging.info(f"Running through file - {input_file}") + logging.info(f"In chunk {i} and processing files: {chunk}") - with gzip.open(input_file, "rb") as f_in: - # Use the BioPython library for reading in the Pubmed XML files. - data_dict_dirty = Entrez.read(f_in, validate=True) - - # Need to have the XML attributes pulled out from the Biopython data classes. - data_dict = add_attributes_to_data_from_biopython_classes(data_dict_dirty) - - for i in range(len(pubmed_transform_list)): - pubmed_data = pubmed_transform_list[i] - - # Retrieve additions from file. - pubmed_data["data"] = pull_data_from_dict( - filename=input_file, - data_dict=data_dict, - prev_retrieved=pubmed_data["data"], - data_name=pubmed_data["data_type"], - sub_set=pubmed_data["sub_key"], - set_key=pubmed_data["set_key"], + # Create tasks for each file + for input_file in chunk: + future = executor.submit( + transform_pubmed_xml_file_to_jsonl, input_file, pubmed_transform_list, self.transform_folder ) - - pubmed_transform_list[i] = pubmed_data - - logging.info( - "Checking through the additions for this release to see if any deletions can be done before going onto BigQuery." - ) - - # TODO: Parallelse the deletion check step - # TODO: Make into a do loop?? - - # Check through for deletions for PubmedArticles and DeleteCitations - pubmed_transform_list[0]["data"], pubmed_transform_list[3]["data"] = check_for_deletions( - pubmed_transform_list[0]["sub_key"], - pubmed_transform_list[0]["pmid_key_loc"], - pubmed_transform_list[0]["data"], - pubmed_transform_list[3]["data"], - ) - - # Check through for deletions for PubmedBookArticles and DeleteCitations. - # TODO: PubmedBookArticles use the same DeleteCiation as the PubmedArticle - TO CHECK - pubmed_transform_list[1]["data"], pubmed_transform_list[3]["data"] = check_for_deletions( - pubmed_transform_list[1]["sub_key"], - pubmed_transform_list[1]["pmid_key_loc"], - pubmed_transform_list[1]["data"], - pubmed_transform_list[3]["data"], - ) - - # Check through for deletions for BookDocument and DeleteDocument. - pubmed_transform_list[2]["data"], pubmed_transform_list[4]["data"] = check_for_deletions( - pubmed_transform_list[2]["sub_key"], - pubmed_transform_list[2]["pmid_key_loc"], - pubmed_transform_list[2]["data"], - pubmed_transform_list[4]["data"], - ) - - # TODO: Write out files in only 4gb chunks + futures.append(future) + + # Gather list of files transformed by the above. + for future in as_completed(futures): + # There's probably a much better way of doing this + for i in range(len(pubmed_transform_list)): + tranformed = future.result() + for j in range(len(tranformed)): + if pubmed_transform_list[i]["name"] == tranformed[j]["name"]: + pubmed_transform_list[i]["transform_files"] = tranformed[j]["transform_files"] + + # Gather list of transformed files for GCS upload + transformed_files = [] for pubmed_data in pubmed_transform_list: - logging.info(f"Writing article additions to file - {pubmed_data['output_file']}") - - # Number of rows per chunk calc - # Remainder of rows goes into last file. - - with open(pubmed_data["output_file"], "wb") as f_out: - for line in pubmed_data["data"]: - f_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) + transformed_files.extend(pubmed_data["transform_files"]) - # List of all transformed files - to redo because of writing in chucks of 4gb - transformed_files_to_updload_to_gcs = [pubmed_data["output_file"] for pubmed_data in pubmed_transform_list] + logging.info(f"Tranfformed files: {transformed_files}") # Push list of transformed files into the Xcom for upload to GCS step. - ti.xcom_push(key="transformed_files_to_updload_to_gcs", value=transformed_files_to_updload_to_gcs) + ti.xcom_push(key="transformed_files_to_updload_to_gcs", value=transformed_files) + + # Push transform list metadata into the Xcom + ti.xcom_push(key="pubmed_transform_list", value=pubmed_transform_list) def upload_transformed(self, release: PubMedRelease, **kwargs): - """Upload the transformed and combined release files to GCS.""" + """Upload the transformed files to GCS for BQ import.""" # Pull list of transform files from xcom ti: TaskInstance = kwargs["ti"] @@ -669,6 +620,10 @@ def bq_create_snapshot(self, release: PubMedRelease, **kwargs): # Apply additions with gcloud command from the additions file on GCS as this is the initial table + # Use pubmed metadata object to loop through uploading tables for the release. + + # for pubmed_data in pubmed_workflow_data: + with open(self.deletions_file, "r", encoding="utf8") as f_del_out: logging.info("Openning del file.") # read both add and del file @@ -751,6 +706,75 @@ def cleanup(self, release: PubMedRelease, **kwargs): ## remove task instances and xcoms used +def download_pubmed_xml_file( + file_on_ftp: str, check_md5_hash: bool, download_folder: str, max_download_retry: int, ftp_server_url: str, ftp_conn +) -> str: + """To download a single file from Pubmed's FTP server. + Option to check the md5 hash as well. + :param file: Path of the file on the FTP server to download. + :param check_md5_hash: + :param download_folder: + :return downloaded_file: Absolute path to the downloaded file. + """ + + # This split is OK as it is pulling it from the FTP server path. + file = file_on_ftp.split("/")[-1] + + # Save file to correct download path for the workflow + file_download_location = os.path.join(download_folder, file) + + # Check if the hash is the same for the downloaded vs one on the web. + if check_md5_hash: + download_attemp_count = 1 + download_success = False + while download_attemp_count <= max_download_retry and not download_success: + logging.info(f"Downloading: {file} Attempt: {download_attemp_count}") + try: + # Download file + with open(file_download_location, "wb") as f: + ftp_conn.retrbinary(f"RETR {file_on_ftp}", f.write) + except: + logging.info(f"Unable to download {file_on_ftp} from PubMed's FTP server {ftp_server_url}.") + + # Create the hash from the above downloaded file. + with open(file_download_location, "rb") as f_in: + data = f_in.read() + md5hash_from_download = hashlib.md5(data).hexdigest() + + # Download corresponding md5 hash. + with open(f"{file_download_location}.md5", "wb") as f: + ftp_conn.retrbinary(f"RETR {file_on_ftp}.md5", f.write) + + # Peep into md5 file. + with open(f"{file_download_location}.md5", "r") as f_md5: + md5_from_pubmed_ftp = f_md5.read() + + # If md5 does not match, raise an Airflow exception. + if md5hash_from_download in md5_from_pubmed_ftp: + download_success = True + return file_download_location + else: + logging.info(f"MD5 hash does not match the given MD5 checksum from server: {file}") + + download_attemp_count += 1 + + if not download_success: + raise AirflowException( + f"Unable to download {file_on_ftp} from PubMed's FTP server {ftp_server_url} after {max_download_retry} retries" + ) + + else: + logging.info(f"Downloading: {file}") + try: + # Download file + with open(file_download_location, "wb") as f: + ftp_conn.retrbinary(f"RETR {file_on_ftp}", f.write) + except: + raise AirflowException(f"Unable to download {file_on_ftp} from PubMed's FTP server {ftp_server_url}") + + return file_download_location + + # Move this to a utils file later def run_subprocess_cmd(proc: Popen, args: list): """Execute and wait for subprocess to finish, also handle stdout & stderr from process. @@ -785,7 +809,7 @@ def convert(k: str) -> str: # Replace other special characters (except '_') in remaining string: k = re.sub(r"\W+", "_", k) - # Incase the key are only special characters. Need to be something for BQ to injest. + # Incase the key was only special characters. Need to be something for BQ to injest. if len(k) == 0: k = "value" @@ -842,25 +866,80 @@ def check_for_deletions(sub_set: str, pmid_key_loc: str, additions: List[Dict], def pull_data_from_dict( - filename: str, data_dict: Dict, prev_retrieved: List[Dict], data_name: str, sub_set: str, set_key: str = None + filename: str, data_dict: Dict, data_name: str, sub_set: str, set_key: str = None ) -> List[Dict]: """Attempt to pull a subset of data from the dictionary injested from the Pubmed XML files.""" try: logging.info(f"Extracting {sub_set} records from file - {filename}") + # Try and catch for the cases where there are multiple e.g. Articles and BookArticles in the same dictionary. + try: retrieved = [retrieve for retrieve in data_dict[set_key][sub_set]] except: retrieved = [retrieve for retrieve in data_dict[sub_set]] - prev_retrieved.extend(retrieved) logging.info(f"Pulled out {len(retrieved)} {sub_set} {data_name} from file - {filename}") except: logging.info(f"No {sub_set} {data_name} in file - {filename}") + retrieved = [] + + return retrieved + - return prev_retrieved +def transform_pubmed_xml_file_to_jsonl( + input_file: str, pubmed_transform_list: List[Dict], transform_folder: str +) -> List[Dict]: + """Convert a single Pubmed XML file to JSONL, pulling out any of the PubmedArticle, + PubmedBookArticle, BookDocument, DeleteCitation, DeleteDocument + + :param input_file: Absolute path to the Pubmed xml.gz file. + :param download_folder: Absolute path to the download folder set for the release. + :param pubmed_transform_list: Dictionary object with details of how to pull out the Pubmed data. + """ + + pubmed_file_id = Path(input_file).name.split(".")[0] + + logging.info(f"Reading in file to memory - {input_file}") + + with gzip.open(input_file, "rb") as f_in: + # Use the BioPython library for reading in the Pubmed XML files. + data_dict_dirty = Entrez.read(f_in, validate=True) + + # Need to have the XML attributes pulled out from the Biopython data classes. + data_dict = add_attributes_to_data_from_biopython_classes(data_dict_dirty) + + logging.info(f"Pulling out data from file - {input_file}") + + for i in range(len(pubmed_transform_list)): + pubmed_data = pubmed_transform_list[i] + + # Pull out each of the citation additions and deletions from the data. + data_part = pull_data_from_dict( + filename=input_file, + data_dict=data_dict, + data_name=pubmed_data["data_type"], + sub_set=pubmed_data["sub_key"], + set_key=pubmed_data["set_key"], + ) + + output_file = os.path.join(transform_folder, pubmed_data["output_file_base"] + "_" + pubmed_file_id + ".jsonl") + name = pubmed_data["name"] + data_type = pubmed_data["data_type"] + + logging.info(f"Writing {name} {data_type} to file - {output_file}") + + with open(output_file, "wb") as f_out: + for line in data_part: + f_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) + + pubmed_data["transform_files"].append(output_file) + + pubmed_transform_list[i] = pubmed_data + + return pubmed_transform_list # TODO: Clean up this function if possible. @@ -934,7 +1013,7 @@ def _transform_obj_data(self, obj): for k, v in list(obj.items()): # TODO: Change this to an input of a List or Set for multiple fields if needed - hard coding bad. if k in [ - "AbstractText", + "AbstractText", # ? also has attributes "Affiliation", "ArticleTitle", # ? also has attributes "b", @@ -947,13 +1026,13 @@ def _transform_obj_data(self, obj): # "Keyword", "Param", # ? also has attributes "PublisherName", - "SectionTitle", + "SectionTitle", # ? also has attributes "sub", "Suffix", "sup", "u", - "VernacularTitle", - "VolumeTitle", + "VernacularTitle", # ? also has attributes + "VolumeTitle", # ? also has attributes ]: new[k] = str(v) else: @@ -980,194 +1059,3 @@ def _transform_obj_data(self, obj): def encode(self, obj): transformed_obj = self._transform_obj_data(obj) return super(CustomEncoder, self).encode(transformed_obj) - - -################################################################################################ - -# # Working on additions file -# logging.info(f"Removing bad characters from keys from additions file data.") - -# # Give each thread a chunk of the work to do. -# chunk_size = len(additions_with_dels_checked) // self.max_processes -# chunks = [additions_with_dels_checked[i : i + chunk_size] for i in range(0, len(additions_with_dels_checked), chunk_size)] - -# additions_cleaned_fieldnames = [] -# with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_processes) as executor: -# # send off chunks to each of the threads -# futures = [ executor.submit(rename_bad_keys_in_dict_list, chunk) for chunk in chunks] - -# for future in concurrent.futures.as_completed(futures): -# fields_renamed = future.result() -# additions_cleaned_fieldnames.extend(fields_renamed) - -# # Working on deletions file -# logging.info(f"Removing bad characters from keys from deletions file data.") - -# # Give each thread a chunk of the work to do. -# chunk_size = len(deletions_with_dels_checked) // self.max_processes -# chunks = [deletions_with_dels_checked[i : i + chunk_size] for i in range(0, len(deletions_with_dels_checked), chunk_size)] - -# deletions_cleaned_fieldnames = [] -# with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_processes) as executor: -# # send off chunks to each of the threads -# futures = [ executor.submit(rename_bad_keys_in_dict_list, chunk) for chunk in chunks] - -# for future in concurrent.futures.as_completed(futures): -# fields_renamed = future.result() -# deletions_cleaned_fieldnames.extend(fields_renamed) - -################################################################################################ - - -# Not used because FTP server can't download multiple files using multiple threads. -# def download_file_from_ftp(files_to_download: List[str], ftp_server_url: str, check_md5_hash: bool) -> List[str]: -# """Function for allowing parallisation of downloading the PubMed files""" - -# # Open FTP connection -# ftp_conn = ftplib.FTP(ftp_server_url, timeout=1000000.0) -# ftp_conn.login() # anonymous login (publicly available data) - -# downloaded_files = [] -# for file_on_ftp in files_to_download: -# file = file_on_ftp.split("/")[-1] - -# logging.info(f"Downloading: {file}") - -# try: -# # Download file -# with open(file, "wb") as f: -# ftp_conn.retrbinary(f"RETR {file_on_ftp}", f.write) -# except: -# raise AirflowException(f"Unable to download {file_on_ftp} from PubMed's FTP server {ftp_server_url}") - -# if check_md5_hash: -# # Create the hash from the above downloaded file. -# with open(file, "rb") as f_in: -# data = f_in.read() -# md5hash_from_download = hashlib.md5(data).hexdigest() - -# # Download corresponding md5 hash. -# with open(f"{file}.md5", "wb") as f: -# ftp_conn.retrbinary(f"RETR {file_on_ftp}.md5", f.write) - -# # Peep into md5 file. -# with open(f"{file}.md5", "r") as f_md5: -# md5_from_pubmed_ftp = f_md5.read() - -# # If md5 does not match, raise an Airflow exception. -# if md5hash_from_download in md5_from_pubmed_ftp: -# downloaded_files.append(file) -# else: -# raise AirflowException(f"MD5 hash does not match the given MD5 checksum from server: {file}") -# else: -# downloaded_files.append(file) - -# # Close the FTP connection to prevent errors. -# ftp_conn.close() - -# return downloaded_files - -# # Download in parallel -# downloaded_files_for_release = [] -# # Give each thread a chunck of the download list -# chunk_size = len(files_to_download) // self.max_processes -# chunks = [files_to_download[i : i + chunk_size] for i in range(0, len(files_to_download), chunk_size)] - -# with concurrent.futures.ThreadPoolExecutor(max_workers=12) as executor: -# # send off each chunk of the download list to each of the threads -# futures = [ -# executor.submit(download_file_from_ftp, chunk, self.ftp_server_url, self.check_md5_hash) -# for chunk in chunks -# ] - -# # Wait for all futures to complete -# concurrent.futures.wait(futures) - -# # Put the successfully downloaded list of files together after everything is complete. -# for future in concurrent.futures.as_completed(futures): -# chunk_downloaded_files = future.result() -# downloaded_files_for_release.extend(chunk_downloaded_files) - -# for file_on_ftp in files_to_download: -# futures.append( -# executor.submit( -# download_file_from_ftp, ftp_conn, file_on_ftp, self.ftp_server_url, self.check_md5_hash -# ) -# ) -# for future in concurrent.futures.as_completed(futures): -# file = future.result() -# downloaded_files_for_release.append(file) - -# Download files in parallel - from dumb chatGPT -# logging.info(f"Downloading files in parallel with {self.max_processes}") -# downloaded_files_for_release = [] -# with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_processes) as executor: -# futures = [] -# for file_on_ftp in files_to_download: -# futures.append( -# executor.submit( -# download_file_from_ftp, ftp_conn, file_on_ftp, self.ftp_server_url, self.check_md5_hash -# ) -# ) -# for future in concurrent.futures.as_completed(futures): -# file = future.result() -# downloaded_files_for_release.append(file) - - -### OLD BAD - DOESN'T WORK ### - -# def rename_bad_keys_in_dict_list(obj_list_input: List[Dict]) -> List[Dict]: -# """Remove bad characters from a keys in a dictionary for Bigquery import.""" - -# obj_list_output = [] -# for obj in obj_list_input: -# obj_list_output.append(change_keys(obj, convert)) - -# return obj_list_output - -# def update_dict_with_missing_keys(input_dict, schema_dict): -# if isinstance(schema_dict, dict): -# if not input_dict: -# input_dict = schema_dict -# else: -# for key in schema_dict.keys(): -# # print(f"key: {key} in schema_dict") - -# if key not in input_dict: -# # Push schema branch onto data branch so that format is the same -# input_dict[key] = schema_dict[key] -# else: -# update_dict_with_missing_keys(input_dict[key], schema_dict[key]) - -# elif isinstance(schema_dict, list): -# if len(input_dict): -# print("Help: ", schema_dict, input_dict) -# for i in range(len(input_dict)): -# update_dict_with_missing_keys(input_dict[i], schema_dict[0]) -# else: -# input_dict = schema_dict -# # logging.info(f"Empty list", schema_dict) - -# def fix_xml_structure(path, key, value): -# #print(f"In postprocessing function with key: {key}") - -# #if isinstance(value, str) or isinstance(value, OrderedDict):. -# if isinstance(value, str): -# value = [value] - -# # All possible lists in pubmed -# list_to_change = ['QualifierName', 'DescriptorName', "Grant", "PublicationType", "Author", "PubMedPubDate", "ArticleIdList", "Reference"] -# if isinstance(value, dict) and key in list_to_change: -# value = [value] - -# return key, value - -# The optional argument `postprocessor` is a function that takes `path`, -# `key` and `value` as positional arguments and returns a new `(key, value)` -# pair where both `key` and `value` may have changed. Usage example:: - -# >>> def postprocessor(path, key, value): -# ... try: -# ... return key + ':int', int(value) -# ... except (ValueError, TypeError): -# ... return key, value