Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
175 changes: 141 additions & 34 deletions gcp_variant_transforms/libs/processed_variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
import re

from collections import defaultdict
from typing import Dict, List, Any # pylint: disable=unused-import
from typing import Dict, List, Any, Tuple, Optional # pylint: disable=unused-import

import vcf

Expand All @@ -41,10 +41,23 @@

_FIELD_COUNT_ALTERNATE_ALLELE = 'A'

# The representation of a deletion variant in VEP.
_COMPLETELY_DELETED_ALT = '-'

# The field name in the BigQuery table that holds annotation ALT.
_ANNOTATION_ALT = 'allele'

# The field name in the BigQuery table that indicates whether the annotation ALT
# matching was ambiguous or not.
_ANNOTATION_ALT_AMBIGUOUS = 'ambiguous_allele'


# Counter names
class _CounterEnum(enum.Enum):
VARIANT = 'variant_counter'
ANNOTATION = 'annotation_counter'
ANNOTATION_ALT_MATCH = 'annotation_alt_match_counter'
ANNOTATION_ALT_MINIMAL_MATCH = 'annotation_alt_minimal_match_counter'
ANNOTATION_ALT_MINIMAL_AMBIGUOUS = 'annotation_alt_minimal_ambiguous_counter'
ANNOTATION_ALT_MISMATCH = 'annotation_alt_mismatch_counter'


Expand Down Expand Up @@ -177,6 +190,7 @@ def __init__(
header_fields, # type: vcf_header_parser.HeaderFields
split_alternate_allele_info_fields=True, # type: bool
annotation_fields=None, # type: List[str]
minimal_match=False, # type: bool
counter_factory=None # type: metrics_util.CounterFactoryInterface
):
"""Sets the internal state of the factory class.
Expand All @@ -189,6 +203,8 @@ def __init__(
annotation_fields: If provided, this is the list of INFO field names that
store variant annotations. The format of how annotations are stored and
their names are extracted from header_fields.
minimal_match: If set, then the --minimal mode of VEP is simulated for
annotation ALT matching.
"""
self._header_fields = header_fields
self._split_alternate_allele_info_fields = (
Expand All @@ -197,13 +213,9 @@ def __init__(
cfactory = counter_factory or metrics_util.NoOpCounterFactory()
self._variant_counter = cfactory.create_counter(
_CounterEnum.VARIANT.value)
self._annotation_counter = cfactory.create_counter(
_CounterEnum.ANNOTATION.value)
self._annotation_alt_mismatch_counter = cfactory.create_counter(
_CounterEnum.ANNOTATION_ALT_MISMATCH.value)
self._annotation_processor = _AnnotationProcessor(
annotation_fields, self._header_fields, self._annotation_counter,
self._annotation_alt_mismatch_counter)
annotation_fields, self._header_fields, cfactory, minimal_match)
self._minimal_match = minimal_match

def create_processed_variant(self, variant):
# type: (vcfio.Variant) -> ProcessedVariant
Expand Down Expand Up @@ -277,6 +289,17 @@ def create_alt_bases_field_schema(self):
mode=bigquery_util.TableFieldConstants.MODE_REPEATED,
description='List of {} annotations for this alternate.'.format(
annot_field))
annotation_record.fields.append(bigquery.TableFieldSchema(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think of making these optional only if --minimal is used?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good idea for the next field (i.e., ambiguous_allele_string) and I did that (thanks for suggesting). For the actual ALT string, I see value in keeping it even for non --minimal cases. The reason is that the matching is not always an exact matching and I think it is good to keep this information in an easy to access way in the BQ table (this is an attempt to make sure no information is lost in the import process).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good!

name=_ANNOTATION_ALT,
type=bigquery_util.TableFieldConstants.TYPE_STRING,
mode=bigquery_util.TableFieldConstants.MODE_NULLABLE,
description='The ALT part of the annotation field.'))
if self._minimal_match:
annotation_record.fields.append(bigquery.TableFieldSchema(
name=_ANNOTATION_ALT_AMBIGUOUS,
type=bigquery_util.TableFieldConstants.TYPE_BOOLEAN,
mode=bigquery_util.TableFieldConstants.MODE_NULLABLE,
description='Whether the annotation ALT matching was ambiguous.'))
for annotation_name in annotation_names:
annotation_record.fields.append(bigquery.TableFieldSchema(
name=bigquery_util.get_bigquery_sanitized_field(annotation_name),
Expand Down Expand Up @@ -313,8 +336,8 @@ class _AnnotationProcessor(object):
def __init__(self,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think annotation logic deserves its own file and even folder now (AnnotationProcessor, Annotation class, util libs, etc). Please just add a TODO in this PR though.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, and the TODO is in the next PR at a place which definitely needs refactoring. _AnnotationProcessor itself becoming a separate class needs more thinking because it is mutating ProcessedVariant and I prefer ProcessedVariantFactory.create_processed_variant be the only public way of ProcessedVariant mutation. I think it is possible to get most of _AnnotationProcessor out into its own library module/class without exposing ProcessedVariant to it, but I need to think more about the design of it.

annotation_fields, # type: List[str]
header_fields, # type: vcf_header_parser.HeaderFields
annotation_alt_match_counter, # type: metrics_util.BaseCounter
annotation_alt_mismatch_counter # type: metrics_util.BaseCounter
counter_factory, # type: metrics_util.CounterFactoryInterface
minimal_match, # type: bool
):
"""Creates an instance for adding annotations to `ProcessedVariant` objects.

Expand All @@ -335,8 +358,15 @@ def __init__(self,
header_desc = header_fields.infos[field].desc
self._annotation_names_map[field] = _extract_annotation_names(
header_desc)
self._annotation_alt_match_counter = annotation_alt_match_counter
self._annotation_alt_mismatch_counter = annotation_alt_mismatch_counter
self._alt_match_counter = counter_factory.create_counter(
_CounterEnum.ANNOTATION_ALT_MATCH.value)
self._alt_minimal_match_counter = counter_factory.create_counter(
_CounterEnum.ANNOTATION_ALT_MINIMAL_MATCH.value)
self._alt_minimal_ambiguous_counter = counter_factory.create_counter(
_CounterEnum.ANNOTATION_ALT_MINIMAL_AMBIGUOUS.value)
self._alt_mismatch_counter = counter_factory.create_counter(
_CounterEnum.ANNOTATION_ALT_MISMATCH.value)
self._minimal_match = minimal_match

def add_annotation_data(self, proc_var, annotation_field_name, data):
# type: (ProcessedVariant, str, List[str]) -> None
Expand Down Expand Up @@ -368,9 +398,12 @@ def add_annotation_data(self, proc_var, annotation_field_name, data):
alt_annotation_map = self._convert_annotation_strs_to_alt_map(
annotation_field_name, data)
for alt_bases, annotations_list in alt_annotation_map.iteritems():
self._add_annotation_list(
proc_var, common_prefix, alt_bases, annotations_list,
annotation_field_name)
alt, ambiguous = self._find_matching_alt(
proc_var, common_prefix, alt_bases, annotation_field_name)
if alt:
if self._minimal_match:
self._add_ambiguous_fields(annotations_list, ambiguous)
alt._info[annotation_field_name] = annotations_list

def _find_common_alt_ref_prefix(self, proc_var):
# type: (ProcessedVariant) -> str
Expand Down Expand Up @@ -420,15 +453,24 @@ def _create_map(self, annotations, annotation_names):
raise ValueError('Expected {} annotations, got {}'.format(
len(annotation_names), len(annotations) - 1))
annotation_dict = {}
annotation_dict[_ANNOTATION_ALT] = annotations[0]
for index, name in enumerate(annotation_names):
annotation_dict[name] = annotations[index + 1]
return annotation_dict

def _add_annotation_list(
self, proc_var, common_prefix, alt_bases, annotations_list,
annotation_field_name):
# type: (ProcessedVariant, str, str, List[Dict[str, str]], str) -> None
"""Adds all annotations to the given `proc_var`.
def _add_ambiguous_fields(self, annotations_list, ambiguous):
# type: (List[Dict[str, str]], bool) -> None
for annotation_map in annotations_list:
annotation_map[_ANNOTATION_ALT_AMBIGUOUS] = ambiguous

def _find_matching_alt(self,
proc_var, # type: ProcessedVariant
common_prefix, # type: str
alt_bases, # type: str
annotation_field_name # type: str
):
# type: (...) -> Tuple[Optional[AlternateBaseData], bool]
"""Searches among ALTs of `proc_var` to find one that matches `alt_bases`.

Args:
proc_var: The object to which the annotations are being added.
Expand All @@ -440,21 +482,47 @@ def _add_annotation_list(
this list is a map of annotation names to values, see the example in
`_convert_annotation_strs_to_alt_map` which creates these maps.
annotation_field_name: The name of the annotation field, e.g., ANN, CSQ.

Returns:
The `AlternateBaseData` object from proc_var that matches or None. It also
returns whether the matching was ambiguous or not.
"""
found_alt = None
is_ambiguous = False
# This assumes that number of alternate bases and annotation segments
# are not too big. If this assumption is not true, we should replace the
# following loop with a hash table search and avoid the quadratic time.
for alt in proc_var._alternate_datas:
if self._alt_matches_annotation_alt(
common_prefix, alt.alternate_bases, alt_bases):
alt._info[annotation_field_name] = annotations_list
self._annotation_alt_match_counter.inc()
self._alt_match_counter.inc()
found_alt = alt
break
else:
self._annotation_alt_mismatch_counter.inc()
if not found_alt and self._minimal_match:
for alt in proc_var._alternate_datas:
if self._alt_matches_annotation_alt_minimal_mode(
proc_var.reference_bases or '', alt.alternate_bases, alt_bases):
if found_alt:
is_ambiguous = True
self._alt_minimal_ambiguous_counter.inc()
logging.warning(
'Annotation ALT %s of field %s matches both ALTs %s and %s '
'with reference bases %s at reference %s start %s', alt_bases,
annotation_field_name, found_alt.alternate_bases,
alt.alternate_bases, proc_var.reference_bases,
proc_var.reference_name, proc_var.start)
else:
self._alt_minimal_match_counter.inc()
found_alt = alt
# Note we do not `break` in this case because we want to know if this
# match was an ambiguous match or an exact one.
if not found_alt:
self._alt_mismatch_counter.inc()
logging.warning(
'Could not find matching alternate bases for %s in '
'annotation filed %s', alt_bases, annotation_field_name)
'annotation filed %s for variant at reference %s start %s', alt_bases,
annotation_field_name, proc_var.reference_name, proc_var.start)
return found_alt, is_ambiguous

def _alt_matches_annotation_alt(
self, common_prefix, alt_bases, annotation_alt):
Expand All @@ -470,15 +538,17 @@ def _alt_matches_annotation_alt(
A <ID> ID
A .[13:123[ .[13
"""
# Check equality without the common prefix. Note according to VCF spec
# the length of this common prefix should be at most one but we have
# not checked/enforced that here.
if alt_bases[len(common_prefix):] == annotation_alt:
return True
# Handling deletion.
if (len(common_prefix) == len(alt_bases)
and annotation_alt == '-'):
return True
if not self._minimal_match:
# Check equality without the common prefix.
# Note according to VCF spec the length of this common prefix should be
# at most one but we have not checked/enforced that here.
# Also, this string matching should be skipped if in minimal_match mode.
if alt_bases[len(common_prefix):] == annotation_alt:
return True
# Handling deletion.
if (len(common_prefix) == len(alt_bases)
and annotation_alt == _COMPLETELY_DELETED_ALT):
return True
# Handling symbolic ALTs.
id_match = self._SYMBOLIC_ALT_RE.match(alt_bases)
if id_match and id_match.group('ID') == annotation_alt:
Expand All @@ -492,6 +562,43 @@ def _alt_matches_annotation_alt(
return True
return False

def _alt_matches_annotation_alt_minimal_mode(
self, referece_bases, alt_bases, annotation_alt):
# type: (str, str, str) -> bool
"""Returns true if ALTs match in the --minimal mode of VEP.

Note in the minimal mode, the matching can be non-deterministic, so this
should only be done if _alt_matches_annotation_alt which is deterministic
has not succeeded. For details of ALT matching in the --minimal mode of VEP,
see the "Complex VCF entries" sections of
https://useast.ensembl.org/info/docs/tools/vep/vep_formats.html
Basically, each ALT is independently checked with REF and the common prefix
and suffix is removed from ALT. The remaining part is the annotation ALT:
REF ALT annotation-ALT
A T T
AT TT,A T,-
C CT,T T -> Note this is ambiguous.
"""
if not alt_bases or not annotation_alt:
return False
# Finding common leading and trailing sub-strings of ALT and REF.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks like a library function (I wonder if there is one already?) If not, we should make a generic substring matcher lib/function and use it here. Please fell free leave it as is for now though (we can do this in the more involved refactoring later).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed but the only library function I know is os.path.commonprefix (which acts on a list of strings and is used elsewhere in this file when there is a list of strings). I can use that here (for a list of two strings) and apply it on reversed strings for the common suffix, but I thought it is easier to understand if I directly implement it here. I can certainly move this to a separate library function when we do refactoring of this class.

leading = 0
trailing = 0
min_len = min(len(alt_bases), len(referece_bases))
while (leading < min_len and
alt_bases[leading] == referece_bases[leading]):
leading += 1
while (trailing + leading < min_len and # TODO check this condition
alt_bases[len(alt_bases) - trailing - 1] ==
referece_bases[len(referece_bases) - trailing - 1]):
trailing += 1
if alt_bases[leading:len(alt_bases) - trailing] == annotation_alt:
return True
if (leading + trailing == len(alt_bases) and
annotation_alt == _COMPLETELY_DELETED_ALT):
return True
return False


def _extract_annotation_list_with_alt(annotation_str):
# type: (str) -> List[str]
Expand Down
Loading