Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 109 additions & 0 deletions gcp_variant_transforms/libs/metrics_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Wrapper for Metrics API of Beam.

This is used to avoid direct dependency on Beam in our library methods/classes
such that they are more modular and easier to test. Preferably instances
of classes in this module should be injected into library classes/methods
instead of direct instantiation.

This is an example of how to use the factory and counter objects:

```
factory = CounterFactory()
my_counter = factory.create_counter('my_counter_name')
my_counter.inc(4)
```

"""

from __future__ import absolute_import

import logging

from apache_beam.runners import runner # pylint: disable=unused-import
from apache_beam import metrics
from apache_beam.metrics import metric

# The name space in which all metrics created by this class are.
_METRICS_NAMESPACE = 'VT_metrics_namespace'

# The name of the entry for counters in the dictionary that metrics().query() of
# Beam returns.
_COUNTERS = 'counters'


class CounterInterface(object):
"""The interface of counter objects"""

def inc(self, n=1):
# type: (int) -> None
"""Subclass implementations should do increment by `n`."""
raise NotImplementedError


class _NoOpCounter(CounterInterface):
"""A counter that does nothing, good to be used when counter is optional."""

def inc(self, n=1):
# type: (int) -> None
pass


class _CounterWrapper(CounterInterface):
"""A wrapper for Beam counters."""

def __init__(self, counter_name):
# type: (str) -> None
self._counter_name = counter_name
self._counter = metrics.Metrics.counter(_METRICS_NAMESPACE, counter_name)

def inc(self, n=1):
# type: (int) -> None
"""Increments the counter by `n`"""
self._counter.inc(n)


class CounterFactoryInterface(object):
"""The interface for counter factories."""

def create_counter(self, counter_name):
# type: (str) -> CounterInterface
"""Returns a counter with the given name."""
raise NotImplementedError


class NoOpCounterFactory(CounterFactoryInterface):
"""A factory that creates counters that do nothing."""

def create_counter(self, counter_name):
# type: (str) -> CounterInterface
return _NoOpCounter()


class CounterFactory(CounterFactoryInterface):

def create_counter(self, counter_name):
# type: (str) -> CounterInterface
return _CounterWrapper(counter_name)


def log_all_counters(pipeline_result):
"""Logs all counters that belong to _METRICS_NAME_SPACE."""
counter_filter = metric.MetricsFilter().with_namespace(_METRICS_NAMESPACE)
query_result = pipeline_result.metrics().query(counter_filter)
if query_result[_COUNTERS]:
for counter in query_result[_COUNTERS]:
logging.info('Counter %s = %d', counter, counter.committed)
31 changes: 31 additions & 0 deletions gcp_variant_transforms/libs/metrics_util_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Unit tests for metrics_util module."""

from __future__ import absolute_import

import unittest

from gcp_variant_transforms.libs import metrics_util


_TEST_COUNTER = 'test_counter'


class CounterFactoryTest(unittest.TestCase):

def test_create_counter(self):
counter = metrics_util.CounterFactory().create_counter(_TEST_COUNTER)
self.assertTrue(isinstance(counter, metrics_util.CounterInterface))
31 changes: 25 additions & 6 deletions gcp_variant_transforms/libs/processed_variant.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,29 @@

from __future__ import absolute_import

import enum
import logging

from collections import defaultdict
from typing import Dict, List, Any #pylint: disable=unused-import
from typing import Dict, List, Any # pylint: disable=unused-import

import vcf

from apache_beam.io.gcp.internal.clients import bigquery
from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms.libs import metrics_util
from gcp_variant_transforms.libs import bigquery_util
from gcp_variant_transforms.libs import vcf_header_parser #pylint: disable=unused-import
from gcp_variant_transforms.libs import vcf_header_parser # pylint: disable=unused-import


_FIELD_COUNT_ALTERNATE_ALLELE = 'A'

# Counter names
class _CounterEnum(enum.Enum):
VARIANT = 'variant_counter'
ANNOTATION = 'annotation_counter'
ANNOTATION_ALT_MISMATCH = 'annotation_alt_mismatch_counter'


class ProcessedVariant(object):
"""A wrapper around the ``Variant`` class with extra functionality.
Expand Down Expand Up @@ -164,10 +172,11 @@ class ProcessedVariantFactory(object):
"""
def __init__(
self,
header_fields,
split_alternate_allele_info_fields=True,
annotation_fields=None):
# type: (vcf_header_parser.HeaderFields, bool, List[str]) -> None
header_fields, # type: vcf_header_parser.HeaderFields
split_alternate_allele_info_fields=True, # type: bool
annotation_fields=None, # type: List[str]
counter_factory=None # type: metrics_util.CounterFactoryInterface
):
"""Sets the internal state of the factory class.

Args:
Expand All @@ -183,6 +192,13 @@ def __init__(
self._split_alternate_allele_info_fields = (
split_alternate_allele_info_fields)
self._annotation_field_set = set(annotation_fields or [])
cfactory = counter_factory or metrics_util.NoOpCounterFactory()
self._variant_counter = cfactory.create_counter(
_CounterEnum.VARIANT.value)
self._annotation_counter = cfactory.create_counter(
_CounterEnum.ANNOTATION.value)
self._annotation_alt_mismatch_counter = cfactory.create_counter(
_CounterEnum.ANNOTATION_ALT_MISMATCH.value)

def create_processed_variant(self, variant):
# type: (vcfio.Variant) -> ProcessedVariant
Expand All @@ -192,6 +208,7 @@ def create_processed_variant(self, variant):
variant (:class:`vcfio.Variant`): The raw variant information.
"""
proc_var = ProcessedVariant(variant)
self._variant_counter.inc()
for key, variant_info in variant.info.iteritems():
# TODO(bashir2): field_count should be removed from VariantInfo and
# instead looked up from header_fields.
Expand Down Expand Up @@ -239,11 +256,13 @@ def _add_annotation(self, proc_var, field_name, data):
for alt in proc_var._alternate_datas:
if alt.alternate_bases == alt_bases:
alt._info[field_name] = annotations_list
self._annotation_counter.inc()
break
# TODO(bashir2): Currently we only check exact matches of alternate bases
# which is not enough. We should implement the whole standard for finding
# alternate bases for an annotation list.
else:
self._annotation_alt_mismatch_counter.inc()
logging.warning('Could not find matching alternate bases for %s in '
'annotation filed %s', alt_bases, field_name)

Expand Down
97 changes: 94 additions & 3 deletions gcp_variant_transforms/libs/processed_variant_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,41 @@
# limitations under the License.

from __future__ import absolute_import
from typing import Dict # pylint: disable=unused-import

import unittest

from vcf import parser

from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms.libs import metrics_util
from gcp_variant_transforms.libs import processed_variant
from gcp_variant_transforms.libs import vcf_header_parser

class _CounterSpy(metrics_util.CounterInterface):

def __init__(self):
self._count = 0

def inc(self, n=1):
self._count += n

def get_value(self):
return self._count


class _CounterSpyFactory(metrics_util.CounterFactoryInterface):

def __init__(self):
self.counter_map = {} # type: Dict[str, _CounterSpy]

def create_counter(self, counter_name):
assert counter_name not in self.counter_map
counter = _CounterSpy()
self.counter_map[counter_name] = counter
return counter


class ProcessedVariantFactoryTest(unittest.TestCase):

def _get_sample_variant(self):
Expand All @@ -40,9 +66,11 @@ def _get_sample_variant(self):
def test_create_processed_variant_no_change(self):
variant = self._get_sample_variant()
header_fields = vcf_header_parser.HeaderFields({}, {})
counter_factory = _CounterSpyFactory()
factory = processed_variant.ProcessedVariantFactory(
header_fields,
split_alternate_allele_info_fields=False)
split_alternate_allele_info_fields=False,
counter_factory=counter_factory)
proc_var = factory.create_processed_variant(variant)
# In this mode, the only difference between the original `variant` and
# `proc_var` should be that INFO fields are copied to `_non_alt_info` map
Expand All @@ -53,6 +81,14 @@ def test_create_processed_variant_no_change(self):
proc_var_synthetic._alternate_datas = [
processed_variant.AlternateBaseData(a) for a in ['A', 'TT']]
self.assertEqual([proc_var_synthetic], [proc_var])
self.assertEqual(counter_factory.counter_map[
processed_variant._CounterEnum.VARIANT.value].get_value(), 1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider creating an alias for _CounterEnum so that it becomes shorter and you don't need as much line breaks. We have adopted the style of importing _CounterEnum as CounterEnum to make it look 'public' for tests (that's the only exception for directly importing classes in our code).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is already done in the third PR (#131), so I prefer to leave it for there to get less merge conflicts.

self.assertEqual(counter_factory.counter_map[
processed_variant._CounterEnum.ANNOTATION.value].get_value(), 0)
self.assertEqual(
counter_factory.counter_map[
processed_variant._CounterEnum.ANNOTATION_ALT_MISMATCH.value
].get_value(), 0)

def test_create_processed_variant_move_alt_info(self):
variant = self._get_sample_variant()
Expand All @@ -68,7 +104,7 @@ def test_create_processed_variant_move_alt_info(self):
self.assertEqual(proc_var.alternate_data_list, [alt1, alt2])
self.assertFalse(proc_var.non_alt_info.has_key('A2'))

def test_create_processed_variant_move_alt_info_and_annotation(self):
def _get_sample_variant_and_header_with_csq(self):
variant = self._get_sample_variant()
variant.info['CSQ'] = vcfio.VariantInfo(
data=['A|C1|I1|S1|G1', 'TT|C2|I2|S2|G2', 'A|C3|I3|S3|G3'],
Expand All @@ -83,10 +119,57 @@ def test_create_processed_variant_move_alt_info_and_annotation(self):
header_fields = vcf_header_parser.HeaderFields(
infos={'CSQ': csq_info},
formats={})
return variant, header_fields

def test_create_processed_variant_move_alt_info_and_annotation(self):
variant, header_fields = self._get_sample_variant_and_header_with_csq()
counter_factory = _CounterSpyFactory()
factory = processed_variant.ProcessedVariantFactory(
header_fields,
split_alternate_allele_info_fields=True,
annotation_fields=['CSQ'],
counter_factory=counter_factory)
proc_var = factory.create_processed_variant(variant)
alt1 = processed_variant.AlternateBaseData('A')
alt1._info = {
'A2': 'data1',
'CSQ': [
{'Consequence': 'C1', 'IMPACT': 'I1', 'SYMBOL': 'S1', 'Gene': 'G1'},
{'Consequence': 'C3', 'IMPACT': 'I3', 'SYMBOL': 'S3', 'Gene': 'G3'}]
}
alt2 = processed_variant.AlternateBaseData('TT')
alt2._info = {
'A2': 'data2',
'CSQ': [
{'Consequence': 'C2', 'IMPACT': 'I2', 'SYMBOL': 'S2', 'Gene': 'G2'}]
}
self.assertEqual(proc_var.alternate_data_list, [alt1, alt2])
self.assertFalse(proc_var.non_alt_info.has_key('A2'))
self.assertFalse(proc_var.non_alt_info.has_key('CSQ'))
self.assertEqual(counter_factory.counter_map[
processed_variant._CounterEnum.VARIANT.value].get_value(), 1)
self.assertEqual(counter_factory.counter_map[
processed_variant._CounterEnum.ANNOTATION.value].get_value(), 2)
self.assertEqual(
counter_factory.counter_map[
processed_variant._CounterEnum.ANNOTATION_ALT_MISMATCH.value
].get_value(), 0)

def test_create_processed_variant_mismatched_annotation_alt(self):
# This is like `test_create_processed_variant_move_alt_info_and_annotation`
# with the difference that it has an extra alt annotation which does not
# match any alts.
variant, header_fields = self._get_sample_variant_and_header_with_csq()
variant.info['CSQ'] = vcfio.VariantInfo(
data=['A|C1|I1|S1|G1', 'TT|C2|I2|S2|G2', 'A|C3|I3|S3|G3',
'ATAT|C3|I3|S3|G3'],
field_count='.')
counter_factory = _CounterSpyFactory()
factory = processed_variant.ProcessedVariantFactory(
header_fields,
split_alternate_allele_info_fields=True,
annotation_fields=['CSQ'])
annotation_fields=['CSQ'],
counter_factory=counter_factory)
proc_var = factory.create_processed_variant(variant)
alt1 = processed_variant.AlternateBaseData('A')
alt1._info = {
Expand All @@ -104,5 +187,13 @@ def test_create_processed_variant_move_alt_info_and_annotation(self):
self.assertEqual(proc_var.alternate_data_list, [alt1, alt2])
self.assertFalse(proc_var.non_alt_info.has_key('A2'))
self.assertFalse(proc_var.non_alt_info.has_key('CSQ'))
self.assertEqual(counter_factory.counter_map[
processed_variant._CounterEnum.VARIANT.value].get_value(), 1)
self.assertEqual(counter_factory.counter_map[
processed_variant._CounterEnum.ANNOTATION.value].get_value(), 2)
self.assertEqual(
counter_factory.counter_map[
processed_variant._CounterEnum.ANNOTATION_ALT_MISMATCH.value
].get_value(), 1)

# TODO(bashir2): Add tests for create_alt_record_for_schema.
Loading