From eaa2928bec528ead99ebb26bba405a942925a8fc Mon Sep 17 00:00:00 2001 From: Alex Massen-Hane Date: Wed, 31 May 2023 14:55:48 +0800 Subject: [PATCH] Update schemas and ingest functions --- .../schema/pubmed_articles_2023-01-01.json | 47 +- .../pubmed_book_articles_2023-01-01.json | 1220 ----------------- .../workflows/pubmed_telescope.py | 229 ++-- 3 files changed, 147 insertions(+), 1349 deletions(-) delete mode 100644 academic_observatory_workflows/database/schema/pubmed_book_articles_2023-01-01.json diff --git a/academic_observatory_workflows/database/schema/pubmed_articles_2023-01-01.json b/academic_observatory_workflows/database/schema/pubmed_articles_2023-01-01.json index 3a7157dd..60025d01 100644 --- a/academic_observatory_workflows/database/schema/pubmed_articles_2023-01-01.json +++ b/academic_observatory_workflows/database/schema/pubmed_articles_2023-01-01.json @@ -300,8 +300,51 @@ }, { "name": "ReferenceList", - "mode": "NULLABLE", - "type": "STRING" + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "Title", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "Reference", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "ArticleIdList", + "mode": "NULLABLE", + "type": "RECORD", + "fields": [ + { + "name": "ArticleId", + "mode": "REPEATED", + "type": "RECORD", + "fields": [ + { + "name": "IdType", + "mode": "NULLABLE", + "type": "STRING" + }, + { + "name": "value", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] + }, + { + "name": "Citation", + "mode": "NULLABLE", + "type": "STRING" + } + ] + } + ] } ] } diff --git a/academic_observatory_workflows/database/schema/pubmed_book_articles_2023-01-01.json b/academic_observatory_workflows/database/schema/pubmed_book_articles_2023-01-01.json deleted file mode 100644 index 082ec51a..00000000 --- a/academic_observatory_workflows/database/schema/pubmed_book_articles_2023-01-01.json +++ /dev/null @@ -1,1220 +0,0 @@ -[ - { - "name": "PubmedBookData", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "History", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "PubMedPubDate", - "mode": "REPEATED", - "type": "RECORD", - - "fields": [ - { - "name": "Year", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Month", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Day", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Hour", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Minute", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "PubStatus", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "PublicationStatus", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "ArticleIdList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "ArticleId", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "IdType", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "ObjectList", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Object", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Type", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Param", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Name", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - } - ] - } - ] - }, - { - "name": "BookDocument", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "PMID", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Version", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "INTEGER" - } - ] - }, - { - "name": "ArticleIdList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "ArticleId", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "IdType", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "Book", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Publisher", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "PublisherName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "PublisherLocation", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "BookTitle", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Publisher", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "PublisherName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "PublisherLocation", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "PubDate", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Season", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "MedlineDate", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Day", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Month", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Year", - "mode": "NULLABLE", - "type": "INTEGER" - } - ] - }, - { - "name": "BeginningDate", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Year", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Month", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Day", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Season", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "EndingDate", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Year", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Month", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Day", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Season", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "AuthorList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "CompleteYN", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Type", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Author", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "CollectiveName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Suffix", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Initials", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "ForeName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "LastName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Identifier", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Source", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "EqualContrib", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "AffiliationInfo", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Affiliation", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Identifier", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Source", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "ValidYN", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "InvestigatorList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Investigator", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Suffix", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Initials", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Identifier", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Source", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "ForeName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "LastName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "AffiliationInfo", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Affiliation", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Identifier", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Source", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "ValidYN", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "Volume", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "VolumeTitle", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Edition", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "CollectionTitle", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Isbn", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "ELocationID", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "ValidYN", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "EIdType", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "Medium", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "ReportNumber", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "LocationLabel", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Type", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "INTEGER" - } - ] - }, - { - "name": "ArticleTitle", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "VernacularTitle", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Pagination", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "StartPage", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "EndPage", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "MedlinePgn", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "Language", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "AuthorList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "CompleteYN", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Type", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Author", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "CollectiveName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Suffix", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Initials", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "ForeName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "LastName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Identifier", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Source", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "EqualContrib", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "AffiliationInfo", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Affiliation", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Identifier", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Source", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "ValidYN", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "InvestigatorList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Investigator", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Suffix", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Initials", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Identifier", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Source", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "ForeName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "LastName", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "AffiliationInfo", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Affiliation", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Identifier", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Source", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "ValidYN", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "PublicationType", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "UI", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "Abstract", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "CopyrightInformation", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "AbstractText", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "Sections", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Section", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "LocationLabel", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Type", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "INTEGER" - } - ] - }, - { - "name": "SectionTitle", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Section", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "LocationLabel", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "Type", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "INTEGER" - } - ] - }, - { - "name": "SectionTitle", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - } - ] - }, - { - "name": "KeywordList", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Owner", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Keyword", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "MajorTopicYN", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "ContributionDate", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Year", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Month", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Day", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Season", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "DateRevised", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Year", - "mode": "NULLABLE", - "type": "INTEGER" - }, - { - "name": "Month", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Day", - "mode": "NULLABLE", - "type": "INTEGER" - } - ] - }, - { - "name": "GrantList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "CompleteYN", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Grant", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "GrantID", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Acronym", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Agency", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Country", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "ItemList", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "ListType", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Item", - "mode": "REPEATED", - "type": "STRING" - } - ] - }, - { - "name": "ReferenceList", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Title", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Reference", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "ArticleIdList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "ArticleId", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "IdType", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "Citation", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "ReferenceList", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Title", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Reference", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "ArticleIdList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "ArticleId", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "IdType", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "Citation", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "ReferenceList", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Title", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Reference", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "ArticleIdList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "ArticleId", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "IdType", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "Citation", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "ReferenceList", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "Title", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "Reference", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "ArticleIdList", - "mode": "NULLABLE", - "type": "RECORD", - "fields": [ - { - "name": "ArticleId", - "mode": "REPEATED", - "type": "RECORD", - "fields": [ - { - "name": "IdType", - "mode": "NULLABLE", - "type": "STRING" - }, - { - "name": "value", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - }, - { - "name": "Citation", - "mode": "NULLABLE", - "type": "STRING" - } - ] - }, - { - "name": "ReferenceList", - "mode": "NULLABLE", - "type": "STRING" - } - ] - } - ] - } - ] - } - ] - } - ] - } -] diff --git a/academic_observatory_workflows/workflows/pubmed_telescope.py b/academic_observatory_workflows/workflows/pubmed_telescope.py index dd1f2d7e..9de5e7b9 100644 --- a/academic_observatory_workflows/workflows/pubmed_telescope.py +++ b/academic_observatory_workflows/workflows/pubmed_telescope.py @@ -50,25 +50,21 @@ from airflow.models.taskinstance import TaskInstance from airflow.models.variable import Variable -from google.cloud.bigquery import SourceFormat from academic_observatory_workflows.config import schema_folder as default_schema_folder from observatory.platform.utils.airflow_utils import AirflowVars, make_workflow_folder from observatory.platform.workflows.workflow import Workflow, Release from observatory.platform.utils.workflow_utils import get_chunks -from observatory.platform.utils.config_utils import find_schema - -from google.cloud import bigquery - from observatory.platform.utils.gc_utils import ( bigquery_table_exists, + upload_file_to_cloud_storage, + bigquery_sharded_table_id, + create_bigquery_dataset, load_bigquery_table, run_bigquery_query, + delete_old_datasets_with_prefix, ) - -from observatory.platform.utils.proc_utils import wait_for_process - from observatory.platform.utils.release_utils import ( get_dataset_releases, get_datasets, @@ -76,17 +72,13 @@ is_first_release, ) -from observatory.platform.utils.gc_utils import ( - bigquery_sharded_table_id, - select_table_shard_dates, - upload_file_to_cloud_storage, -) - from observatory.platform.utils.workflow_utils import ( SubFolder, make_release_date, ) +from google.cloud import bigquery + class PubMedRelease(Release): def __init__( @@ -197,6 +189,7 @@ def __init__( self.source_format = source_format self.schema_folder = default_schema_folder() self.main_table = "articles" + self.temp_dataset = "temporary_working_space" # Workflow parameters self.workflow_id = workflow_id @@ -263,8 +256,9 @@ def make_release(self, **kwargs) -> PubMedRelease: self.download_folder = make_workflow_folder(self.dag_id, release_date, SubFolder.downloaded.value) self.transform_folder = make_workflow_folder(self.dag_id, release_date, SubFolder.transformed.value) - # TODO: Make this baseline check more robust. - if self.data_interval_start < self.start_date + timedelta(days=7): + if self.data_interval_start < self.start_date + timedelta(days=7) and not bigquery_table_exists( + project_id=self.project_id, dataset_id=self.dataset_id, table_name=self.main_table + ): self.download_baseline = True else: self.download_baseline = False @@ -650,12 +644,24 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): :return: None. """ - # Pull files to transfer from GCS to BQ + # Pull lsit of files to transfer from GCS to BQ ti: TaskInstance = kwargs["ti"] entity_list = ti.xcom_pull(key="entity_list") # If its the first run of the workflow, only upload the baseline table to the main table. if self.download_baseline: + # Create the dataset if it doesn't exist already. + try: + create_bigquery_dataset( + self.project_id, + self.dataset_id, + self.data_location, + description="Pubmed dataset for Medline Citation records.", + ) + logging.info(f"Created dataset: {self.project_id}.{self.dataset_id}") + except: + logging.info(f"Dataset {self.project_id}.{self.dataset_id} already exists.") + # Delete old baseline table just in case there was a bad previous run. bq_delete_table( project_id=self.project_id, dataset_id=self.dataset_id, table_id=self.main_table, not_found_okay=True @@ -663,37 +669,48 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): logging.info(f"Uploading to table - {self.main_table}") - for transform_blob in entity_list["article_additions"]["uploaded_to_gcs"]: - success = load_bigquery_table( - transform_blob, - self.dataset_id, - self.data_location, - self.main_table, - entity_list["article_additions"]["schema_file_path"], - self.source_format, - write_disposition=bigquery.WriteDisposition.WRITE_APPEND, - project_id=self.project_id, - table_description="Baseline table of the Pubmed database.", - partition=False, - ) + # Create a transfer job into BQ from GCS for all of the transformed baseline files. + transform_blob_pattern = f"gs://{self.transform_bucket}/telescopes/{self.dag_id}/{self.release_id}/*" + + logging.info( + f"Creating a load job for all of the smaller baseline files with pattern: {transform_blob_pattern} " + ) + + success = load_bigquery_table( + transform_blob_pattern, + self.dataset_id, + self.data_location, + self.main_table, + entity_list["article_additions"]["schema_file_path"], + self.source_format, + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + project_id=self.project_id, + table_description="Baseline table of the Pubmed database.", + partition=False, + ) if success: - entity_list["article_additions"]["table_list"].append(self.main_table) + entity_list["article_additions"]["table_list"].append( + f"{self.project_id}.{self.dataset_id}.{self.main_table}" + ) else: raise AirflowException(f"Unable to upload table {self.main_table}") # Not the first workflow run, only ingest update tables into BQ. else: - ## There has to be a better way to do this. - ## Currently it just uploads to the table pubmed_(something)_(file_index) - ## but we cannot shard on dates are there are sometimes multiple update files per day - ## and we are unable to merge the updatefiles as they are dependant on order. - ## so there will be 7-10 ish files stuck in the dataset until the cleanup step. - - ## Figure out if - - ## Dates can still be used for the shards and if the updatefiles on same day interfere with each other - ## OR other shard methods - ## OR keep as it is. + # Create a temporary dataset for injesting the tables and not make main dataset messy with small update tables. + # Unable to partition on dates or indicies - could look into another method later. + try: + create_bigquery_dataset( + self.project_id, + self.temp_dataset, + self.data_location, + description="Temporary dataset for Pubmed workflow for the update tables for current release.", + ) + logging.info(f"Created working dataset: {self.project_id}.{self.temp_dataset}") + except: + raise AirflowException(f"Working dataset {self.project_id}.{self.temp_dataset} already exists.") + # Should be deleted at the end of each run. If it still exists it means that the cleanup step didn't run properly. # Upload release tables for this data interval. for name, entity in entity_list.items(): @@ -701,22 +718,21 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): for transform_blob in entity["uploaded_to_gcs"]: # Create a table for each of the pubmed updates. - file_index = transform_blob.split("_")[-1].split(".")[0] table_id = f"{name}_{file_index}" table_description = f"Update table from file {file_index}" - # Delete old tables with exact table_id just in case there was a bad previous run. - # We don't want to append add dupliactes to the tables. + # Delete old tables with table_id just in case there was a bad previous run. + # We don't want to append or add any dupliactes to the tables. bq_delete_table( - project_id=self.project_id, dataset_id=self.dataset_id, table_id=table_id, not_found_okay=True + project_id=self.project_id, dataset_id=self.temp_dataset, table_id=table_id, not_found_okay=True ) logging.info(f"Uploading to table - {table_id}") success = load_bigquery_table( transform_blob, - self.dataset_id, + self.temp_dataset, self.data_location, table_id, entity["schema_file_path"], @@ -729,11 +745,11 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs): if success: if not table_id in entity["table_list"]: - entity["table_list"].append(table_id) + entity["table_list"].append(f"{self.project_id}.{self.temp_dataset}.{table_id}") else: - raise AirflowException(f"Unable to upload table {table_id}") - - # TODO: Get number of rows of table from BQ for an assert? + raise AirflowException( + f"Unable to upload table {self.project_id}.{self.temp_dataset}.{table_id}" + ) if success: logging.info( @@ -754,6 +770,8 @@ def bq_add_updates_to_main_table(self, release: PubMedRelease, **kwargs): :return: None. """ + # TODO: Add a dependant check on if previous release was done. Can only run if the last data_interval_period was done. + # Pull workflow entity data ti: TaskInstance = kwargs["ti"] entity_list = ti.xcom_pull(key="entity_list") @@ -763,7 +781,7 @@ def bq_add_updates_to_main_table(self, release: PubMedRelease, **kwargs): if self.download_baseline: # TODO: Asssert that the main table exists. - logging.info(f"Table {self.main_table} has already been created in the previous step.") + logging.info(f"Table {self.main_table} has already been created and injested in the previous step.") else: if len(entity_list["article_additions"]["table_list"]) != len( @@ -776,7 +794,7 @@ def bq_add_updates_to_main_table(self, release: PubMedRelease, **kwargs): # Create a snapshot of main table just in case something happens with the below updates and deletes # and we can revert if necessary. - # This will be replaced in the new workflow. + # This will be replaced with a function in the new version of the workflow. backup_table_id = bigquery_sharded_table_id(f"{self.main_table}_backup", self.data_interval_start) create_backup_table = f""" CREATE SNAPSHOT TABLE `{self.project_id}.{self.dataset_id}.{backup_table_id}` @@ -798,13 +816,24 @@ def bq_add_updates_to_main_table(self, release: PubMedRelease, **kwargs): for i in range(num_update_tables): logging.info(f"Currently doing - {entity_list['article_additions']['table_list'][i]}") + # Ensure that the file_index is the same for the addition and deletion tables. + add_file_index = entity_list["article_additions"]["table_list"][i].split(".").split("_")[-1] + del_file_index = entity_list["article_deletions"]["table_list"][i].split(".").split("_")[-1] + + if add_file_index != del_file_index: + raise AirflowException( + f"File indices do not match for these additions and deletions! Additions: {add_file_index} Deletions: {del_file_index}" + ) + + logging.info(f"Currently doing additions from - {entity_list['article_additions']['table_list'][i]}") + # Remove the records that have updates from the incoming additions table. # This will be done with an upsert merge with the next version of the workflows. delete_records_from_main_table_from_incoming_additions = f""" DELETE FROM `{self.project_id}.{self.dataset_id}.{self.main_table}` WHERE MedlineCitation.PMID.value IN ( SELECT MedlineCitation.PMID.value - FROM `{self.project_id}.{self.dataset_id}.{entity_list['article_additions']['table_list'][i]}`) + FROM `{entity_list['article_additions']['table_list'][i]}`) """ bq_output = run_bigquery_query(delete_records_from_main_table_from_incoming_additions) logging.info(f"Output from 'delete_records_from_main_table_from_incoming_additions' query: {bq_output}") @@ -813,27 +842,28 @@ def bq_add_updates_to_main_table(self, release: PubMedRelease, **kwargs): insert_records_into_main_table_from_incoming_additions = f""" INSERT INTO `{self.project_id}.{self.dataset_id}.{self.main_table}` SELECT * - FROM `{self.project_id}.{self.dataset_id}.{entity_list['article_additions']['table_list'][i]}` + FROM `{entity_list['article_additions']['table_list'][i]}` """ bq_output = run_bigquery_query(insert_records_into_main_table_from_incoming_additions) logging.info(f"Output from 'insert_records_into_main_table_from_incoming_additions' query: {bq_output}") - logging.info(f"Currently doing - {entity_list['article_deletions']['table_list'][i]}") + logging.info(f"Currently doing deletions from - {entity_list['article_deletions']['table_list'][i]}") # Delete records from the main table using the incoming deletions, matching on the PMID and Version. delete_records_from_main_table_from_incoming_deletions = f""" DELETE FROM `{self.project_id}.{self.dataset_id}.{self.main_table}` WHERE (MedlineCitation.PMID.value, MedlineCitation.PMID.Version) IN ( SELECT (value, Version) - FROM `{self.project_id}.{self.dataset_id}.{entity_list['article_deletions']['table_list'][i]}`) + FROM `{entity_list['article_deletions']['table_list'][i]}`) """ bq_output = run_bigquery_query(delete_records_from_main_table_from_incoming_deletions) logging.info(f"Output from 'delete_records_from_main_table_from_incoming_deletions' query: {bq_output}") - # TODO: Update the table description with the new update file index. - # description = f"""Pubmed Article table with baseline files pubmed23n0001-pubmed23n1167 and updatefiles from pubmed23n1168-pubmed23nxxxx""" + # TODO: Update the main table description with the new update file index. + # description = f"""Pubmed Article table with baseline files pubmed23n0001-pubmed23n1166 and updatefiles from pubmed23n1167-pubmed23nxxxx""" - # TODO: Do a check step on the main table to ensure that it's been done right? + # TODO: Do some sort of check step on the main table to ensure that it's been done right? + # Re result from the deletions and see if the length of the def add_release(self, release: PubMedRelease, **kwargs): """Add the release info to the observatory API. @@ -871,9 +901,8 @@ def cleanup(self, release: PubMedRelease, **kwargs): except: logging.info(f"File {file} does not exist.") + # For each entity, delete the local transform files for name, entity in entity_list.items(): - # For each entity, delete the transform files (local) and tables (on BQ) - logging.info(f"Deleting local transform files for - {name}") for transform_file in entity["transform_files"]: logging.info(f"Deleting file = {file}") @@ -882,25 +911,22 @@ def cleanup(self, release: PubMedRelease, **kwargs): except: logging.info(f"File {transform_file} does not exist.") - if not self.download_baseline: - logging.info(f"Deleting the update tables for - {name}") - # Delete the update tables from BQ, not the main table. - for table in entity["table_list"]: - # Delete the addition table from dataset. - bq_delete_table( - project_id=self.project_id, - dataset_id=self.dataset_id, - table_id=table, - not_found_okay=False, - ) + if not self.download_baseline: + logging.info( + f"Deleting the dataset {self.project_id}.{self.temp_dataset} and all update tables inside it for this release." + ) + try: + # Delete the update tables from BQ by deleting the whole temporary dataset. + delete_old_datasets_with_prefix(prefix=self.temp_dataset, age_to_delete=0) + except: + raise AirflowException(f"Unable to delete dataset: {self.project_id}.{self.temp_dataset}") # TODO: Remove all task instance data from this workflow run. def transform_pubmed_xml_file_to_jsonl(input_file: str, entity_list: Dict, transform_folder: str) -> Dict: """ - Convert a single Pubmed XML file to JSONL, pulling out any of the PubmedArticle, - PubmedBookArticle, BookDocument, DeleteCitation, DeleteDocument. + Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed additions and or deletions. Used in parallelised section "transform". Each file is given to an individual process rather than multple files read in and transformed at once. @@ -1084,43 +1110,13 @@ def encode(self, obj): return super(CustomEncoder, self).encode(transformed_obj) -def bq_update_table_expiration_date( - project_id: str, dataset_id: str, table_id: str, expiration_date: pendulum.datetime -): - # TODO: To add to gcs utils after the workflow restructure. - - """ - Update a BigQuery table expiration date. - - :project_id: - :dataset_id: - :table_id: - :expiration_date: Date when the table with expire. - - :return: None. - """ - - bq_client = bigquery.Client(project=project_id) - dataset_ref = bigquery.DatasetReference(project_id, dataset_id) - - table_ref = dataset_ref.table(table_id) - table = bq_client.get_table(table_ref) - - table.expires = expiration_date - - try: - table = bq_client.update_table(table, ["expires"]) - except: - raise AirflowException(f"Unable to set expiration date of table {table_id} to {expiration_date}") - - def bq_delete_table( project_id: str, dataset_id: str, table_id: str, partition_decorator: str = None, not_found_okay: bool = True ): - """Delete a single table in Bigquery.""" - # TODO: To add to gcs utils after the workflow restructure. + """Delete a single table in Bigquery.""" + bq_client = bigquery.Client(project=project_id) dataset = bigquery.DatasetReference(project_id, dataset_id) table = bigquery.Table(dataset.table(table_id)) @@ -1135,24 +1131,3 @@ def bq_delete_table( logging.info(f"Deleted exising bigquery table: {table_id} {partition_decorator}") except: raise AirflowException(f"An error occured deleting table {table_id}") - - -def bq_list_tables_in_dataset_with_prefix(project_id: str, dataset_id: str, prefix: str = "") -> List[str]: - # TODO: Add prefix or suffix for this function. - - """List tables under a dataset in Bigquery. - - :param project_id: - :param dataset_id: - :param prefix: - - """ - - # TODO: To add to gcs utils after the workflow restructure. - - bq_client = bigquery.Client(project=project_id) - tables = bq_client.list_tables(dataset_id) - - return [ - f"{table.project}.{table.dataset_id}.{table.table_id}" for table in tables if table.table_id.startswith(prefix) - ]