Skip to content

Commit

Permalink
Merge pull request #508 from NBISweden/feature/updated-variant-parser
Browse files Browse the repository at this point in the history
Feature/updated variant parser
  • Loading branch information
norling authored Feb 13, 2019
2 parents 34d55a0 + 16a9826 commit 500b3b0
Showing 1 changed file with 84 additions and 17 deletions.
101 changes: 84 additions & 17 deletions scripts/importer/data_importer/raw_data_importer.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,18 @@ def get_minimal_representation(pos, ref, alt):
return pos, ref, alt
else:
# strip off identical suffixes
while(alt[-1] == ref[-1] and min(len(alt),len(ref)) > 1):
while(alt[-1] == ref[-1] and min(len(alt), len(ref)) > 1):
alt = alt[:-1]
ref = ref[:-1]
# strip off identical prefixes and increment position
while(alt[0] == ref[0] and min(len(alt),len(ref)) > 1):
while(alt[0] == ref[0] and min(len(alt), len(ref)) > 1):
alt = alt[1:]
ref = ref[1:]
pos += 1
return pos, ref, alt


class RawDataImporter( DataImporter ):

class RawDataImporter(DataImporter):
def __init__(self, settings):
super().__init__(settings)
self.dataset_version = None
Expand All @@ -68,7 +67,7 @@ def _select_dataset_version(self):
datasets = []

try:
ds = db.Dataset.get(short_name = self.settings.dataset)
ds = db.Dataset.get(short_name=self.settings.dataset)
except db.Dataset.DoesNotExist:
print("Select a Dataset to use with this data")
for dataset in db.Dataset.select():
Expand Down Expand Up @@ -204,8 +203,8 @@ def _insert_coverage(self):
# re-format coverage for batch
for i, item in enumerate(batch):
batch[i]['coverage'] = [item['cov1'], item['cov5'], item['cov10'],
item['cov15'],item['cov20'],item['cov25'],
item['cov30'],item['cov50'],item['cov100']]
item['cov15'], item['cov20'], item['cov25'],
item['cov30'], item['cov50'], item['cov100']]
del batch[i]['cov1']
del batch[i]['cov5']
del batch[i]['cov10']
Expand Down Expand Up @@ -235,18 +234,27 @@ def _insert_coverage(self):
logging.info("Inserted {} coverage records in {}".format(counter, self._time_since(start)))

def _insert_variants(self):
"""
Insert variants from a VCF file
"""
logging.info("Inserting variants")
header = [("chrom",str), ("pos", int), ("rsid", str), ("ref", str),
header = [("chrom", str), ("pos", int), ("rsid", str), ("ref", str),
("alt", str), ("site_quality", float), ("filter_string", str)]
start = time.time()
batch = []
genes = []
transcripts = []

last_progress = 0.0
counter = 0
vep_field_names = None
dp_mids = None
gq_mids = None
with db.database.atomic():
for filename in self.settings.variant_file:
# gene/transctipt dbids; need to add support for version
refgenes = {gene.gene_id:gene.id for gene in db.Gene.select(db.Gene.id, db.Gene.gene_id)}
reftranscripts = {tran.transcript_id:tran.id for tran in db.Transcript.select(db.Transcript.id, db.Transcript.transcript_id)}
for line in self._open(filename):
line = bytes(line).decode('utf8').strip()

Expand Down Expand Up @@ -304,18 +312,17 @@ def _insert_variants(self):
if self.settings.beacon_only and 'AC_Adj' not in info:
ac = 'AC'

data['allele_num'] = int(info[an])
data['allele_freq'] = None
data['allele_num'] = int(info[an])
data['allele_freq'] = None
data['allele_count'] = int(info[ac].split(',')[i])
if 'AF' in info and data['allele_num'] > 0:
data['allele_freq'] = data['allele_count']/float(info[an])


if not self.settings.beacon_only:
data['vep_annotations'] = vep_annotations

data['genes'] = list({annotation['Gene'] for annotation in vep_annotations})
data['transcripts'] = list({annotation['Feature'] for annotation in vep_annotations})
genes.append(list({annotation['Gene'] for annotation in vep_annotations}))
transcripts.append(list({annotation['Feature'] for annotation in vep_annotations}))

data['orig_alt_alleles'] = [
'{}-{}-{}-{}'.format(data['chrom'], *get_minimal_representation(base['pos'], base['ref'], x)) for x in alt_alleles
Expand All @@ -324,27 +331,70 @@ def _insert_variants(self):
data['hom_count'] = int(info['AC_Hom'])
except KeyError:
pass # null is better than 0, as 0 has a meaning
data['variant_id'] = '{}-{}-{}-{}'.format(data['chrom'], data['pos'], data['ref'], data['alt'])
data['quality_metrics'] = dict([(x, info[x]) for x in METRICS if x in info])
except ValueError:
data['hom_count'] = int(info['AC_Hom'].split(',')[0]) # parsing Swegen sometimes give e.g. 14,0
data['variant_id'] = '{}-{}-{}-{}'.format(data['chrom'], data['pos'], data['ref'], data['alt'])
data['quality_metrics'] = dict([(x, info[x]) for x in METRICS if x in info])
batch += [data]

counter += 1

if len(batch) >= self.settings.batch_size:
if not self.settings.dry_run:
if not self.settings.beacon_only:
try:
curr_id = db.Variant.select(db.Variant.id).order_by(db.Variant.id.desc()).limit(1).get().id
except db.Variant.DoesNotExist:
# assumes next id will be 1 if table is empty
curr_id = 0

db.Variant.insert_many(batch).execute()

if not self.settings.beacon_only:
last_id = db.Variant.select(db.Variant.id).order_by(db.Variant.id.desc()).limit(1).get().id
if last_id-curr_id == len(batch):
indexes = list(range(curr_id+1, last_id+1))
else:
indexes = []
for entry in batch:
indexes.append(db.Variant.select(db.Variant.id).where(db.Variant.variant_id == entry['variant_id']).get().id)
self.add_variant_genes(indexes, genes, refgenes)
self.add_variant_transcripts(indexes, transcripts, reftranscripts)

genes = []
transcripts = []
batch = []
# Update progress
if self.counter['variants'] != None:
progress = counter / self.counter['variants']
while progress > last_progress + 0.01:
if not last_progress:
logging.info("Estimated time to completion: {}".format(self._time_to(start, progress)))
self._print_progress_bar()
self._tick()
last_progress += 0.01

if batch and not self.settings.dry_run:
db.Variant.insert_many(batch).execute()
if not self.settings.dry_run:
if not self.settings.beacon_only:
try:
curr_id = db.Variant.select(db.Variant.id).order_by(db.Variant.id.desc()).limit(1).get().id
except db.Variant.DoesNotExist:
# assumes next id will be 1 if table is empty
curr_id = 0

db.Variant.insert_many(batch).execute()

if not self.settings.beacon_only:
last_id = db.Variant.select(db.Variant.id).order_by(db.Variant.id.desc()).limit(1).get().id
if last_id-curr_id == len(batch):
indexes = list(range(curr_id+1, last_id+1))
else:
indexes = []
for entry in batch:
indexes.append(db.Variant.select(db.Variant.id).where(db.Variant.variant_id == entry['variant_id']).get().id)
self.add_variant_genes(indexes, genes, refgenes)
self.add_variant_transcripts(indexes, transcripts, reftranscripts)

self.dataset_version.num_variants = counter
self.dataset_version.save()
if self.counter['variants'] != None:
Expand Down Expand Up @@ -384,3 +434,20 @@ def start_import(self):
self._insert_variants()
if not self.settings.beacon_only:
self._insert_coverage()

def add_variant_genes(self, variant_indexes:list, genes_to_add:list, refgenes:dict):
batch = []
for i in range(len(variant_indexes)):
connected_genes = [{'variant':variant_indexes[i], 'gene':refgenes[gene]} for gene in genes_to_add[i] if gene]
batch += connected_genes
if not self.settings.dry_run:
db.VariantGenes.insert_many(batch).execute()

def add_variant_transcripts(self, variant_indexes:list, transcripts_to_add:list, reftranscripts:dict):
batch = []
for i in range(len(variant_indexes)):
connected_transcripts = [{'variant':variant_indexes[i], 'transcript':reftranscripts[transcript]}
for transcript in transcripts_to_add[i] if transcript and transcript[:4] == 'ENST']
batch += connected_transcripts
if not self.settings.dry_run:
db.VariantTranscripts.insert_many(batch).execute()

0 comments on commit 500b3b0

Please sign in to comment.