Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 37 additions & 7 deletions gcp_variant_transforms/libs/vcf_file_composer.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

"""Composes multiple files in GCS to one VCF file."""

import logging
import multiprocessing
from typing import Iterable, List # pylint: disable=unused-import

Expand All @@ -24,6 +25,8 @@

# Cloud Storage allows to compose up to 32 objects.
_MAX_NUM_OF_BLOBS_PER_COMPOSE = 32
_MAX_NUM_OF_COMPOSE_RETRIES = 5
_COMPOSE_TIMEOUT_SECONDS = 30


def compose_gcs_vcf_shards(project, # type: str
Expand Down Expand Up @@ -175,20 +178,47 @@ def _compose_blobs_to_one(self, blob_prefix):
The final blob that all blobs with `blob_prefix` composed to.
"""
blobs_to_be_composed = list(self._bucket.list_blobs(prefix=blob_prefix))
logging.info('Total number of blobs are %d.', len(blobs_to_be_composed))
if len(blobs_to_be_composed) == 1:
return blobs_to_be_composed[0]
new_blob_prefix = filesystems.FileSystems.join(blob_prefix, 'composed_')

proc_pool = multiprocessing.Pool()
blobs_to_compose_args = []
for blob_names in self._break_list_in_chunks(blobs_to_be_composed,
_MAX_NUM_OF_BLOBS_PER_COMPOSE):
_, file_name = filesystems.FileSystems.split(blob_names[0])
new_blob_name = ''.join([new_blob_prefix, file_name])
proc_pool.apply_async(
func=_compose_files,
args=(self._project, self._bucket_name, blob_names, new_blob_name))
proc_pool.close()
proc_pool.join()
blobs_to_compose_args.append(
(self._project, self._bucket_name, blob_names, new_blob_name))

num_retries = 0
while num_retries <= _MAX_NUM_OF_COMPOSE_RETRIES:
proc_pool = multiprocessing.Pool(processes=8)
results = []
for arg in blobs_to_compose_args:
results.append(proc_pool.apply_async(func=_compose_files, args=arg))
proc_pool.close()

failed_blobs_to_compose_args = []
for result, argument in zip(results, blobs_to_compose_args):
try:
result.get(_COMPOSE_TIMEOUT_SECONDS)
except multiprocessing.TimeoutError:
logging.warning('Aborting the composing of blobs (%s to %s) due to '
'timeout.', argument[2][0], argument[2][-1])
failed_blobs_to_compose_args.append(argument)

if failed_blobs_to_compose_args:
num_retries += 1
blobs_to_compose_args = failed_blobs_to_compose_args
logging.warning(
'%d jobs of composing of blobs failed due to timeout. Retrying '
'%d of %d.', len(blobs_to_compose_args), num_retries,
_MAX_NUM_OF_COMPOSE_RETRIES)
else:
break
else:
raise RuntimeError('Composing of blobs failed after {} '
'retries.'.format(_MAX_NUM_OF_COMPOSE_RETRIES))
return self._compose_blobs_to_one(new_blob_prefix)

def _break_list_in_chunks(self, blob_list, chunk_size):
Expand Down