diff --git a/gcp_variant_transforms/libs/annotation/vep/vep_runner.py b/gcp_variant_transforms/libs/annotation/vep/vep_runner.py index c2975a1f6..efd7c137a 100644 --- a/gcp_variant_transforms/libs/annotation/vep/vep_runner.py +++ b/gcp_variant_transforms/libs/annotation/vep/vep_runner.py @@ -67,8 +67,9 @@ _NUMBER_OF_API_CALL_RETRIES = 5 -def create_runner(known_args, pipeline_args, input_pattern, watchdog_file): - # type: (argparse.Namespace, List[str], str, Optional[str]) -> VepRunner +def create_runner(known_args, pipeline_args, input_pattern, watchdog_file, + watchdog_file_update_interval_seconds): + # type: (argparse.Namespace, List[str], str, Optional[str], int) -> VepRunner """Returns an instance of VepRunner using the provided args. Args: @@ -77,8 +78,10 @@ def create_runner(known_args, pipeline_args, input_pattern, watchdog_file): determine resources like number of workers, machine type, etc. input_pattern: The VCF files to be annotated. watchdog_file: The file that will be updated by the Dataflow worker every - `_POLLING_INTERVAL_SECONDS`. Once the file is found to be stale, the VEP - process running in the VM will be killed. + `watchdog_file_update_interval_seconds`. Once the file is found to be + stale, the VEP process running in the VM will be killed. + watchdog_file_update_interval_seconds: The `watchdog_file` will be updated + by the Dataflow worker every `watchdog_file_update_interval_seconds`. """ credentials = client.GoogleCredentials.get_application_default() pipeline_service = discovery.build( @@ -88,7 +91,7 @@ def create_runner(known_args, pipeline_args, input_pattern, watchdog_file): input_pattern, known_args.annotation_output_dir, known_args.vep_info_field, known_args.vep_image_uri, known_args.vep_cache_path, known_args.vep_num_fork, pipeline_args, - watchdog_file) + watchdog_file, watchdog_file_update_interval_seconds) return runner @@ -110,7 +113,8 @@ def __init__( vep_cache_path, # type: str vep_num_fork, # type: int pipeline_args, # type: List[str] - watchdog_file=None, # type: str + watchdog_file, # type: Optional[str] + watchdog_file_update_interval_seconds, # type: int ): # type: (...) -> None """Constructs an instance for running VEP. @@ -129,8 +133,10 @@ def __init__( running Beam; for simplicity we use the same arguments to decide how many and what type of workers to use, where to run, etc. watchdog_file: The file that will be updated by the Dataflow worker every - `_POLLING_INTERVAL_SECONDS`. Once the file is found to be stale, the VEP - process running in the VM will be killed. + `watchdog_file_update_interval_seconds`. Once the file is found to be + stale, the VEP process running in the VM will be killed. + watchdog_file_update_interval_seconds: The `watchdog_file` will be updated + by the Dataflow worker every `watchdog_file_update_interval_seconds`. """ self._pipeline_service = pipeline_service self._species = species @@ -143,6 +149,8 @@ def __init__( self._vep_info_field = vep_info_field self._process_pipeline_args(pipeline_args) self._watchdog_file = watchdog_file + self._watchdog_file_update_interval_seconds = ( + watchdog_file_update_interval_seconds) self._running_operation_ids = [] # type: List[str] self._operation_name_to_io_infos = {} self._operation_name_to_logs = {} @@ -300,7 +308,6 @@ def _wait_and_retry_operations(self): retry_operation_ids = [] for operation in self._running_operation_ids: while not self._is_done(operation): - self._update_watchdog_file() time.sleep(_POLLING_INTERVAL_SECONDS) error_message = self._get_error_message(operation) if error_message: @@ -324,11 +331,6 @@ def _retry_operation(self, operation, error_message): error_message, logs, retry_operation_id) return retry_operation_id - def _update_watchdog_file(self): - if self._watchdog_file: - with filesystems.FileSystems.create(self._watchdog_file) as file_to_write: - file_to_write.write('Watchdog file.') - def _is_done(self, operation): # type: (str) -> bool # TODO(bashir2): Silence the log messages of googleapiclient.discovery @@ -426,13 +428,14 @@ def _create_actions(self, input_file, output_file): # type: (str, str) -> List local_input_file = '/mnt/vep/{}'.format(_get_base_name(input_file)) if self._watchdog_file: - action = self._make_action(self._vep_image_uri, - _WATCHDOG_RUNNER_SCRIPT, - _VEP_RUN_SCRIPT, - str(_POLLING_INTERVAL_SECONDS), - self._watchdog_file, - local_input_file, - _LOCAL_OUTPUT_FILE) + action = self._make_action( + self._vep_image_uri, + _WATCHDOG_RUNNER_SCRIPT, + _VEP_RUN_SCRIPT, + str(self._watchdog_file_update_interval_seconds), + self._watchdog_file, + local_input_file, + _LOCAL_OUTPUT_FILE) else: action = self._make_action(self._vep_image_uri, _VEP_RUN_SCRIPT, diff --git a/gcp_variant_transforms/libs/annotation/vep/vep_runner_test.py b/gcp_variant_transforms/libs/annotation/vep/vep_runner_test.py index aa7534c4c..3efa09f78 100644 --- a/gcp_variant_transforms/libs/annotation/vep/vep_runner_test.py +++ b/gcp_variant_transforms/libs/annotation/vep/vep_runner_test.py @@ -81,7 +81,7 @@ def _create_test_instance(self, pipeline_args=None): self._mock_service, _ASSEMBLY, _SPECIES, _INPUT_PATTERN, _OUTPUT_DIR, _VEP_INFO_FIELD, _IMAGE, _CACHE, _NUM_FORK, - pipeline_args or self._get_pipeline_args()) + pipeline_args or self._get_pipeline_args(), None, 30) return test_object def _get_pipeline_args(self, num_workers=1): @@ -105,13 +105,15 @@ def test_make_vep_cache_path(self): self.assertEqual(test_instance._vep_cache_path, _CACHE) test_instance = vep_runner.VepRunner( self._mock_service, _SPECIES, _ASSEMBLY, _INPUT_PATTERN, _OUTPUT_DIR, - _VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, self._get_pipeline_args()) + _VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, self._get_pipeline_args(), + None, 30) self.assertEqual(test_instance._vep_cache_path, ('gs://gcp-variant-annotation-vep-cache/' 'vep_cache_homo_sapiens_GRCh38_91.tar.gz')) test_instance = vep_runner.VepRunner( self._mock_service, 'mouse', 'mm9', _INPUT_PATTERN, _OUTPUT_DIR, - _VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, self._get_pipeline_args()) + _VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, self._get_pipeline_args(), + None, 30) self.assertEqual(test_instance._vep_cache_path, ('gs://gcp-variant-annotation-vep-cache/' 'vep_cache_mouse_mm9_91.tar.gz')) diff --git a/gcp_variant_transforms/transforms/annotate_files.py b/gcp_variant_transforms/transforms/annotate_files.py index 12289a325..b2bd9664e 100644 --- a/gcp_variant_transforms/transforms/annotate_files.py +++ b/gcp_variant_transforms/transforms/annotate_files.py @@ -14,15 +14,19 @@ from __future__ import absolute_import +import threading +import time import uuid from datetime import datetime -from typing import List # pylint: disable=unused-import +from typing import List, Optional # pylint: disable=unused-import import apache_beam as beam from apache_beam.io import filesystems from gcp_variant_transforms.libs.annotation.vep import vep_runner +_WATCHDOG_FILE_UPDATE_INTERVAL_SECONDS = 30 + class AnnotateFile(beam.DoFn): """A PTransform to annotate VCF files.""" @@ -36,18 +40,30 @@ def __init__(self, known_args, pipeline_args): def process(self, input_pattern): # type: (str) -> None watchdog_file = None - if self._known_args.run_with_garbage_collection: - unique_id = '-'.join(['watchdog_file', - str(uuid.uuid4()), - datetime.now().strftime('%Y%m%d-%H%M%S')]) - watchdog_file = filesystems.FileSystems.join( - self._known_args.annotation_output_dir, unique_id) + if not self._known_args.run_with_garbage_collection: + self._annotate_files(input_pattern, watchdog_file) + return + + unique_id = '-'.join(['watchdog_file', + str(uuid.uuid4()), + datetime.now().strftime('%Y%m%d-%H%M%S')]) + watchdog_file = filesystems.FileSystems.join( + self._known_args.annotation_output_dir, unique_id) + + t = threading.Thread(target=self._annotate_files, + args=(input_pattern, watchdog_file,)) + t.start() + while t.isAlive(): with filesystems.FileSystems.create(watchdog_file) as file_to_write: file_to_write.write('Watchdog file.') + time.sleep(_WATCHDOG_FILE_UPDATE_INTERVAL_SECONDS) + def _annotate_files(self, input_pattern, watchdog_file): + # type: (str, Optional[str]) -> None runner = vep_runner.create_runner(self._known_args, self._pipeline_args, input_pattern, - watchdog_file) + watchdog_file, + _WATCHDOG_FILE_UPDATE_INTERVAL_SECONDS) runner.run_on_all_files() runner.wait_until_done()