From e90f1435bbd961e55346142662bd187f35f9982b Mon Sep 17 00:00:00 2001 From: Nima <4221028+nmousavi@users.noreply.github.com> Date: Fri, 16 Feb 2018 13:13:54 -0500 Subject: [PATCH] Extract header fields from variants. Add a PTransform to extract header fields (FORMAT, INFO) form variants. This feature is activated by a flag, and is useful when there are header fields in variant with no definition in headers. Without this PTransform we currently fail the pipeline in such cases. --- .../options/variant_transform_options.py | 8 +- .../transforms/infer_undefined_headers.py | 158 ++++++++++++++++++ .../infer_undefined_headers_test.py | 157 +++++++++++++++++ .../transforms/merge_headers.py | 2 +- gcp_variant_transforms/vcf_to_bq.py | 32 +++- 5 files changed, 349 insertions(+), 8 deletions(-) create mode 100644 gcp_variant_transforms/transforms/infer_undefined_headers.py create mode 100644 gcp_variant_transforms/transforms/infer_undefined_headers_test.py diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 86a220e2b..7063d7d6b 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -77,7 +77,13 @@ def add_arguments(self, parser): 'large number of files are specified by input_pattern. ' 'Note that each VCF file must still contain valid header files ' 'even if this is provided.')) - + parser.add_argument( + '--infer_undefined_headers', + type='bool', default=False, nargs='?', const=True, + help=('If true, header fields (e.g. FORMAT, INFO) are not only ' + 'extracted from header section of VCF files, but also from ' + 'variants. This is useful when there are header fields in ' + 'variants not defined in the header sections.')) class BigQueryWriteOptions(VariantTransformsOptions): """Options for writing Variant records to BigQuery.""" diff --git a/gcp_variant_transforms/transforms/infer_undefined_headers.py b/gcp_variant_transforms/transforms/infer_undefined_headers.py new file mode 100644 index 000000000..86b059cc2 --- /dev/null +++ b/gcp_variant_transforms/transforms/infer_undefined_headers.py @@ -0,0 +1,158 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""A PTransform to infer undefined header fields.""" + +from __future__ import absolute_import + +import apache_beam as beam + +from vcf.parser import _Format as Format +from vcf.parser import _Info as Info +from vcf.parser import field_counts + +from gcp_variant_transforms.beam_io import vcf_header_io +from gcp_variant_transforms.transforms import merge_headers + + +__all__ = ['InferUndefinedHeaderFields'] + + +class VcfFieldType(object): + """Constants for VCF field types.""" + FLAG = 'Flag' + STRING = 'String' + + +class _InferUndefinedHeaderFields(beam.DoFn): + """Extracts undefined header fields from ``Variant`` record.""" + + def _get_field_count(self, field_value): + """ + Args: + field_value (list, bool, integer, or string): value for the field + returned by PyVCF. E.g. [0.33, 0.66] is a field value for Allele + frequency (AF) field. + """ + if isinstance(field_value, list): + return field_counts['.'] + elif isinstance(field_value, bool): + return 0 + else: + return 1 + + def _get_field_type(self, field_value): + """ + Args: + field_value (list, bool, integer, or string): value for the field + returned by PyVCF. E.g. [0.33, 0.66] is a field value for Allele + frequency (AF) field. + """ + if isinstance(field_value, bool): + return VcfFieldType.FLAG + else: + return VcfFieldType.STRING + + def _infer_undefined_info_fields(self, variant, defined_headers): + """Returns info fields not defined in the headers. + + Args: + variant (:class:`vcfio.Variant`): variant obj. + defined_headers (:class:`vcf_header_io.VcfHeader`): header fields defined + in header section of VCF files. + Returns: + A dict of (info_key(str), :class:`Info`) for any info field in `variant` + that is not defined in the header. + """ + infos = {} + for info_field_key, variant_info in variant.info.iteritems(): + info_field_value = variant_info.data + if not defined_headers or info_field_key not in defined_headers.infos: + if info_field_key in infos: + raise ValueError( + 'Invalid VCF file. Duplicate INFO field in variant {}'.format( + variant)) + infos[info_field_key] = Info(info_field_key, + self._get_field_count(info_field_value), + self._get_field_type(info_field_value), + '', # NO_DESCRIPTION + '', # UNKNOWN_SOURCE + '') # UNKNOWN_VERSION + return infos + + def _infer_undefined_format_fields(self, variant, defined_headers): + """Returns format fields not defined in the headers. + + Args: + variant (:class:`vcfio.Variant`): variant obj. + defined_headers (:class:`vcf_header_io.VcfHeader`): header fields defined + in header section of VCF files. + Returns: + A dict of (format_key(str), :class:`Format`) for any format key in + `variant` that is not defined in the header. + """ + formats = {} + for call in variant.calls: + for format_key, format_value in call.info.iteritems(): + if not defined_headers or format_key not in defined_headers.formats: + if format_key in formats: + raise ValueError( + 'Invalid VCF file. Duplicate FORMAT field in variant {}'.format( + variant)) + formats[format_key] = Format(format_key, + self._get_field_count(format_value), + self._get_field_type(format_value), + '') # NO_DESCRIPTION + # No point in proceeding. All other calls have the same FORMAT. + break + return formats + + def process(self, variant, defined_headers): + """ + Args: + defined_headers (:class:`vcf_header_io.VcfHeader`): header fields defined + in header section of VCF files. + """ + infos = self._infer_undefined_info_fields(variant, defined_headers) + formats = self._infer_undefined_format_fields(variant, defined_headers) + if infos or formats: + yield vcf_header_io.VcfHeader(infos=infos, formats=formats) + + +class InferUndefinedHeaderFields(beam.PTransform): + """Extracts undefined header fields from ``Variant`` records.""" + + def __init__(self, defined_headers): + """Initializes the transform. + Args: + defined_headers (:class:`vcf_header_io.VCFHeader`): Side input + containing all the header fields (e.g., INFO, FORMAT) defined + in the header section of the VCF files. This is used to skip already + defined header fields. + """ + self._defined_headers = defined_headers + + def expand(self, pcoll): + return (pcoll + | 'InferUndefinedHeaderFields' >> beam.ParDo( + _InferUndefinedHeaderFields(), self._defined_headers) + # TODO(nmousavi): Modify the MergeHeaders to resolve 1 vs '.' + # mistmatch for headers extracted from variants. + # + # Note: argument `split_alternate_allele_info_fileds` is not + # relevant here since no fields with `Number=A` will be extracted + # from variants, therefore we let the default value (True) for it + # be used. Should this changes, we should modify the default value. + | 'MergeHeaders' >> merge_headers.MergeHeaders( + split_alternate_allele_info_fields=True)) diff --git a/gcp_variant_transforms/transforms/infer_undefined_headers_test.py b/gcp_variant_transforms/transforms/infer_undefined_headers_test.py new file mode 100644 index 000000000..752470d47 --- /dev/null +++ b/gcp_variant_transforms/transforms/infer_undefined_headers_test.py @@ -0,0 +1,157 @@ +# Copyright 2018 Google Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""Tests for infer_variant_header module.""" + +from __future__ import absolute_import + +from collections import OrderedDict +import unittest + +from apache_beam import pvalue +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.transforms import Create +from vcf.parser import _Format as Format +from vcf.parser import _Info as Info +from vcf.parser import field_counts + +from gcp_variant_transforms.beam_io import vcf_header_io +from gcp_variant_transforms.beam_io import vcfio +from gcp_variant_transforms.transforms import infer_undefined_headers + +class InferUndefinedHeaderFieldsTest(unittest.TestCase): + """ Test case for ``InferUndefinedHeaderFields`` DoFn.""" + + def _get_sample_header_fields(self): + infos = OrderedDict([ + ('IS', Info('I1', 1, 'String', 'desc', 'src', 'v')), + ('IF', Info('I1', 1, 'Flag', 'desc', 'src', 'v')), + ('IA', Info('IA', field_counts['A'], 'Integer', 'desc', 'src', 'v'))]) + formats = OrderedDict([ + ('FS', Format('FS', 1, 'String', 'desc')), + ('FI', Format('FI', 2, 'Integer', 'desc')), + ('FU', Format('FU', field_counts['.'], 'Float', 'desc')), + ('GT', Format('GT', 2, 'Integer', 'Special GT key')), + ('PS', Format('PS', 1, 'Integer', 'Special PS key'))]) + return vcf_header_io.VcfHeader(infos=infos, formats=formats) + + def _get_sample_variant_1(self): + variant = vcfio.Variant( + reference_name='chr19', start=11, end=12, reference_bases='C', + alternate_bases=['A', 'TT'], names=['rs1', 'rs2'], quality=2, + filters=['PASS'], + info={'IS': vcfio.VariantInfo('some data', '1'), + 'IF': vcfio.VariantInfo(True, '0'), + 'IA': vcfio.VariantInfo([0.1, 0.2], '2')}, + calls=[vcfio.VariantCall( + name='Sample1', genotype=[0, 1], phaseset='*', + info={'FI': 20, 'FU': [10.0, 20.0]})] + ) + return variant + + def _get_sample_variant_2(self): + variant = vcfio.Variant( + reference_name='20', start=123, end=125, reference_bases='CT', + alternate_bases=[], filters=['q10', 's10'], + info={'IS_2': vcfio.VariantInfo('some data', '1')}, + calls=[vcfio.VariantCall( + name='Sample1', genotype=[0, 1], phaseset='*', info={'FI_2': 20})] + ) + return variant + + def test_header_fields_inferred_one_variant(self): + with TestPipeline() as p: + variant = self._get_sample_variant_1() + inferred_headers = ( + p + | Create([variant]) + | 'InferUndefinedHeaderFields' >> + infer_undefined_headers.InferUndefinedHeaderFields( + defined_headers=None)) + + expected_infos = {'IS': Info('IS', 1, 'String', '', '', ''), + 'IF': Info('IF', 0, 'Flag', '', '', ''), + 'IA': Info('IA', None, 'String', '', '', '')} + expected_formats = {'FI': Format('FI', 1, 'String', ''), + 'FU': Format('FU', None, 'String', '')} + + expected = vcf_header_io.VcfHeader( + infos=expected_infos, formats=expected_formats) + assert_that(inferred_headers, equal_to([expected])) + p.run() + + def test_defined_fields_filtered_one_variant(self): + # All FORMATs and INFOs are already defined in the header section of VCF + # files. + with TestPipeline() as p: + vcf_headers = self._get_sample_header_fields() + vcf_headers_side_input = p | 'vcf_headers' >> Create([vcf_headers]) + variant = self._get_sample_variant_1() + inferred_headers = ( + p + | Create([variant]) + | 'InferUndefinedHeaderFields' >> + infer_undefined_headers.InferUndefinedHeaderFields( + pvalue.AsSingleton(vcf_headers_side_input))) + + assert_that(inferred_headers, equal_to([])) + p.run() + + def test_header_fields_inferred_from_two_variants(self): + with TestPipeline() as p: + variant_1 = self._get_sample_variant_1() + variant_2 = self._get_sample_variant_2() + inferred_headers = ( + p + | Create([variant_1, variant_2]) + | 'InferUndefinedHeaderFields' >> + infer_undefined_headers.InferUndefinedHeaderFields( + defined_headers=None)) + + expected_infos = {'IS': Info('IS', 1, 'String', '', '', ''), + 'IF': Info('IF', 0, 'Flag', '', '', ''), + 'IA': Info('IA', None, 'String', '', '', ''), + 'IS_2': Info('IS_2', 1, 'String', '', '', '')} + expected_formats = {'FI': Format('FI', 1, 'String', ''), + 'FU': Format('FU', None, 'String', ''), + 'FI_2': Format('FI_2', 1, 'String', '')} + + expected = vcf_header_io.VcfHeader( + infos=expected_infos, formats=expected_formats) + assert_that(inferred_headers, equal_to([expected])) + p.run() + + def test_defined_fields_filtered_two_variants(self): + # Only INFO and FORMAT in the first variants are already defined in the + # header section of the VCF files. + with TestPipeline() as p: + vcf_headers = self._get_sample_header_fields() + vcf_headers_side_input = p | 'vcf_header' >> Create([vcf_headers]) + variant_1 = self._get_sample_variant_1() + variant_2 = self._get_sample_variant_2() + inferred_headers = ( + p + | Create([variant_1, variant_2]) + | 'InferUndefinedHeaderFields' >> + infer_undefined_headers.InferUndefinedHeaderFields( + pvalue.AsSingleton(vcf_headers_side_input))) + + expected_infos = {'IS_2': Info('IS_2', 1, 'String', '', '', '')} + expected_formats = {'FI_2': Format('FI_2', 1, 'String', '')} + expected = vcf_header_io.VcfHeader( + infos=expected_infos, formats=expected_formats) + assert_that(inferred_headers, equal_to([expected])) + p.run() diff --git a/gcp_variant_transforms/transforms/merge_headers.py b/gcp_variant_transforms/transforms/merge_headers.py index 95f82c828..223ef3e9b 100644 --- a/gcp_variant_transforms/transforms/merge_headers.py +++ b/gcp_variant_transforms/transforms/merge_headers.py @@ -201,4 +201,4 @@ def __init__(self, split_alternate_allele_info_fields=True): def expand(self, pcoll): return pcoll | 'MergeHeaders' >> beam.CombineGlobally(_MergeHeadersFn( - self._header_merger)) + self._header_merger)).without_defaults() diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 50ad00844..a3790b211 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -55,6 +55,7 @@ from gcp_variant_transforms.transforms import merge_headers from gcp_variant_transforms.transforms import merge_variants from gcp_variant_transforms.transforms import variant_to_bigquery +from gcp_variant_transforms.transforms import infer_undefined_headers _COMMAND_LINE_OPTIONS = [ variant_transform_options.VcfReadOptions, @@ -132,6 +133,21 @@ def _get_pipeline_mode(known_args): return PipelineModes.SMALL +def _add_inferred_headers(pipeline, known_args, merged_header): + inferred_headers = ( + _read_variants(pipeline, known_args) + | 'FilterVariants' >> filter_variants.FilterVariants( + reference_names=known_args.reference_names) + | ' InferUndefinedHeaderFields' >> + infer_undefined_headers.InferUndefinedHeaderFields( + beam.pvalue.AsSingleton(merged_header))) + merged_header = ( + (inferred_headers, merged_header) + | beam.Flatten() + | 'MergeHeadersFromVcfAndVariants' >> merge_headers.MergeHeaders( + known_args.split_alternate_allele_info_fields)) + return merged_header + def _merge_headers(known_args, pipeline_args, pipeline_mode): """Merges VCF headers using beam based on pipeline_mode.""" @@ -141,7 +157,8 @@ def _merge_headers(known_args, pipeline_args, pipeline_mode): options = PipelineOptions(pipeline_args) # Always run pipeline locally if data is small. - if pipeline_mode == PipelineModes.SMALL: + if (pipeline_mode == PipelineModes.SMALL and + not known_args.infer_undefined_headers): options.view_as(StandardOptions).runner = 'DirectRunner' @@ -169,12 +186,15 @@ def _merge_headers(known_args, pipeline_args, pipeline_mode): else: headers |= vcf_header_io.ReadVcfHeaders(known_args.input_pattern) - _ = (headers - | 'MergeHeaders' >> merge_headers.MergeHeaders( - known_args.split_alternate_allele_info_fields) - | 'WriteHeaders' >> vcf_header_io.WriteVcfHeaders( - known_args.representative_header_file)) + merged_header = (headers + | 'MergeHeaders' >> merge_headers.MergeHeaders( + known_args.split_alternate_allele_info_fields)) + + if known_args.infer_undefined_headers: + merged_header = _add_inferred_headers(p, known_args, merged_header) + _ = (merged_header | 'WriteHeaders' >> vcf_header_io.WriteVcfHeaders( + known_args.representative_header_file)) def _add_parser_arguments(options, parser): for transform_options in options: