This script converts data to the format that is expected by further steps. 

It is also normalizes variants (converts them to the minimal form) and genotypes (converts them from multialleic form (e.g. 2/3) to biallelic form - 0/1, 1/0, 1/1).

# Imorts and functions definition

In [1]:
import csv
import gzip
import numpy as np
import glob
import tqdm

In [2]:
# minimal representation of the variant

def get_minimal_representation(pos, ref, alt): 
    """
    Creates minimal representation for alleles. 
    
    Taken from here:
        http://www.cureffi.org/2014/04/24/converting-genetic-variants-to-their-minimal-representation/
    """
    # If it's a simple SNV, don't remap anything
    if len(ref) == 1 and len(alt) == 1: 
        return pos, ref, alt
    else:
        # strip off identical suffixes (from the end)
        while(alt[-1] == ref[-1] and min(len(alt),len(ref)) > 1):
            alt = alt[:-1]
            ref = ref[:-1]
            
        # strip off identical prefixes (from the start) and increment position
        while(alt[0] == ref[0] and min(len(alt),len(ref)) > 1):
            alt = alt[1:]
            ref = ref[1:]
            pos += 1
            
        return pos, ref, alt 

In [3]:
def get_new_genotype(old_genotype, allele):
    """
        Converts genotype from Hail's multi-allelic vcf representation 
        to biallelic 0/1, 1/0, 1/1
    """
    result_genotype = np.array([0, 0])
    result_genotype[np.array(old_genotype) == allele] = 1
    
    return "/".join(map(str, result_genotype.tolist()))

# alleles parsing (includes application of the minimal representation)
def parse_alleles_from_hail(row):
    """
        Splits multi-allelic variants (e.g. 1/2) into biallelic 1/0, 0/1, 1/1
        Gets minimal representation of the variants
    """
    
    # cast types 
    row['locus.position'] = int(row['locus.position'])
    alleles = eval(row['alleles'])
    genotype = eval(row['GT.alleles'])
    
    result_rows = []
        
    for gt in set(genotype):
        
        if gt == 0:
            continue
        
        result_row = row.copy()
        
        # parse alleles 
        result_row['ref'] = alleles[0]
        result_row['alt'] = alleles[gt]
        
        # parse genotype
        result_row['GT'] = get_new_genotype(genotype, gt)

        # save old variant representation
        result_row['raw pos'], result_row['raw ref'], result_row['raw alt'] = (
            result_row['locus.position'], result_row['ref'], result_row['alt']
        )

        # get minimal representation
        result_row['pos'], result_row['ref'], result_row['alt'] = get_minimal_representation(
            result_row['locus.position'],
            result_row['ref'], 
            result_row['alt']) 
        
        result_row['chrom'] = result_row['locus.contig']
        
        # delete unnecessary info
        del result_row['']
        del result_row['locus.contig']
        del result_row['locus.position']
        del result_row['GT.phased']
        
        result_rows.append(result_row)
        
    return result_rows

This function iterates over all variants in an input file and does 2 things: 
1. Gets minimal representation of the variant

2. Converts genotypes grom multi-allelic of gVCF form to bi-allelic form of VCF. 


In [25]:
# processing of a file
def process_and_save_vcf(input_path, output_path=None):
    """
        Applies all transformation to every row of the input file 
        and saves the result as gzipped csv
    """
    
    if output_path is None:
        output_path = input_path.replace('.csv.gz', '.norm.csv.gz')

    with gzip.open(input_path, 'rt') as in_csvfile, gzip.open(output_path, 'wt') as out_csvfile:
        
        # define output csv file field names
        fieldnames = ['chrom', 'pos', 'ref', 'alt', 'GT', 's', 
                      'raw pos', 'raw ref', 'raw alt', 'alleles', 'GT.alleles']
        
        # create writer
        processed_data = csv.DictWriter(out_csvfile, fieldnames=fieldnames)
        processed_data.writeheader()
        
        # create reader
        data = csv.DictReader(in_csvfile)
        parsed_data = []
        
        # process every line of the input file
        for row in data:
            for processed_row in parse_alleles_from_hail(row):
                processed_data.writerow(processed_row)

# Processing

First, we collect all the file names for future processing, that contain variants derived from RAP

In [26]:
chromosomes = glob.glob(f".../450k_data/chr*[0-9]")

chromosomes

['.../450k_data/chr18',
 '.../450k_data/chr3',
 '.../450k_data/chr20',
 '.../450k_data/chr17',
 '.../450k_data/chr8',
 '.../450k_data/chr19',
 '.../450k_data/chr15',
 '.../450k_data/chr10',
 '.../450k_data/chr12',
 '.../450k_data/chr16',
 '.../450k_data/chr14',
 '.../450k_data/chr5',
 '.../450k_data/chr22',
 '.../450k_data/chr2',
 '.../450k_data/chr6',
 '.../450k_data/chr7',
 '.../450k_data/chr1',
 '.../450k_data/chr9',
 '.../450k_data/chr13',
 '.../450k_data/chr21',
 '.../450k_data/chr4',
 '.../450k_data/chr11']

Then we process all input files by using function defined earlier. We do it in parallel for each file, parallelizing it for 8 cores. 

In [28]:
import multiprocessing as mp

pool = mp.Pool(8)

for chromosome in chromosomes:
    print ("Processing:", chromosome, flush=True)
    
    parts = glob.glob(f"{chromosome}/*.part*[0-9].csv.gz")
    
    pool.map(
        process_and_save_vcf,
        parts
    )
    
pool.close()

Processing: .../450k_data/chr18
Processing: .../450k_data/chr3
Processing: .../450k_data/chr20
Processing: .../450k_data/chr17
Processing: .../450k_data/chr8
Processing: .../450k_data/chr19
Processing: .../450k_data/chr15
Processing: .../450k_data/chr10
Processing: .../450k_data/chr12
Processing: .../450k_data/chr16
Processing: .../450k_data/chr14
Processing: .../450k_data/chr5
Processing: .../450k_data/chr22
Processing: .../450k_data/chr2
Processing: .../450k_data/chr6
Processing: .../450k_data/chr7
Processing: .../450k_data/chr1
Processing: .../450k_data/chr9
Processing: .../450k_data/chr13
Processing: .../450k_data/chr21
Processing: .../450k_data/chr4
Processing: .../450k_data/chr11


Let's check if we have no less rows than in original file:

In [31]:
!less $path | wc -l

3414492


In [32]:
!less $processed_path | wc -l

3414511


# Number of unique variants for the paper:

In [49]:
import pandas as pd
from collections import defaultdict

parts = glob.glob(f".../450k_data/chr*[0-9]/*.part*[0-9].norm.csv.gz")

print ("Total amount of files:", len(parts))

parts[:3]

Total amount of files: 952


['.../450k_data/chr18/output.chr18.part14.norm.csv.gz',
 '.../450k_data/chr18/output.chr18.part7.norm.csv.gz',
 '.../450k_data/chr18/output.chr18.part1.norm.csv.gz']

In [61]:
import multiprocessing as mp

def get_unique_variants_count(filename):
    df = pd.read_csv(filename, compression='gzip')[['chrom', 'pos', 'ref', 'alt']].drop_duplicates()
    return filename, df.shape[0]

# Multi-processing

pool = mp.Pool(8)

result = pool.map(
    get_unique_variants_count,
    parts
)
pool.close()

In [72]:
print (f"Number of unique variants: {sum(map(lambda x: x[1], result))}")

Number of unique variants: 1544703
