diff --git a/gcp_variant_transforms/libs/metrics_util.py b/gcp_variant_transforms/libs/metrics_util.py new file mode 100644 index 000000000..57d1b61de --- /dev/null +++ b/gcp_variant_transforms/libs/metrics_util.py @@ -0,0 +1,109 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Wrapper for Metrics API of Beam. + +This is used to avoid direct dependency on Beam in our library methods/classes +such that they are more modular and easier to test. Preferably instances +of classes in this module should be injected into library classes/methods +instead of direct instantiation. + +This is an example of how to use the factory and counter objects: + +``` +factory = CounterFactory() +my_counter = factory.create_counter('my_counter_name') +my_counter.inc(4) +``` + +""" + +from __future__ import absolute_import + +import logging + +from apache_beam.runners import runner # pylint: disable=unused-import +from apache_beam import metrics +from apache_beam.metrics import metric + +# The name space in which all metrics created by this class are. +_METRICS_NAMESPACE = 'VT_metrics_namespace' + +# The name of the entry for counters in the dictionary that metrics().query() of +# Beam returns. +_COUNTERS = 'counters' + + +class CounterInterface(object): + """The interface of counter objects""" + + def inc(self, n=1): + # type: (int) -> None + """Subclass implementations should do increment by `n`.""" + raise NotImplementedError + + +class _NoOpCounter(CounterInterface): + """A counter that does nothing, good to be used when counter is optional.""" + + def inc(self, n=1): + # type: (int) -> None + pass + + +class _CounterWrapper(CounterInterface): + """A wrapper for Beam counters.""" + + def __init__(self, counter_name): + # type: (str) -> None + self._counter_name = counter_name + self._counter = metrics.Metrics.counter(_METRICS_NAMESPACE, counter_name) + + def inc(self, n=1): + # type: (int) -> None + """Increments the counter by `n`""" + self._counter.inc(n) + + +class CounterFactoryInterface(object): + """The interface for counter factories.""" + + def create_counter(self, counter_name): + # type: (str) -> CounterInterface + """Returns a counter with the given name.""" + raise NotImplementedError + + +class NoOpCounterFactory(CounterFactoryInterface): + """A factory that creates counters that do nothing.""" + + def create_counter(self, counter_name): + # type: (str) -> CounterInterface + return _NoOpCounter() + + +class CounterFactory(CounterFactoryInterface): + + def create_counter(self, counter_name): + # type: (str) -> CounterInterface + return _CounterWrapper(counter_name) + + +def log_all_counters(pipeline_result): + """Logs all counters that belong to _METRICS_NAME_SPACE.""" + counter_filter = metric.MetricsFilter().with_namespace(_METRICS_NAMESPACE) + query_result = pipeline_result.metrics().query(counter_filter) + if query_result[_COUNTERS]: + for counter in query_result[_COUNTERS]: + logging.info('Counter %s = %d', counter, counter.committed) diff --git a/gcp_variant_transforms/libs/metrics_util_test.py b/gcp_variant_transforms/libs/metrics_util_test.py new file mode 100644 index 000000000..07700f3d1 --- /dev/null +++ b/gcp_variant_transforms/libs/metrics_util_test.py @@ -0,0 +1,31 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Unit tests for metrics_util module.""" + +from __future__ import absolute_import + +import unittest + +from gcp_variant_transforms.libs import metrics_util + + +_TEST_COUNTER = 'test_counter' + + +class CounterFactoryTest(unittest.TestCase): + + def test_create_counter(self): + counter = metrics_util.CounterFactory().create_counter(_TEST_COUNTER) + self.assertTrue(isinstance(counter, metrics_util.CounterInterface)) diff --git a/gcp_variant_transforms/libs/processed_variant.py b/gcp_variant_transforms/libs/processed_variant.py index 42e9dc78c..943044d0f 100644 --- a/gcp_variant_transforms/libs/processed_variant.py +++ b/gcp_variant_transforms/libs/processed_variant.py @@ -22,21 +22,29 @@ from __future__ import absolute_import +import enum import logging from collections import defaultdict -from typing import Dict, List, Any #pylint: disable=unused-import +from typing import Dict, List, Any # pylint: disable=unused-import import vcf from apache_beam.io.gcp.internal.clients import bigquery from gcp_variant_transforms.beam_io import vcfio +from gcp_variant_transforms.libs import metrics_util from gcp_variant_transforms.libs import bigquery_util -from gcp_variant_transforms.libs import vcf_header_parser #pylint: disable=unused-import +from gcp_variant_transforms.libs import vcf_header_parser # pylint: disable=unused-import _FIELD_COUNT_ALTERNATE_ALLELE = 'A' +# Counter names +class _CounterEnum(enum.Enum): + VARIANT = 'variant_counter' + ANNOTATION = 'annotation_counter' + ANNOTATION_ALT_MISMATCH = 'annotation_alt_mismatch_counter' + class ProcessedVariant(object): """A wrapper around the ``Variant`` class with extra functionality. @@ -164,10 +172,11 @@ class ProcessedVariantFactory(object): """ def __init__( self, - header_fields, - split_alternate_allele_info_fields=True, - annotation_fields=None): - # type: (vcf_header_parser.HeaderFields, bool, List[str]) -> None + header_fields, # type: vcf_header_parser.HeaderFields + split_alternate_allele_info_fields=True, # type: bool + annotation_fields=None, # type: List[str] + counter_factory=None # type: metrics_util.CounterFactoryInterface + ): """Sets the internal state of the factory class. Args: @@ -183,6 +192,13 @@ def __init__( self._split_alternate_allele_info_fields = ( split_alternate_allele_info_fields) self._annotation_field_set = set(annotation_fields or []) + cfactory = counter_factory or metrics_util.NoOpCounterFactory() + self._variant_counter = cfactory.create_counter( + _CounterEnum.VARIANT.value) + self._annotation_counter = cfactory.create_counter( + _CounterEnum.ANNOTATION.value) + self._annotation_alt_mismatch_counter = cfactory.create_counter( + _CounterEnum.ANNOTATION_ALT_MISMATCH.value) def create_processed_variant(self, variant): # type: (vcfio.Variant) -> ProcessedVariant @@ -192,6 +208,7 @@ def create_processed_variant(self, variant): variant (:class:`vcfio.Variant`): The raw variant information. """ proc_var = ProcessedVariant(variant) + self._variant_counter.inc() for key, variant_info in variant.info.iteritems(): # TODO(bashir2): field_count should be removed from VariantInfo and # instead looked up from header_fields. @@ -239,11 +256,13 @@ def _add_annotation(self, proc_var, field_name, data): for alt in proc_var._alternate_datas: if alt.alternate_bases == alt_bases: alt._info[field_name] = annotations_list + self._annotation_counter.inc() break # TODO(bashir2): Currently we only check exact matches of alternate bases # which is not enough. We should implement the whole standard for finding # alternate bases for an annotation list. else: + self._annotation_alt_mismatch_counter.inc() logging.warning('Could not find matching alternate bases for %s in ' 'annotation filed %s', alt_bases, field_name) diff --git a/gcp_variant_transforms/libs/processed_variant_test.py b/gcp_variant_transforms/libs/processed_variant_test.py index 83e6cd1ee..11be4ff80 100644 --- a/gcp_variant_transforms/libs/processed_variant_test.py +++ b/gcp_variant_transforms/libs/processed_variant_test.py @@ -13,15 +13,41 @@ # limitations under the License. from __future__ import absolute_import +from typing import Dict # pylint: disable=unused-import import unittest from vcf import parser from gcp_variant_transforms.beam_io import vcfio +from gcp_variant_transforms.libs import metrics_util from gcp_variant_transforms.libs import processed_variant from gcp_variant_transforms.libs import vcf_header_parser +class _CounterSpy(metrics_util.CounterInterface): + + def __init__(self): + self._count = 0 + + def inc(self, n=1): + self._count += n + + def get_value(self): + return self._count + + +class _CounterSpyFactory(metrics_util.CounterFactoryInterface): + + def __init__(self): + self.counter_map = {} # type: Dict[str, _CounterSpy] + + def create_counter(self, counter_name): + assert counter_name not in self.counter_map + counter = _CounterSpy() + self.counter_map[counter_name] = counter + return counter + + class ProcessedVariantFactoryTest(unittest.TestCase): def _get_sample_variant(self): @@ -40,9 +66,11 @@ def _get_sample_variant(self): def test_create_processed_variant_no_change(self): variant = self._get_sample_variant() header_fields = vcf_header_parser.HeaderFields({}, {}) + counter_factory = _CounterSpyFactory() factory = processed_variant.ProcessedVariantFactory( header_fields, - split_alternate_allele_info_fields=False) + split_alternate_allele_info_fields=False, + counter_factory=counter_factory) proc_var = factory.create_processed_variant(variant) # In this mode, the only difference between the original `variant` and # `proc_var` should be that INFO fields are copied to `_non_alt_info` map @@ -53,6 +81,14 @@ def test_create_processed_variant_no_change(self): proc_var_synthetic._alternate_datas = [ processed_variant.AlternateBaseData(a) for a in ['A', 'TT']] self.assertEqual([proc_var_synthetic], [proc_var]) + self.assertEqual(counter_factory.counter_map[ + processed_variant._CounterEnum.VARIANT.value].get_value(), 1) + self.assertEqual(counter_factory.counter_map[ + processed_variant._CounterEnum.ANNOTATION.value].get_value(), 0) + self.assertEqual( + counter_factory.counter_map[ + processed_variant._CounterEnum.ANNOTATION_ALT_MISMATCH.value + ].get_value(), 0) def test_create_processed_variant_move_alt_info(self): variant = self._get_sample_variant() @@ -68,7 +104,7 @@ def test_create_processed_variant_move_alt_info(self): self.assertEqual(proc_var.alternate_data_list, [alt1, alt2]) self.assertFalse(proc_var.non_alt_info.has_key('A2')) - def test_create_processed_variant_move_alt_info_and_annotation(self): + def _get_sample_variant_and_header_with_csq(self): variant = self._get_sample_variant() variant.info['CSQ'] = vcfio.VariantInfo( data=['A|C1|I1|S1|G1', 'TT|C2|I2|S2|G2', 'A|C3|I3|S3|G3'], @@ -83,10 +119,57 @@ def test_create_processed_variant_move_alt_info_and_annotation(self): header_fields = vcf_header_parser.HeaderFields( infos={'CSQ': csq_info}, formats={}) + return variant, header_fields + + def test_create_processed_variant_move_alt_info_and_annotation(self): + variant, header_fields = self._get_sample_variant_and_header_with_csq() + counter_factory = _CounterSpyFactory() + factory = processed_variant.ProcessedVariantFactory( + header_fields, + split_alternate_allele_info_fields=True, + annotation_fields=['CSQ'], + counter_factory=counter_factory) + proc_var = factory.create_processed_variant(variant) + alt1 = processed_variant.AlternateBaseData('A') + alt1._info = { + 'A2': 'data1', + 'CSQ': [ + {'Consequence': 'C1', 'IMPACT': 'I1', 'SYMBOL': 'S1', 'Gene': 'G1'}, + {'Consequence': 'C3', 'IMPACT': 'I3', 'SYMBOL': 'S3', 'Gene': 'G3'}] + } + alt2 = processed_variant.AlternateBaseData('TT') + alt2._info = { + 'A2': 'data2', + 'CSQ': [ + {'Consequence': 'C2', 'IMPACT': 'I2', 'SYMBOL': 'S2', 'Gene': 'G2'}] + } + self.assertEqual(proc_var.alternate_data_list, [alt1, alt2]) + self.assertFalse(proc_var.non_alt_info.has_key('A2')) + self.assertFalse(proc_var.non_alt_info.has_key('CSQ')) + self.assertEqual(counter_factory.counter_map[ + processed_variant._CounterEnum.VARIANT.value].get_value(), 1) + self.assertEqual(counter_factory.counter_map[ + processed_variant._CounterEnum.ANNOTATION.value].get_value(), 2) + self.assertEqual( + counter_factory.counter_map[ + processed_variant._CounterEnum.ANNOTATION_ALT_MISMATCH.value + ].get_value(), 0) + + def test_create_processed_variant_mismatched_annotation_alt(self): + # This is like `test_create_processed_variant_move_alt_info_and_annotation` + # with the difference that it has an extra alt annotation which does not + # match any alts. + variant, header_fields = self._get_sample_variant_and_header_with_csq() + variant.info['CSQ'] = vcfio.VariantInfo( + data=['A|C1|I1|S1|G1', 'TT|C2|I2|S2|G2', 'A|C3|I3|S3|G3', + 'ATAT|C3|I3|S3|G3'], + field_count='.') + counter_factory = _CounterSpyFactory() factory = processed_variant.ProcessedVariantFactory( header_fields, split_alternate_allele_info_fields=True, - annotation_fields=['CSQ']) + annotation_fields=['CSQ'], + counter_factory=counter_factory) proc_var = factory.create_processed_variant(variant) alt1 = processed_variant.AlternateBaseData('A') alt1._info = { @@ -104,5 +187,13 @@ def test_create_processed_variant_move_alt_info_and_annotation(self): self.assertEqual(proc_var.alternate_data_list, [alt1, alt2]) self.assertFalse(proc_var.non_alt_info.has_key('A2')) self.assertFalse(proc_var.non_alt_info.has_key('CSQ')) + self.assertEqual(counter_factory.counter_map[ + processed_variant._CounterEnum.VARIANT.value].get_value(), 1) + self.assertEqual(counter_factory.counter_map[ + processed_variant._CounterEnum.ANNOTATION.value].get_value(), 2) + self.assertEqual( + counter_factory.counter_map[ + processed_variant._CounterEnum.ANNOTATION_ALT_MISMATCH.value + ].get_value(), 1) # TODO(bashir2): Add tests for create_alt_record_for_schema. diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index c8ce33abc..1476996ee 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -48,6 +48,7 @@ from gcp_variant_transforms.beam_io import vcf_header_io from gcp_variant_transforms.beam_io import vcfio +from gcp_variant_transforms.libs import metrics_util from gcp_variant_transforms.libs import vcf_header_parser from gcp_variant_transforms.libs.variant_merge import merge_with_non_variants_strategy from gcp_variant_transforms.libs.variant_merge import move_to_calls_strategy @@ -230,30 +231,36 @@ def run(argv=None): # See https://issues.apache.org/jira/browse/BEAM-2801. header_fields = vcf_header_parser.get_vcf_headers( known_args.representative_header_file) + counter_factory = metrics_util.CounterFactory() processed_variant_factory = processed_variant.ProcessedVariantFactory( header_fields, known_args.split_alternate_allele_info_fields, - known_args.annotation_fields) + known_args.annotation_fields, + counter_factory) pipeline_options = PipelineOptions(pipeline_args) - with beam.Pipeline(options=pipeline_options) as p: - variants = _read_variants(p, known_args) - variants |= 'FilterVariants' >> filter_variants.FilterVariants( - reference_names=known_args.reference_names) - if variant_merger: - variants |= ( - 'MergeVariants' >> merge_variants.MergeVariants(variant_merger)) - proc_variants = variants | 'ProcessVaraints' >> beam.Map( - processed_variant_factory.create_processed_variant).\ - with_output_types(processed_variant.ProcessedVariant) - _ = (proc_variants | - 'VariantToBigQuery' >> variant_to_bigquery.VariantToBigQuery( - known_args.output_table, - header_fields, - variant_merger, - processed_variant_factory, - append=known_args.append, - omit_empty_sample_calls=known_args.omit_empty_sample_calls)) + pipeline = beam.Pipeline(options=pipeline_options) + variants = _read_variants(pipeline, known_args) + variants |= 'FilterVariants' >> filter_variants.FilterVariants( + reference_names=known_args.reference_names) + if variant_merger: + variants |= ( + 'MergeVariants' >> merge_variants.MergeVariants(variant_merger)) + proc_variants = variants | 'ProcessVaraints' >> beam.Map( + processed_variant_factory.create_processed_variant).\ + with_output_types(processed_variant.ProcessedVariant) + _ = (proc_variants | + 'VariantToBigQuery' >> variant_to_bigquery.VariantToBigQuery( + known_args.output_table, + header_fields, + variant_merger, + processed_variant_factory, + append=known_args.append, + omit_empty_sample_calls=known_args.omit_empty_sample_calls)) + result = pipeline.run() + result.wait_until_finish() + + metrics_util.log_all_counters(result) if __name__ == '__main__': diff --git a/run_presubmit.sh b/run_presubmit.sh index 86623e2ec..0c4b17ae1 100755 --- a/run_presubmit.sh +++ b/run_presubmit.sh @@ -18,7 +18,15 @@ set -euo pipefail # A helper script for ensuring all checks pass before submitting any change. echo ========== Running unit tests. +if [[ -z `which coverage` ]];then + echo "coverage is not installed. Installing ..." + pip install coverage +fi coverage run --source=gcp_variant_transforms setup.py test echo ========== Running pylint. +if [[ -z `which pylint` ]];then + echo "pylint is not installed. Installing ..." + pip install pylint +fi pylint gcp_variant_transforms