diff --git a/gcp_variant_transforms/beam_io/vcfio.py b/gcp_variant_transforms/beam_io/vcfio.py index 162430dd2..3e208b22c 100644 --- a/gcp_variant_transforms/beam_io/vcfio.py +++ b/gcp_variant_transforms/beam_io/vcfio.py @@ -34,6 +34,8 @@ from apache_beam.transforms import PTransform from apache_beam.transforms.display import DisplayDataItem +from gcp_variant_transforms.libs import vcf_header_parser + __all__ = ['ReadFromVcf', 'ReadAllFromVcf', 'Variant', 'VariantCall', 'VariantInfo', 'MalformedVcfRecord'] @@ -46,7 +48,11 @@ # - 'A': one value per alternate allele. # - 'G': one value for each possible genotype. # - 'R': one value for each possible allele (including the reference). -VariantInfo = namedtuple('VariantInfo', ['data', 'field_count']) +# `annotation_names` is only filled for the annotation field and it is the +# list of annotation names extracted from the description part of the annotation +# field metadata in the VCF header. +VariantInfo = namedtuple('VariantInfo', + ['data', 'field_count', 'annotation_names']) # Stores data about failed VCF record reads. `line` is the text line that # caused the failed read and `file_name` is the name of the file that the read # failed in. @@ -652,7 +658,8 @@ def __init__(self, compression_type=CompressionTypes.AUTO, buffer_size=DEFAULT_VCF_READ_BUFFER_SIZE, validate=True, - allow_malformed_records=False): + allow_malformed_records=False, + annotation_field=None): super(_VcfSource, self).__init__(file_pattern, compression_type=compression_type, validate=validate) @@ -660,6 +667,7 @@ def __init__(self, self._compression_type = compression_type self._buffer_size = buffer_size self._allow_malformed_records = allow_malformed_records + self._annotation_field = annotation_field def read_records(self, file_name, range_tracker): record_iterator = _VcfSource._VcfRecordIterator( @@ -668,6 +676,7 @@ def read_records(self, file_name, range_tracker): self._pattern, self._compression_type, self._allow_malformed_records, + annotation_field=self._annotation_field, buffer_size=self._buffer_size, skip_header_lines=0) @@ -684,11 +693,13 @@ def __init__(self, file_pattern, compression_type, allow_malformed_records, + annotation_field=None, **kwargs): self._header_lines = [] self._last_record = None self._file_name = file_name self._allow_malformed_records = allow_malformed_records + self._annotation_field = annotation_field text_source = _TextSource( file_pattern, @@ -795,10 +806,19 @@ def _get_variant_info(self, record, infos): for k, v in record.INFO.iteritems(): if k != END_INFO_KEY: field_count = None + annotation_names = None if k in infos: field_count = self._get_field_count_as_string(infos[k].num) - info[k] = VariantInfo(data=v, field_count=field_count) - + if k == self._annotation_field: + annotation_names = vcf_header_parser.extract_annotation_names( + infos[k].desc) + # TODO(bashir2): The reason we keep annotation_names with each variant + # is to do better merging, e.g., when some variants from two VCF files + # have different annotations. This merging logic needs to be + # implemented though. + info[k] = VariantInfo(data=v, + field_count=field_count, + annotation_names=annotation_names) return info def _get_field_count_as_string(self, field_count): @@ -880,6 +900,7 @@ def __init__( compression_type=CompressionTypes.AUTO, validate=True, allow_malformed_records=False, + annotation_field=None, **kwargs): """Initialize the :class:`ReadFromVcf` transform. @@ -892,23 +913,29 @@ def __init__( underlying file_path's extension will be used to detect the compression. validate (bool): flag to verify that the files exist during the pipeline creation time. + annotation_field (str): If set, it is the field which will be treated as + annotation field, i.e., the description from header is split and copied + into the `VariantInfo.annotation_names` field of each variant. """ super(ReadFromVcf, self).__init__(**kwargs) self._source = _VcfSource( file_pattern, compression_type, validate=validate, - allow_malformed_records=allow_malformed_records) + allow_malformed_records=allow_malformed_records, + annotation_field=annotation_field) def expand(self, pvalue): return pvalue.pipeline | Read(self._source) def _create_vcf_source( - file_pattern=None, compression_type=None, allow_malformed_records=None): + file_pattern=None, compression_type=None, allow_malformed_records=None, + annotation_field=None): return _VcfSource(file_pattern=file_pattern, compression_type=compression_type, - allow_malformed_records=allow_malformed_records) + allow_malformed_records=allow_malformed_records, + annotation_field=annotation_field) class ReadAllFromVcf(PTransform): @@ -930,6 +957,7 @@ def __init__( desired_bundle_size=DEFAULT_DESIRED_BUNDLE_SIZE, compression_type=CompressionTypes.AUTO, allow_malformed_records=False, + annotation_field=None, **kwargs): """Initialize the :class:`ReadAllFromVcf` transform. @@ -945,11 +973,15 @@ def __init__( allow_malformed_records (bool): If true, malformed records from VCF files will be returned as :class:`MalformedVcfRecord` instead of failing the pipeline. + annotation_field (`str`): If set, that is the field which will be treated + as annotation field, i.e., the description from header is split and + copied into the `VariantInfo.annotation_names` field of each variant. """ super(ReadAllFromVcf, self).__init__(**kwargs) source_from_file = partial( _create_vcf_source, compression_type=compression_type, - allow_malformed_records=allow_malformed_records) + allow_malformed_records=allow_malformed_records, + annotation_field=annotation_field) self._read_all_files = filebasedsource.ReadAllFiles( True, # splittable CompressionTypes.AUTO, desired_bundle_size, diff --git a/gcp_variant_transforms/beam_io/vcfio_test.py b/gcp_variant_transforms/beam_io/vcfio_test.py index d3e88b8b9..924e2dd0d 100644 --- a/gcp_variant_transforms/beam_io/vcfio_test.py +++ b/gcp_variant_transforms/beam_io/vcfio_test.py @@ -78,8 +78,10 @@ def _get_sample_variant_1(): reference_name='20', start=1233, end=1234, reference_bases='C', alternate_bases=['A', 'T'], names=['rs123', 'rs2'], quality=50, filters=['PASS'], - info={'AF': vcfio.VariantInfo(data=[0.5, 0.1], field_count='A'), - 'NS': vcfio.VariantInfo(data=1, field_count='1')}) + info={'AF': vcfio.VariantInfo(data=[0.5, 0.1], field_count='A', + annotation_names=None), + 'NS': vcfio.VariantInfo(data=1, field_count='1', + annotation_names=None)}) variant.calls.append( vcfio.VariantCall(name='Sample1', genotype=[0, 0], info={'GQ': 48})) variant.calls.append( @@ -103,7 +105,8 @@ def _get_sample_variant_2(): reference_name='19', start=122, end=125, reference_bases='GTC', alternate_bases=[], names=['rs1234'], quality=40, filters=['q10', 's50'], - info={'NS': vcfio.VariantInfo(data=2, field_count='1')}) + info={'NS': vcfio.VariantInfo(data=2, field_count='1', + annotation_names=None)}) variant.calls.append( vcfio.VariantCall(name='Sample1', genotype=[1, 0], phaseset=vcfio.DEFAULT_PHASESET_VALUE, @@ -127,7 +130,8 @@ def _get_sample_variant_3(): variant = vcfio.Variant( reference_name='19', start=11, end=12, reference_bases='C', alternate_bases=[''], quality=49, filters=['q10'], - info={'AF': vcfio.VariantInfo(data=[0.5], field_count='A')}) + info={'AF': vcfio.VariantInfo(data=[0.5], field_count='A', + annotation_names=None)}) variant.calls.append( vcfio.VariantCall(name='Sample1', genotype=[0, 1], phaseset='1', @@ -392,7 +396,8 @@ def test_no_samples(self): expected_variant = Variant( reference_name='19', start=122, end=123, reference_bases='G', alternate_bases=['A'], filters=['PASS'], - info={'AF': VariantInfo(data=[0.2], field_count='A')}) + info={'AF': VariantInfo(data=[0.2], field_count='A', + annotation_names=None)}) read_data = self._create_temp_file_and_read_records( _SAMPLE_HEADER_LINES[:-1] + [header_line, record_line]) self.assertEqual(1, len(read_data)) @@ -423,19 +428,27 @@ def test_info_numbers_and_types(self): variant_1 = Variant( reference_name='19', start=1, end=2, reference_bases='A', alternate_bases=['T', 'C'], - info={'HA': VariantInfo(data=['a1', 'a2'], field_count='A'), - 'HG': VariantInfo(data=[1, 2, 3], field_count='G'), - 'HR': VariantInfo(data=['a', 'b', 'c'], field_count='R'), - 'HF': VariantInfo(data=True, field_count='0'), - 'HU': VariantInfo(data=[0.1], field_count=None)}) + info={'HA': VariantInfo(data=['a1', 'a2'], field_count='A', + annotation_names=None), + 'HG': VariantInfo(data=[1, 2, 3], field_count='G', + annotation_names=None), + 'HR': VariantInfo(data=['a', 'b', 'c'], field_count='R', + annotation_names=None), + 'HF': VariantInfo(data=True, field_count='0', + annotation_names=None), + 'HU': VariantInfo(data=[0.1], field_count=None, + annotation_names=None)}) variant_1.calls.append(VariantCall(name='Sample1', genotype=[1, 0])) variant_1.calls.append(VariantCall(name='Sample2', genotype=[0, 1])) variant_2 = Variant( reference_name='19', start=123, end=124, reference_bases='A', alternate_bases=['T'], - info={'HG': VariantInfo(data=[3, 4, 5], field_count='G'), - 'HR': VariantInfo(data=['d', 'e'], field_count='R'), - 'HU': VariantInfo(data=[1.1, 1.2], field_count=None)}) + info={'HG': VariantInfo(data=[3, 4, 5], field_count='G', + annotation_names=None), + 'HR': VariantInfo(data=['d', 'e'], field_count='R', + annotation_names=None), + 'HU': VariantInfo(data=[1.1, 1.2], field_count=None, + annotation_names=None)}) variant_2.calls.append(VariantCall(name='Sample1', genotype=[0, 0])) variant_2.calls.append(VariantCall(name='Sample2', genotype=[0, 1])) read_data = self._create_temp_file_and_read_records( @@ -755,9 +768,12 @@ def test_info_list(self): def test_info_field_count(self): coder = self._get_coder() variant = Variant() - variant.info['NS'] = VariantInfo(data=3, field_count='1') - variant.info['AF'] = VariantInfo(data=[0.333, 0.667], field_count='A') - variant.info['DB'] = VariantInfo(data=True, field_count='0') + variant.info['NS'] = VariantInfo(data=3, field_count='1', + annotation_names=None) + variant.info['AF'] = VariantInfo(data=[0.333, 0.667], field_count='A', + annotation_names=None) + variant.info['DB'] = VariantInfo(data=True, field_count='0', + annotation_names=None) expected = '. . . . . . . NS=3;AF=0.333,0.667;DB .\n' self._assert_variant_lines_equal(coder.encode(variant), expected) diff --git a/gcp_variant_transforms/libs/bigquery_vcf_schema.py b/gcp_variant_transforms/libs/bigquery_vcf_schema.py index 7b2375053..94f706a95 100644 --- a/gcp_variant_transforms/libs/bigquery_vcf_schema.py +++ b/gcp_variant_transforms/libs/bigquery_vcf_schema.py @@ -26,6 +26,7 @@ from apache_beam.io.gcp.internal.clients import bigquery from gcp_variant_transforms.beam_io import vcfio +from gcp_variant_transforms.libs import vcf_header_parser __all__ = ['generate_schema_from_header_fields', 'get_rows_from_variant', @@ -82,8 +83,15 @@ class _TableFieldConstants(object): _JSON_CONCATENATION_OVERHEAD_BYTES = 5 +# TODO(bashir2): Using type identifiers like ``HeaderFields`` does not seem +# to be picked up by tools, because they cannot resolve these type identifiers. +# We should either fix these or otherwise stop using the convention of using +# double ` when not recognized by tools. + + def generate_schema_from_header_fields(header_fields, variant_merger=None, - split_alternate_allele_info_fields=True): + split_alternate_allele_info_fields=True, + annotation_field=None): """Returns a ``TableSchema`` for the BigQuery table storing variants. Args: @@ -141,6 +149,28 @@ def generate_schema_from_header_fields(header_fields, variant_merger=None, type=_get_bigquery_type_from_vcf_type(field.type), mode=_TableFieldConstants.MODE_NULLABLE, description=_get_bigquery_sanitized_field(field.desc))) + if annotation_field: + annotation_names = [] + for key, field in header_fields.infos.iteritems(): + if key == annotation_field: + annotation_names = vcf_header_parser.extract_annotation_names( + field.desc) + break + if not annotation_names: + raise ValueError('Annotation field {} not found'.format(annotation_field)) + annotation_record = bigquery.TableFieldSchema( + name=_get_bigquery_sanitized_field(annotation_field), + type=_TableFieldConstants.TYPE_RECORD, + mode=_TableFieldConstants.MODE_REPEATED, + description='List of annotations for this alternate.') + for annotation in annotation_names: + annotation_record.fields.append(bigquery.TableFieldSchema( + name=_get_bigquery_sanitized_field(annotation), + type=_TableFieldConstants.TYPE_STRING, + mode=_TableFieldConstants.MODE_NULLABLE, + # TODO(bashir2): Add descriptions of known annotations, e.g., from VEP + description='')) + alternate_bases_record.fields.append(annotation_record) schema.fields.append(alternate_bases_record) schema.fields.append(bigquery.TableFieldSchema( @@ -202,7 +232,8 @@ def generate_schema_from_header_fields(header_fields, variant_merger=None, # END info is already included by modifying the end_position. if (key == vcfio.END_INFO_KEY or (split_alternate_allele_info_fields and - field.num == vcf.parser.field_counts[_FIELD_COUNT_ALTERNATE_ALLELE])): + field.num == vcf.parser.field_counts[_FIELD_COUNT_ALTERNATE_ALLELE]) or + key == annotation_field): continue schema.fields.append(bigquery.TableFieldSchema( name=_get_bigquery_sanitized_field_name(key), @@ -217,7 +248,7 @@ def generate_schema_from_header_fields(header_fields, variant_merger=None, # TODO: refactor this to use a class instead. def get_rows_from_variant(variant, split_alternate_allele_info_fields=True, - omit_empty_sample_calls=False): + omit_empty_sample_calls=False, annotation_field=None): """Yields BigQuery rows according to the schema from the given variant. There is a 10MB limit for each BigQuery row, which can be exceeded by having @@ -232,6 +263,8 @@ def get_rows_from_variant(variant, split_alternate_allele_info_fields=True, of the INFO fields. omit_empty_sample_calls (bool): If true, samples that don't have a given call will be omitted. + annotation_field (str): If provided, it is the name of the INFO field + that contains the annotation list. Yields: A dict representing a BigQuery row from the given variant. The row may have a subset of the calls if it exceeds the maximum allowed BigQuery row size. @@ -241,9 +274,11 @@ def get_rows_from_variant(variant, split_alternate_allele_info_fields=True, # TODO: Add error checking here for cases where the schema defined # by the headers does not match actual records. base_row = _get_base_row_from_variant( - variant, split_alternate_allele_info_fields) + variant, split_alternate_allele_info_fields, annotation_field) base_row_size_in_bytes = _get_json_object_size(base_row) row_size_in_bytes = base_row_size_in_bytes + # TODO(bashir2): It seems that BigQueryWriter buffers 1000 rows and this + # can cause BigQuery API exceptions. We need to fix this! row = copy.deepcopy(base_row) # Keep base_row intact. for call in variant.calls: call_record, empty = _get_call_record(call) @@ -287,7 +322,34 @@ def _get_call_record(call): return call_record, is_empty -def _get_base_row_from_variant(variant, split_alternate_allele_info_fields): +def _create_list_of_annotation_lists(alt, info): + """Extracts list of annotations for an alternate. + + Args: + alt (str): The alternate for which the annotation lists are extracted. + info (``VariantInfo``): The data for the annotation INFO field. + """ + annotation_record = [] + for data in info.data: + annotation_list = vcf_header_parser.extract_annotation_list_with_alt(data) + if len(annotation_list) != len(info.annotation_names) + 1: + # TODO(bashir2): This and several other annotation related checks should + # be made "soft", i.e., handled gracefully. We will do this as part of the + # bigger issue to make schema error checking more robust. + raise ValueError('Number of annotations does not match header') + # TODO(bashir2): The alternate allele format is not necessarily as simple + # as being equal to an 'alt', so this needs to be fixed to handle all + # possible formats. + if annotation_list[0] == alt: + annotation_dict = {} + for i in range(len(info.annotation_names)): + annotation_dict[info.annotation_names[i]] = annotation_list[i + 1] + annotation_record.append(annotation_dict) + return annotation_record + + +def _get_base_row_from_variant(variant, split_alternate_allele_info_fields, + annotation_field): """A helper method for ``get_rows_from_variant`` to get row without calls.""" row = { ColumnKeyConstants.REFERENCE_NAME: variant.reference_name, @@ -315,12 +377,22 @@ def _get_base_row_from_variant(variant, split_alternate_allele_info_fields): info_key, variant)) alt_record[_get_bigquery_sanitized_field_name(info_key)] = ( _get_bigquery_sanitized_field(info.data[alt_index])) + if annotation_field: + for info_key, info in variant.info.iteritems(): + if info_key == annotation_field: + if not info.annotation_names: + raise ValueError( + 'Annotation list not found for field {}'.format(info_key)) + alt_record[_get_bigquery_sanitized_field(annotation_field)] = ( + _create_list_of_annotation_lists(alt, info)) + break row[ColumnKeyConstants.ALTERNATE_BASES].append(alt_record) # Add info. for key, info in variant.info.iteritems(): if (info.data is not None and (not split_alternate_allele_info_fields or - info.field_count != _FIELD_COUNT_ALTERNATE_ALLELE)): + info.field_count != _FIELD_COUNT_ALTERNATE_ALLELE) and + key != annotation_field): row[_get_bigquery_sanitized_field_name(key)] = ( _get_bigquery_sanitized_field(info.data)) # Set calls to empty for now (will be filled later). diff --git a/gcp_variant_transforms/libs/bigquery_vcf_schema_test.py b/gcp_variant_transforms/libs/bigquery_vcf_schema_test.py index b32e959e6..597754209 100644 --- a/gcp_variant_transforms/libs/bigquery_vcf_schema_test.py +++ b/gcp_variant_transforms/libs/bigquery_vcf_schema_test.py @@ -203,15 +203,15 @@ class GetRowsFromVariantTest(unittest.TestCase): def _get_row_list_from_variant(self, variant, **kwargs): return list(bigquery_vcf_schema.get_rows_from_variant(variant, **kwargs)) - def test_all_fields(self): - variant = vcfio.Variant( + def _create_variant_with_two_alts(self): + return vcfio.Variant( reference_name='chr19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1', 'rs2'], quality=2, filters=['PASS'], - info={'AF': vcfio.VariantInfo([0.1, 0.2], 'A'), - 'AF2': vcfio.VariantInfo([0.2, 0.3], 'A'), - 'I1': vcfio.VariantInfo('some data', '1'), - 'I2': vcfio.VariantInfo(['data1', 'data2'], '2')}, + info={'AF': vcfio.VariantInfo([0.1, 0.2], 'A', None), + 'AF2': vcfio.VariantInfo([0.2, 0.3], 'A', None), + 'I1': vcfio.VariantInfo('some data', '1', None), + 'I2': vcfio.VariantInfo(['data1', 'data2'], '2', None)}, calls=[ vcfio.VariantCall( name='Sample1', genotype=[0, 1], phaseset='*', @@ -221,6 +221,9 @@ def test_all_fields(self): info={'GQ': 10, 'FLAG1': True}), vcfio.VariantCall( name='Sample3', genotype=[vcfio.MISSING_GENOTYPE_VALUE])]) + + def test_all_fields(self): + variant = self._create_variant_with_two_alts() expected_row = { ColumnKeyConstants.REFERENCE_NAME: 'chr19', ColumnKeyConstants.START_POSITION: 11, @@ -261,12 +264,105 @@ def test_all_fields(self): self._get_row_list_from_variant( variant, split_alternate_allele_info_fields=False)) + def test_annotation_field_no_split(self): + variant = self._create_variant_with_two_alts() + # PyVCF returns None for '.' and this is to simulate that behavior. + variant.info['CSQ'] = vcfio.VariantInfo( + data=['A|ugv1|M1|pc1', 'TT|ugv2|M2|pc2', 'A|ugv3|M3|pc3'], + field_count=None, + annotation_names=None) + # TODO(bashir2): Refactor expectations as well; this is left to be done with + # the bigger refactoring of the production code under test here. + expected_row = { + ColumnKeyConstants.REFERENCE_NAME: 'chr19', + ColumnKeyConstants.START_POSITION: 11, + ColumnKeyConstants.END_POSITION: 12, + ColumnKeyConstants.REFERENCE_BASES: 'C', + ColumnKeyConstants.ALTERNATE_BASES: [ + {ColumnKeyConstants.ALTERNATE_BASES_ALT: 'A', + 'AF': 0.1, 'AF2': 0.2}, + {ColumnKeyConstants.ALTERNATE_BASES_ALT: 'TT', + 'AF': 0.2, 'AF2': 0.3}], + ColumnKeyConstants.NAMES: ['rs1', 'rs2'], + ColumnKeyConstants.QUALITY: 2, + ColumnKeyConstants.FILTER: ['PASS'], + ColumnKeyConstants.CALLS: [ + {ColumnKeyConstants.CALLS_NAME: 'Sample1', + ColumnKeyConstants.CALLS_GENOTYPE: [0, 1], + ColumnKeyConstants.CALLS_PHASESET: '*', + 'GQ': 20, 'HQ': [10, 20]}, + {ColumnKeyConstants.CALLS_NAME: 'Sample2', + ColumnKeyConstants.CALLS_GENOTYPE: [1, 0], + ColumnKeyConstants.CALLS_PHASESET: None, + 'GQ': 10, 'FLAG1': True}, + {ColumnKeyConstants.CALLS_NAME: 'Sample3', + ColumnKeyConstants.CALLS_GENOTYPE: [vcfio.MISSING_GENOTYPE_VALUE], + ColumnKeyConstants.CALLS_PHASESET: None}], + 'I1': 'some data', + 'I2': ['data1', 'data2'], + 'CSQ': ['A|ugv1|M1|pc1', 'TT|ugv2|M2|pc2', 'A|ugv3|M3|pc3'], + } + self.assertEqual([expected_row], self._get_row_list_from_variant(variant)) + + def test_annotation_field(self): + variant = self._create_variant_with_two_alts() + # PyVCF returns None for '.' and this is to simulate that behavior. + variant.info['CSQ'] = vcfio.VariantInfo( + data=['A|ugv1|M1|pc1', 'TT|ugv2|M2|pc2', 'A|ugv3|M3|pc3'], + field_count=None, + annotation_names=vcf_header_parser.extract_annotation_names( + 'SOME_DESC|upstream_gene_variant|MODIFIER|protein_coding')) + expected_row = { + ColumnKeyConstants.REFERENCE_NAME: 'chr19', + ColumnKeyConstants.START_POSITION: 11, + ColumnKeyConstants.END_POSITION: 12, + ColumnKeyConstants.REFERENCE_BASES: 'C', + ColumnKeyConstants.ALTERNATE_BASES: [ + {ColumnKeyConstants.ALTERNATE_BASES_ALT: 'A', + 'AF': 0.1, 'AF2': 0.2, + 'CSQ': [{'upstream_gene_variant': 'ugv1', + 'MODIFIER': 'M1', + 'protein_coding': 'pc1'}, + {'upstream_gene_variant': 'ugv3', + 'MODIFIER': 'M3', + 'protein_coding': 'pc3'}, + ], + }, + {ColumnKeyConstants.ALTERNATE_BASES_ALT: 'TT', + 'AF': 0.2, 'AF2': 0.3, + 'CSQ': [{'upstream_gene_variant': 'ugv2', + 'MODIFIER': 'M2', + 'protein_coding': 'pc2'}, + ], + }], + ColumnKeyConstants.NAMES: ['rs1', 'rs2'], + ColumnKeyConstants.QUALITY: 2, + ColumnKeyConstants.FILTER: ['PASS'], + ColumnKeyConstants.CALLS: [ + {ColumnKeyConstants.CALLS_NAME: 'Sample1', + ColumnKeyConstants.CALLS_GENOTYPE: [0, 1], + ColumnKeyConstants.CALLS_PHASESET: '*', + 'GQ': 20, 'HQ': [10, 20]}, + {ColumnKeyConstants.CALLS_NAME: 'Sample2', + ColumnKeyConstants.CALLS_GENOTYPE: [1, 0], + ColumnKeyConstants.CALLS_PHASESET: None, + 'GQ': 10, 'FLAG1': True}, + {ColumnKeyConstants.CALLS_NAME: 'Sample3', + ColumnKeyConstants.CALLS_GENOTYPE: [vcfio.MISSING_GENOTYPE_VALUE], + ColumnKeyConstants.CALLS_PHASESET: None}], + 'I1': 'some data', + 'I2': ['data1', 'data2'], + } + self.assertEqual([expected_row], + self._get_row_list_from_variant( + variant, annotation_field='CSQ')) + def test_no_alternate_bases(self): variant = vcfio.Variant( reference_name='chr19', start=11, end=12, reference_bases='CT', alternate_bases=[], filters=['q10'], - info={'A1': vcfio.VariantInfo('some data', '1'), - 'A2': vcfio.VariantInfo(['data1', 'data2'], '2')}) + info={'A1': vcfio.VariantInfo('some data', '1', None), + 'A2': vcfio.VariantInfo(['data1', 'data2'], '2', None)}) expected_row = { ColumnKeyConstants.REFERENCE_NAME: 'chr19', ColumnKeyConstants.START_POSITION: 11, @@ -308,10 +404,10 @@ def test_null_repeated_fields(self): variant = vcfio.Variant( reference_name='chr19', start=11, end=12, reference_bases='CT', alternate_bases=[], filters=['q10'], - info={'AI': vcfio.VariantInfo([0, 1, None], '3'), - 'AB': vcfio.VariantInfo([True, None, False], '3'), - 'AF': vcfio.VariantInfo([0.1, 0.2, None, 0.4], '4'), - 'AS': vcfio.VariantInfo([None, 'data1', 'data2'], '3')}) + info={'AI': vcfio.VariantInfo([0, 1, None], '3', None), + 'AB': vcfio.VariantInfo([True, None, False], '3', None), + 'AF': vcfio.VariantInfo([0.1, 0.2, None, 0.4], '4', None), + 'AS': vcfio.VariantInfo([None, 'data1', 'data2'], '3', None)}) expected_row = { ColumnKeyConstants.REFERENCE_NAME: 'chr19', ColumnKeyConstants.START_POSITION: 11, @@ -332,9 +428,9 @@ def test_unicode_fields(self): variant = vcfio.Variant( reference_name='chr19', start=11, end=12, reference_bases='CT', alternate_bases=[], filters=[sample_unicode_str, sample_utf8_str], - info={'AS1': vcfio.VariantInfo(sample_utf8_str, '1'), + info={'AS1': vcfio.VariantInfo(sample_utf8_str, '1', None), 'AS2': vcfio.VariantInfo( - [sample_unicode_str, sample_utf8_str], '2')}) + [sample_unicode_str, sample_utf8_str], '2', None)}) expected_row = { ColumnKeyConstants.REFERENCE_NAME: 'chr19', ColumnKeyConstants.START_POSITION: 11, @@ -351,9 +447,10 @@ def test_nonstandard_float_values(self): variant = vcfio.Variant( reference_name='chr19', start=11, end=12, reference_bases='CT', alternate_bases=[], filters=[], - info={'F1': vcfio.VariantInfo(float('inf'), '1'), - 'F2': vcfio.VariantInfo([float('-inf'), float('nan'), 1.2], '3'), - 'F3': vcfio.VariantInfo(float('nan'), '1'),}) + info={'F1': vcfio.VariantInfo(float('inf'), '1', None), + 'F2': vcfio.VariantInfo([float('-inf'), float('nan'), 1.2], '3', + None), + 'F3': vcfio.VariantInfo(float('nan'), '1', None),}) null_replacement_value = -sys.maxint expected_row = { ColumnKeyConstants.REFERENCE_NAME: 'chr19', @@ -371,8 +468,8 @@ def test_nonstandard_fields_names(self): variant = vcfio.Variant( reference_name='chr19', start=11, end=12, reference_bases='CT', alternate_bases=[], - info={'A-1': vcfio.VariantInfo('data1', '1'), - '_A': vcfio.VariantInfo('data2', '2')}) + info={'A-1': vcfio.VariantInfo('data1', '1', None), + '_A': vcfio.VariantInfo('data2', '2', None)}) expected_row = { ColumnKeyConstants.REFERENCE_NAME: 'chr19', ColumnKeyConstants.START_POSITION: 11, @@ -389,9 +486,9 @@ def test_sharded_rows(self): reference_name='chr19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1', 'rs2'], quality=2, filters=['PASS'], - info={'AF': vcfio.VariantInfo([0.1, 0.2], 'A'), - 'AF2': vcfio.VariantInfo([0.2, 0.3], 'A'), - 'I1': vcfio.VariantInfo('some data', '1'),}, + info={'AF': vcfio.VariantInfo([0.1, 0.2], 'A', None), + 'AF2': vcfio.VariantInfo([0.2, 0.3], 'A', None), + 'I1': vcfio.VariantInfo('some data', '1', None),}, calls=[ vcfio.VariantCall( name='Sample1', genotype=[0, 1], phaseset='*', diff --git a/gcp_variant_transforms/libs/variant_merge/merge_with_non_variants_strategy_test.py b/gcp_variant_transforms/libs/variant_merge/merge_with_non_variants_strategy_test.py index e080c9e77..9c62ac143 100644 --- a/gcp_variant_transforms/libs/variant_merge/merge_with_non_variants_strategy_test.py +++ b/gcp_variant_transforms/libs/variant_merge/merge_with_non_variants_strategy_test.py @@ -32,8 +32,8 @@ def _get_sample_variants(self): reference_name='19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1', 'rs2'], quality=2, filters=['PASS'], - info={'A1': vcfio.VariantInfo('some data', '1'), - 'A2': vcfio.VariantInfo(['data1', 'data2'], '2')}, + info={'A1': vcfio.VariantInfo('some data', '1', None), + 'A2': vcfio.VariantInfo(['data1', 'data2'], '2', None)}, calls=[ vcfio.VariantCall(name='Sample1', genotype=[0, 1], info={'GQ': 20, 'HQ': [10, 20]}), @@ -43,8 +43,8 @@ def _get_sample_variants(self): reference_name='19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1', 'rs3'], quality=20, filters=['q10'], - info={'A1': vcfio.VariantInfo('some data2', '2'), - 'A3': vcfio.VariantInfo(['data3', 'data4'], '2')}, + info={'A1': vcfio.VariantInfo('some data2', '2', None), + 'A3': vcfio.VariantInfo(['data3', 'data4'], '2', None)}, calls=[ vcfio.VariantCall(name='Sample3', genotype=[1, 1]), vcfio.VariantCall(name='Sample4', genotype=[1, 0], @@ -87,9 +87,9 @@ def test_get_merged_variants_no_custom_options(self): self.assertItemsEqual(['A1', 'A2', 'A3'], merged_variant.info.keys()) self.assertTrue( merged_variant.info['A1'].data in ('some data', 'some data2')) - self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2'), + self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2', None), merged_variant.info['A2']) - self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2'), + self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2', None), merged_variant.info['A3']) def test_get_merged_variants_move_quality_and_filter_to_calls(self): @@ -135,9 +135,9 @@ def test_get_merged_variants_move_quality_and_filter_to_calls(self): self.assertItemsEqual(['A1', 'A2', 'A3'], merged_variant.info.keys()) self.assertTrue( merged_variant.info['A1'].data in ('some data', 'some data2')) - self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2'), + self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2', None), merged_variant.info['A2']) - self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2'), + self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2', None), merged_variant.info['A3']) def test_get_merged_variants_move_info_to_calls(self): @@ -170,9 +170,9 @@ def test_get_merged_variants_move_info_to_calls(self): info={'GQ': 20, 'A1': 'some data2'})], merged_variant.calls) self.assertItemsEqual(['A2', 'A3'], merged_variant.info.keys()) - self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2'), + self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2', None), merged_variant.info['A2']) - self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2'), + self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2', None), merged_variant.info['A3']) def test_get_merged_variants_move_everything_to_calls(self): diff --git a/gcp_variant_transforms/libs/variant_merge/move_to_calls_strategy_test.py b/gcp_variant_transforms/libs/variant_merge/move_to_calls_strategy_test.py index 60982b0cf..76b9c5ace 100644 --- a/gcp_variant_transforms/libs/variant_merge/move_to_calls_strategy_test.py +++ b/gcp_variant_transforms/libs/variant_merge/move_to_calls_strategy_test.py @@ -33,8 +33,8 @@ def _get_sample_variants(self): reference_name='19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1', 'rs2'], quality=2, filters=['PASS'], - info={'A1': vcfio.VariantInfo('some data', '1'), - 'A2': vcfio.VariantInfo(['data1', 'data2'], '2')}, + info={'A1': vcfio.VariantInfo('some data', '1', None), + 'A2': vcfio.VariantInfo(['data1', 'data2'], '2', None)}, calls=[ vcfio.VariantCall(name='Sample1', genotype=[0, 1], info={'GQ': 20, 'HQ': [10, 20]}), @@ -44,8 +44,8 @@ def _get_sample_variants(self): reference_name='19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1', 'rs3'], quality=20, filters=['q10'], - info={'A1': vcfio.VariantInfo('some data2', '2'), - 'A3': vcfio.VariantInfo(['data3', 'data4'], '2')}, + info={'A1': vcfio.VariantInfo('some data2', '2', None), + 'A3': vcfio.VariantInfo(['data3', 'data4'], '2', None)}, calls=[ vcfio.VariantCall(name='Sample3', genotype=[1, 1]), vcfio.VariantCall(name='Sample4', genotype=[1, 0], @@ -87,9 +87,9 @@ def test_get_merged_variants_no_custom_options(self): self.assertItemsEqual(['A1', 'A2', 'A3'], merged_variant.info.keys()) self.assertTrue( merged_variant.info['A1'].data in ('some data', 'some data2')) - self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2'), + self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2', None), merged_variant.info['A2']) - self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2'), + self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2', None), merged_variant.info['A3']) def test_get_merged_variants_move_quality_and_filter_to_calls(self): @@ -135,9 +135,9 @@ def test_get_merged_variants_move_quality_and_filter_to_calls(self): self.assertItemsEqual(['A1', 'A2', 'A3'], merged_variant.info.keys()) self.assertTrue( merged_variant.info['A1'].data in ('some data', 'some data2')) - self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2'), + self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2', None), merged_variant.info['A2']) - self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2'), + self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2', None), merged_variant.info['A3']) def test_get_merged_variants_move_info_to_calls(self): @@ -170,9 +170,9 @@ def test_get_merged_variants_move_info_to_calls(self): info={'GQ': 20, 'A1': 'some data2'})], merged_variant.calls) self.assertItemsEqual(['A2', 'A3'], merged_variant.info.keys()) - self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2'), + self.assertEqual(vcfio.VariantInfo(['data1', 'data2'], '2', None), merged_variant.info['A2']) - self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2'), + self.assertEqual(vcfio.VariantInfo(['data3', 'data4'], '2', None), merged_variant.info['A3']) def test_get_merged_variants_move_everything_to_calls(self): diff --git a/gcp_variant_transforms/libs/vcf_header_parser.py b/gcp_variant_transforms/libs/vcf_header_parser.py index 803fda00b..1c879155f 100644 --- a/gcp_variant_transforms/libs/vcf_header_parser.py +++ b/gcp_variant_transforms/libs/vcf_header_parser.py @@ -32,6 +32,42 @@ HeaderFields = namedtuple('HeaderFields', ['infos', 'formats']) +def extract_annotation_list_with_alt(annotation_str): + """Extracts annotations from an annotation INFO field. + + This works by dividing the ``annotation_str`` on '|'. The first element is + the alternate allele and the rest are the annotations. + + Args: + annotation_str (``str``): The content of annotation field for one alt. + + Returns: + The list of annotations with the first element being the alternate. + """ + return annotation_str.split('|') + + +def extract_annotation_names(description): + """Extracts annotation list from the description of an annotation INFO field. + + This is similar to extract_extract_annotation_list_with_alt with the + difference that it ignores everything before the first '|'. + + Args: + description (``str``): The "Description" part of the annotation INFO field + in the header of VCF. + + Returns: + The list of annotation names. + """ + annotation_names = extract_annotation_list_with_alt(description) + if len(annotation_names) < 2: + raise ValueError( + 'Expected at least one | in annotation description {}'.format( + description)) + return annotation_names[1:] + + def get_vcf_headers(input_file): """Returns VCF headers (FORMAT and INFO) from ``input_file``. diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index 86a220e2b..55ec4c44e 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -58,6 +58,13 @@ def add_arguments(self, parser): help=('If true, failed VCF record reads will not raise errors. ' 'Failed reads will be logged as warnings and returned as ' 'MalformedVcfRecord objects.')) + parser.add_argument( + '--annotation_field', + default=None, + help=('If set, it is the name of the annotation field. The content ' + 'of this INFO field will be broken into multiple columns in the ' + 'output BigQuery table and stored as repeated fields with ' + 'corresponding alternate alleles. [EXPERIMENTAL]')) parser.add_argument( '--optimize_for_large_inputs', type='bool', default=False, nargs='?', const=True, diff --git a/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_VEP.json b/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_VEP.json new file mode 100644 index 000000000..a8fd827eb --- /dev/null +++ b/gcp_variant_transforms/testing/integration/small_tests/valid_4_2_VEP.json @@ -0,0 +1,15 @@ +{ + "test_name": "valid-4-2-vep", + "table_name": "valid_4_2_VEP", + "input_pattern": "gs://gcp-variant-transforms-testfiles/small_tests/valid-4.2_VEP.vcf", + "annotation_field": "CSQ", + "runner": "DataflowRunner", + "validation_query": [ + "SELECT COUNT(DISTINCT CSQ.Feature) AS num_features ", + "FROM {TABLE_NAME} AS t, t.alternate_bases as alts, alts.CSQ as CSQ ", + "WHERE start_position = 1110695 AND alts.alt = 'G'" + ], + "expected_query_result": { + "num_features": 3 + } +} diff --git a/gcp_variant_transforms/transforms/filter_variants_test.py b/gcp_variant_transforms/transforms/filter_variants_test.py index f6bd2fc1e..1de558403 100644 --- a/gcp_variant_transforms/transforms/filter_variants_test.py +++ b/gcp_variant_transforms/transforms/filter_variants_test.py @@ -43,8 +43,8 @@ def _get_sample_variants(self): reference_name='19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1'], quality=2, filters=['PASS'], - info={'A1': vcfio.VariantInfo('some data', '1'), - 'A2': vcfio.VariantInfo(['data1', 'data2'], '2')}, + info={'A1': vcfio.VariantInfo('some data', '1', None), + 'A2': vcfio.VariantInfo(['data1', 'data2'], '2', None)}, calls=[ vcfio.VariantCall( name='Sample1', genotype=[0, 1], phaseset='*', @@ -58,8 +58,8 @@ def _get_sample_variants(self): reference_name='20', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1'], quality=20, filters=['q10'], - info={'A1': vcfio.VariantInfo('some data2', '2'), - 'A3': vcfio.VariantInfo(['data3', 'data4'], '2')}, + info={'A1': vcfio.VariantInfo('some data2', '2', None), + 'A3': vcfio.VariantInfo(['data3', 'data4'], '2', None)}, calls=[ vcfio.VariantCall(name='Sample3', genotype=[1, 1]), vcfio.VariantCall( diff --git a/gcp_variant_transforms/transforms/merge_variants_test.py b/gcp_variant_transforms/transforms/merge_variants_test.py index 16300de3d..c0dc77063 100644 --- a/gcp_variant_transforms/transforms/merge_variants_test.py +++ b/gcp_variant_transforms/transforms/merge_variants_test.py @@ -36,8 +36,8 @@ def _get_sample_merged_variants(self): reference_name='19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1'], quality=2, filters=['PASS'], - info={'A1': vcfio.VariantInfo('some data', '1'), - 'A2': vcfio.VariantInfo(['data1', 'data2'], '2')}, + info={'A1': vcfio.VariantInfo('some data', '1', None), + 'A2': vcfio.VariantInfo(['data1', 'data2'], '2', None)}, calls=[ vcfio.VariantCall( name='Sample1', genotype=[0, 1], phaseset='*', @@ -51,8 +51,8 @@ def _get_sample_merged_variants(self): reference_name='19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1'], quality=20, filters=['q10'], - info={'A1': vcfio.VariantInfo('some data2', '2'), - 'A3': vcfio.VariantInfo(['data3', 'data4'], '2')}, + info={'A1': vcfio.VariantInfo('some data2', '2', None), + 'A3': vcfio.VariantInfo(['data3', 'data4'], '2', None)}, calls=[ vcfio.VariantCall(name='Sample3', genotype=[1, 1]), vcfio.VariantCall( @@ -64,8 +64,8 @@ def _get_sample_merged_variants(self): reference_name='19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1'], filters=['PASS', 'q10'], quality=20, - info={'A2': vcfio.VariantInfo(['data1', 'data2'], '2'), - 'A3': vcfio.VariantInfo(['data3', 'data4'], '2')}, + info={'A2': vcfio.VariantInfo(['data1', 'data2'], '2', None), + 'A3': vcfio.VariantInfo(['data3', 'data4'], '2', None)}, calls=[ vcfio.VariantCall( name='Sample1', genotype=[0, 1], phaseset='*', diff --git a/gcp_variant_transforms/transforms/variant_to_bigquery.py b/gcp_variant_transforms/transforms/variant_to_bigquery.py index b05354378..92bb725e5 100644 --- a/gcp_variant_transforms/transforms/variant_to_bigquery.py +++ b/gcp_variant_transforms/transforms/variant_to_bigquery.py @@ -27,16 +27,18 @@ class _ConvertToBigQueryTableRow(beam.DoFn): """Converts a ``Variant`` record to a BigQuery row.""" def __init__(self, split_alternate_allele_info_fields=True, - omit_empty_sample_calls=False): + omit_empty_sample_calls=False, annotation_field=None): super(_ConvertToBigQueryTableRow, self).__init__() self._split_alternate_allele_info_fields = ( split_alternate_allele_info_fields) self._omit_empty_sample_calls = omit_empty_sample_calls + self._annotation_field = annotation_field def process(self, record): return bigquery_vcf_schema.get_rows_from_variant( record, self._split_alternate_allele_info_fields, - self._omit_empty_sample_calls) + self._omit_empty_sample_calls, + self._annotation_field) class VariantToBigQuery(beam.PTransform): @@ -44,7 +46,8 @@ class VariantToBigQuery(beam.PTransform): def __init__(self, output_table, header_fields, variant_merger=None, split_alternate_allele_info_fields=True, append=False, - omit_empty_sample_calls=False): + omit_empty_sample_calls=False, + annotation_field=None): """Initializes the transform. Args: @@ -63,6 +66,8 @@ def __init__(self, output_table, header_fields, variant_merger=None, overwritten. New records will be appended to those that already exist. omit_empty_sample_calls (bool): If true, samples that don't have a given call will be omitted. + annotation_field (str): If provided, it is the name of the INFO field + that contains the annotation list. """ self._output_table = output_table self._header_fields = header_fields @@ -71,19 +76,22 @@ def __init__(self, output_table, header_fields, variant_merger=None, split_alternate_allele_info_fields) self._append = append self._omit_empty_sample_calls = omit_empty_sample_calls + self._annotation_field = annotation_field def expand(self, pcoll): return (pcoll | 'ConvertToBigQueryTableRow' >> beam.ParDo( _ConvertToBigQueryTableRow( self._split_alternate_allele_info_fields, - self._omit_empty_sample_calls)) + self._omit_empty_sample_calls, + self._annotation_field)) | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink( self._output_table, schema=bigquery_vcf_schema.generate_schema_from_header_fields( self._header_fields, self._variant_merger, - self._split_alternate_allele_info_fields), + self._split_alternate_allele_info_fields, + annotation_field=self._annotation_field), create_disposition=( beam.io.BigQueryDisposition.CREATE_IF_NEEDED), write_disposition=( diff --git a/gcp_variant_transforms/transforms/variant_to_bigquery_test.py b/gcp_variant_transforms/transforms/variant_to_bigquery_test.py index e441cff1d..14c078cea 100644 --- a/gcp_variant_transforms/transforms/variant_to_bigquery_test.py +++ b/gcp_variant_transforms/transforms/variant_to_bigquery_test.py @@ -37,10 +37,10 @@ def _get_sample_variant_1(self, split_alternate_allele_info_fields=True): reference_name='chr19', start=11, end=12, reference_bases='C', alternate_bases=['A', 'TT'], names=['rs1', 'rs2'], quality=2, filters=['PASS'], - info={'AF': vcfio.VariantInfo([0.1, 0.2], 'A'), - 'AF2': vcfio.VariantInfo([0.2, 0.3], 'A'), - 'A1': vcfio.VariantInfo('some data', '1'), - 'A2': vcfio.VariantInfo(['data1', 'data2'], '2')}, + info={'AF': vcfio.VariantInfo([0.1, 0.2], 'A', None), + 'AF2': vcfio.VariantInfo([0.2, 0.3], 'A', None), + 'A1': vcfio.VariantInfo('some data', '1', None), + 'A2': vcfio.VariantInfo(['data1', 'data2'], '2', None)}, calls=[ vcfio.VariantCall( name='Sample1', genotype=[0, 1], phaseset='*', @@ -84,7 +84,7 @@ def _get_sample_variant_2(self): variant = vcfio.Variant( reference_name='20', start=123, end=125, reference_bases='CT', alternate_bases=[], filters=['q10', 's10'], - info={'INTINFO': vcfio.VariantInfo(1234, '1')}) + info={'INTINFO': vcfio.VariantInfo(1234, '1', None)}) row = {ColumnKeyConstants.REFERENCE_NAME: '20', ColumnKeyConstants.START_POSITION: 123, ColumnKeyConstants.END_POSITION: 125, diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index 50ad00844..2bea1f6a7 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -106,11 +106,13 @@ def _read_variants(pipeline, known_args): [known_args.input_pattern]) | 'ReadAllFromVcf' >> vcfio.ReadAllFromVcf( allow_malformed_records=( - known_args.allow_malformed_records))) + known_args.allow_malformed_records), + annotation_field=known_args.annotation_field)) else: variants = pipeline | 'ReadFromVcf' >> vcfio.ReadFromVcf( known_args.input_pattern, - allow_malformed_records=known_args.allow_malformed_records) + allow_malformed_records=known_args.allow_malformed_records, + annotation_field=known_args.annotation_field) return variants @@ -224,7 +226,8 @@ def run(argv=None): variant_merger, known_args.split_alternate_allele_info_fields, append=known_args.append, - omit_empty_sample_calls=known_args.omit_empty_sample_calls)) + omit_empty_sample_calls=known_args.omit_empty_sample_calls, + annotation_field=known_args.annotation_field)) if __name__ == '__main__':