Skip to content

Commit

Permalink
Parallised merge updatefiles function
Browse files Browse the repository at this point in the history
  • Loading branch information
alexmassen-hane committed Jun 27, 2023
1 parent d017754 commit 90b57b9
Showing 1 changed file with 88 additions and 40 deletions.
128 changes: 88 additions & 40 deletions academic_observatory_workflows/workflows/pubmed_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import os
import gzip
import json
import math
import logging
import pendulum
from datetime import timedelta
Expand Down Expand Up @@ -418,7 +419,7 @@ def __init__(
self.batch_size = batch_size

# Required file size of the update files.
self.merged_file_size = 4 # Gb
self.merged_file_size = 3.8 # Gb

# After how many downloads to reset the connection to Pubmed's FTP server.
self.reset_ftp_counter = reset_ftp_counter
Expand Down Expand Up @@ -588,6 +589,9 @@ def make_release(self, **kwargs) -> PubMedRelease:
# Sort the incoming list.
changefile_list.sort(key=lambda c: c.file_index, reverse=False)

# limit to the first 20 files for testing
changefile_list = changefile_list[:100]

run_id = kwargs["run_id"]
dag_run = kwargs["dag_run"]
is_first_run = is_first_dag_run(dag_run)
Expand Down Expand Up @@ -752,7 +756,7 @@ def merge_updatefiles(self, release: PubMedRelease, **kwargs):
Check through the changefiles and only grab the newest records for a particular PMID and version number of the article.
This is to reduce the number of quiries done on the main table in BQ.
If it is the first run of the workflow, it will merge the baseline changefiles into 4Gb chunks.
If it is the first run of the workflow, it will merge the baseline changefiles into more appropriate sized chunks.
"""

# # Grab list of changefiles that were transformed in previous task.
Expand All @@ -769,52 +773,61 @@ def merge_updatefiles(self, release: PubMedRelease, **kwargs):

# Loop through additions or deletions
for entity in self.entity_list:
merged_data = []
file_size_sum = 0.0
merge_part = 1
merged_updatefiles[entity.name] = []

first_file_index = files_to_merge[0].file_index
# Get the size of all the updatefiles
file_size_sum = 0
for changefile in files_to_merge:
transform_file = changefile.transform_file_path(entity.type)

# Get file size in Gigabytes
transform_file_stats = os.stat(transform_file)
transform_file_size = transform_file_stats.st_size / 1024.0**3
file_size_sum += transform_file_size

logging.info(f"Transform file size - {transform_file_size}")
logging.info(f"Total size of updatefiles for {entity.type} for this release: {file_size_sum} ")

# Read in file
with gzip.open(transform_file, "rb") as f_in:
data = [json.loads(line) for line in f_in]
merged_data.extend(data)
num_chunks = math.ceil(file_size_sum / self.merged_file_size)

# If the file is the last in the list or if the running total of the file_size exceeds the requirement.
if (
changefile == files_to_merge[-1]
or (file_size_sum + transform_file_size) >= self.merged_file_size
):
merged_output_file = changefile.merged_transform_file_path(
entity.type, first_file_index, changefile.file_index, merge_part
)
logging.info(f"Aproximate size of each merged: {file_size_sum/num_chunks} Gb")

logging.info(f"Size of merged output file - {file_size_sum + transform_file_size}")
logging.info(f"Writing out to file - {merged_output_file}")
if num_chunks == 1:
logging.info(f"There were will be 1 part for the merged updatefiles.")

# Write file out to file.
with gzip.open(merged_output_file, "w") as f_out:
for line in merged_data:
f_out.write(str.encode(json.dumps(line, cls=PubMedCustomEncoder) + "\n"))
# Only one chunk to process for required file size, do in serial.
merged_updatefile_path = merge_changefiles_together(files_to_merge, 1, entity.type)
merged_updatefiles[entity.name].append(merged_updatefile_path)
logging.info(f"Successfully merged updatefiles to - {merged_updatefile_path}")

merged_updatefiles[entity.name] = merged_output_file
else:
chunk_size = math.floor(len(files_to_merge) / num_chunks)

Check warning on line 801 in academic_observatory_workflows/workflows/pubmed_telescope.py

View check run for this annotation

Codecov / codecov/patch

academic_observatory_workflows/workflows/pubmed_telescope.py#L801

Added line #L801 was not covered by tests
chunks = [
chunk for i, chunk in enumerate(get_chunks(input_list=files_to_merge, chunk_size=chunk_size))
]

# Reset variables for next part.
merge_part += 1
file_size_sum = 0
merged_data = []
first_file_index = changefile.file_index + 1
if num_chunks > self.max_processes:
processes_to_use = self.max_processes

Check warning on line 807 in academic_observatory_workflows/workflows/pubmed_telescope.py

View check run for this annotation

Codecov / codecov/patch

academic_observatory_workflows/workflows/pubmed_telescope.py#L807

Added line #L807 was not covered by tests

else:
file_size_sum += transform_file_size
processes_to_use = num_chunks

Check warning on line 810 in academic_observatory_workflows/workflows/pubmed_telescope.py

View check run for this annotation

Codecov / codecov/patch

academic_observatory_workflows/workflows/pubmed_telescope.py#L810

Added line #L810 was not covered by tests

logging.info(f"There were will be {len(chunks)} parts for the merged updatefiles.")

Check warning on line 812 in academic_observatory_workflows/workflows/pubmed_telescope.py

View check run for this annotation

Codecov / codecov/patch

academic_observatory_workflows/workflows/pubmed_telescope.py#L812

Added line #L812 was not covered by tests

# Multiple output for merged files, do in parallel.
for j, sub_chunks in enumerate(get_chunks(input_list=chunks, chunk_size=processes_to_use)):
# Pass off each chunk to a process for them to merge files in parallel.
with ProcessPoolExecutor(max_workers=processes_to_use) as executor:
futures = []

Check warning on line 818 in academic_observatory_workflows/workflows/pubmed_telescope.py

View check run for this annotation

Codecov / codecov/patch

academic_observatory_workflows/workflows/pubmed_telescope.py#L817-L818

Added lines #L817 - L818 were not covered by tests
for i, chunk in enumerate(sub_chunks):
futures.append(

Check warning on line 820 in academic_observatory_workflows/workflows/pubmed_telescope.py

View check run for this annotation

Codecov / codecov/patch

academic_observatory_workflows/workflows/pubmed_telescope.py#L820

Added line #L820 was not covered by tests
executor.submit(
merge_changefiles_together, chunk, i + 1 + j * processes_to_use, entity.type
)
)

for future in as_completed(futures):
merged_updatefile_path = future.result()
merged_updatefiles[entity.name].append(merged_updatefile_path)

Check warning on line 828 in academic_observatory_workflows/workflows/pubmed_telescope.py

View check run for this annotation

Codecov / codecov/patch

academic_observatory_workflows/workflows/pubmed_telescope.py#L827-L828

Added lines #L827 - L828 were not covered by tests

logging.info(f"Successfully merged updatefiles to - {merged_updatefile_path}")

Check warning on line 830 in academic_observatory_workflows/workflows/pubmed_telescope.py

View check run for this annotation

Codecov / codecov/patch

academic_observatory_workflows/workflows/pubmed_telescope.py#L830

Added line #L830 was not covered by tests

# For each updatefile, only keep the newest record for a PMID and version.
else:
Expand All @@ -827,7 +840,7 @@ def merge_updatefiles(self, release: PubMedRelease, **kwargs):
# Find newest records from the updatefiles
# Clear last entity
merged_data = []

merged_updatefiles[entity.name] = []
update_counter = 0

first_file_index = files_to_merge[0].file_index
Expand Down Expand Up @@ -869,7 +882,7 @@ def merge_updatefiles(self, release: PubMedRelease, **kwargs):
entity.type, first_file_index, changefile.file_index
)

merged_updatefiles[entity.name] = merged_output_file
merged_updatefiles[entity.name].append(merged_output_file)

logging.info(f"Writing to file - {merged_output_file}")

Expand All @@ -889,7 +902,9 @@ def upload_transformed(self, release: PubMedRelease, **kwargs):

ti: TaskInstance = kwargs["ti"]
merged_updatefiles = ti.xcom_pull(key="merged_updatefiles")
files_to_upload = list(merged_updatefiles.values())
files_to_upload = []
for entity in self.entity_list:
files_to_upload.extend(merged_updatefiles[entity.name])

logging.info(f"files_to_upload - {files_to_upload}")

Expand All @@ -914,7 +929,6 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs):
# If its the first run of the workflow, only upload the baseline table to the main table.
if release.is_first_run:
# Create the dataset if it doesn't exist already.

bq_create_dataset(
project_id=self.cloud_workspace.project_id,
dataset_id=self.bq_dataset_id,
Expand Down Expand Up @@ -962,7 +976,7 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs):
table_id = bq_sharded_table_id(
self.cloud_workspace.project_id, self.bq_dataset_id, entity.name, date=release.end_date
)
merged_transform_file_uri = f"gs://{self.cloud_workspace.transform_bucket}/{gcs_blob_name_from_path(merged_updatefiles[entity.name])}"
merged_transform_blob_pattern = release.merged_transfer_blob_pattern(entity_type=entity.type)

# Delete old table just in case there was a bad previous run.
# We don't want to append or add any dupliactes to the tables.
Expand All @@ -976,7 +990,7 @@ def bq_ingest_update_tables(self, release: PubMedRelease, **kwargs):
logging.info(f"Uploading to table - {table_id}")

success = bq_load_table(
uri=merged_transform_file_uri,
uri=merged_transform_blob_pattern,
table_id=table_id,
source_format=SourceFormat.NEWLINE_DELIMITED_JSON,
schema_file_path=entity.schema_file_path,
Expand Down Expand Up @@ -1135,6 +1149,7 @@ def transform_pubmed_xml_file_to_jsonl(

# Need to have the XML attributes pulled out from the Biopython data classes.
data_dict = add_attributes_to_data_from_biopython_classes(data_dict_dirty)
data_dict_dirty = []

# Loop through the Pubmed record metadata, pulling out additions or deletions.
for entity in entity_list:
Expand Down Expand Up @@ -1170,6 +1185,39 @@ def transform_pubmed_xml_file_to_jsonl(
return changefile


def merge_changefiles_together(changefile_list: List[Changefile], part_num: int, entity_type: str) -> str:
"""Merge a given list of changefiles together.
:param changefile_list: List of changefiles to merge together.
:param part_num: Part number of the merge file from the greater list of changefiles.
:param entity_type: Type of changefiles given, for helping with name of the output file.
:return merged_output_file: Path to the resultant output file.
"""

merged_output_file = changefile_list[0].merged_transform_file_path(
entity_type, changefile_list[0].file_index, changefile_list[-1].file_index, part_num
)

logging.info(f"Writing data to file - {merged_output_file}")

# Read in data and directly write it out to file to avoid holding too much
# data in memory and crashing the workflow.
with gzip.open(merged_output_file, "w") as f_out:
# Loop through the set of given changefiles
for changefile_sub in changefile_list:
transform_file_sub = changefile_sub.transform_file_path(entity_type)

# Read in file
with gzip.open(transform_file_sub, "rb") as f_in:
data = [json.loads(line) for line in f_in]

# Write directly to output file.
for line in data:
f_out.write(str.encode(json.dumps(line, cls=PubMedCustomEncoder) + "\n"))

return merged_output_file


def add_attributes_to_data_from_biopython_classes(
obj: Union[StringElement, DictionaryElement, ListElement, OrderedListElement, list]
):
Expand Down

0 comments on commit 90b57b9

Please sign in to comment.