diff --git a/academic_observatory_workflows/database/schema/pubmed_book_articles_2023-01-01.json b/academic_observatory_workflows/database/schema/pubmed_book_articles_2023-01-01.json new file mode 100644 index 000000000..082ec51ae --- /dev/null +++ b/academic_observatory_workflows/database/schema/pubmed_book_articles_2023-01-01.json @@ -0,0 +1,1220 @@ +[ + { + "name": "PubmedBookData", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "History", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "PubMedPubDate", + "mode": "REPEATED", + "type": "RECORD", + + "fields": [ + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Hour", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Minute", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "PubStatus", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "PublicationStatus", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "ObjectList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Object", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Type", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Param", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Name", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + } + ] + } + ] + }, + { + "name": "BookDocument", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "PMID", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Version", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Book", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Publisher", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "PublisherName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "PublisherLocation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "BookTitle", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Publisher", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "PublisherName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "PublisherLocation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "PubDate", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Season", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "MedlineDate", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "BeginningDate", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Season", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "EndingDate", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Season", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "AuthorList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "CompleteYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Type", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Author", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "CollectiveName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Suffix", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Initials", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ForeName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "LastName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "EqualContrib", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "AffiliationInfo", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Affiliation", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "ValidYN", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "InvestigatorList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Investigator", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Suffix", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Initials", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ForeName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "LastName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "AffiliationInfo", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Affiliation", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "ValidYN", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Volume", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "VolumeTitle", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Edition", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "CollectionTitle", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Isbn", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ELocationID", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ValidYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "EIdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "Medium", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ReportNumber", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "LocationLabel", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Type", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "ArticleTitle", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "VernacularTitle", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Pagination", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "StartPage", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "EndPage", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "MedlinePgn", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "Language", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "AuthorList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "CompleteYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Type", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Author", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "CollectiveName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Suffix", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Initials", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ForeName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "LastName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "EqualContrib", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "AffiliationInfo", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Affiliation", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "ValidYN", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "InvestigatorList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Investigator", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Suffix", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Initials", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ForeName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "LastName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "AffiliationInfo", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Affiliation", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "ValidYN", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "PublicationType", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "UI", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "Abstract", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "CopyrightInformation", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "AbstractText", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "Sections", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Section", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "LocationLabel", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Type", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "SectionTitle", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Section", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "LocationLabel", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Type", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "SectionTitle", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + } + ] + }, + { + "name": "KeywordList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Owner", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Keyword", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "MajorTopicYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "ContributionDate", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Season", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "DateRevised", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "GrantList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "CompleteYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Grant", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "GrantID", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Acronym", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Agency", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Country", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "ItemList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ListType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Item", + "mode": "REPEATED", + "type": "STRING" + } + ] + }, + { + "name": "ReferenceList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Title", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Reference", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Citation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ReferenceList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Title", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Reference", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Citation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ReferenceList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Title", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Reference", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Citation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ReferenceList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Title", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Reference", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Citation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ReferenceList", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + } + ] + } + ] + } + ] + } +] diff --git a/academic_observatory_workflows/workflows/pubmed_telescope.py b/academic_observatory_workflows/workflows/pubmed_telescope.py index eb0a50c1a..f4b49ec5b 100644 --- a/academic_observatory_workflows/workflows/pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/pubmed_telescope.py @@ -48,6 +48,8 @@ # import concurrent.futures +# To determine the size of an object in memory +from sys import getsizeof # Airflow modules from airflow import AirflowException @@ -150,6 +152,8 @@ def __init__( max_processes: int = 4, queue: str = "default", batch_size: int = 4, + reset_ftp_counter: int = 20, + transform_file_size: int = 1, # in Gigabytes **kwargs, ): """Construct an PubMed Telescope instance. @@ -217,6 +221,13 @@ def __init__( self.max_processes = max_processes self.batch_size = batch_size + # Rough size of what the output file size should be for the merged files. + # 1 Gb = 1024 Megabytes = 1024^2 Kilobytes = 1024^3 Bytes = 1073741824 Bytes + self.transform_file_size = transform_file_size + + # How often to reset the ftp connection to Pubmed's server. + self.reset_ftp_counter = reset_ftp_counter + # If this is the first ever run of the telescope, download the "baseline" database and build on this. ## Use if first dag run later instead of this. @@ -228,7 +239,7 @@ def __init__( self.add_setup_task(self.check_dependencies) self.add_task(self.check_releases) # check releases and get list of files to download self.add_task(self.download) # download the xml files from the FTP server, shove into gzip - # self.add_task(self.upload_downloaded) # upload raw files from pubmed servers for specific releases + self.add_task(self.upload_downloaded) # upload raw files from pubmed servers for specific releases self.add_task(self.transform) self.add_task(self.upload_transformed) # upload additions and deletions files from transform/cleaning step @@ -349,7 +360,6 @@ def download(self, release: PubMedRelease, **kwargs): """Download files from PubMed's FTP server for this release. Unable to do this in parallel because their FTP server is not able to handle too many requests. - """ # Grab list of files to download from xcom @@ -365,17 +375,33 @@ def download(self, release: PubMedRelease, **kwargs): downloaded_files_for_release = [] # For testing - # if len(files_to_download) < 6: - # num_to_download = len(files_to_download) - # else: - # num_to_download = 6 + if len(files_to_download) < 300: + num_to_download = len(files_to_download) + else: + num_to_download = 300 - num_to_download = len(files_to_download) + # num_to_download = len(files_to_download) # for file_on_ftp in files_to_download: for i in range(0, num_to_download, 1): # - for testing file_on_ftp = files_to_download[i] + # The FTP server disconnects the connection for the download after some time. + # Need to have it refreshed every so often so we can reliably download it. + + # Every self.reset_ftp_counter number of files that are downloaded, reinitialise the FTP connection. + if i % self.reset_ftp_counter == 0: + # Close the FTP connection + ftp_conn.close() + + # Open a new FTP connection + ftp_conn = ftplib.FTP(self.ftp_server_url, timeout=1000000000.0) + ftp_conn.login() + + logging.info( + f"FTP connection to Pubmed's servers has been reset after {self.reset_ftp_counter} to avoid issues." + ) + file = file_on_ftp.split("/")[-1] # Save file to correct download path for the workflow @@ -400,9 +426,15 @@ def download(self, release: PubMedRelease, **kwargs): data = f_in.read() md5hash_from_download = hashlib.md5(data).hexdigest() - # Download corresponding md5 hash. - with open(f"{file_download_location}.md5", "wb") as f: - ftp_conn.retrbinary(f"RETR {file_on_ftp}.md5", f.write) + # Need to have a download catch for the hash file as well, otherwise it can break the download loop. + try: + # Download corresponding md5 hash. + with open(f"{file_download_location}.md5", "wb") as f: + ftp_conn.retrbinary(f"RETR {file_on_ftp}.md5", f.write) + except: + logging.info( + f"Unable to download {file_on_ftp}.md5 from PubMed's FTP server {self.ftp_server_url}." + ) # Peep into md5 file. with open(f"{file_download_location}.md5", "r") as f_md5: @@ -446,10 +478,10 @@ def upload_downloaded(self, release: PubMedRelease, **kwargs): # Pull list of transform files from xcom ti: TaskInstance = kwargs["ti"] - downloaded_file_paths_for_release = ti.xcom_pull(key="downloaded_file_paths_for_release") + downloaded_files_for_release = ti.xcom_pull(key="downloaded_files_for_release") uploaded_download_files = [] - for file_to_upload in downloaded_file_paths_for_release: # upload tarball of release files + for file_to_upload in downloaded_files_for_release: file = file_to_upload.split("/")[-1] try: @@ -487,53 +519,58 @@ def transform(self, release: PubMedRelease, **kwargs): { "name": "pubmed_article", "data_type": "additions", + "data": [], "sub_key": "PubmedArticle", "set_key": "PubmedArticleSet", "pmid_key_loc": "MedlineCitation", "output_file_base": f"pubmed_article_additions_{self.release_id}", "transform_files": [], + "merged_transform_files": [], }, { "name": "pubmed_book_article", "data_type": "additions", + "data": [], "sub_key": "PubmedBookArticle", "set_key": "PubmedBookArticleSet", "pmid_key_loc": "MedlineCitation", "output_file_base": f"pubmed_book_article_additions_{self.release_id}", "transform_files": [], + "merged_transform_files": [], }, { "name": "book_document", "data_type": "additions", + "data": [], "sub_key": "BookDocument", "set_key": "BookDocumentSet", "pmid_key_loc": "BookDocument", "output_file_base": f"book_document_additions_{self.release_id}", "transform_files": [], + "merged_transform_files": [], }, { "name": "pubmed_article", "data_type": "deletions", + "data": [], "sub_key": "DeleteCitation", "set_key": None, "output_file_base": f"pubmed_article_deletions_{self.release_id}", "transform_files": [], + "merged_transform_files": [], }, { "name": "book_document", "data_type": "deletions", + "data": [], "sub_key": "DeleteDocument", "set_key": None, "output_file_base": f"book_document_deletions_{self.release_id}", "transform_files": [], + "merged_transform_files": [], }, ] - # TODO: Rewrite for chunks that are around ~4b each for baseline? - # do while size of pubmed_transformed_list less than 4gb ? - # else - # break loop and write to file? - # Process files in batches so that ProcessPoolExecutor doesn't deplete the system of memory for i, chunk in enumerate(get_chunks(input_list=downloaded_files_for_release, chunk_size=self.batch_size)): with ProcessPoolExecutor(max_workers=self.max_processes) as executor: @@ -564,12 +601,59 @@ def transform(self, release: PubMedRelease, **kwargs): logging.info(f"Tranfformed files: {transformed_files}") - # Push list of transformed files into the Xcom for upload to GCS step. - ti.xcom_push(key="transformed_files_to_updload_to_gcs", value=transformed_files) - # Push transform list metadata into the Xcom ti.xcom_push(key="pubmed_transform_list", value=pubmed_transform_list) + def merge_transform_files(self, release: PubMedRelease, **kwargs): + """Merge the transformed Pubmed files into appropriately sized chunks for upload GCS and import to BQ.""" + + # Pull list of transform files from xcom + ti: TaskInstance = kwargs["ti"] + pubmed_transform_list = ti.xcom_pull(key="pubmed_transform_list") + + logging.info("Merging Pubmed transform files into {self.transform_file_size} Mb sized files.") + + # Loop through each record type of Pubmed (Article, BookArticle, BookDocument, DeleteCitation, DeleteDocument) + for pubmed_data in pubmed_transform_list: + # TODO: Parallelise this section. + + # Initialise list + merged_files = [] + + # Get first pubmed file id to be read in. + last_pubmed_file_id = pull_pubmed_file_id(pubmed_data["transform_files"][0]) + + data = [] # private to the thread that it's dealing with. + for file in pubmed_data["transform_files"]: + # Open file in memory + with open(file, "r") as f_in: + # TODO: Use the proper coding for this. + new_data = json.read(f_in) + + # Append it onto existing data object. + data.extend(new_data) + + # If the size of the data is greater than this, write out to file and clear variable from memeory. + if getsizeof(data) >= self.transform_file_size: + # To make a aptly named file name for the output. + current_pubmed_file_id = pull_pubmed_file_id(file) + + # Make proper name for + merged_file = os.path.join(self.transform_folder, f"") + + # Write out to file + with open(merged_file, "wb") as f_out: + for line in data: + f_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) + + # Clear object in memeory + data = [] + + # Save file name into memory + merged_files.append(merged_file) + + # Export to TI for uploading the data for each of the record types for pubmed. + def upload_transformed(self, release: PubMedRelease, **kwargs): """Upload the transformed files to GCS for BQ import.""" @@ -775,6 +859,18 @@ def download_pubmed_xml_file( return file_download_location +def pull_pubmed_file_id(absolute_file_path: str) -> str: + """Simple function to remove the Pubmed file number/ID from the path of the file.""" + + try: + # TODO: Add a regex to this to just pull out what is needed. + pubmed_file_id = Path(absolute_file_path).name.split(".")[0] + except: + raise AirflowException(f"Unable to get Pubmed file ID from the following: {absolute_file_path}") + + return pubmed_file_id + + # Move this to a utils file later def run_subprocess_cmd(proc: Popen, args: list): """Execute and wait for subprocess to finish, also handle stdout & stderr from process. diff --git a/docs/telescopes/pubmed.md b/docs/telescopes/pubmed.md new file mode 100644 index 000000000..ec139db0d --- /dev/null +++ b/docs/telescopes/pubmed.md @@ -0,0 +1,42 @@ +# Pubmed + +(((BIG DRAFT))) + +## Download +### Downloads from FTP server. + +If baseline is required (first release or other), it will combine it with the updatefiles necessary for the snapshot period (data_interval_start - data_interval_end). + +Resets the connection every 20-50 files downloaded, otherwise the connection times out or the FTP server rejects the request to get the file. + +## Tranform +### Processing XML files with Biopython + +Reads in the xml.gz files and does a validation step against the *.dtd file mentioned in the XML file itself. + +Schema + +As of 2023, Pubmed's schema is presented here as a *.dtd file: + +(link for the schema file) + +Which is an older format meant for importing into other database SQL systems. + +For importing the data into bigquery, the schema was transformed into a *.xsd file using InteliJ (other weird program) and included the required math library files for it. + +The XSD schema is more human readable than the orginal, and was used to form the schemas for bigquery. + +Pubmed holds 5 main types of data: + +Pubmed Articles +Pubmed Book Articles +Book Documents +DeleteCitation +DeleteDocument + +Each of these data types are pulled out of the XML and writting to a .jsonl file for easy importing to bigquery. + +### Problematic text fields + +The AbstractText field cna hold either the entire abstract for a citattion or a separated version of it, having Background, Method, etc. +To simplify the schema and to ensure that the data for the field is readin reliably, the \ No newline at end of file