From 696bcb487b98dd80509a0d71dd5cccb16d0ca027 Mon Sep 17 00:00:00 2001 From: yifangchen Date: Tue, 16 Oct 2018 10:35:08 -0400 Subject: [PATCH 1/3] retry on failure --- .../libs/vcf_file_composer.py | 42 +++++++++++++++---- 1 file changed, 35 insertions(+), 7 deletions(-) diff --git a/gcp_variant_transforms/libs/vcf_file_composer.py b/gcp_variant_transforms/libs/vcf_file_composer.py index a6eeee776..3f230814d 100644 --- a/gcp_variant_transforms/libs/vcf_file_composer.py +++ b/gcp_variant_transforms/libs/vcf_file_composer.py @@ -14,6 +14,7 @@ """Composes multiple files in GCS to one VCF file.""" +import logging import multiprocessing from typing import Iterable, List # pylint: disable=unused-import @@ -24,6 +25,8 @@ # Cloud Storage allows to compose up to 32 objects. _MAX_NUM_OF_BLOBS_PER_COMPOSE = 32 +_NUMBER_OF_API_CALL_RETRIES = 5 +_TIMEOUT = 30 def compose_gcs_vcf_shards(project, # type: str @@ -175,20 +178,45 @@ def _compose_blobs_to_one(self, blob_prefix): The final blob that all blobs with `blob_prefix` composed to. """ blobs_to_be_composed = list(self._bucket.list_blobs(prefix=blob_prefix)) + logging.info('Total number of blobs are %d.', len(blobs_to_be_composed)) if len(blobs_to_be_composed) == 1: return blobs_to_be_composed[0] new_blob_prefix = filesystems.FileSystems.join(blob_prefix, 'composed_') - - proc_pool = multiprocessing.Pool() + arguments = [] for blob_names in self._break_list_in_chunks(blobs_to_be_composed, _MAX_NUM_OF_BLOBS_PER_COMPOSE): _, file_name = filesystems.FileSystems.split(blob_names[0]) new_blob_name = ''.join([new_blob_prefix, file_name]) - proc_pool.apply_async( - func=_compose_files, - args=(self._project, self._bucket_name, blob_names, new_blob_name)) - proc_pool.close() - proc_pool.join() + arguments.append( + (self._project, self._bucket_name, blob_names, new_blob_name)) + + retry = 0 + while arguments: + proc_pool = multiprocessing.Pool(processes=8) + results = [] + failed_composing_arguments = [] + for argument in arguments: + results.append(proc_pool.apply_async(func=_compose_files, + args=argument)) + proc_pool.close() + for result, argument in zip(results, arguments): + try: + result.get(_TIMEOUT) + except multiprocessing.TimeoutError: + logging.warning('Aborting the composing of blobs (%s to %s) due to ' + 'timeout.', argument[2][0], argument[2][-1]) + failed_composing_arguments.append(argument) + + arguments = failed_composing_arguments + retry += 1 + if arguments: + if retry > _NUMBER_OF_API_CALL_RETRIES: + raise RuntimeError('Composing of blobs fails after {} ' + 'retries.'.format(_NUMBER_OF_API_CALL_RETRIES)) + else: + logging.warning( + '%d jobs of composing of blobs failed due to timeout. Retry for ' + 'the %d time.', len(arguments), retry) return self._compose_blobs_to_one(new_blob_prefix) def _break_list_in_chunks(self, blob_list, chunk_size): From 1dba8627d1a9cdbe59f81a9e669774c702624b48 Mon Sep 17 00:00:00 2001 From: Allie Chen Date: Wed, 17 Oct 2018 13:15:47 -0400 Subject: [PATCH 2/3] address the comments --- .../libs/vcf_file_composer.py | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/gcp_variant_transforms/libs/vcf_file_composer.py b/gcp_variant_transforms/libs/vcf_file_composer.py index 3f230814d..e73b22cfe 100644 --- a/gcp_variant_transforms/libs/vcf_file_composer.py +++ b/gcp_variant_transforms/libs/vcf_file_composer.py @@ -25,8 +25,8 @@ # Cloud Storage allows to compose up to 32 objects. _MAX_NUM_OF_BLOBS_PER_COMPOSE = 32 -_NUMBER_OF_API_CALL_RETRIES = 5 -_TIMEOUT = 30 +_MAX_NUM_OF_COMPOSE_RETRIES = 5 +_COMPOSE_TIMEOUT_SECONDS = 30 def compose_gcs_vcf_shards(project, # type: str @@ -182,41 +182,43 @@ def _compose_blobs_to_one(self, blob_prefix): if len(blobs_to_be_composed) == 1: return blobs_to_be_composed[0] new_blob_prefix = filesystems.FileSystems.join(blob_prefix, 'composed_') - arguments = [] + blobs_to_compose_args = [] for blob_names in self._break_list_in_chunks(blobs_to_be_composed, _MAX_NUM_OF_BLOBS_PER_COMPOSE): _, file_name = filesystems.FileSystems.split(blob_names[0]) new_blob_name = ''.join([new_blob_prefix, file_name]) - arguments.append( + blobs_to_compose_args.append( (self._project, self._bucket_name, blob_names, new_blob_name)) - retry = 0 - while arguments: + num_retries = 0 + while num_retries <= _MAX_NUM_OF_COMPOSE_RETRIES: proc_pool = multiprocessing.Pool(processes=8) results = [] - failed_composing_arguments = [] - for argument in arguments: - results.append(proc_pool.apply_async(func=_compose_files, - args=argument)) + for arg in blobs_to_compose_args: + results.append(proc_pool.apply_async(func=_compose_files, args=arg)) proc_pool.close() - for result, argument in zip(results, arguments): + + failed_blobs_to_compose_args = [] + for result, argument in zip(results, blobs_to_compose_args): try: - result.get(_TIMEOUT) + result.get(_COMPOSE_TIMEOUT_SECONDS) except multiprocessing.TimeoutError: logging.warning('Aborting the composing of blobs (%s to %s) due to ' 'timeout.', argument[2][0], argument[2][-1]) - failed_composing_arguments.append(argument) - - arguments = failed_composing_arguments - retry += 1 - if arguments: - if retry > _NUMBER_OF_API_CALL_RETRIES: - raise RuntimeError('Composing of blobs fails after {} ' - 'retries.'.format(_NUMBER_OF_API_CALL_RETRIES)) - else: - logging.warning( - '%d jobs of composing of blobs failed due to timeout. Retry for ' - 'the %d time.', len(arguments), retry) + failed_blobs_to_compose_args.append(argument) + + if failed_blobs_to_compose_args: + num_retries += 1 + blobs_to_compose_args = failed_blobs_to_compose_args + logging.warning( + '%d jobs of composing of blobs failed due to timeout. Retrying for ' + '%d of %d time.', len(blobs_to_compose_args), num_retries, + _MAX_NUM_OF_COMPOSE_RETRIES) + else: + break + else: + raise RuntimeError('Composing of blobs fails after {} ' + 'retries.'.format(_MAX_NUM_OF_COMPOSE_RETRIES)) return self._compose_blobs_to_one(new_blob_prefix) def _break_list_in_chunks(self, blob_list, chunk_size): From 1b9f15caef2ac01973d363f67d90c5b74fc45e5b Mon Sep 17 00:00:00 2001 From: Allie Chen Date: Wed, 17 Oct 2018 13:49:12 -0400 Subject: [PATCH 3/3] address the comments --- gcp_variant_transforms/libs/vcf_file_composer.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/gcp_variant_transforms/libs/vcf_file_composer.py b/gcp_variant_transforms/libs/vcf_file_composer.py index e73b22cfe..45246b090 100644 --- a/gcp_variant_transforms/libs/vcf_file_composer.py +++ b/gcp_variant_transforms/libs/vcf_file_composer.py @@ -211,13 +211,13 @@ def _compose_blobs_to_one(self, blob_prefix): num_retries += 1 blobs_to_compose_args = failed_blobs_to_compose_args logging.warning( - '%d jobs of composing of blobs failed due to timeout. Retrying for ' - '%d of %d time.', len(blobs_to_compose_args), num_retries, + '%d jobs of composing of blobs failed due to timeout. Retrying ' + '%d of %d.', len(blobs_to_compose_args), num_retries, _MAX_NUM_OF_COMPOSE_RETRIES) else: break else: - raise RuntimeError('Composing of blobs fails after {} ' + raise RuntimeError('Composing of blobs failed after {} ' 'retries.'.format(_MAX_NUM_OF_COMPOSE_RETRIES)) return self._compose_blobs_to_one(new_blob_prefix)