diff --git a/gcp_variant_transforms/libs/bigquery_vcf_schema.py b/gcp_variant_transforms/libs/bigquery_vcf_schema.py index 5e5e7a16e..7b2375053 100644 --- a/gcp_variant_transforms/libs/bigquery_vcf_schema.py +++ b/gcp_variant_transforms/libs/bigquery_vcf_schema.py @@ -216,10 +216,11 @@ def generate_schema_from_header_fields(header_fields, variant_merger=None, # TODO: refactor this to use a class instead. -def get_rows_from_variant(variant, split_alternate_allele_info_fields=True): +def get_rows_from_variant(variant, split_alternate_allele_info_fields=True, + omit_empty_sample_calls=False): """Yields BigQuery rows according to the schema from the given variant. - There is a 10MB limit for each BigQuqery row, which can exceed by having + There is a 10MB limit for each BigQuery row, which can be exceeded by having a large number of calls. This method may split up a row into multiple rows if it exceeds 10MB. @@ -229,6 +230,8 @@ def get_rows_from_variant(variant, split_alternate_allele_info_fields=True): `Number=A` (i.e. one value for each alternate allele) will be stored under the `alternate_bases` record. If false, they will be stored with the rest of the INFO fields. + omit_empty_sample_calls (bool): If true, samples that don't have a given + call will be omitted. Yields: A dict representing a BigQuery row from the given variant. The row may have a subset of the calls if it exceeds the maximum allowed BigQuery row size. @@ -243,7 +246,10 @@ def get_rows_from_variant(variant, split_alternate_allele_info_fields=True): row_size_in_bytes = base_row_size_in_bytes row = copy.deepcopy(base_row) # Keep base_row intact. for call in variant.calls: - call_record = _get_call_record(call) + call_record, empty = _get_call_record(call) + if omit_empty_sample_calls and empty: + continue + # Add a few bytes to account for surrounding characters when concatenating. call_record_size_in_bytes = ( _get_json_object_size(call_record) + _JSON_CONCATENATION_OVERHEAD_BYTES) @@ -258,17 +264,27 @@ def get_rows_from_variant(variant, split_alternate_allele_info_fields=True): def _get_call_record(call): - """A helper method for ``get_rows_from_variant`` to get a call as JSON.""" + """A helper method for ``get_rows_from_variant`` to get a call as JSON. + + Args: + call (``VariantCall``): Variant call to convert. + + Returns: + BigQuery call value (dict). + """ call_record = { ColumnKeyConstants.CALLS_NAME: _get_bigquery_sanitized_field(call.name), ColumnKeyConstants.CALLS_PHASESET: call.phaseset, ColumnKeyConstants.CALLS_GENOTYPE: call.genotype or [] } + is_empty = (not call.genotype or + set(call.genotype) == set((vcfio.MISSING_GENOTYPE_VALUE,))) for key, field in call.info.iteritems(): if field is not None: - call_record[_get_bigquery_sanitized_field_name(key)] = ( - _get_bigquery_sanitized_field(field)) - return call_record + sanitized = _get_bigquery_sanitized_field(field) + call_record[_get_bigquery_sanitized_field_name(key)] = sanitized + is_empty = is_empty and _is_empty_field(sanitized) + return call_record, is_empty def _get_base_row_from_variant(variant, split_alternate_allele_info_fields): @@ -468,5 +484,10 @@ def _is_alternate_allele_count(info_field): return info_field.field_count == _FIELD_COUNT_ALTERNATE_ALLELE +def _is_empty_field(value): + return (value in (vcfio.MISSING_FIELD_VALUE, [vcfio.MISSING_FIELD_VALUE]) or + (not value and value != 0)) + + def _get_json_object_size(obj): return len(json.dumps(obj)) diff --git a/gcp_variant_transforms/libs/bigquery_vcf_schema_test.py b/gcp_variant_transforms/libs/bigquery_vcf_schema_test.py index 86ca295fa..b32e959e6 100644 --- a/gcp_variant_transforms/libs/bigquery_vcf_schema_test.py +++ b/gcp_variant_transforms/libs/bigquery_vcf_schema_test.py @@ -200,10 +200,8 @@ def test_variant_merger_modify_schema(self): class GetRowsFromVariantTest(unittest.TestCase): """Test cases for the ``get_rows_from_variant`` library function.""" - def _get_row_list_from_variant( - self, variant, split_alternate_allele_info_fields=True): - return list(bigquery_vcf_schema.get_rows_from_variant( - variant, split_alternate_allele_info_fields)) + def _get_row_list_from_variant(self, variant, **kwargs): + return list(bigquery_vcf_schema.get_rows_from_variant(variant, **kwargs)) def test_all_fields(self): variant = vcfio.Variant( @@ -220,7 +218,9 @@ def test_all_fields(self): info={'GQ': 20, 'HQ': [10, 20]}), vcfio.VariantCall( name='Sample2', genotype=[1, 0], - info={'GQ': 10, 'FLAG1': True})]) + info={'GQ': 10, 'FLAG1': True}), + vcfio.VariantCall( + name='Sample3', genotype=[vcfio.MISSING_GENOTYPE_VALUE])]) expected_row = { ColumnKeyConstants.REFERENCE_NAME: 'chr19', ColumnKeyConstants.START_POSITION: 11, @@ -242,7 +242,10 @@ def test_all_fields(self): {ColumnKeyConstants.CALLS_NAME: 'Sample2', ColumnKeyConstants.CALLS_GENOTYPE: [1, 0], ColumnKeyConstants.CALLS_PHASESET: None, - 'GQ': 10, 'FLAG1': True}], + 'GQ': 10, 'FLAG1': True}, + {ColumnKeyConstants.CALLS_NAME: 'Sample3', + ColumnKeyConstants.CALLS_GENOTYPE: [vcfio.MISSING_GENOTYPE_VALUE], + ColumnKeyConstants.CALLS_PHASESET: None}], 'I1': 'some data', 'I2': ['data1', 'data2']} self.assertEqual([expected_row], self._get_row_list_from_variant(variant)) @@ -453,3 +456,36 @@ def test_sharded_rows(self): self.assertEqual(expected_rows, self._get_row_list_from_variant(variant)) finally: bigquery_vcf_schema._MAX_BIGQUERY_ROW_SIZE_BYTES = original_max_row_size + + def test_omit_empty_sample_calls(self): + variant = vcfio.Variant( + reference_name='chr19', start=11, end=12, reference_bases='C', + alternate_bases=[], names=['rs1', 'rs2'], quality=2, + filters=['PASS'], + info={}, + calls=[ + vcfio.VariantCall( + name='Sample1', info={'GQ': vcfio.MISSING_FIELD_VALUE}), + vcfio.VariantCall( + name='Sample2', genotype=[1, 0], + info={'GQ': 10}), + vcfio.VariantCall( + name='Sample3', genotype=[vcfio.MISSING_GENOTYPE_VALUE, + vcfio.MISSING_GENOTYPE_VALUE])]) + expected_row = { + ColumnKeyConstants.REFERENCE_NAME: 'chr19', + ColumnKeyConstants.START_POSITION: 11, + ColumnKeyConstants.END_POSITION: 12, + ColumnKeyConstants.REFERENCE_BASES: 'C', + ColumnKeyConstants.ALTERNATE_BASES: [], + ColumnKeyConstants.NAMES: ['rs1', 'rs2'], + ColumnKeyConstants.QUALITY: 2, + ColumnKeyConstants.FILTER: ['PASS'], + ColumnKeyConstants.CALLS: [ + {ColumnKeyConstants.CALLS_NAME: 'Sample2', + ColumnKeyConstants.CALLS_GENOTYPE: [1, 0], + ColumnKeyConstants.CALLS_PHASESET: None, + 'GQ': 10}]} + self.assertEqual( + [expected_row], + self._get_row_list_from_variant(variant, omit_empty_sample_calls=True)) diff --git a/gcp_variant_transforms/options/variant_transform_options.py b/gcp_variant_transforms/options/variant_transform_options.py index dece77703..86a220e2b 100644 --- a/gcp_variant_transforms/options/variant_transform_options.py +++ b/gcp_variant_transforms/options/variant_transform_options.py @@ -101,6 +101,10 @@ def add_arguments(self, parser): help=('If true, existing records in output_table will not be ' 'overwritten. New records will be appended to those that ' 'already exist.')) + parser.add_argument( + '--omit_empty_sample_calls', + type='bool', default=False, nargs='?', const=True, + help=("If true, samples that don't have a given call will be omitted.")) def validate(self, parsed_args, client=None): output_table_re_match = re.match( diff --git a/gcp_variant_transforms/transforms/variant_to_bigquery.py b/gcp_variant_transforms/transforms/variant_to_bigquery.py index b201a39e0..b05354378 100644 --- a/gcp_variant_transforms/transforms/variant_to_bigquery.py +++ b/gcp_variant_transforms/transforms/variant_to_bigquery.py @@ -26,21 +26,25 @@ class _ConvertToBigQueryTableRow(beam.DoFn): """Converts a ``Variant`` record to a BigQuery row.""" - def __init__(self, split_alternate_allele_info_fields=True): + def __init__(self, split_alternate_allele_info_fields=True, + omit_empty_sample_calls=False): super(_ConvertToBigQueryTableRow, self).__init__() self._split_alternate_allele_info_fields = ( split_alternate_allele_info_fields) + self._omit_empty_sample_calls = omit_empty_sample_calls def process(self, record): return bigquery_vcf_schema.get_rows_from_variant( - record, self._split_alternate_allele_info_fields) + record, self._split_alternate_allele_info_fields, + self._omit_empty_sample_calls) class VariantToBigQuery(beam.PTransform): """Writes PCollection of ``Variant`` records to BigQuery.""" def __init__(self, output_table, header_fields, variant_merger=None, - split_alternate_allele_info_fields=True, append=False): + split_alternate_allele_info_fields=True, append=False, + omit_empty_sample_calls=False): """Initializes the transform. Args: @@ -57,6 +61,8 @@ def __init__(self, output_table, header_fields, variant_merger=None, the rest of the INFO fields. append (bool): If true, existing records in output_table will not be overwritten. New records will be appended to those that already exist. + omit_empty_sample_calls (bool): If true, samples that don't have a given + call will be omitted. """ self._output_table = output_table self._header_fields = header_fields @@ -64,12 +70,14 @@ def __init__(self, output_table, header_fields, variant_merger=None, self._split_alternate_allele_info_fields = ( split_alternate_allele_info_fields) self._append = append + self._omit_empty_sample_calls = omit_empty_sample_calls def expand(self, pcoll): return (pcoll | 'ConvertToBigQueryTableRow' >> beam.ParDo( _ConvertToBigQueryTableRow( - self._split_alternate_allele_info_fields)) + self._split_alternate_allele_info_fields, + self._omit_empty_sample_calls)) | 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink( self._output_table, schema=bigquery_vcf_schema.generate_schema_from_header_fields( diff --git a/gcp_variant_transforms/vcf_to_bq.py b/gcp_variant_transforms/vcf_to_bq.py index daa5128fc..50ad00844 100644 --- a/gcp_variant_transforms/vcf_to_bq.py +++ b/gcp_variant_transforms/vcf_to_bq.py @@ -223,7 +223,8 @@ def run(argv=None): header_fields, variant_merger, known_args.split_alternate_allele_info_fields, - append=known_args.append)) + append=known_args.append, + omit_empty_sample_calls=known_args.omit_empty_sample_calls)) if __name__ == '__main__':