Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

from __future__ import absolute_import

import hashlib
import re

from gcp_variant_transforms.beam_io.vcfio import Variant
Expand Down Expand Up @@ -103,6 +104,17 @@ def get_merged_variants(self, variants, unused_key=None):
end=variant.end,
reference_bases=variant.reference_bases,
alternate_bases=variant.alternate_bases)
# Since we use hash function in generating the merge key, there is
# a chance (extremely low though) to have variants with different
# `reference_bases` or `alternate_base` here due to a collision in
# the hash function.
assert variant.reference_bases == merged_variant.reference_bases, (
'Cannot merge variants with different reference bases. {} vs {}'
.format(variant.reference_bases, merged_variant.reference_bases))
assert variant.alternate_bases == merged_variant.alternate_bases, (
'Cannot merge variants with different alternate bases. {} vs {}'
.format(variant.alternate_bases, merged_variant.alternate_bases))

merged_variant.names.extend(variant.names)
merged_variant.filters.extend(variant.filters)
merged_variant.quality = max(merged_variant.quality, variant.quality)
Expand All @@ -123,8 +135,8 @@ def get_merge_keys(self, variant):
variant.reference_name or '',
variant.start or '',
variant.end or '',
variant.reference_bases or '',
','.join(variant.alternate_bases or [])]])
self._get_hash(variant.reference_bases or ''),
self._get_hash(','.join(variant.alternate_bases or []))]])

def modify_bigquery_schema(self, schema, info_keys):
# Find the calls record so that it's easier to reference it below.
Expand Down Expand Up @@ -163,6 +175,9 @@ def modify_bigquery_schema(self, schema, info_keys):
updated_fields.append(field)
schema.fields = updated_fields

def _get_hash(self, value):
return hashlib.md5(value).hexdigest()

def _should_move_info_key_to_calls(self, info_key):
return bool(self._info_keys_to_move_to_calls_re and
self._info_keys_to_move_to_calls_re.match(info_key))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,19 +225,33 @@ def test_get_merged_variants_move_everything_to_calls(self):
def test_get_merge_keys(self):
strategy = move_to_calls_strategy.MoveToCallsStrategy(None, None, None)

def get_expected_key(reference_name, start, end,
reference_bases, alternate_bases):
return '%s:%s:%s:%s:%s'%(
reference_name or '',
str(start or ''),
str(end or ''),
strategy._get_hash(reference_bases or ''),
strategy._get_hash(','.join(alternate_bases or [])))

variant = vcfio.Variant()
self.assertEqual('::::', next(strategy.get_merge_keys(variant)))
self.assertEqual(get_expected_key(None, None, None, None, None),
next(strategy.get_merge_keys(variant)))

variant.reference_name = '19'
self.assertEqual('19::::', next(strategy.get_merge_keys(variant)))
self.assertEqual(get_expected_key(19, None, None, None, None),
next(strategy.get_merge_keys(variant)))


variant.start = 123
variant.end = 125
variant.reference_bases = 'AT'
self.assertEqual('19:123:125:AT:', next(strategy.get_merge_keys(variant)))
self.assertEqual(get_expected_key(19, 123, 125, 'AT', None),
next(strategy.get_merge_keys(variant)))


variant.alternate_bases = ['A', 'C']
self.assertEqual('19:123:125:AT:A,C',
self.assertEqual(get_expected_key(19, 123, 125, 'AT', ['A', 'C']),
next(strategy.get_merge_keys(variant)))

def _get_base_schema(self, info_keys):
Expand Down