Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 24 additions & 21 deletions gcp_variant_transforms/libs/annotation/vep/vep_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@
_NUMBER_OF_API_CALL_RETRIES = 5


def create_runner(known_args, pipeline_args, input_pattern, watchdog_file):
# type: (argparse.Namespace, List[str], str, Optional[str]) -> VepRunner
def create_runner(known_args, pipeline_args, input_pattern, watchdog_file,
watchdog_file_update_interval_seconds):
# type: (argparse.Namespace, List[str], str, Optional[str], int) -> VepRunner
"""Returns an instance of VepRunner using the provided args.

Args:
Expand All @@ -77,8 +78,10 @@ def create_runner(known_args, pipeline_args, input_pattern, watchdog_file):
determine resources like number of workers, machine type, etc.
input_pattern: The VCF files to be annotated.
watchdog_file: The file that will be updated by the Dataflow worker every
`_POLLING_INTERVAL_SECONDS`. Once the file is found to be stale, the VEP
process running in the VM will be killed.
`watchdog_file_update_interval_seconds`. Once the file is found to be
stale, the VEP process running in the VM will be killed.
watchdog_file_update_interval_seconds: The `watchdog_file` will be updated
by the Dataflow worker every `watchdog_file_update_interval_seconds`.
"""
credentials = client.GoogleCredentials.get_application_default()
pipeline_service = discovery.build(
Expand All @@ -88,7 +91,7 @@ def create_runner(known_args, pipeline_args, input_pattern, watchdog_file):
input_pattern, known_args.annotation_output_dir,
known_args.vep_info_field, known_args.vep_image_uri,
known_args.vep_cache_path, known_args.vep_num_fork, pipeline_args,
watchdog_file)
watchdog_file, watchdog_file_update_interval_seconds)
return runner


Expand All @@ -110,7 +113,8 @@ def __init__(
vep_cache_path, # type: str
vep_num_fork, # type: int
pipeline_args, # type: List[str]
watchdog_file=None, # type: str
watchdog_file, # type: Optional[str]
watchdog_file_update_interval_seconds, # type: int
):
# type: (...) -> None
"""Constructs an instance for running VEP.
Expand All @@ -129,8 +133,10 @@ def __init__(
running Beam; for simplicity we use the same arguments to decide how
many and what type of workers to use, where to run, etc.
watchdog_file: The file that will be updated by the Dataflow worker every
`_POLLING_INTERVAL_SECONDS`. Once the file is found to be stale, the VEP
process running in the VM will be killed.
`watchdog_file_update_interval_seconds`. Once the file is found to be
stale, the VEP process running in the VM will be killed.
watchdog_file_update_interval_seconds: The `watchdog_file` will be updated
by the Dataflow worker every `watchdog_file_update_interval_seconds`.
"""
self._pipeline_service = pipeline_service
self._species = species
Expand All @@ -143,6 +149,8 @@ def __init__(
self._vep_info_field = vep_info_field
self._process_pipeline_args(pipeline_args)
self._watchdog_file = watchdog_file
self._watchdog_file_update_interval_seconds = (
watchdog_file_update_interval_seconds)
self._running_operation_ids = [] # type: List[str]
self._operation_name_to_io_infos = {}
self._operation_name_to_logs = {}
Expand Down Expand Up @@ -300,7 +308,6 @@ def _wait_and_retry_operations(self):
retry_operation_ids = []
for operation in self._running_operation_ids:
while not self._is_done(operation):
self._update_watchdog_file()
time.sleep(_POLLING_INTERVAL_SECONDS)
error_message = self._get_error_message(operation)
if error_message:
Expand All @@ -324,11 +331,6 @@ def _retry_operation(self, operation, error_message):
error_message, logs, retry_operation_id)
return retry_operation_id

def _update_watchdog_file(self):
if self._watchdog_file:
with filesystems.FileSystems.create(self._watchdog_file) as file_to_write:
file_to_write.write('Watchdog file.')

def _is_done(self, operation):
# type: (str) -> bool
# TODO(bashir2): Silence the log messages of googleapiclient.discovery
Expand Down Expand Up @@ -426,13 +428,14 @@ def _create_actions(self, input_file, output_file):
# type: (str, str) -> List
local_input_file = '/mnt/vep/{}'.format(_get_base_name(input_file))
if self._watchdog_file:
action = self._make_action(self._vep_image_uri,
_WATCHDOG_RUNNER_SCRIPT,
_VEP_RUN_SCRIPT,
str(_POLLING_INTERVAL_SECONDS),
self._watchdog_file,
local_input_file,
_LOCAL_OUTPUT_FILE)
action = self._make_action(
self._vep_image_uri,
_WATCHDOG_RUNNER_SCRIPT,
_VEP_RUN_SCRIPT,
str(self._watchdog_file_update_interval_seconds),
self._watchdog_file,
local_input_file,
_LOCAL_OUTPUT_FILE)
else:
action = self._make_action(self._vep_image_uri,
_VEP_RUN_SCRIPT,
Expand Down
8 changes: 5 additions & 3 deletions gcp_variant_transforms/libs/annotation/vep/vep_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def _create_test_instance(self, pipeline_args=None):
self._mock_service, _ASSEMBLY, _SPECIES,
_INPUT_PATTERN, _OUTPUT_DIR,
_VEP_INFO_FIELD, _IMAGE, _CACHE, _NUM_FORK,
pipeline_args or self._get_pipeline_args())
pipeline_args or self._get_pipeline_args(), None, 30)
return test_object

def _get_pipeline_args(self, num_workers=1):
Expand All @@ -105,13 +105,15 @@ def test_make_vep_cache_path(self):
self.assertEqual(test_instance._vep_cache_path, _CACHE)
test_instance = vep_runner.VepRunner(
self._mock_service, _SPECIES, _ASSEMBLY, _INPUT_PATTERN, _OUTPUT_DIR,
_VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, self._get_pipeline_args())
_VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, self._get_pipeline_args(),
None, 30)
self.assertEqual(test_instance._vep_cache_path,
('gs://gcp-variant-annotation-vep-cache/'
'vep_cache_homo_sapiens_GRCh38_91.tar.gz'))
test_instance = vep_runner.VepRunner(
self._mock_service, 'mouse', 'mm9', _INPUT_PATTERN, _OUTPUT_DIR,
_VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, self._get_pipeline_args())
_VEP_INFO_FIELD, _IMAGE, '', _NUM_FORK, self._get_pipeline_args(),
None, 30)
self.assertEqual(test_instance._vep_cache_path,
('gs://gcp-variant-annotation-vep-cache/'
'vep_cache_mouse_mm9_91.tar.gz'))
Expand Down
32 changes: 24 additions & 8 deletions gcp_variant_transforms/transforms/annotate_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,19 @@

from __future__ import absolute_import

import threading
import time
import uuid
from datetime import datetime
from typing import List # pylint: disable=unused-import
from typing import List, Optional # pylint: disable=unused-import

import apache_beam as beam
from apache_beam.io import filesystems

from gcp_variant_transforms.libs.annotation.vep import vep_runner

_WATCHDOG_FILE_UPDATE_INTERVAL_SECONDS = 30


class AnnotateFile(beam.DoFn):
"""A PTransform to annotate VCF files."""
Expand All @@ -36,18 +40,30 @@ def __init__(self, known_args, pipeline_args):
def process(self, input_pattern):
# type: (str) -> None
watchdog_file = None
if self._known_args.run_with_garbage_collection:
unique_id = '-'.join(['watchdog_file',
str(uuid.uuid4()),
datetime.now().strftime('%Y%m%d-%H%M%S')])
watchdog_file = filesystems.FileSystems.join(
self._known_args.annotation_output_dir, unique_id)
if not self._known_args.run_with_garbage_collection:
self._annotate_files(input_pattern, watchdog_file)
return

unique_id = '-'.join(['watchdog_file',
str(uuid.uuid4()),
datetime.now().strftime('%Y%m%d-%H%M%S')])
watchdog_file = filesystems.FileSystems.join(
self._known_args.annotation_output_dir, unique_id)

t = threading.Thread(target=self._annotate_files,
args=(input_pattern, watchdog_file,))
t.start()
while t.isAlive():
with filesystems.FileSystems.create(watchdog_file) as file_to_write:
file_to_write.write('Watchdog file.')
time.sleep(_WATCHDOG_FILE_UPDATE_INTERVAL_SECONDS)

def _annotate_files(self, input_pattern, watchdog_file):
# type: (str, Optional[str]) -> None
runner = vep_runner.create_runner(self._known_args,
self._pipeline_args,
input_pattern,
watchdog_file)
watchdog_file,
_WATCHDOG_FILE_UPDATE_INTERVAL_SECONDS)
runner.run_on_all_files()
runner.wait_until_done()