Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 28 additions & 7 deletions gcp_variant_transforms/libs/bigquery_vcf_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,11 @@ def generate_schema_from_header_fields(header_fields, variant_merger=None,


# TODO: refactor this to use a class instead.
def get_rows_from_variant(variant, split_alternate_allele_info_fields=True):
def get_rows_from_variant(variant, split_alternate_allele_info_fields=True,
omit_empty_sample_calls=False):
"""Yields BigQuery rows according to the schema from the given variant.

There is a 10MB limit for each BigQuqery row, which can exceed by having
There is a 10MB limit for each BigQuery row, which can be exceeded by having
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching and fixing these! :)

a large number of calls. This method may split up a row into multiple rows if
it exceeds 10MB.

Expand All @@ -229,6 +230,8 @@ def get_rows_from_variant(variant, split_alternate_allele_info_fields=True):
`Number=A` (i.e. one value for each alternate allele) will be stored under
the `alternate_bases` record. If false, they will be stored with the rest
of the INFO fields.
omit_empty_sample_calls (bool): If true, samples that don't have a given
call will be omitted.
Yields:
A dict representing a BigQuery row from the given variant. The row may have
a subset of the calls if it exceeds the maximum allowed BigQuery row size.
Expand All @@ -243,7 +246,10 @@ def get_rows_from_variant(variant, split_alternate_allele_info_fields=True):
row_size_in_bytes = base_row_size_in_bytes
row = copy.deepcopy(base_row) # Keep base_row intact.
for call in variant.calls:
call_record = _get_call_record(call)
call_record, empty = _get_call_record(call)
if omit_empty_sample_calls and empty:
continue

# Add a few bytes to account for surrounding characters when concatenating.
call_record_size_in_bytes = (
_get_json_object_size(call_record) + _JSON_CONCATENATION_OVERHEAD_BYTES)
Expand All @@ -258,17 +264,27 @@ def get_rows_from_variant(variant, split_alternate_allele_info_fields=True):


def _get_call_record(call):
"""A helper method for ``get_rows_from_variant`` to get a call as JSON."""
"""A helper method for ``get_rows_from_variant`` to get a call as JSON.

Args:
call (``VariantCall``): Variant call to convert.

Returns:
BigQuery call value (dict).
"""
call_record = {
ColumnKeyConstants.CALLS_NAME: _get_bigquery_sanitized_field(call.name),
ColumnKeyConstants.CALLS_PHASESET: call.phaseset,
ColumnKeyConstants.CALLS_GENOTYPE: call.genotype or []
}
is_empty = (not call.genotype or
set(call.genotype) == set((vcfio.MISSING_GENOTYPE_VALUE,)))
for key, field in call.info.iteritems():
if field is not None:
call_record[_get_bigquery_sanitized_field_name(key)] = (
_get_bigquery_sanitized_field(field))
return call_record
sanitized = _get_bigquery_sanitized_field(field)
call_record[_get_bigquery_sanitized_field_name(key)] = sanitized
is_empty = is_empty and _is_empty_field(sanitized)
return call_record, is_empty


def _get_base_row_from_variant(variant, split_alternate_allele_info_fields):
Expand Down Expand Up @@ -468,5 +484,10 @@ def _is_alternate_allele_count(info_field):
return info_field.field_count == _FIELD_COUNT_ALTERNATE_ALLELE


def _is_empty_field(value):
return (value in (vcfio.MISSING_FIELD_VALUE, [vcfio.MISSING_FIELD_VALUE]) or
(not value and value != 0))


def _get_json_object_size(obj):
return len(json.dumps(obj))
48 changes: 42 additions & 6 deletions gcp_variant_transforms/libs/bigquery_vcf_schema_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,8 @@ def test_variant_merger_modify_schema(self):
class GetRowsFromVariantTest(unittest.TestCase):
"""Test cases for the ``get_rows_from_variant`` library function."""

def _get_row_list_from_variant(
self, variant, split_alternate_allele_info_fields=True):
return list(bigquery_vcf_schema.get_rows_from_variant(
variant, split_alternate_allele_info_fields))
def _get_row_list_from_variant(self, variant, **kwargs):
return list(bigquery_vcf_schema.get_rows_from_variant(variant, **kwargs))

def test_all_fields(self):
variant = vcfio.Variant(
Expand All @@ -220,7 +218,9 @@ def test_all_fields(self):
info={'GQ': 20, 'HQ': [10, 20]}),
vcfio.VariantCall(
name='Sample2', genotype=[1, 0],
info={'GQ': 10, 'FLAG1': True})])
info={'GQ': 10, 'FLAG1': True}),
vcfio.VariantCall(
name='Sample3', genotype=[vcfio.MISSING_GENOTYPE_VALUE])])
expected_row = {
ColumnKeyConstants.REFERENCE_NAME: 'chr19',
ColumnKeyConstants.START_POSITION: 11,
Expand All @@ -242,7 +242,10 @@ def test_all_fields(self):
{ColumnKeyConstants.CALLS_NAME: 'Sample2',
ColumnKeyConstants.CALLS_GENOTYPE: [1, 0],
ColumnKeyConstants.CALLS_PHASESET: None,
'GQ': 10, 'FLAG1': True}],
'GQ': 10, 'FLAG1': True},
{ColumnKeyConstants.CALLS_NAME: 'Sample3',
ColumnKeyConstants.CALLS_GENOTYPE: [vcfio.MISSING_GENOTYPE_VALUE],
ColumnKeyConstants.CALLS_PHASESET: None}],
'I1': 'some data',
'I2': ['data1', 'data2']}
self.assertEqual([expected_row], self._get_row_list_from_variant(variant))
Expand Down Expand Up @@ -453,3 +456,36 @@ def test_sharded_rows(self):
self.assertEqual(expected_rows, self._get_row_list_from_variant(variant))
finally:
bigquery_vcf_schema._MAX_BIGQUERY_ROW_SIZE_BYTES = original_max_row_size

def test_omit_empty_sample_calls(self):
variant = vcfio.Variant(
reference_name='chr19', start=11, end=12, reference_bases='C',
alternate_bases=[], names=['rs1', 'rs2'], quality=2,
filters=['PASS'],
info={},
calls=[
vcfio.VariantCall(
name='Sample1', info={'GQ': vcfio.MISSING_FIELD_VALUE}),
vcfio.VariantCall(
name='Sample2', genotype=[1, 0],
info={'GQ': 10}),
vcfio.VariantCall(
name='Sample3', genotype=[vcfio.MISSING_GENOTYPE_VALUE,
vcfio.MISSING_GENOTYPE_VALUE])])
expected_row = {
ColumnKeyConstants.REFERENCE_NAME: 'chr19',
ColumnKeyConstants.START_POSITION: 11,
ColumnKeyConstants.END_POSITION: 12,
ColumnKeyConstants.REFERENCE_BASES: 'C',
ColumnKeyConstants.ALTERNATE_BASES: [],
ColumnKeyConstants.NAMES: ['rs1', 'rs2'],
ColumnKeyConstants.QUALITY: 2,
ColumnKeyConstants.FILTER: ['PASS'],
ColumnKeyConstants.CALLS: [
{ColumnKeyConstants.CALLS_NAME: 'Sample2',
ColumnKeyConstants.CALLS_GENOTYPE: [1, 0],
ColumnKeyConstants.CALLS_PHASESET: None,
'GQ': 10}]}
self.assertEqual(
[expected_row],
self._get_row_list_from_variant(variant, omit_empty_sample_calls=True))
4 changes: 4 additions & 0 deletions gcp_variant_transforms/options/variant_transform_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ def add_arguments(self, parser):
help=('If true, existing records in output_table will not be '
'overwritten. New records will be appended to those that '
'already exist.'))
parser.add_argument(
'--omit_empty_sample_calls',
type='bool', default=False, nargs='?', const=True,
help=("If true, samples that don't have a given call will be omitted."))

def validate(self, parsed_args, client=None):
output_table_re_match = re.match(
Expand Down
16 changes: 12 additions & 4 deletions gcp_variant_transforms/transforms/variant_to_bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,25 @@
class _ConvertToBigQueryTableRow(beam.DoFn):
"""Converts a ``Variant`` record to a BigQuery row."""

def __init__(self, split_alternate_allele_info_fields=True):
def __init__(self, split_alternate_allele_info_fields=True,
omit_empty_sample_calls=False):
super(_ConvertToBigQueryTableRow, self).__init__()
self._split_alternate_allele_info_fields = (
split_alternate_allele_info_fields)
self._omit_empty_sample_calls = omit_empty_sample_calls

def process(self, record):
return bigquery_vcf_schema.get_rows_from_variant(
record, self._split_alternate_allele_info_fields)
record, self._split_alternate_allele_info_fields,
self._omit_empty_sample_calls)


class VariantToBigQuery(beam.PTransform):
"""Writes PCollection of ``Variant`` records to BigQuery."""

def __init__(self, output_table, header_fields, variant_merger=None,
split_alternate_allele_info_fields=True, append=False):
split_alternate_allele_info_fields=True, append=False,
omit_empty_sample_calls=False):
"""Initializes the transform.

Args:
Expand All @@ -57,19 +61,23 @@ def __init__(self, output_table, header_fields, variant_merger=None,
the rest of the INFO fields.
append (bool): If true, existing records in output_table will not be
overwritten. New records will be appended to those that already exist.
omit_empty_sample_calls (bool): If true, samples that don't have a given
call will be omitted.
"""
self._output_table = output_table
self._header_fields = header_fields
self._variant_merger = variant_merger
self._split_alternate_allele_info_fields = (
split_alternate_allele_info_fields)
self._append = append
self._omit_empty_sample_calls = omit_empty_sample_calls

def expand(self, pcoll):
return (pcoll
| 'ConvertToBigQueryTableRow' >> beam.ParDo(
_ConvertToBigQueryTableRow(
self._split_alternate_allele_info_fields))
self._split_alternate_allele_info_fields,
self._omit_empty_sample_calls))
| 'WriteToBigQuery' >> beam.io.Write(beam.io.BigQuerySink(
self._output_table,
schema=bigquery_vcf_schema.generate_schema_from_header_fields(
Expand Down
3 changes: 2 additions & 1 deletion gcp_variant_transforms/vcf_to_bq.py
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,8 @@ def run(argv=None):
header_fields,
variant_merger,
known_args.split_alternate_allele_info_fields,
append=known_args.append))
append=known_args.append,
omit_empty_sample_calls=known_args.omit_empty_sample_calls))


if __name__ == '__main__':
Expand Down