Skip to content

Commit

Permalink
Update documentation and comments
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmassen-hane committed Jul 24, 2023
1 parent 3503d95 commit e41a95b
Show file tree
Hide file tree
Showing 2 changed files with 135 additions and 70 deletions.
61 changes: 38 additions & 23 deletions academic_observatory_workflows/workflows/pubmed_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -1040,7 +1040,6 @@ def bq_upsert_records(self, release: PubMedRelease, **kwargs):
the main table.
"""

# Pull list of update tables from xcom
ti: TaskInstance = kwargs["ti"]
upsert_table_id = ti.xcom_pull(key="upsert_table_id")

Expand All @@ -1057,6 +1056,8 @@ def bq_upsert_records(self, release: PubMedRelease, **kwargs):
def merge_delete_records(self, release: PubMedRelease, **kwargs):
"""
Merge the delete records that will be applied to the main table.
Remove duplicates and write file.
"""

delete_files_to_merge = [datafile for datafile in release.datafile_list if not datafile.baseline]
Expand Down Expand Up @@ -1210,14 +1211,14 @@ def download_datafiles_from_ftp_server(
reset_ftp_counter: int,
max_download_retry: int,
) -> bool:
"""Download a list of Pubmed Datafiles from their FTP server.
"""Download a list of Pubmed datafiles from their FTP server.
:param datafile_list: List of Datafiles to download from their FTP server.
:param datafile_list: List of datafiles to download from their FTP server.
:param ftp_server_url: FTP server URL.
:param ftp_port: Port for the FTP connection.
:param reset_ftp_counter: After this variable number of files, it will reset the FTP connection
to make sure that the files can be downloaded reliably.
:param max_download_retry: Maximum number of retries for downloading one Datafile before throwing an error.
:param reset_ftp_counter: After this number of files, reset the FTP connection
to make sure that the connect is not reset by the host.
:param max_download_retry: Maximum number of retries for downloading one datafile before throwing an error.
:return download_success: If downloading all of the datafiles were successful.
"""
Expand Down Expand Up @@ -1306,7 +1307,7 @@ def download_datafiles_from_ftp_server(
def transform_pubmed_xml_file_to_jsonl(datafile: Datafile, entity_list: List[PubmedEntity]) -> Union[Datafile, bool]:
"""
Convert a single Pubmed XML file to JSONL, pulling out any of the Pubmed entities and their upserts and or deletes.
Used in parallelised transform sections.
Used in parallelised transform section.
:param datafile: Incoming datafile to transform.
:param entity_list: List of entities to pull out from the datafile.
Expand All @@ -1316,27 +1317,28 @@ def transform_pubmed_xml_file_to_jsonl(datafile: Datafile, entity_list: List[Pub
logging.info(f"Reading in file - {datafile.filename}")

with gzip.open(datafile.download_file_path, "rb") as f_in:
# Use the BioPython library for reading in the Pubmed XML files.
# Use the BioPython package for reading in the Pubmed XML files.
# This package also checks against it's own DTD schema defined in the XML header.
try:
data_dict_dirty1 = Entrez.read(f_in, validate=True)
data_dirty1 = Entrez.read(f_in, validate=True)
except ValidationError:
logging.info(f"Fields in XML are not valid against it's own DTD file - {datafile.filename}")
return False

# Need to have the XML attributes pulled out from the Biopython data classes.
data_dict2 = add_attributes_to_data_from_biopython_classes(data_dict_dirty1)
del data_dict_dirty1
# Need pull out XML attributes from the Biopython data classes.
data_dirty2 = add_attributes_to_data_from_biopython_classes(data_dirty1)
del data_dirty1

# Remove unwanted nested list structure from the Pubmed dictionary.
data_dict = change_pubmed_list_structure(data_dict2)
del data_dict2
data = change_pubmed_list_structure(data_dirty2)
del data_dirty2

for entity in entity_list:
try:
try:
data_part = [retrieve for retrieve in data_dict[entity.set_key][entity.sub_key]]
data_part = [retrieve for retrieve in data[entity.set_key][entity.sub_key]]
except KeyError:
data_part = [retrieve for retrieve in data_dict[entity.sub_key]]
data_part = [retrieve for retrieve in data[entity.sub_key]]

logging.info(
f"Pulled out {len(data_part)} {f'{entity.sub_key} {entity.type}'} from file - {datafile.filename}"
Expand Down Expand Up @@ -1367,7 +1369,9 @@ def add_attributes_to_data_from_biopython_classes(
obj: Union[StringElement, DictionaryElement, ListElement, OrderedListElement, list]
):
"""
Recursively travel down the Pubmed data tree and add attributes from the Biopython classes as dictionary keys.
Recursively travel down the Pubmed data tree to add attributes from Biopython classes as key-value pairs.
Only pulling data from StringElements, DictionaryElements, ListElements and OrderedListElements.
:param obj: Input object, being one of the Biopython data classes.
:return new: Object with attributes added as keys to the dictionary.
Expand Down Expand Up @@ -1424,8 +1428,9 @@ def add_attributes_to_data_from_biopython_classes(

# List of problematic fields that have nested lists.
# Elements in the list are extra fields to append to the same level as the *List field.
# Only for the 2023 schema. May change with a new revision.
bad_list_fields = {
"AuthorIdList": [],
"AuthorList": [],
"ArticleIdList": [],
"AuthorList": ["CompleteYN", "Type"],
"GrantList": ["CompleteYN"],
Expand All @@ -1437,6 +1442,11 @@ def add_attributes_to_data_from_biopython_classes(
"InvestigatorList": [],
"PublicationTypeList": [],
"ObjectList": [],
# The following are taken care of with if statements as they are special cases:
# KeywordList
# SupplMeshList
# DataBankList
# AccessionNumberList
}


Expand All @@ -1449,16 +1459,21 @@ def change_pubmed_list_structure(
For example, the original data can look something like
{
ArticleIdList: {
"ArticleId": [{ "value": "12345", "Type": "pubmed" }]
"AuthorList": {
"CompleteYN": "Y",
"Author": [{ "First": "Foo", "Last": "Bar" },
{ "First": "James", "Last": "Bond" }]
}
}
The "ArticleId" field name will be removed and the data from it will be moved up
to the "List" level:
The "Author" field will be removed and the data from it will be moved up
to the "List" level, along with any data specified in the "bad_list_fields" dictionary:
{
"ArticleIdList": [{ "value": "12345", "Type": "pubmed" }]
"AuthorListCompleteYN": "Y",
"AuthorListType": None,
"AuthorList": [{ "First": "Foo", "Last": "Bar" },
{ "First": "James", "Last": "Bond" }]
}
:param obj: Incoming data object.
Expand Down
144 changes: 97 additions & 47 deletions docs/telescopes/pubmed.md
Original file line number Diff line number Diff line change
@@ -1,78 +1,128 @@
# Pubmed
# Pubmed Telescope

((( DRAFT, please edit as necessary )))

The Pubmed Medline database is a bibliographioc database of over 29 million medical related citations over the last 30 years.
The Pubmed Medline database is a bibliographioc database of over 35 million medical related citations over the last 30 years.

More information on the database and the fields present in the data can be found here:

https://www.nlm.nih.gov/medline/medline_overview.html

## Telescope workflow

This workflow for Pubmed Medline database downloads the baseline yearly snaphot and applies the addition and deletion updates weekly, storing the raw, transformed and final data in Googles Cloud Storage and Bigquery.
## Workflow

## Download
This workflow processes the Pubmed Medline database by downloading the yearly snaphot and applies the necessary changes in weekly intervals while storing the raw, transformed and final tables in Googles Cloud Storage and Bigquery.

The Baseline records are release December of each year, the last being released on 2022-12-08. The URL to the FTP server for the 'baseline' files is

https://ftp.ncbi.nlm.nih.gov/pubmed/baseline/
Pubmed's Medline database is split up into two parts; baseline and updatefiles. The 'baseline' portion is the yearly snapshot and the 'updatefiles' are the additions and or edits to the database that are released daily throughout the year. Each year the 'baseline' is re-released with the all of the previous 'updatefiles' and 'baseline' compiled together.

If it is the first run of the workflow, the telescope only processes the baseline portion of the Pubmed Medline database.

Subsequent updatefiles to modify the Pubmed database are released 7-days a week.

https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/
If it is the first run of the year for the workflow, it will download and process the baseline snapshot and any updatefiles that were released from the baseline release date until the execution date of the workflow. Subsequent runs of the workflow are scheduled at weekly intervals and will merge the updatefile modifications and apply the changes to the main table (in Bigquery) all at once. If it is a new yearly run of the workflow, it will download the new baseline release and apply any changes from updatefiles since the baseline release date.

The telescope runs weekly and finds all updatefiles uploaded onto the server within that time period and apply the changes to main table on Google Biguqery.
The baseline files from the Pubmed essentially only hold records to upsert to the main table, whereas the updatefiles can hold both upserts and a list of records to delete.

All files that are downloaded and are also uploaded to Google Cloud Storage for archival.
### Download

## Tranform
The Baseline yearly snapshots are released in December of each year. The URL to the FTP server for the 'baseline' files is

All files downloaded from Pubmed have to be transformed from a compressed XML into a strict format such as \*.jsonl.gz to be import into Google Bigquery.

The Biopython package (TODO link to package) is used to read-in, parse and verify the Pubmed XMLs against it's own schema DTD file (TODO link to schema file).
All entities such as Pubmed Articles, Book Articles, Book Documents, Delete Citation and Delete Document are defined in the DTD schema file, however
only Pubmed Articles and Delete Citation fields are present in the baseline and updatefiles.

After cahngefiles are transformed, they are uploaded to Google Cloud Storage for archive and ingested into Biguqery using a glob pattern.
https://ftp.ncbi.nlm.nih.gov/pubmed/baseline/

The schema for the Pubmed Article table was derived from the DTD file. It was firstly converted from DTD to XSD using IntelliJ IDEA and was manually gone through
to make sure no fields were missed.
and similarly the updatefiles are released daily

Due to how XMLs can be stored, there can be multiple didferent types of data can present in a field. For example, AbstractText is most commonly a string, however there are times
where maths formula involved which create times where strings and arrays of strings can be mixed together, but Bigquery does not permit this. As a workaround, known text fields
with issues are written to file as a string ONLY, which allows the Pubmed records to be imported into Bigquery.
https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/

The Biguqery version of the Pubmed Article schema (including field descriptions) for the 2023 release of Pubmed can be found here:
Updatefiles can hold both records to upsert and delete. If there are more than 30,000 upserts, there are multiple updatefiles released for that day. All files that are downloaded and are also uploaded to Google Cloud Storage for archival purposes.

(link to github repo of schema file)
### Transform

## Applying changefiles
All files downloaded from Pubmed have to be transformed from a compressed XML into a well defined json format so it can be import into Google Bigquery.

As mentioned previously, daily updatefiles for Pubmed are collected over a week period at a time and applied to the table all at once.
This is to reduce computation and Bigquery cost, as the main table ends up being 100 Gb, which will cost a lot to run and query the table daily.
The package "Biopython" is used to parse and verify the Pubmed XMLs against it's own schema DTD file (defined in the header of each XML file). The resulting data dictionary from Biopython's `Entrez.read` holds the XML attributes on it's own data-classes. The attributes such as "CompleteYN" and others are pulled out and added as their own keys to the records.

Additions and deletions are applied using the PMID and Version values.
In this transform step, XML lists are also simplified as the original XML structure can be difficult to query in Bigquery. Multiple unnests and aggregations are needed to pull data from particular fields. To reduce the complexity of the SQL quries for the table, the original XML-like formated data could be in the form of,

If there is an updated record in in the same week period, only the newest record is kept and merged with the main Pubmed Article table on Bigquery.
All Delete Citation records are merged and removed from the main table all at once.
```json
{
"AuthorList": [
{
"CompleteYN": "Y",
"Author": [
{ "First": "Foo", "Last": "Bar" },
{ "First": "James", "Last": "Bond" }
]
}
]
}
```

Steps of how Pubmed's changefiles are applied on Biquery:
To simplify the above, the "Author" field will be removed and the data from it will be moved up
to the "List" level, along with any data specified under the List field,

```json
{
"AuthorListCompleteYN": "Y",
"AuthorList": [
{ "First": "Foo", "Last": "Bar" },
{ "First": "James", "Last": "Bond" }
]
}
```

0. A backup copy of the main table is made before the additions and deletions are applied, just in case there is a problem part way through the update process.
1. The main table is queried to find the PMIDs that are to be upddated. These records are deleted from the table.
2. The list of record to be updated are then appended to the main table.
3. The main table is then queried again to find all records to be deleted, matching on both PMID and Version, and then deleting those records.
The following list-like fields that have their structure modified in the workflow are:

- AuthorList
- ArticleIdList
- AuthorList
- GrantList
- ChemicalList
- CommentsCorrectionsList
- GeneSymbolList
- MeshHeadingList
- PersonalNameSubjectList
- InvestigatorList
- PublicationTypeList
- ObjectList
- KeywordList
- SupplMeshList
- DataBankList
- AccessionNumberList

Additionally, some fields that are commonly strings such as "AbstractText" could also be a list of values that include formulas or could be split up with fields such as Backgroud, Methods, etc. Since the data has to be well defined for importing into Bigquery, text fields that have this problem are forced to be strings when written to file using a custom encoder. The fields that are forced to be strings are:

- AbstractText
- Affiliation
- ArticleTitle
- b
- BookTitle
- Citation
- CoiStatement
- CollectionTitle
- CollectiveName
- i
- Param
- PublisherName
- SectionTitle
- sub
- Suffix
- sup
- u
- VernacularTitle
- VolumeTitle

All transformed files are uploaded into Google Cloud Storage for ingesting the data into Bigquery.

### Applying upserts and deletes

Merging both upsert and delete records are done to reduce Bigquery cost, as the main table ends up being approximatelly ~100 Gb. Everytime an upsert or delete is applied, the entire table has to be queried for the records to upsert and delete and doing that query for each daily updatefile will add the cost up quickly.

It is important to note that each upsert record contains the entire record again including the newer updated information, not just the new information for that particular record. Thus when merging the upsert records for a release, only the newest available upsert records are kept and applied to the main table. All the upsert records for a release period are merged and ingested into a date sharded table called "upsert". Additionally, delete records can sometimes appear multiple times in multiple different updatefiles. Duplicates are removed and ingested into a date sharded table called "delete". These upsert and delete tables are set to expire in 7 days after being created.

Upserts and delete records are applied by matching on the PMID value and the Version number of a record. A backup is taken of the main table before any of the upserts and deletions are applied. The backup table is set to expire in 31 days after it was created (to reduce table storage costs in Bigquery).

# Workflow Summary

```eval_rst
+------------------------------+-----------------------------------------+
| Summary | |
+==============================+=========================================+
| Average runtime | 6 hrs baseline, 5-20 min weekly updates |
| Average runtime | 6-8 hrs baseline,20 min weekly updates |
+------------------------------+-----------------------------------------+
| Average download size | 80-100gb baseline, ~500mb weekly |
| Average download size | ~100gb baseline, ~500mb weekly |
+------------------------------+-----------------------------------------+
| Harvest Type | FTP transfer |
+------------------------------+-----------------------------------------+
Expand All @@ -82,7 +132,7 @@ Steps of how Pubmed's changefiles are applied on Biquery:
+------------------------------+-----------------------------------------+
| Catchup missed runs | False |
+------------------------------+-----------------------------------------+
| Table Write Disposition | Append |
| Table Write Disposition | Write Truncate |
+------------------------------+-----------------------------------------+
| Provider Update Frequency | Daily |
+------------------------------+-----------------------------------------+
Expand Down

0 comments on commit e41a95b

Please sign in to comment.