diff --git a/academic_observatory_workflows/database/schema/pubmed_articles_2023-01-01.json b/academic_observatory_workflows/database/schema/pubmed_articles_2023-01-01.json new file mode 100644 index 00000000..f4cded8d --- /dev/null +++ b/academic_observatory_workflows/database/schema/pubmed_articles_2023-01-01.json @@ -0,0 +1,1120 @@ +[ + { + "name": "PubmedData", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "History", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "PubMedPubDate", + "mode": "REPEATED", + "type": "RECORD", + + "fields": [ + { + "name": "Minute", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Hour", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "PubStatus", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "PublicationStatus", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ReferenceList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Title", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Reference", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Citation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ReferenceList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Title", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Reference", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Citation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ReferenceList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Title", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Reference", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Citation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ReferenceList", + "mode": "REPEATED", + "type": "STRING" + } + ] + } + ] + } + ] + } + ] + }, + { + "name": "MedlineCitation", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "NumberOfReferences", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "PersonalNameSubjectList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "PersonalNameSubject", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Initials", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ForeName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "LastName", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "SupplMeshList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "SupplMeshName", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "UI", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Type", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "IndexingMethod", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ChemicalList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Chemical", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "NameOfSubstance", + "mode": "NULLABLE", + "type": "RECORD", + + "fields": [ + { + "name": "UI", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "RegistryNumber", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "VersionID", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "CommentsCorrectionsList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "CommentsCorrections", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "PMID", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Version", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "RefSource", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "RefType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Note", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "MedlineJournalInfo", + "mode": "NULLABLE", + "type": "RECORD", + + "fields": [ + { + "name": "NlmUniqueID", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ISSNLinking", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "MedlineTA", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Country", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "Article", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "VernacularTitle", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "DataBankList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "CompleteYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "DataBank", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "AccessionNumberList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "AccessionNumber", + "mode": "REPEATED", + "type": "STRING" + } + ] + }, + { + "name": "DataBankName", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "PublicationTypeList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "PublicationType", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "UI", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "AuthorList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "CompleteYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Author", + "mode": "REPEATED", + "type": "RECORD", + + "fields": [ + { + "name": "CollectiveName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Suffix", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Initials", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ForeName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "LastName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "EqualContrib", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "AffiliationInfo", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Affiliation", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "ValidYN", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "ArticleDate", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "DateType", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "Pagination", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "MedlinePgn", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "PubModel", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "ArticleTitle", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "GrantList", + "mode": "NULLABLE", + "type": "RECORD", + + "fields": [ + { + "name": "CompleteYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Grant", + "mode": "REPEATED", + "type": "RECORD", + + "fields": [ + { + "name": "Agency", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Acronym", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Country", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "GrantID", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Journal", + "mode": "NULLABLE", + "type": "RECORD", + + "fields": [ + { + "name": "ISOAbbreviation", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Title", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "JournalIssue", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "PubDate", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Season", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "MedlineDate", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "Volume", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Issue", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "CitedMedium", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "ISSN", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "IssnType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Abstract", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "CopyrightInformation", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "AbstractText", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "Language", + "mode": "REPEATED", + "type": "STRING" + }, + { + "name": "ELocationID", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ValidYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "EIdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "KeywordList", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Owner", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Keyword", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "MajorTopicYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "SpaceFlightMission", + "mode": "REPEATED", + "type": "STRING" + }, + { + "name": "DateCompleted", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "CoiStatement", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "DateRevised", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Day", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Month", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "Year", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "PMID", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Version", + "mode": "NULLABLE", + "type": "INTEGER" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "INTEGER" + } + ] + }, + { + "name": "VersionDate", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "GeneralNote", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Owner", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "Owner", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "CitationSubset", + "mode": "REPEATED", + "type": "STRING" + }, + { + "name": "InvestigatorList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Investigator", + "mode": "REPEATED", + "type": "RECORD", + + "fields": [ + { + "name": "Suffix", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Initials", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "STRING" + }, + { + "name": "ForeName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "LastName", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "AffiliationInfo", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Affiliation", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Identifier", + "mode": "REPEATED", + "type": "STRING" + } + ] + }, + { + "name": "ValidYN", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "MeshHeadingList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "MeshHeading", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "DescriptorName", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "Type", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "UI", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "MajorTopicYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "QualifierName", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "UI", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "MajorTopicYN", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + } + ] + }, + { + "name": "OtherAbstract", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "AbstractText", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Language", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Type", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "OtherID", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Source", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + }, + { + "name": "Status", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } +] diff --git a/academic_observatory_workflows/workflows/pubmed_telescope.py b/academic_observatory_workflows/workflows/pubmed_telescope.py index f45bfc55..10278787 100644 --- a/academic_observatory_workflows/workflows/pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/pubmed_telescope.py @@ -32,7 +32,14 @@ # For Injesting the XML using Biopython library. from Bio import Entrez -from Bio.Entrez.Parser import StringElement, ListElement, DictionaryElement, OrderedListElement, IntegerElement, NoneElement +from Bio.Entrez.Parser import ( + StringElement, + ListElement, + DictionaryElement, + OrderedListElement, + IntegerElement, + NoneElement, +) # Alternative method for injest import xmlschema @@ -80,10 +87,6 @@ from observatory.platform.utils.workflow_utils import ( SubFolder, make_release_date, - change_keys, - convert, - table_ids_from_path, - delete_old_xcoms, ) @@ -195,13 +198,11 @@ def __init__( self.table_id = table_id self.full_table_id = f"{self.project_id}.{self.dataset_id}.{self.table_id}" self.source_format = source_format - self.schema_folder = schema_folder + self.schema_folder = default_schema_folder() # schema_folder # Workflow parameters self.workflow_id = workflow_id self.schedule_interval = schedule_interval - self.schema_file_path = os.path.join(self.schema_folder) - self.schema_skeleton_path = os.path.join(self.schema_folder, "pubmed_structure_2023-03-22.json") # PubMed settings self.ftp_server_url = ftp_server_url # FTP server URL @@ -219,9 +220,9 @@ def __init__( self.add_setup_task(self.check_dependencies) self.add_task(self.check_releases) # check releases and get list of files to download self.add_task(self.download) # download the xml files from the FTP server, shove into gzip - self.add_task(self.upload_downloaded) # upload raw files from pubmed servers for specific releases + # self.add_task(self.upload_downloaded) # upload raw files from pubmed servers for specific releases self.add_task(self.transform) - # self.add_task(self.upload_transformed) # upload additions and deletions files from transform/cleaning step + self.add_task(self.upload_transformed) # upload additions and deletions files from transform/cleaning step # self.add_task(transform) # convert xml files into *.jsonl.gz, validate if neccesary using their API @@ -378,7 +379,7 @@ def download(self, release: PubMedRelease, **kwargs): # for file_on_ftp in files_to_download: # For testing - for i in range(0, 1, 1): # - for testing + for i in range(0, 4, 1): # - for testing file_on_ftp = files_to_download[i] file = file_on_ftp.split("/")[-1] @@ -418,15 +419,15 @@ def download(self, release: PubMedRelease, **kwargs): download_success = True downloaded_files_for_release.append(file_download_location) else: - raise logging.info(f"MD5 hash does not match the given MD5 checksum from server: {file}") + logging.info(f"MD5 hash does not match the given MD5 checksum from server: {file}") + + download_attemp_count += 1 if not download_success: raise AirflowException( f"Unable to download {file_on_ftp} from PubMed's FTP server {self.ftp_server_url} after {self.max_download_retry}" ) - download_attemp_count += 1 - else: logging.info(f"Downloading: {file}") try: @@ -465,7 +466,7 @@ def upload_downloaded(self, release: PubMedRelease, **kwargs): try: # TODO: how to get proper path of the GCS blobs? blob_name = f"telescopes/{self.dag_id}/{self.release_id}/{file}" - logging.info(f"Uplaoding file {file} to GCS bucket {self.download_bucket} and {blob_name}") + logging.info(f"Uploading file {file} to GCS bucket {self.download_bucket} and {blob_name}") success = upload_file_to_cloud_storage( bucket_name=self.download_bucket, blob_name=blob_name, @@ -473,6 +474,9 @@ def upload_downloaded(self, release: PubMedRelease, **kwargs): check_blob_hash=False, ) + # ??? gcs uri ??? + uploaded_download_files.append(file_to_upload) + # get gcs uri from the upload of the blob except: raise AirflowException(f"Unable to upload file: {file} to GCS bucket {self.download_bucket}") @@ -483,12 +487,12 @@ def upload_downloaded(self, release: PubMedRelease, **kwargs): def transform(self, release: PubMedRelease, **kwargs): """Transform the *.xml.gz files from the FTP server into usable jsonl like files. - This step compiles all the additions and deletions from the update files into separate tables. + This task pulls all of the PubmedArticle, PubmedBookArticle and BookDocument entires from the Pubmed files. + + compiles all the additions and deletions from the update files into separate tables. Loops through and checks if there are any deletions to apply before uploading them to Bigquery, to save cost. Matches on both the version of the publication and the PMID number. - - Outputs additions.jsonl.gz and deletions.jsonl.gz files for the release. """ # Read master dictionary from file. @@ -497,60 +501,127 @@ def transform(self, release: PubMedRelease, **kwargs): # If instance in dictionary exists, add to the matching master dict copy. # read in the schema and make sure that they're the same + # Push details of the files into one dictionary for ease of use later, datafiles: { filename: blah, path: blah, files_included: (list of pubmed indexes) } + # Grab list of files downloaded. ti: TaskInstance = kwargs["ti"] downloaded_files_for_release = ti.xcom_pull(key="downloaded_files_for_release") # Make paths for the additions and deletions files. - additions_file_path = os.path.join(self.transform_folder, f"additions_{self.release_id}.jsonl.gz") - deletions_file_path = os.path.join(self.transform_folder, f"deletions_{self.release_id}.jsonl.gz") + pubmed_article_additions_file_path = os.path.join( + self.transform_folder, f"pubmed_article_additions_{self.release_id}.jsonl" + ) + pubmed_article_deletions_file_path = os.path.join( + self.transform_folder, f"pubmed_article_deletions_{self.release_id}.jsonl" + ) + + pubmed_book_article_additions_file_path = os.path.join( + self.transform_folder, f"pubmed_book_article_additions_{self.release_id}.jsonl" + ) + + book_document_additions_file_path = os.path.join( + self.transform_folder, f"book_document_additions_{self.release_id}.jsonl" + ) + book_document_deletions_file_path = os.path.join( + self.transform_folder, f"book_document_deletions_{self.release_id}.jsonl" + ) + + pubmed_article_additions_for_release = [] + pubmed_article_deletions_for_release = [] + + pubmed_book_article_additions_for_release = [] + # TODO: Check that pubmed_book_article_deletions are included in the pubmed_article_deletions or book_document_deletions ???????? + + book_document_additions_for_release = [] + book_document_deletions_for_release = [] - additions_for_release = [] - deletions_for_release = [] - # TODO: Consider parallelising this section. + # dictionary for a list of chunks to keep the number of files limited to < 4gb, to make parallelising easier and uploading to BQ faster. + # Loop through each of the PubMed database files and gather additions and deletions. # for file in downloaded_files_for_release: - for i in range(0, 1, 1): # - for testing - file = downloaded_files_for_release[i] # - for testing + for i in range(0, 4, 1): # - for testing + file = downloaded_files_for_release[i] # - for testing logging.info(f"Running through file - {file}") with gzip.open(file, "rb") as f_in: - - # Use the BioPython library for reading in the xml files and putting into a usable format. - # TODO: Confirm that this library pulls down the necessary schemas from withing the xml header and does a self validation. + # Use the BioPython library for reading in the Pubmed xml files and putting into a usable format. + data_dict_dirty = Entrez.read(f_in, validate=True) - data_dict = Entrez.read(f_in, validate=True) + # Need to have the XML attributes pulled out from the special Biopython classes. + data_dict = add_attributes_to_data_from_biopython_classes(data_dict_dirty) - # Using the XMLSchema library - Slowest option of the two by 10x but more robust as it checks against the XSD schema during the import. + # Using the XMLSchema library - Slowest option of the two by 10x but more robust as it checks against the XSD schema during the import. + # Please not that this parsing methoud does not work currently and the CustomStrEncoder below would need to be modified for it. # Path to the XSD schemas needed for this method. - # pubmed_schema = xmlschema.XMLSchema(os.path.join(self.schema_folder,"pubmed_xsd_schemas","pubmed_230101.xsd")) + # pubmed_schema = xmlschema.XMLSchema( + # os.path.join(self.schema_folder, "pubmed_xsd_schemas", "pubmed_230101.xsd") + # ) # data_dict_dirty_keys = pubmed_schema.to_dict(f_in) - # data_dict = change_keys( pubmed_data_dirty_keys, convert) + # data_dict = change_keys(data_dict_dirty_keys, convert) + + # Grab additions from file. + try: + logging.info(f"Extracting PubmedArticle records from file - {file}") + + try: + additions = [addition for addition in data_dict["PubmedArticleSet"]["PubmedArticle"]] + except: + additions = [addition for addition in data_dict["PubmedArticle"]] + + pubmed_article_additions_for_release.extend(additions) + logging.info(f"Pulled out {len(additions)} PubmedArticle additions from file - {file}") + + except: + logging.info(f"No PubmedArticle additions in file - {file}") try: - logging.info(f"Extracting additions to the PubMed dataset - {file}") - - # Remove the bad characters from the keys on import - additions = [ - addition - for addition in data_dict["PubmedArticle"] - ] - additions_for_release.extend(additions) + logging.info(f"Extracting PubmedBookArticle records from file - {file}") + + try: + additions = [ + addition for addition in data_dict["PubmedBookBookArticleSet"]["PubmedBookArticle"] + ] + except: + additions = [addition for addition in data_dict["PubmedBookArticle"]] + pubmed_book_article_additions_for_release.extend(additions) + + logging.info(f"Pulled out {len(additions)} PubmedBookArticle additions from file - {file}") except: - logging.info(f"No additions in this file - {file}") + logging.info(f"No PubmedBookArticle additions in file - {file}") + + try: + logging.info(f"Extracting BookDocument records from file - {file}") + + try: + additions = [addition for addition in data_dict["BookDocumentSet"]["BookDocument"]] + except: + additions = [addition for addition in data_dict["BookDocument"]] + book_document_additions_for_release.extend(additions) + + logging.info(f"Pulled out {len(additions)} BookDocument additions from file - {file}") + except: + logging.info(f"No BookDocument additions in file - {file}") + + # Grab deletions from file. try: - logging.info(f"Extracting deletions from the PubMed dataset - {file}") - deletions = [ - deletion - for deletion in data_dict["DeleteCitation"] - ] - deletions_for_release.extend(deletions) + logging.info(f"Extracting DeleteCitation records from file - {file}") + deletions = [deletion for deletion in data_dict["DeleteCitation"]] + pubmed_article_deletions_for_release.extend(deletions) + logging.info(f"Pulled out {len(deletions)} DeleteCitation records from file - {file}") except: - logging.info(f"No deletions in this file - {file}") + logging.info(f"No DeleteCitation records in file - {file}") + + try: + logging.info(f"Extracting DeleteDocument records from file - {file}") + deletions = [deletion for deletion in data_dict["DeleteDocument"]] + book_document_deletions_for_release.extend(deletions) + logging.info(f"Pulled out {len(deletions)} DeleteDocument records from file - {file}") + except: + logging.info(f"No DeleteDocument records in file - {file}") logging.info( "Checking through the additions for this release to see if any deletions can be done before going onto BigQuery." @@ -558,25 +629,81 @@ def transform(self, release: PubMedRelease, **kwargs): # TODO: Do a check step to make sure there are no doubles of the deletes?? Each deletion *should* be unique in the DB. - # TODO: Consider parallelising this section - initial baseline additions will be ~ 35 million entries and needs to be quicker for checking deletions. - additions_with_dels_checked = additions_for_release.copy() - deletions_with_dels_checked = deletions_for_release.copy() + # TODO: Consider parallelising this section, and make it a function to clean up this section - initial baseline additions will be ~ 35 million entries and needs to be quicker for checking deletions. - print(additions_with_dels_checked[0]) - for record_to_delete in deletions_for_release: - for addition_to_check in additions_for_release: - to_check = addition_to_check["MedlineCitation"]["PMID"] - if record_to_delete == to_check: - deletions_with_dels_checked.remove(record_to_delete) - additions_with_dels_checked.remove(addition_to_check) + ### Pubmed Article pre-check for deletions + try: + logging.info("Running though PubmedArticle record for deletions.") - # logging.info(f"Removed the following record from additions list - {count} {record_to_delete}") + pubmed_article_additions_with_dels_checked = pubmed_article_additions_for_release.copy() + pubmed_article_deletions_with_dels_checked = pubmed_article_deletions_for_release.copy() - # Do some sort of assert check to make sure all of the deletions are done properly and none missing + # print(pubmed_article_additions_with_dels_checked[0]) + for record_to_delete in pubmed_article_deletions_for_release: + for article_addition_to_check in pubmed_article_additions_for_release: + to_check = article_addition_to_check["MedlineCitation"]["PMID"] + if record_to_delete == to_check: + pubmed_article_deletions_with_dels_checked.remove(record_to_delete) + pubmed_article_additions_with_dels_checked.remove(article_addition_to_check) - logging.info( - f"There are {len(deletions_with_dels_checked)} deletions left to do on BQ and {len(additions_with_dels_checked)} additions to add to the snapshot for this release." - ) + # logging.info(f"Removed the following record from additions list - {record_to_delete}") + + logging.info( + f"There are {len(pubmed_article_deletions_with_dels_checked)} article deletions left to do on BQ and {len(pubmed_article_additions_with_dels_checked)} article additions to add to the snapshot for this release." + ) + + except: + logging.info("No PubMedArticle records to check against for deletion.") + + ### Pubmed Book Article pre-check for deletions + try: + logging.info("Running though PubmedBookArticle record for deletions.") + + # PubmedBookArticle uses the PMID from the DeleteCitation records. + pubmed_book_article_deletions_for_release = pubmed_article_deletions_with_dels_checked.copy() + + pubmed_book_article_additions_with_dels_checked = pubmed_book_article_additions_for_release.copy() + pubmed_book_article_deletions_with_dels_checked = pubmed_book_article_deletions_for_release.copy() + + # print(pubmed_book_article_additions_with_dels_checked[0]) + for record_to_delete in pubmed_book_article_deletions_for_release: + for book_article_addition_to_check in pubmed_book_article_additions_for_release: + to_check = book_article_addition_to_check["BookDocument"]["PMID"] + if record_to_delete == to_check: + pubmed_book_article_deletions_with_dels_checked.remove(record_to_delete) + pubmed_book_article_additions_with_dels_checked.remove(article_addition_to_check) + + # logging.info(f"Removed the following record from additions list - {record_to_delete}") + + logging.info( + f"There are {len(pubmed_book_article_deletions_with_dels_checked)} book article deletions left to do on BQ and {len(pubmed_book_article_additions_with_dels_checked)} book article additions to add to the snapshot for this release." + ) + + except: + logging.info("No PubMedArticle records to check against for deletion.") + + ### Book Document pre-check for deletions + try: + logging.info("Running though BookDocument for deletions.") + + book_document_additions_with_dels_checked = book_document_additions_for_release.copy() + book_document_deletions_with_dels_checked = book_document_deletions_for_release.copy() + + # print(book_document_additions_with_dels_checked[0]) + for record_to_delete in book_document_deletions_for_release: + for book_addition_to_check in book_document_additions_for_release: + to_check = book_addition_to_check["BookDocument"]["PMID"] + if record_to_delete == to_check: + book_document_deletions_with_dels_checked.remove(record_to_delete) + book_document_additions_with_dels_checked.remove(book_addition_to_check) + + # logging.info(f"Removed the following record from additions list - {record_to_delete}") + + logging.info( + f"There are {len(book_document_deletions_with_dels_checked)} BookDocument deletions left to do on BQ and {len(book_document_additions_with_dels_checked)} additions to add to the snapshot for this release." + ) + except: + logging.info("No BookDocument records to check against for deletion.") ################################################################################################ @@ -615,37 +742,74 @@ def transform(self, release: PubMedRelease, **kwargs): ################################################################################################ # Write out the processed additions and deletions to file. - with gzip.open(additions_file_path, "w") as f_add_out, gzip.open(deletions_file_path, "w") as f_del_out: + with open(pubmed_article_additions_file_path, "wb") as f_add_out, open( + pubmed_article_deletions_file_path, "wb" + ) as f_del_out: + logging.info(f"Writing article additions to file - {pubmed_article_additions_file_path}") + for line in pubmed_article_additions_with_dels_checked: + f_add_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) + # Custom encoder used here so that the abstracts (AbstractText) are written as strings, not dictionary objects + + logging.info(f"Writing article deletions to file - {pubmed_article_deletions_file_path}") + for line in pubmed_article_deletions_with_dels_checked: + f_del_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) + + with open(pubmed_book_article_additions_file_path, "wb") as f_add_out: + logging.info(f"Writing book additions to file - {pubmed_book_article_additions_file_path}") + for line in pubmed_book_article_additions_with_dels_checked: + f_add_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) + # Custom encoder used here so that the abstracts (AbstractText) are written as strings, not dictionary objects - logging.info(f"Writing additions to file - {additions_file_path}") - for line in additions_with_dels_checked: - f_add_out.write(str.encode(json.dumps(line, cls=CustomStrEncoder) + "\n")) + with open(book_document_additions_file_path, "wb") as f_add_out, open( + book_document_deletions_file_path, "wb" + ) as f_del_out: + logging.info(f"Writing book additions to file - {book_document_additions_file_path}") + for line in book_document_additions_with_dels_checked: + f_add_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) # Custom encoder used here so that the abstracts (AbstractText) are written as strings, not dictionary objects - logging.info(f"Writing deletions to file - {deletions_file_path}") - for line in deletions_with_dels_checked: - f_del_out.write(str.encode(json.dumps(line) + "\n")) + logging.info(f"Writing book article deletions to file - {book_document_deletions_file_path}") + for line in book_document_deletions_with_dels_checked: + f_del_out.write(str.encode(json.dumps(line, cls=CustomEncoder) + "\n")) # Push list of transform files into the xcom - # ti.xcom_push(key="transform_additions_for_release", value=additions_file_path) - # ti.xcom_push(key="transform_deletions_for_release", value=deletions_file_path) + + # TODO: Push these values into a list of strings for the file paths so the upload_transform function can just be a for loop. + + transformed_files_to_updload_to_gcs = [ + pubmed_article_additions_file_path, + pubmed_article_deletions_file_path, + pubmed_book_article_additions_file_path, + book_document_additions_file_path, + book_document_deletions_file_path, + ] + + ti.xcom_push(key="transformed_files_to_updload_to_gcs", value=transformed_files_to_updload_to_gcs) + + # ti.xcom_push(key="transform_pubmed_article_additions_for_release", value=pubmed_article_additions_file_path) + # ti.xcom_push(key="transform_pubmed_article_deletions_for_release", value=pubmed_article_deletions_file_path) + + # ti.xcom_push(key="transform_pubmed_book_article_additions_for_release", value=pubmed_book_article_additions_file_path) + + # ti.xcom_push(key="transform_book_document_additions_for_release", value=book_document_additions_file_path) + # ti.xcom_push(key="transform_book_document_deletions_for_release", value=book_document_deletions_file_path) def upload_transformed(self, release: PubMedRelease, **kwargs): """Upload the transformed and combined release files to GCS.""" # Pull list of transform files from xcom ti: TaskInstance = kwargs["ti"] - transformed_additions = ti.xcom_pull(key="transform_additions_for_release") - transformed_deletions = ti.xcom_pull(key="transform_deletions_for_release") + transformed_files_to_updload_to_gcs = ti.xcom_pull(key="transformed_files_to_updload_to_gcs") uploaded_transform_files = [] - for file_to_upload in [transformed_additions, transformed_deletions]: + for file_to_upload in transformed_files_to_updload_to_gcs: file = file_to_upload.split("/")[-1] try: # TODO: how to get proper path of the GCS blobs? + # TODO: Add a retry loop ?? blob_name = f"telescopes/{self.dag_id}/{self.release_id}/{file}" - logging.info(f"Uplaoding file {file} to GCS bucket {self.transform_bucket} and {blob_name}") + logging.info(f"Uploading file {file} to GCS bucket {self.transform_bucket} and {blob_name}") success = upload_file_to_cloud_storage( bucket_name=self.transform_bucket, blob_name=blob_name, @@ -653,6 +817,8 @@ def upload_transformed(self, release: PubMedRelease, **kwargs): check_blob_hash=False, ) + uploaded_transform_files.append(file_to_upload) + # get gcs uri from the upload of the blob?? except: raise AirflowException(f"Unable to upload file: {file} to GCS bucket {self.download_bucket}") @@ -669,7 +835,7 @@ def bq_create_snapshot(self, release: PubMedRelease, **kwargs): uploaded_transform_files = ti.xcom_pull(key="uploaded_transform_files") ### MAJOR - ### TODO: Make BQ json schema based on the pubmed_230101.xsd schema that includes all possible fields. + ### TODO: Make BQ json schema based on the pubmed_230101.xsd schema that includes all possible fields. ### if self.download_baseline or not bigquery_table_exists(self.project_id, self.dataset_id, self.table_id): @@ -690,7 +856,7 @@ def bq_create_snapshot(self, release: PubMedRelease, **kwargs): create_bigquery_snapshot(self.project_id, self.dataset_id, self.table_id, self.dataset_id, self.table_id) # create list of PMIDs to delete from the just new snapshot table. - publications_to_delete = [PMID_to_delete["PMID"]["#text"] for PMID_to_delete in deletions_for_release] + publications_to_delete = [PMID_to_delete["PMID"]["text"] for PMID_to_delete in deletions_for_release] # apply deletions with query # WHERE column_name IN ('value1', 'value2', 'value3');;""" TODO: figure out the formatting. @@ -785,23 +951,64 @@ def run_subprocess_cmd(proc: Popen, args: list): logging.info("Finished cmd successfully") +def convert(k: str) -> str: + """Convert a key name. + BigQuery specification for field names: Fields must contain only letters, numbers, and underscores, start with a + letter or underscore, and be at most 128 characters long. + :param k: Key. + :return: Converted key. + """ + # Trim special characters at start: + k = re.sub("^[^A-Za-z0-9]+", "", k) + # Replace other special characters (except '_') in remaining string: + k = re.sub(r"\W+", "_", k) + + # Incase the key are only special characters. Need to be something for BQ to injest. + if len(k) == 0: + k = "value" + + return k + + +def change_keys(obj, convert): + """Recursively goes through the dictionary obj and replaces keys with the convert function. + :param obj: Dictionary object. + :param convert: Convert function. + :return: Updated dictionary object. + """ + if isinstance(obj, (str, int, float)): + return obj + if isinstance(obj, dict): + new = obj.__class__() + for k, v in list(obj.items()): + new[convert(k)] = change_keys(v, convert) + elif isinstance(obj, (list, set, tuple)): + new = obj.__class__(change_keys(v, convert) for v in obj) + else: + return obj + return new + + # TODO: Clean up this function if possible. def add_attributes_to_data_from_biopython_classes(obj): + """Recursively travel down the data tree and add attributes from the biopython data classes as dictionary keys. - """Add attributes from the biopython data import and continue going down the data tree""" - - if isinstance(obj, StringElement ): + :param obj: Input object, any type. + :return new: Object with attributes added as keys. + """ + + if isinstance(obj, StringElement): if len(list(obj.attributes.keys())) > 0: # New object to hold the string data. new = {} - new['value'] = obj + new["value"] = obj # Loop through attributes and add as needed. for key in list(obj.attributes.keys()): new[key] = add_attributes_to_data_from_biopython_classes(obj.attributes[key]) else: new = obj - + return new if isinstance(obj, DictionaryElement): @@ -817,54 +1024,67 @@ def add_attributes_to_data_from_biopython_classes(obj): return new - if isinstance(obj, (ListElement, OrderedListElement) ): + if isinstance(obj, (ListElement, OrderedListElement)): # New object to hold the string data. new = {} if len(obj) > 0: - new[obj[0].tag] = [ add_attributes_to_data_from_biopython_classes(v) for v in obj ] + new[obj[0].tag] = [add_attributes_to_data_from_biopython_classes(v) for v in obj] try: # Loop through attributes and add as needed. for key in list(obj.attributes.keys()): - new[key] = add_attributes_to_data_from_biopython_classes( obj.attributes[key] ) - except: + new[key] = add_attributes_to_data_from_biopython_classes(obj.attributes[key]) + except: pass return new - - if isinstance(obj, list): - new = [ add_attributes_to_data_from_biopython_classes(v) for v in obj ] + + if isinstance(obj, list): + new = [add_attributes_to_data_from_biopython_classes(v) for v in obj] return new else: return obj -class CustomStrEncoder(json.JSONEncoder): - """Custom encoder for JSON dump for it to write a dictionary field as a string of text.""" +class CustomEncoder(json.JSONEncoder): + + """Custom encoder for JSON dump for it to write a dictionary field as a string of text for a + number of select key values in the Pubmed data""" - def _transform_obj_to_str(self, obj): - + def _transform_obj_data(self, obj): if isinstance(obj, str): - return obj + return obj elif isinstance(obj, dict): - new = obj.__class__() - # Loop through field names for the match fields to change to text. + new = {} + # Loop through field names for the match fields to change to text. for k, v in list(obj.items()): - - # TODO: Change this to an input of a List or Set for multiple fields if needed - hard coding bad. - if k == "AbstractText": + # TODO: Change this to an input of a List or Set for multiple fields if needed - hard coding bad. + if k == "AbstractText": # or k == "GeneralNote": new[k] = str(v) else: - new[k] = self._transform_obj_to_str(v) + new[k] = self._transform_obj_data(v) + + # # To remove a list of a list e.g. "KeywordList": [["a","b"]] to "KeywordList": ["a","b"] + # if k == "KeywordList" and len(v) > 0: + # new[k] = v[0] + # else: + # new[k] = self._transform_obj_data(v) + return new elif isinstance(obj, list): - return [self._transform_obj_to_str(elem) for elem in obj] + # # If the data is something like "key": [] needs to be "key": [""] for BQ + # if len(obj) == 0: + # return [""] + # else: + # return [self._transform_obj_to_str(elem) for elem in obj] + + return [self._transform_obj_data(elem) for elem in obj] else: return obj def encode(self, obj): - transformed_obj = self._transform_obj_to_str(obj) - return super(CustomStrEncoder, self).encode(transformed_obj) + transformed_obj = self._transform_obj_data(obj) + return super(CustomEncoder, self).encode(transformed_obj) # Not used because FTP server can't download multiple files using multiple threads. @@ -998,24 +1218,24 @@ def encode(self, obj): # def fix_xml_structure(path, key, value): # #print(f"In postprocessing function with key: {key}") - + # #if isinstance(value, str) or isinstance(value, OrderedDict):. -# if isinstance(value, str): +# if isinstance(value, str): # value = [value] # # All possible lists in pubmed # list_to_change = ['QualifierName', 'DescriptorName', "Grant", "PublicationType", "Author", "PubMedPubDate", "ArticleIdList", "Reference"] -# if isinstance(value, dict) and key in list_to_change: +# if isinstance(value, dict) and key in list_to_change: # value = [value] # return key, value - # The optional argument `postprocessor` is a function that takes `path`, - # `key` and `value` as positional arguments and returns a new `(key, value)` - # pair where both `key` and `value` may have changed. Usage example:: +# The optional argument `postprocessor` is a function that takes `path`, +# `key` and `value` as positional arguments and returns a new `(key, value)` +# pair where both `key` and `value` may have changed. Usage example:: - # >>> def postprocessor(path, key, value): - # ... try: - # ... return key + ':int', int(value) - # ... except (ValueError, TypeError): - # ... return key, value \ No newline at end of file +# >>> def postprocessor(path, key, value): +# ... try: +# ... return key + ':int', int(value) +# ... except (ValueError, TypeError): +# ... return key, value diff --git a/requirements.txt b/requirements.txt index fe7cd68a..ea8adf9e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,4 +8,4 @@ boto3>=1.15.0,<2 nltk==3.* Deprecated>1,<2 xmlschema>=2.2.2 -bioopython>=1.81 \ No newline at end of file +biopython>=1.81