From 8a28e7ceb44231f300c2cc981506f5c516e057ea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20=C3=96stberg?= Date: Mon, 19 Aug 2019 15:18:00 +0200 Subject: [PATCH 1/2] Move VCF row parsing to separate function. --- .../data_importer/raw_data_importer.py | 212 +++++++++--------- 1 file changed, 111 insertions(+), 101 deletions(-) diff --git a/scripts/importer/data_importer/raw_data_importer.py b/scripts/importer/data_importer/raw_data_importer.py index 9155d1ca6..70fdb0af7 100644 --- a/scripts/importer/data_importer/raw_data_importer.py +++ b/scripts/importer/data_importer/raw_data_importer.py @@ -302,30 +302,119 @@ def _get_genes_transcripts(self): .where(db.Gene.reference_set == ref_set))} return genes, transcripts + def _parse_variant_row(self, line: str, batch_cont: dict, headers: list, vep_field_names: list): # pylint: disable=too-many-locals,too-many-branches,too-many-statements + """ + Parse a VCF row for a position (potentially multiple variants). + + Data is added in-place. + + Args: + line (str): the raw text row + batch_cont (dict): should contain batch, genes, transcripts + + """ + base = {'dataset_version': self.dataset_version} + for i, item in enumerate(line.strip().split("\t")): + if i < 7: + base[headers[i][0]] = headers[i][1](item) + elif i == 7 or not self.settings.beacon_only: + # only parse column 7 (maybe also for non-beacon-import?) + info = {a[0]:a[1] for a in map(lambda x: x.split('=', 1) if '=' in x + else (x, x), re.split(r';(?=\w)', item))} + + if base["chrom"].startswith('GL') or base["chrom"].startswith('MT'): + return + + consequence_array = info['CSQ'].split(',') if 'CSQ' in info else [] + + alt_alleles = base['alt'].split(",") + rsids = [int(rsid.strip('rs')) + for rsid in base['rsid'].split(';') + if rsid.startswith('rs')] + if not rsids: + rsids = [None] + + try: + hom_counts = [int(info['AC_Hom'])] + except KeyError: + hom_counts = None # null is better than 0, as 0 has a meaning + except ValueError: + # multiple variants on same row + hom_counts = [int(count) for count in info['AC_Hom'].split(',')] + + fmt_alleles = [f'{base["chrom"]}-{base["pos"]}-{base["ref"]}-{x}' + for x in alt_alleles] + + for i, alt in enumerate(alt_alleles): + data = dict(base) + data['alt'] = alt + data['orig_alt_alleles'] = fmt_alleles + + if len(rsids) <= i: + data['rsid'] = rsids[-1] # same id as the last alternate + else: + data['rsid'] = rsids[i] + + data['allele_num'] = int((info['AN_Adj'] if 'AN_Adj' in info else info['AN'])) + data['allele_freq'] = None + + data['allele_count'] = int((info['AC_Adj'] if 'AC_Adj' in info + else info['AC']).split(',')[i]) + if 'AF' in info and data['allele_num'] > 0: + data['allele_freq'] = data['allele_count']/data['allele_num'] + + if not self.settings.beacon_only: + annotations = [dict(zip(vep_field_names, x.split('|'))) + for x in consequence_array + if len(vep_field_names) == len(x.split('|'))] + data['vep_annotations'] = [ann for ann in annotations + if int(ann['ALLELE_NUM']) == i + 1] + batch_cont['genes'].append(list({annotation['Gene'] + for annotation in data['vep_annotations'] + if annotation['Gene'][:4] == 'ENSG'})) + batch_cont['transcripts'].append(list({annotation['Feature'] + for annotation in data['vep_annotations'] + if annotation['Feature'][:4] == 'ENST'})) + + data['hom_count'] = hom_counts[i] if hom_counts else None + + data['variant_id'] = '{}-{}-{}-{}'.format(data['chrom'], + data['pos'], + data['ref'], + data['alt']) + data['quality_metrics'] = {x: info[x] for x in METRICS if x in info} + batch_cont['batch'] += [data] + if self.settings.count_calls: + self.get_callcount(data) # count calls (one per reference) + self.counter['beaconvariants'] += 1 # count variants (one/alternate) + def _insert_variants(self): # pylint: disable=too-many-locals,too-many-branches,too-many-statements """Import variants from a VCF file.""" logging.info(f"Inserting variants{' (dry run)' if self.settings.dry_run else ''}") - header = [("chrom", str), ("pos", int), ("rsid", str), ("ref", str), - ("alt", str), ("site_quality", float), ("filter_string", str)] start = time.time() - batch = [] - genes = [] - transcripts = [] + headers = [("chrom", str), ("pos", int), ("rsid", str), ("ref", str), + ("alt", str), ("site_quality", float), ("filter_string", str)] + batch_container = {'batch': [], + 'genes': [], + 'transcripts': []} + + vep_field_names = None last_progress = -1.0 counter = 0 samples = 0 - vep_field_names = None + + references = dict(zip(('genes', 'transcripts'), self._get_genes_transcripts())) + with db.database.atomic(): - for filename in self.settings.variant_file: # pylint: disable=too-many-nested-blocks - references = dict(zip(('genes', 'transcripts'), self._get_genes_transcripts())) + for filename in self.settings.variant_file: for line in self._open(filename, binary=False): line = line.strip() if line.startswith("#"): # Check for some information that we need if line.startswith('##INFO=').split('|') + vep_field_names = (line.split('Format: ')[-1].strip('">').split('|')) if line.startswith('#CHROM'): samples = len(line.split('\t')[9:]) continue @@ -336,108 +425,29 @@ def _insert_variants(self): # pylint: disable=too-many-locals,too-many-branches "Make sure VCF header is present.") sys.exit(1) - base = self.parse_baseinfo(header, line) - info = parse_info(line) - - if is_non_chromosome(base["chrom"]): - continue - - consequence_array = info['CSQ'].split(',') if 'CSQ' in info else [] - if not self.settings.beacon_only: - annotations = [dict(zip(vep_field_names, x.split('|'))) - for x in consequence_array - if len(vep_field_names) == len(x.split('|'))] - - alt_alleles = base['alt'].split(",") - rsids = [int(rsid.strip('rs')) - for rsid in base['rsid'].split(';') - if rsid.startswith('rs')] - if not rsids: - rsids = [None] - - try: - hom_counts = [int(info['AC_Hom'])] - except KeyError: - hom_counts = None # null is better than 0, as 0 has a meaning - except ValueError: - # multiple variants on same row - hom_counts = [int(count) for count in info['AC_Hom'].split(',')] - - fmt_alleles = [f'{base["chrom"]}-{base["pos"]}-{base["ref"]}-{x}' - for x in alt_alleles] - - for i, alt in enumerate(alt_alleles): - if not self.settings.beacon_only: - vep_annotations = [ann for ann in annotations - if int(ann['ALLELE_NUM']) == i + 1] - - data = dict(base) - data['pos'], data['ref'], data['alt'] = base['pos'], base['ref'], alt - data['orig_alt_alleles'] = fmt_alleles - - if len(rsids) <= i: - data['rsid'] = rsids[-1] # same id as the last alternate - else: - data['rsid'] = rsids[i] - - an, ac = 'AN_Adj', 'AC_Adj' - if 'AN_Adj' not in info: - an = 'AN' - if 'AC_Adj' not in info: - ac = 'AC' - - data['allele_num'] = int(info[an]) - data['allele_freq'] = None - if 'NS' in info and not samples: - # save this unless we already know the sample size - samples = int(info['NS']) - - data['allele_count'] = int(info[ac].split(',')[i]) - if 'AF' in info and data['allele_num'] > 0: - data['allele_freq'] = data['allele_count']/float(info[an]) - - if not self.settings.beacon_only: - data['vep_annotations'] = vep_annotations - - genes.append(list({annotation['Gene'] - for annotation in vep_annotations - if annotation['Gene'][:4] == 'ENSG'})) - transcripts.append(list({annotation['Feature'] - for annotation in vep_annotations - if annotation['Feature'][:4] == 'ENST'})) - - data['hom_count'] = hom_counts[i] if hom_counts else None - - data['variant_id'] = '{}-{}-{}-{}'.format(data['chrom'], - data['pos'], - data['ref'], - data['alt']) - data['quality_metrics'] = {x: info[x] for x in METRICS if x in info} - batch += [data] - if self.settings.count_calls: - self.get_callcount(data) # count calls (one per reference) - self.counter['beaconvariants'] += 1 # count variants (one/alternate) + self._parse_variant_row(line, batch_container, headers, vep_field_names) counter += 1 # count variants (one per vcf row) - if len(batch) >= self.settings.batch_size: + if len(batch_container['batch']) >= self.settings.batch_size: if not self.settings.dry_run: - self._add_variants_to_db(batch, - genes, - transcripts, + self._add_variants_to_db(batch_container['batch'], + batch_container['genes'], + batch_container['transcripts'], references) - genes = [] - transcripts = [] - batch = [] + + batch_container['genes'] = [] + batch_container['transcripts'] = [] + batch_container['batch'] = [] # Update progress if self.counter['variants'] is not None: last_progress = self._update_progress_bar(counter, self.counter['variants'], last_progress) - if batch and not self.settings.dry_run: - self._add_variants_to_db(batch, - genes, - transcripts, + if batch_container['batch'] and not self.settings.dry_run: + self._add_variants_to_db(batch_container['batch'], + batch_container['genes'], + batch_container['transcripts'], references) if self.settings.set_vcf_sampleset_size and samples: From bc70b18af86e2af19b45bfe8dc4a343180c7ed8c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20=C3=96stberg?= Date: Tue, 20 Aug 2019 13:09:09 +0200 Subject: [PATCH 2/2] Use functions for parsing, simplify code. --- .../data_importer/raw_data_importer.py | 41 ++++++++----------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/scripts/importer/data_importer/raw_data_importer.py b/scripts/importer/data_importer/raw_data_importer.py index 70fdb0af7..a2f26cb7c 100644 --- a/scripts/importer/data_importer/raw_data_importer.py +++ b/scripts/importer/data_importer/raw_data_importer.py @@ -302,7 +302,7 @@ def _get_genes_transcripts(self): .where(db.Gene.reference_set == ref_set))} return genes, transcripts - def _parse_variant_row(self, line: str, batch_cont: dict, headers: list, vep_field_names: list): # pylint: disable=too-many-locals,too-many-branches,too-many-statements + def _parse_variant_row(self, line: str, batch_cont: dict, headers: list, vep_field_names: list): # pylint: disable=too-many-locals """ Parse a VCF row for a position (potentially multiple variants). @@ -311,18 +311,14 @@ def _parse_variant_row(self, line: str, batch_cont: dict, headers: list, vep_fie Args: line (str): the raw text row batch_cont (dict): should contain batch, genes, transcripts + headers (list): (header, type) + vep_field_names (list): VEP field names """ - base = {'dataset_version': self.dataset_version} - for i, item in enumerate(line.strip().split("\t")): - if i < 7: - base[headers[i][0]] = headers[i][1](item) - elif i == 7 or not self.settings.beacon_only: - # only parse column 7 (maybe also for non-beacon-import?) - info = {a[0]:a[1] for a in map(lambda x: x.split('=', 1) if '=' in x - else (x, x), re.split(r';(?=\w)', item))} - - if base["chrom"].startswith('GL') or base["chrom"].startswith('MT'): + base = self.parse_baseinfo(headers, line) + info = parse_info(line) + + if is_non_chromosome(base["chrom"]): return consequence_array = info['CSQ'].split(',') if 'CSQ' in info else [] @@ -337,23 +333,19 @@ def _parse_variant_row(self, line: str, batch_cont: dict, headers: list, vep_fie try: hom_counts = [int(info['AC_Hom'])] except KeyError: - hom_counts = None # null is better than 0, as 0 has a meaning + hom_counts = [] # null is better than 0, as 0 has a meaning except ValueError: # multiple variants on same row hom_counts = [int(count) for count in info['AC_Hom'].split(',')] - fmt_alleles = [f'{base["chrom"]}-{base["pos"]}-{base["ref"]}-{x}' - for x in alt_alleles] + base['orig_alt_alleles'] = [f'{base["chrom"]}-{base["pos"]}-{base["ref"]}-{x}' + for x in alt_alleles] for i, alt in enumerate(alt_alleles): data = dict(base) data['alt'] = alt - data['orig_alt_alleles'] = fmt_alleles - if len(rsids) <= i: - data['rsid'] = rsids[-1] # same id as the last alternate - else: - data['rsid'] = rsids[i] + data['rsid'] = rsids[i] if i < len(rsids) else rsids[-1] data['allele_num'] = int((info['AN_Adj'] if 'AN_Adj' in info else info['AN'])) data['allele_freq'] = None @@ -388,7 +380,7 @@ def _parse_variant_row(self, line: str, batch_cont: dict, headers: list, vep_fie self.get_callcount(data) # count calls (one per reference) self.counter['beaconvariants'] += 1 # count variants (one/alternate) - def _insert_variants(self): # pylint: disable=too-many-locals,too-many-branches,too-many-statements + def _insert_variants(self): """Import variants from a VCF file.""" logging.info(f"Inserting variants{' (dry run)' if self.settings.dry_run else ''}") start = time.time() @@ -419,11 +411,10 @@ def _insert_variants(self): # pylint: disable=too-many-locals,too-many-branches samples = len(line.split('\t')[9:]) continue - if not self.settings.beacon_only: - if vep_field_names is None: - logging.error("VEP_field_names is empty. " + - "Make sure VCF header is present.") - sys.exit(1) + if not self.settings.beacon_only and not vep_field_names: + logging.error("VEP_field_names is empty. " + + "Make sure VCF header is present.") + sys.exit(1) self._parse_variant_row(line, batch_container, headers, vep_field_names) counter += 1 # count variants (one per vcf row)