diff --git a/gcp_variant_transforms/libs/vcf_file_composer.py b/gcp_variant_transforms/libs/vcf_file_composer.py index a6eeee776..45246b090 100644 --- a/gcp_variant_transforms/libs/vcf_file_composer.py +++ b/gcp_variant_transforms/libs/vcf_file_composer.py @@ -14,6 +14,7 @@ """Composes multiple files in GCS to one VCF file.""" +import logging import multiprocessing from typing import Iterable, List # pylint: disable=unused-import @@ -24,6 +25,8 @@ # Cloud Storage allows to compose up to 32 objects. _MAX_NUM_OF_BLOBS_PER_COMPOSE = 32 +_MAX_NUM_OF_COMPOSE_RETRIES = 5 +_COMPOSE_TIMEOUT_SECONDS = 30 def compose_gcs_vcf_shards(project, # type: str @@ -175,20 +178,47 @@ def _compose_blobs_to_one(self, blob_prefix): The final blob that all blobs with `blob_prefix` composed to. """ blobs_to_be_composed = list(self._bucket.list_blobs(prefix=blob_prefix)) + logging.info('Total number of blobs are %d.', len(blobs_to_be_composed)) if len(blobs_to_be_composed) == 1: return blobs_to_be_composed[0] new_blob_prefix = filesystems.FileSystems.join(blob_prefix, 'composed_') - - proc_pool = multiprocessing.Pool() + blobs_to_compose_args = [] for blob_names in self._break_list_in_chunks(blobs_to_be_composed, _MAX_NUM_OF_BLOBS_PER_COMPOSE): _, file_name = filesystems.FileSystems.split(blob_names[0]) new_blob_name = ''.join([new_blob_prefix, file_name]) - proc_pool.apply_async( - func=_compose_files, - args=(self._project, self._bucket_name, blob_names, new_blob_name)) - proc_pool.close() - proc_pool.join() + blobs_to_compose_args.append( + (self._project, self._bucket_name, blob_names, new_blob_name)) + + num_retries = 0 + while num_retries <= _MAX_NUM_OF_COMPOSE_RETRIES: + proc_pool = multiprocessing.Pool(processes=8) + results = [] + for arg in blobs_to_compose_args: + results.append(proc_pool.apply_async(func=_compose_files, args=arg)) + proc_pool.close() + + failed_blobs_to_compose_args = [] + for result, argument in zip(results, blobs_to_compose_args): + try: + result.get(_COMPOSE_TIMEOUT_SECONDS) + except multiprocessing.TimeoutError: + logging.warning('Aborting the composing of blobs (%s to %s) due to ' + 'timeout.', argument[2][0], argument[2][-1]) + failed_blobs_to_compose_args.append(argument) + + if failed_blobs_to_compose_args: + num_retries += 1 + blobs_to_compose_args = failed_blobs_to_compose_args + logging.warning( + '%d jobs of composing of blobs failed due to timeout. Retrying ' + '%d of %d.', len(blobs_to_compose_args), num_retries, + _MAX_NUM_OF_COMPOSE_RETRIES) + else: + break + else: + raise RuntimeError('Composing of blobs failed after {} ' + 'retries.'.format(_MAX_NUM_OF_COMPOSE_RETRIES)) return self._compose_blobs_to_one(new_blob_prefix) def _break_list_in_chunks(self, blob_list, chunk_size):