From 57926025df0d19f80a17b6b58e4bf7cc1d8dc682 Mon Sep 17 00:00:00 2001 From: Miles Saul Date: Tue, 21 Nov 2017 14:30:16 -0500 Subject: [PATCH] Added loose failure mode to allow individual VCF record reads to fail --- sdks/python/apache_beam/io/vcfio.py | 46 ++++++-- sdks/python/apache_beam/io/vcfio_test.py | 133 ++++++++++++++--------- 2 files changed, 119 insertions(+), 60 deletions(-) diff --git a/sdks/python/apache_beam/io/vcfio.py b/sdks/python/apache_beam/io/vcfio.py index b877a32d01bd..80f4631e4627 100644 --- a/sdks/python/apache_beam/io/vcfio.py +++ b/sdks/python/apache_beam/io/vcfio.py @@ -22,6 +22,8 @@ from __future__ import absolute_import +import logging +import traceback from collections import namedtuple import vcf @@ -33,8 +35,8 @@ from apache_beam.io.textio import _TextSource as TextSource from apache_beam.transforms import PTransform -__all__ = ['ReadFromVcf', 'Variant', 'VariantCall', 'VariantInfo'] - +__all__ = ['ReadFromVcf', 'Variant', 'VariantCall', 'VariantInfo', + 'MalformedVcfRecord'] # Stores data about variant INFO fields. The type of 'data' is specified in the # VCF headers. 'field_count' is a string that specifies the number of fields @@ -45,6 +47,10 @@ # - 'G': one value for each possible genotype. # - 'R': one value for each possible allele (including the reference). VariantInfo = namedtuple('VariantInfo', ['data', 'field_count']) +# Stores data about failed VCF record reads. `line` is the text line that +# caused the failed read and `file_name` is the name of the file that the read +# failed in. +MalformedVcfRecord = namedtuple('MalformedVcfRecord', ['file_name', 'line']) MISSING_FIELD_VALUE = '.' # Indicates field is missing in VCF record. PASS_FILTER = 'PASS' # Indicates that all filters have been passed. END_INFO_KEY = 'END' # The info key that explicitly specifies end of a record. @@ -223,7 +229,8 @@ def __init__(self, file_pattern, compression_type=CompressionTypes.AUTO, buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE, - validate=True): + validate=True, + allow_malformed_records=False): super(_VcfSource, self).__init__(file_pattern, compression_type=compression_type, validate=validate) @@ -231,6 +238,7 @@ def __init__(self, self._header_lines_per_file = {} self._compression_type = compression_type self._buffer_size = buffer_size + self._allow_malformed_records = allow_malformed_records def read_records(self, file_name, range_tracker): record_iterator = _VcfSource._VcfRecordIterator( @@ -238,6 +246,7 @@ def read_records(self, file_name, range_tracker): range_tracker, self._pattern, self._compression_type, + self._allow_malformed_records, buffer_size=self._buffer_size, skip_header_lines=0) @@ -253,10 +262,12 @@ def __init__(self, range_tracker, file_pattern, compression_type, + allow_malformed_records, **kwargs): self._header_lines = [] self._last_record = None self._file_name = file_name + self._allow_malformed_records = allow_malformed_records text_source = TextSource( file_pattern, @@ -274,7 +285,9 @@ def __init__(self, try: self._vcf_reader = vcf.Reader(fsock=self._create_generator()) except SyntaxError as e: - raise ValueError('Invalid VCF header %s' % str(e)) + raise ValueError('An exception was raised when reading header from VCF ' + 'file %s: %s' % (self._file_name, + traceback.format_exc(e))) def _store_header_lines(self, header_lines): self._header_lines = header_lines @@ -301,7 +314,18 @@ def next(self): return self._convert_to_variant_record(record, self._vcf_reader.infos, self._vcf_reader.formats) except (LookupError, ValueError) as e: - raise ValueError('Invalid record in VCF file. Error: %s' % str(e)) + if self._allow_malformed_records: + logging.warning( + 'An exception was raised when reading record from VCF file ' + '%s. Invalid record was %s: %s', + self._file_name, self._last_record, traceback.format_exc(e)) + return MalformedVcfRecord(self._file_name, self._last_record) + + raise ValueError('An exception was raised when reading record from VCF ' + 'file %s. Invalid record was %s: %s' % ( + self._file_name, + self._last_record, + traceback.format_exc(e))) def _convert_to_variant_record(self, record, infos, formats): """Converts the PyVCF record to a :class:`Variant` object. @@ -407,7 +431,7 @@ class ReadFromVcf(PTransform): Parses VCF files (version 4) using PyVCF library. If file_pattern specifies multiple files, then the header from each file is used separately to parse the content. However, the output will be a PCollection of - :class:`Variant` objects. + :class:`Variant` (or :class:`MalformedVcfRecord` for failed reads) objects. """ def __init__( @@ -415,6 +439,7 @@ def __init__( file_pattern=None, compression_type=CompressionTypes.AUTO, validate=True, + allow_malformed_records=False, **kwargs): """Initialize the :class:`ReadFromVcf` transform. @@ -427,10 +452,17 @@ def __init__( underlying file_path's extension will be used to detect the compression. validate (bool): flag to verify that the files exist during the pipeline creation time. + allow_malformed_records (bool): determines if failed VCF + record reads will be tolerated. Failed record reads will result in a + :class:`MalformedVcfRecord` being returned from the read of the record + rather than a :class:`Variant`. """ super(ReadFromVcf, self).__init__(**kwargs) self._source = _VcfSource( - file_pattern, compression_type, validate=validate) + file_pattern, + compression_type, + validate=validate, + allow_malformed_records=allow_malformed_records) def expand(self, pvalue): return pvalue.pipeline | Read(self._source) diff --git a/sdks/python/apache_beam/io/vcfio_test.py b/sdks/python/apache_beam/io/vcfio_test.py index 871b6e9c8c09..7ff16d49b062 100644 --- a/sdks/python/apache_beam/io/vcfio_test.py +++ b/sdks/python/apache_beam/io/vcfio_test.py @@ -20,12 +20,14 @@ import logging import os import unittest +from itertools import chain from itertools import permutations import apache_beam.io.source_test_utils as source_test_utils from apache_beam.io.vcfio import _VcfSource as VcfSource from apache_beam.io.vcfio import DEFAULT_PHASESET_VALUE from apache_beam.io.vcfio import MISSING_GENOTYPE_VALUE +from apache_beam.io.vcfio import MalformedVcfRecord from apache_beam.io.vcfio import ReadFromVcf from apache_beam.io.vcfio import Variant from apache_beam.io.vcfio import VariantCall @@ -95,8 +97,9 @@ class VcfSourceTest(unittest.TestCase): def _create_temp_vcf_file(self, lines, tempdir): return tempdir.create_temp_file(suffix='.vcf', lines=lines) - def _read_records(self, file_or_pattern): - return source_test_utils.read_from_source(VcfSource(file_or_pattern)) + def _read_records(self, file_or_pattern, **kwargs): + return source_test_utils.read_from_source( + VcfSource(file_or_pattern, **kwargs)) def _create_temp_file_and_read_records(self, lines): with TempDir() as tempdir: @@ -177,6 +180,56 @@ def _get_sample_variant_3(self): info={'GQ': None})) return variant, vcf_line + def _get_invalid_file_contents(self): + """Gets sample invalid files contents. + + Returns: + A `tuple` where the first element is contents that are invalid because + of record errors and the second element is contents that are invalid + because of header errors. + """ + malformed_vcf_records = [ + # Malfromed record. + [ + '#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT Sample\n', + '1 1 ' + ], + # Missing "GT:GQ" format, but GQ is provided. + [ + '#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT Sample\n', + '19 123 rs12345 T C 50 q10 AF=0.2;NS=2 GT 1|0:48' + ], + # GT is not an integer. + [ + '#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT Sample\n', + '19 123 rs12345 T C 50 q10 AF=0.2;NS=2 GT A|0' + ], + # POS should be an integer. + [ + '##FILTER=\n', + '##FILTER=\n', + '#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT Sample\n', + '19 abc rs12345 T C 9 q10 AF=0.2;NS=2 GT:GQ 1|0:48\n', + ] + ] + malformed_header_lines = [ + # Malformed FILTER. + [ + '##FILTER=\n', + '##FILTER=\n', + '#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT Sample\n', + '19 123 rs12345 T C 50 q10 AF=0.2;NS=2 GT:GQ 1|0:48\n', + ] + ] + + return (malformed_vcf_records, malformed_header_lines) + def test_sort_variants(self): sorted_variants = [ Variant(reference_name='a', start=20, end=22), @@ -286,59 +339,33 @@ def test_read_after_splitting(self): self.assertEqual(9882, len(split_records)) def test_invalid_file(self): - invalid_file_contents = [ - # Malfromed record. - [ - '#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT SampleName\n', - '1 1 ' - ], - # Missing "GT:GQ" format, but GQ is provided. - [ - '#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT SampleName\n', - '19 123 rs12345 T C 50 q10 AF=0.2;NS=2 GT 1|0:48' - ], - # GT is not an integer. - [ - '#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT SampleName\n', - '19 123 rs12345 T C 50 q10 AF=0.2;NS=2 GT A|0' - ], - # Malformed FILTER. - [ - '##FILTER=\n', - '##FILTER=\n', - '#CHROM POS ID REF ALT QUAL FILTER INFO FORMAT SampleName\n', - '19 123 rs12345 T C 50 q10 AF=0.2;NS=2 GT:GQ 1|0:48\n', - ], - # POS should be an integer. - [ - '##FILTER=\n', - '##FILTER=