Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,13 @@ def add_arguments(self, parser):
'large number of files are specified by input_pattern. '
'Note that each VCF file must still contain valid header files '
'even if this is provided.'))

parser.add_argument(
'--infer_undefined_headers',
type='bool', default=False, nargs='?', const=True,
help=('If true, header fields (e.g. FORMAT, INFO) are not only '
'extracted from header section of VCF files, but also from '
'variants. This is useful when there are header fields in '
'variants not defined in the header sections.'))

class BigQueryWriteOptions(VariantTransformsOptions):
"""Options for writing Variant records to BigQuery."""
Expand Down
158 changes: 158 additions & 0 deletions gcp_variant_transforms/transforms/infer_undefined_headers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A PTransform to infer undefined header fields."""

from __future__ import absolute_import

import apache_beam as beam

from vcf.parser import _Format as Format
from vcf.parser import _Info as Info
from vcf.parser import field_counts

from gcp_variant_transforms.beam_io import vcf_header_io
from gcp_variant_transforms.transforms import merge_headers


__all__ = ['InferUndefinedHeaderFields']


class VcfFieldType(object):
"""Constants for VCF field types."""
FLAG = 'Flag'
STRING = 'String'


class _InferUndefinedHeaderFields(beam.DoFn):
"""Extracts undefined header fields from ``Variant`` record."""

def _get_field_count(self, field_value):
"""
Args:
field_value (list, bool, integer, or string): value for the field
returned by PyVCF. E.g. [0.33, 0.66] is a field value for Allele
frequency (AF) field.
"""
if isinstance(field_value, list):
return field_counts['.']
elif isinstance(field_value, bool):
return 0
else:
return 1

def _get_field_type(self, field_value):
"""
Args:
field_value (list, bool, integer, or string): value for the field
returned by PyVCF. E.g. [0.33, 0.66] is a field value for Allele
frequency (AF) field.
"""
if isinstance(field_value, bool):
return VcfFieldType.FLAG
else:
return VcfFieldType.STRING

def _infer_undefined_info_fields(self, variant, defined_headers):
"""Returns info fields not defined in the headers.

Args:
variant (:class:`vcfio.Variant`): variant obj.
defined_headers (:class:`vcf_header_io.VcfHeader`): header fields defined
in header section of VCF files.
Returns:
A dict of (info_key(str), :class:`Info`) for any info field in `variant`
that is not defined in the header.
"""
infos = {}
for info_field_key, variant_info in variant.info.iteritems():
info_field_value = variant_info.data
if not defined_headers or info_field_key not in defined_headers.infos:
if info_field_key in infos:
raise ValueError(
'Invalid VCF file. Duplicate INFO field in variant {}'.format(
variant))
infos[info_field_key] = Info(info_field_key,
self._get_field_count(info_field_value),
self._get_field_type(info_field_value),
'', # NO_DESCRIPTION
'', # UNKNOWN_SOURCE
'') # UNKNOWN_VERSION
return infos

def _infer_undefined_format_fields(self, variant, defined_headers):
"""Returns format fields not defined in the headers.

Args:
variant (:class:`vcfio.Variant`): variant obj.
defined_headers (:class:`vcf_header_io.VcfHeader`): header fields defined
in header section of VCF files.
Returns:
A dict of (format_key(str), :class:`Format`) for any format key in
`variant` that is not defined in the header.
"""
formats = {}
for call in variant.calls:
for format_key, format_value in call.info.iteritems():
if not defined_headers or format_key not in defined_headers.formats:
if format_key in formats:
raise ValueError(
'Invalid VCF file. Duplicate FORMAT field in variant {}'.format(
variant))
formats[format_key] = Format(format_key,
self._get_field_count(format_value),
self._get_field_type(format_value),
'') # NO_DESCRIPTION
# No point in proceeding. All other calls have the same FORMAT.
break
return formats

def process(self, variant, defined_headers):
"""
Args:
defined_headers (:class:`vcf_header_io.VcfHeader`): header fields defined
in header section of VCF files.
"""
infos = self._infer_undefined_info_fields(variant, defined_headers)
formats = self._infer_undefined_format_fields(variant, defined_headers)
if infos or formats:
yield vcf_header_io.VcfHeader(infos=infos, formats=formats)


class InferUndefinedHeaderFields(beam.PTransform):
"""Extracts undefined header fields from ``Variant`` records."""

def __init__(self, defined_headers):
"""Initializes the transform.
Args:
defined_headers (:class:`vcf_header_io.VCFHeader`): Side input
containing all the header fields (e.g., INFO, FORMAT) defined
in the header section of the VCF files. This is used to skip already
defined header fields.
"""
self._defined_headers = defined_headers

def expand(self, pcoll):
return (pcoll
| 'InferUndefinedHeaderFields' >> beam.ParDo(
_InferUndefinedHeaderFields(), self._defined_headers)
# TODO(nmousavi): Modify the MergeHeaders to resolve 1 vs '.'
# mistmatch for headers extracted from variants.
#
# Note: argument `split_alternate_allele_info_fileds` is not
# relevant here since no fields with `Number=A` will be extracted
# from variants, therefore we let the default value (True) for it
# be used. Should this changes, we should modify the default value.
| 'MergeHeaders' >> merge_headers.MergeHeaders(
split_alternate_allele_info_fields=True))
157 changes: 157 additions & 0 deletions gcp_variant_transforms/transforms/infer_undefined_headers_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
# Copyright 2018 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for infer_variant_header module."""

from __future__ import absolute_import

from collections import OrderedDict
import unittest

from apache_beam import pvalue
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms import Create
from vcf.parser import _Format as Format
from vcf.parser import _Info as Info
from vcf.parser import field_counts

from gcp_variant_transforms.beam_io import vcf_header_io
from gcp_variant_transforms.beam_io import vcfio
from gcp_variant_transforms.transforms import infer_undefined_headers

class InferUndefinedHeaderFieldsTest(unittest.TestCase):
""" Test case for ``InferUndefinedHeaderFields`` DoFn."""

def _get_sample_header_fields(self):
infos = OrderedDict([
('IS', Info('I1', 1, 'String', 'desc', 'src', 'v')),
('IF', Info('I1', 1, 'Flag', 'desc', 'src', 'v')),
('IA', Info('IA', field_counts['A'], 'Integer', 'desc', 'src', 'v'))])
formats = OrderedDict([
('FS', Format('FS', 1, 'String', 'desc')),
('FI', Format('FI', 2, 'Integer', 'desc')),
('FU', Format('FU', field_counts['.'], 'Float', 'desc')),
('GT', Format('GT', 2, 'Integer', 'Special GT key')),
('PS', Format('PS', 1, 'Integer', 'Special PS key'))])
return vcf_header_io.VcfHeader(infos=infos, formats=formats)

def _get_sample_variant_1(self):
variant = vcfio.Variant(
reference_name='chr19', start=11, end=12, reference_bases='C',
alternate_bases=['A', 'TT'], names=['rs1', 'rs2'], quality=2,
filters=['PASS'],
info={'IS': vcfio.VariantInfo('some data', '1'),
'IF': vcfio.VariantInfo(True, '0'),
'IA': vcfio.VariantInfo([0.1, 0.2], '2')},
calls=[vcfio.VariantCall(
name='Sample1', genotype=[0, 1], phaseset='*',
info={'FI': 20, 'FU': [10.0, 20.0]})]
)
return variant

def _get_sample_variant_2(self):
variant = vcfio.Variant(
reference_name='20', start=123, end=125, reference_bases='CT',
alternate_bases=[], filters=['q10', 's10'],
info={'IS_2': vcfio.VariantInfo('some data', '1')},
calls=[vcfio.VariantCall(
name='Sample1', genotype=[0, 1], phaseset='*', info={'FI_2': 20})]
)
return variant

def test_header_fields_inferred_one_variant(self):
with TestPipeline() as p:
variant = self._get_sample_variant_1()
inferred_headers = (
p
| Create([variant])
| 'InferUndefinedHeaderFields' >>
infer_undefined_headers.InferUndefinedHeaderFields(
defined_headers=None))

expected_infos = {'IS': Info('IS', 1, 'String', '', '', ''),
'IF': Info('IF', 0, 'Flag', '', '', ''),
'IA': Info('IA', None, 'String', '', '', '')}
expected_formats = {'FI': Format('FI', 1, 'String', ''),
'FU': Format('FU', None, 'String', '')}

expected = vcf_header_io.VcfHeader(
infos=expected_infos, formats=expected_formats)
assert_that(inferred_headers, equal_to([expected]))
p.run()

def test_defined_fields_filtered_one_variant(self):
# All FORMATs and INFOs are already defined in the header section of VCF
# files.
with TestPipeline() as p:
vcf_headers = self._get_sample_header_fields()
vcf_headers_side_input = p | 'vcf_headers' >> Create([vcf_headers])
variant = self._get_sample_variant_1()
inferred_headers = (
p
| Create([variant])
| 'InferUndefinedHeaderFields' >>
infer_undefined_headers.InferUndefinedHeaderFields(
pvalue.AsSingleton(vcf_headers_side_input)))

assert_that(inferred_headers, equal_to([]))
p.run()

def test_header_fields_inferred_from_two_variants(self):
with TestPipeline() as p:
variant_1 = self._get_sample_variant_1()
variant_2 = self._get_sample_variant_2()
inferred_headers = (
p
| Create([variant_1, variant_2])
| 'InferUndefinedHeaderFields' >>
infer_undefined_headers.InferUndefinedHeaderFields(
defined_headers=None))

expected_infos = {'IS': Info('IS', 1, 'String', '', '', ''),
'IF': Info('IF', 0, 'Flag', '', '', ''),
'IA': Info('IA', None, 'String', '', '', ''),
'IS_2': Info('IS_2', 1, 'String', '', '', '')}
expected_formats = {'FI': Format('FI', 1, 'String', ''),
'FU': Format('FU', None, 'String', ''),
'FI_2': Format('FI_2', 1, 'String', '')}

expected = vcf_header_io.VcfHeader(
infos=expected_infos, formats=expected_formats)
assert_that(inferred_headers, equal_to([expected]))
p.run()

def test_defined_fields_filtered_two_variants(self):
# Only INFO and FORMAT in the first variants are already defined in the
# header section of the VCF files.
with TestPipeline() as p:
vcf_headers = self._get_sample_header_fields()
vcf_headers_side_input = p | 'vcf_header' >> Create([vcf_headers])
variant_1 = self._get_sample_variant_1()
variant_2 = self._get_sample_variant_2()
inferred_headers = (
p
| Create([variant_1, variant_2])
| 'InferUndefinedHeaderFields' >>
infer_undefined_headers.InferUndefinedHeaderFields(
pvalue.AsSingleton(vcf_headers_side_input)))

expected_infos = {'IS_2': Info('IS_2', 1, 'String', '', '', '')}
expected_formats = {'FI_2': Format('FI_2', 1, 'String', '')}
expected = vcf_header_io.VcfHeader(
infos=expected_infos, formats=expected_formats)
assert_that(inferred_headers, equal_to([expected]))
p.run()
2 changes: 1 addition & 1 deletion gcp_variant_transforms/transforms/merge_headers.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,4 +201,4 @@ def __init__(self, split_alternate_allele_info_fields=True):

def expand(self, pcoll):
return pcoll | 'MergeHeaders' >> beam.CombineGlobally(_MergeHeadersFn(
self._header_merger))
self._header_merger)).without_defaults()
32 changes: 26 additions & 6 deletions gcp_variant_transforms/vcf_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
from gcp_variant_transforms.transforms import merge_headers
from gcp_variant_transforms.transforms import merge_variants
from gcp_variant_transforms.transforms import variant_to_bigquery
from gcp_variant_transforms.transforms import infer_undefined_headers

_COMMAND_LINE_OPTIONS = [
variant_transform_options.VcfReadOptions,
Expand Down Expand Up @@ -133,6 +134,21 @@ def _get_pipeline_mode(known_args):

return PipelineModes.SMALL

def _add_inferred_headers(pipeline, known_args, merged_header):
inferred_headers = (
_read_variants(pipeline, known_args)
| 'FilterVariants' >> filter_variants.FilterVariants(
reference_names=known_args.reference_names)
| ' InferUndefinedHeaderFields' >>
infer_undefined_headers.InferUndefinedHeaderFields(
beam.pvalue.AsSingleton(merged_header)))
merged_header = (
(inferred_headers, merged_header)
| beam.Flatten()
| 'MergeHeadersFromVcfAndVariants' >> merge_headers.MergeHeaders(
known_args.split_alternate_allele_info_fields))
return merged_header


def _merge_headers(known_args, pipeline_args, pipeline_mode):
"""Merges VCF headers using beam based on pipeline_mode."""
Expand All @@ -142,7 +158,8 @@ def _merge_headers(known_args, pipeline_args, pipeline_mode):
options = PipelineOptions(pipeline_args)

# Always run pipeline locally if data is small.
if pipeline_mode == PipelineModes.SMALL:
if (pipeline_mode == PipelineModes.SMALL and
not known_args.infer_undefined_headers):
options.view_as(StandardOptions).runner = 'DirectRunner'


Expand Down Expand Up @@ -170,12 +187,15 @@ def _merge_headers(known_args, pipeline_args, pipeline_mode):
else:
headers |= vcf_header_io.ReadVcfHeaders(known_args.input_pattern)

_ = (headers
| 'MergeHeaders' >> merge_headers.MergeHeaders(
known_args.split_alternate_allele_info_fields)
| 'WriteHeaders' >> vcf_header_io.WriteVcfHeaders(
known_args.representative_header_file))
merged_header = (headers
| 'MergeHeaders' >> merge_headers.MergeHeaders(
known_args.split_alternate_allele_info_fields))

if known_args.infer_undefined_headers:
merged_header = _add_inferred_headers(p, known_args, merged_header)

_ = (merged_header | 'WriteHeaders' >> vcf_header_io.WriteVcfHeaders(
known_args.representative_header_file))

def _add_parser_arguments(options, parser):
for transform_options in options:
Expand Down