In [1]:
import pandas as pd
import re
import os
import datetime

In [2]:
def write(path, content):
    """
    a wrapper function for with-open-write, just for good code looking...
    """
    with open(path, "w+") as f:
        read_data = f.read()
        f.write(content)
        
def append(path, content):
    """
    a wrapper function for with-open-append, just for good code looking...
    """
    with open(path, "a+") as f:
        read_data = f.read()
        f.write(content)

In [3]:
def extract_ID(SampleID):
    """
    Helper function clean Sample ID to pure digits
    
    Args:
        SampleID: String, in "washei*****.BQSR.recaled.bam" format, where * stand for numbers
    Return:
        String, the number characters in the middle
    """
    return re.findall(r"[0-9]+", SampleID)[0]

def get_file(input_path, bam_name):
    '''
    given a bam name, read the variantsAnnotate.txt inside the output of Coassin pipeline
    Args:
        input_path, the input path with all the bam output inside
        bam_name: str, the name of the original bam file
    Return:
        pandas.DataFrame instance, the variantsAnnotate.txt file read
    '''
    return pd.read_csv(f'{input_path}/{bam_name}/variantsAnnotate/variantsAnnotate.txt', delimiter = "\t")

def data_cleaning(df):
    """
    existing data cleaning procedure
    Args:
        df: pandas.DafaFrame instance, the dataframe
    Returns:
        df: pandas.DafaFrame instance, the dataframe with cleaned ID
    """
    df.SampleID = df.SampleID.apply(extract_ID)
    return df

def data_load_wrapper(input_path, bam_list):
    """
    The wrapper for a loading files
    Args:
        path_list: list[str], the path hold all Coassin pipeline output
    Return:
        df: pandas.DafaFrame instance, all the data should be loaded
    """
    return pd.concat([
        data_cleaning(
            get_file(input_path = input_path,
                     bam_name = bam_name)
        ) 
        for bam_name in bam_list
    ], 
        axis = "index", 
        ignore_index = True)

In [4]:
def generate_meta(file_format = "VCFv4.2", 
                  file_date = 'today', 
                  source = "lpa-analysis",
                  reference = "https://raw.githubusercontent.com/seppinho/mutation-server/76e865ece25cf792d1534b0288b2c28bc1b3d013/test-data/dna/lpa-sample/reference/kiv2_6.fasta"
                 ):
    meta_information = \
f'''##fileformat={file_format}
##fileDate={datetime.datetime.now().strftime("%Y%m%d") if file_date=="today" else file_date}
##source={source}
##reference={reference}
##phasing=partial 
##FORMAT=<ID=GT,Number=1,Type=String,Description="Genotype"> 
##FORMAT=<ID=VL,Number=1,Type=Float,Description="Variant Level"> 
##FORMAT=<ID=TC,Number=1,Type=Integer,Description="Total Coverage"> 
##FORMAT=<ID=TB,Number=1,Type=String,Description="TypeB"> 
'''
    return meta_information

def generate_header(sample_id_sorted_list):
    return "\t".join(['#CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'FORMAT'] + sample_id_sorted_list)+"\n"

def new_index(l, i):
    """
    a exception solver for list with exceed index
    """
    try:
        return l[i]
    except:
        return "0/0:::"
    
def generate_line(pos, REF, ALT, sample_literal):
    CHROM = "6"
    POS = str(pos)
    ID = '.'
    QUAL = '.'
    FILTER = '.'
    FORMAT = "GT:VL:TC:TB"
    repeat = max([len(l) for l in sample_literal])
    # when there is no multiple output in one position for one sample
    if repeat == 1:
        return "\t".join([CHROM, POS, ID, REF, ALT, QUAL, FILTER, FORMAT]+[i[0] for i in sample_literal])+"\n"
    #when there are multiple output in one position for one sample
    else:
        return "\n".join(
            ["\t".join(
                [CHROM, POS, ID, REF, ALT, QUAL, FILTER, FORMAT]+
                [new_index(l,i) for l in sample_literal])
             for i in range(repeat)
            ]
        )+"\n" 

In [5]:
def get_positional_data(df):
    """
    Extract the positional-specific data, df is expected to be the return of
    pandas.DataFrameGroupBy.get_group(), which is grouped by the position("Pos")
    """
    # reference is unique for each position
    REF = df.Ref.unique()[0]
    # This list keeps the order of alternatives
    alt_list = list(df.Variant.unique())
    # the output format of alternatives
    ALT = ','.join(alt_list)
    # Total Coverage is unique for each position
    TC = df['Coverage-Total'].unique()[0]
    return REF, ALT, TC, alt_list

In [6]:
def _get_genotype_literal(row, ref, alt_list):
    '''
    generate the genotype information literal
    args:
        row: pandas dataframe individual line, a variant record
        ref: str, the reference position
        alt_list: all the alternatives possible
    return:
        a genotype output follows VCFv4.2 1.4.2 genotype field requirement given the order from alt_list
    '''
    gt_list = [ref]+alt_list
    gt_dict = {v: k for k, v in enumerate(gt_list)}
    major, minor = row['Major/Minor'].split('/')
    return f"{gt_dict[major]}/{gt_dict[minor]}"

def _get_genotype(df, ref, alt_list):
    """
    a wrapper apply _genotype to each individual line of df
    args:
        df: pandas.DataFrame object, a list of variant record
        ref: str, the reference position
        alt_list: all the alternatives possible
    return:
        list[str]: genotype output follows VCFv4.2 1.4.2 genotype field requirement given the order from alt_list
    """
    # apply _get_genotype_literal to all rows
    return dict(
        df.apply(
            func = (lambda x: _get_genotype_literal(row = x, ref = ref, alt_list = alt_list)),
            axis=1)
    )

def _extract_data(df, ref, alt_list, TC):
    """
    generate necessary data format from a pandas.DataFrame object,
    This DataFrame object is designed to be generated from 
    a get_group method from a pandas.DataFrameGroupBy object
    """

    # extract the genotype notation string
    GT = _get_genotype(df = df, ref = ref, alt_list = alt_list)
    # get the variant level information
    VL = dict(df['Variant-Level'])
    # get the typeB information
    TB = dict(df["TypeB"])
    # build the notation literals
    notation = [f"{GT[index]}:{VL[index]:.4f}:{TC}:{TB[index]}" for index in df.index]

    return notation

def get_sample_data(df_group_by_id, SampleID, ref, alt_list, TC):
    """
    a wrapper for _extract_data, which dealt with the keyError and generate a "0/0:::" literal
    return:
        list[str]: a list with genotype literals inside
    """
    try:
        df = df_group_by_id.get_group(SampleID)
    except KeyError:
        return ["0/0:::"]
    # if there is any duplicate use comma for now
    return _extract_data(df = df, ref = ref, alt_list = alt_list, TC = TC)

In [7]:
def write_VCF(input_path = "data", output_path = "output.vcf", bam_list = ["*"]):
    """
    Wrapper function for a complete process taking Coassin pipeline output as input
    and output VCF file at output_path
    
    args:
        input_path: str, the path of input folder with all the Coassin Pipeline output inside the folder
        bam_list: list[str], if specified it will select the file in the list
        output_path: str, the path of output path
        
    output:
        a file at <output_path> following VCFv4.2 requirement
    """
    
    if bam_list == ["*"]:
        input_list = os.listdir(input_path)
        
    # Load the complete dataset into pandas
    df = data_load_wrapper(input_path, bam_list)
    
    # Write the output path
    write(path = output_path, content = generate_meta())
    
    # get all the SampleID(this ID is str, but it's still sorted numerically in python)
    sample_id_sorted_list = list([i for i in df.SampleID.sort_values().drop_duplicates()])
    
    # append the header
    append(path = output_path, 
           content = generate_header(sample_id_sorted_list = sample_id_sorted_list))
    
    # get all the positions
    pos_sorted_list = list([i for i in df.Pos.sort_values().drop_duplicates()])
    
    # group the results by Positions("Pos"), sort = False to improve efficiency
    df_group_by_pos = df.groupby(['Pos'], sort = False)
    
    # iterate over the positions, to save the memory the result will be output line by line, i.e. by position
    for pos in pos_sorted_list:
        
        # get all the records on position <pos>
        df_group_pos = df_group_by_pos.get_group(pos)
        
        # get the positional data, which will not be affected or can be generated by individual Samples
        REF, ALT, TC, alt_list = get_positional_data(df_group_pos)
        
        # group the records by sample id ("SampleID"), sort = False to improve efficiency
        df_group_by_id = df_group_pos.groupby("SampleID", sort = False)
        
        # get data from each individual sample
        sample_literal = [get_sample_data(
            df_group_by_id = df_group_by_id, 
            SampleID = sample_id, 
            ref = REF, 
            alt_list = alt_list, 
            TC = TC
        ) for sample_id in sample_id_sorted_list]
        # complete the row with sample data
        append(path = output_path, 
               content = generate_line(pos = pos,
                                       REF = REF,
                                       ALT = ALT,
                                       sample_literal = sample_literal))

In [8]:
write_VCF(input_path = "data", 
          output_path = "test.vcf", 
          bam_list = ['washei49194.BQSR.recaled.bam',
                      'washei62376.BQSR.recaled.bam',
                      'washei45948.BQSR.recaled.bam',
                      'washei55083.BQSR.recaled.bam',
                      'washei50433.BQSR.recaled.bam']
         )