diff --git a/scripts/importer/data_importer/raw_data_importer.py b/scripts/importer/data_importer/raw_data_importer.py index 9155d1ca6..a2f26cb7c 100644 --- a/scripts/importer/data_importer/raw_data_importer.py +++ b/scripts/importer/data_importer/raw_data_importer.py @@ -302,142 +302,143 @@ def _get_genes_transcripts(self): .where(db.Gene.reference_set == ref_set))} return genes, transcripts - def _insert_variants(self): # pylint: disable=too-many-locals,too-many-branches,too-many-statements + def _parse_variant_row(self, line: str, batch_cont: dict, headers: list, vep_field_names: list): # pylint: disable=too-many-locals + """ + Parse a VCF row for a position (potentially multiple variants). + + Data is added in-place. + + Args: + line (str): the raw text row + batch_cont (dict): should contain batch, genes, transcripts + headers (list): (header, type) + vep_field_names (list): VEP field names + + """ + base = self.parse_baseinfo(headers, line) + info = parse_info(line) + + if is_non_chromosome(base["chrom"]): + return + + consequence_array = info['CSQ'].split(',') if 'CSQ' in info else [] + + alt_alleles = base['alt'].split(",") + rsids = [int(rsid.strip('rs')) + for rsid in base['rsid'].split(';') + if rsid.startswith('rs')] + if not rsids: + rsids = [None] + + try: + hom_counts = [int(info['AC_Hom'])] + except KeyError: + hom_counts = [] # null is better than 0, as 0 has a meaning + except ValueError: + # multiple variants on same row + hom_counts = [int(count) for count in info['AC_Hom'].split(',')] + + base['orig_alt_alleles'] = [f'{base["chrom"]}-{base["pos"]}-{base["ref"]}-{x}' + for x in alt_alleles] + + for i, alt in enumerate(alt_alleles): + data = dict(base) + data['alt'] = alt + + data['rsid'] = rsids[i] if i < len(rsids) else rsids[-1] + + data['allele_num'] = int((info['AN_Adj'] if 'AN_Adj' in info else info['AN'])) + data['allele_freq'] = None + + data['allele_count'] = int((info['AC_Adj'] if 'AC_Adj' in info + else info['AC']).split(',')[i]) + if 'AF' in info and data['allele_num'] > 0: + data['allele_freq'] = data['allele_count']/data['allele_num'] + + if not self.settings.beacon_only: + annotations = [dict(zip(vep_field_names, x.split('|'))) + for x in consequence_array + if len(vep_field_names) == len(x.split('|'))] + data['vep_annotations'] = [ann for ann in annotations + if int(ann['ALLELE_NUM']) == i + 1] + batch_cont['genes'].append(list({annotation['Gene'] + for annotation in data['vep_annotations'] + if annotation['Gene'][:4] == 'ENSG'})) + batch_cont['transcripts'].append(list({annotation['Feature'] + for annotation in data['vep_annotations'] + if annotation['Feature'][:4] == 'ENST'})) + + data['hom_count'] = hom_counts[i] if hom_counts else None + + data['variant_id'] = '{}-{}-{}-{}'.format(data['chrom'], + data['pos'], + data['ref'], + data['alt']) + data['quality_metrics'] = {x: info[x] for x in METRICS if x in info} + batch_cont['batch'] += [data] + if self.settings.count_calls: + self.get_callcount(data) # count calls (one per reference) + self.counter['beaconvariants'] += 1 # count variants (one/alternate) + + def _insert_variants(self): """Import variants from a VCF file.""" logging.info(f"Inserting variants{' (dry run)' if self.settings.dry_run else ''}") - header = [("chrom", str), ("pos", int), ("rsid", str), ("ref", str), - ("alt", str), ("site_quality", float), ("filter_string", str)] start = time.time() - batch = [] - genes = [] - transcripts = [] + headers = [("chrom", str), ("pos", int), ("rsid", str), ("ref", str), + ("alt", str), ("site_quality", float), ("filter_string", str)] + batch_container = {'batch': [], + 'genes': [], + 'transcripts': []} + + vep_field_names = None last_progress = -1.0 counter = 0 samples = 0 - vep_field_names = None + + references = dict(zip(('genes', 'transcripts'), self._get_genes_transcripts())) + with db.database.atomic(): - for filename in self.settings.variant_file: # pylint: disable=too-many-nested-blocks - references = dict(zip(('genes', 'transcripts'), self._get_genes_transcripts())) + for filename in self.settings.variant_file: for line in self._open(filename, binary=False): line = line.strip() if line.startswith("#"): # Check for some information that we need if line.startswith('##INFO=').split('|') + vep_field_names = (line.split('Format: ')[-1].strip('">').split('|')) if line.startswith('#CHROM'): samples = len(line.split('\t')[9:]) continue - if not self.settings.beacon_only: - if vep_field_names is None: - logging.error("VEP_field_names is empty. " + - "Make sure VCF header is present.") - sys.exit(1) - - base = self.parse_baseinfo(header, line) - info = parse_info(line) - - if is_non_chromosome(base["chrom"]): - continue + if not self.settings.beacon_only and not vep_field_names: + logging.error("VEP_field_names is empty. " + + "Make sure VCF header is present.") + sys.exit(1) - consequence_array = info['CSQ'].split(',') if 'CSQ' in info else [] - if not self.settings.beacon_only: - annotations = [dict(zip(vep_field_names, x.split('|'))) - for x in consequence_array - if len(vep_field_names) == len(x.split('|'))] - - alt_alleles = base['alt'].split(",") - rsids = [int(rsid.strip('rs')) - for rsid in base['rsid'].split(';') - if rsid.startswith('rs')] - if not rsids: - rsids = [None] - - try: - hom_counts = [int(info['AC_Hom'])] - except KeyError: - hom_counts = None # null is better than 0, as 0 has a meaning - except ValueError: - # multiple variants on same row - hom_counts = [int(count) for count in info['AC_Hom'].split(',')] - - fmt_alleles = [f'{base["chrom"]}-{base["pos"]}-{base["ref"]}-{x}' - for x in alt_alleles] - - for i, alt in enumerate(alt_alleles): - if not self.settings.beacon_only: - vep_annotations = [ann for ann in annotations - if int(ann['ALLELE_NUM']) == i + 1] - - data = dict(base) - data['pos'], data['ref'], data['alt'] = base['pos'], base['ref'], alt - data['orig_alt_alleles'] = fmt_alleles - - if len(rsids) <= i: - data['rsid'] = rsids[-1] # same id as the last alternate - else: - data['rsid'] = rsids[i] - - an, ac = 'AN_Adj', 'AC_Adj' - if 'AN_Adj' not in info: - an = 'AN' - if 'AC_Adj' not in info: - ac = 'AC' - - data['allele_num'] = int(info[an]) - data['allele_freq'] = None - if 'NS' in info and not samples: - # save this unless we already know the sample size - samples = int(info['NS']) - - data['allele_count'] = int(info[ac].split(',')[i]) - if 'AF' in info and data['allele_num'] > 0: - data['allele_freq'] = data['allele_count']/float(info[an]) - - if not self.settings.beacon_only: - data['vep_annotations'] = vep_annotations - - genes.append(list({annotation['Gene'] - for annotation in vep_annotations - if annotation['Gene'][:4] == 'ENSG'})) - transcripts.append(list({annotation['Feature'] - for annotation in vep_annotations - if annotation['Feature'][:4] == 'ENST'})) - - data['hom_count'] = hom_counts[i] if hom_counts else None - - data['variant_id'] = '{}-{}-{}-{}'.format(data['chrom'], - data['pos'], - data['ref'], - data['alt']) - data['quality_metrics'] = {x: info[x] for x in METRICS if x in info} - batch += [data] - if self.settings.count_calls: - self.get_callcount(data) # count calls (one per reference) - self.counter['beaconvariants'] += 1 # count variants (one/alternate) + self._parse_variant_row(line, batch_container, headers, vep_field_names) counter += 1 # count variants (one per vcf row) - if len(batch) >= self.settings.batch_size: + if len(batch_container['batch']) >= self.settings.batch_size: if not self.settings.dry_run: - self._add_variants_to_db(batch, - genes, - transcripts, + self._add_variants_to_db(batch_container['batch'], + batch_container['genes'], + batch_container['transcripts'], references) - genes = [] - transcripts = [] - batch = [] + + batch_container['genes'] = [] + batch_container['transcripts'] = [] + batch_container['batch'] = [] # Update progress if self.counter['variants'] is not None: last_progress = self._update_progress_bar(counter, self.counter['variants'], last_progress) - if batch and not self.settings.dry_run: - self._add_variants_to_db(batch, - genes, - transcripts, + if batch_container['batch'] and not self.settings.dry_run: + self._add_variants_to_db(batch_container['batch'], + batch_container['genes'], + batch_container['transcripts'], references) if self.settings.set_vcf_sampleset_size and samples: