# Consensus and deviations builder script

In [52]:
import numpy as np
import pandas as pd
from Bio import Seq
import json

## Script

In [69]:
IUAPC_table = ["A","T","C","G","U","R","Y","K","M","S","W","B","D","H","V","N","-"]

def dataframe_builder(seq_list):
    """
    Given a list of sequences, output a pandas DataFrame with a column for every site
    
    Parameters:
        seq_list : list of sequences (string). Be carefull, every sequences must have the same length
    
    Returns:
        output : pandas Dataframe with each sequence and a column per site
    """
    formated_seq_list = list(map(lambda s : list(s), seq_list))
    return pd.DataFrame(formated_seq_list)

def consensus_builder(seq_df):
    """
    Given a pandas DataFrame containing the sequences, it will create the consensus sequence and the Minor Allele Frequency table
    
    Parameters:
        seq_df : pandas Dataframe with each sequence and a column per site
        
    Returns:
        consensus : consensus sequence(string)
        maf_table : minor allele frequency table(dict) (an entry per site)
    """
    consensus = ""
    maf_table = dict()
    seq_nb = seq_df.shape[0]
    for c in seq_df.columns:
        nucleotide_count = seq_df[c].value_counts()
        consensus += nucleotide_count.index[0] ## !!! it may be biased because in case of equality we always take the first index -> randomize if equality?
        nuc_dict = dict()
        for nuc in IUAPC_table: ### !!! need to see how to handle new letters (R,N,...)
            if nuc in nucleotide_count:
                nuc_dict[nuc] = nucleotide_count.loc[nuc]/seq_nb
            else:
                nuc_dict[nuc] = 0.0
        maf_table[c] = nuc_dict
    return consensus, maf_table

def observed_deviations_np_maf(seq_df,consensus,maf_table,coding,genetic_code,neighbours_nb=2):
    """
    It will list all the observed deviations from consensus. The deviations are in the format : {Seq_ID,Pos,From,To,MAF},
    where "Seq_ID" is the id of the sequence, "Pos" in the position in the sequence, "From" is the original value in the consensus, "To" is 
    the value in the sequence, "MAF" is the minor allele frequency.
    
    Parameters:
        seq_df : pandas DataFrame with every sequences in a single column
        consensus : consensus sequence (string)
        maf_table : minor allele frequency table (dict)
        neighbours_nb: number(int) of desired neighbours in the output, to the left and to the right (default : 2)
    
    Returns:
        deviations : list of dictionnaries containing every deviations for the given sequence
    """
    deviations = []
    sequences = np.asarray([list("#"*neighbours_nb+q.casefold()+"#"*neighbours_nb) for q in seq_df["Sequence"]]) #pad sequences to take care of extremities
    formated_consensus = "#"*neighbours_nb+consensus.casefold()+"#"*neighbours_nb
    deviations_indices = np.where(sequences != list(formated_consensus))
    start_indices = deviations_indices[1]-neighbours_nb # subsequence starting point
    end_indices = deviations_indices[1]+neighbours_nb# subsequence ending point
    columns_indices = np.arange(sequences.shape[1])
    selection_mask = (start_indices[:,None] <= columns_indices) & (end_indices[:,None] >= columns_indices) #mask used for slicing the sequences
    deviations_array = sequences[deviations_indices[0]][selection_mask].reshape(deviations_indices[0].shape[0],2*neighbours_nb+1) #take valid sequences and reshape to have constant output
    deviations_subseq = ["".join(s) for s in deviations_array]   
    deviations_subseq = list(map(lambda s : s[:neighbours_nb]+s[neighbours_nb:].capitalize(),deviations_subseq)) #format in ..xxXxx..
    for i in range(len(deviations_subseq)):
        seq_id = deviations_indices[0][i]
        pos_in_seq = int(deviations_indices[1][i]-neighbours_nb) # the "-" is because of padding
        origin_seq = formated_consensus[pos_in_seq:pos_in_seq+2*neighbours_nb+1]
        origin_seq = origin_seq[:neighbours_nb]+origin_seq[neighbours_nb:].capitalize() #format in ..xxXxx..
        dev_seq = deviations_subseq[i]
        if coding:
            codon_start = int(pos_in_seq/3)
            original_codon = consensus[codon_start*3:(codon_start+1)*3].upper()
            original_codon = original_codon.replace("-","N")
            original_codon = original_codon.replace("#","N")
            new_codon = "".join(sequences[seq_id][neighbours_nb+codon_start*3:neighbours_nb+(codon_start+1)*3]).upper()
            new_codon = new_codon.replace("-","N")
            new_codon = new_codon.replace("#","N")
            aa_from = Seq.translate(original_codon,table=genetic_code)
            aa_to = Seq.translate(new_codon,table=genetic_code)
            synonymous = aa_from == aa_to
            deviations.append({"Seq_ID" : int(seq_id), "Codon_Pos" : codon_start, "Nucleotide_Pos" : pos_in_seq, "From" : origin_seq, "To" : dev_seq, "MAF" : maf_table[pos_in_seq][dev_seq[neighbours_nb]], "Amino_Acid_From" : aa_from, "Amino_Acid_To" : aa_to, "o_c" : original_codon, "n_c" : new_codon, "Synonymous" : synonymous})
        else:
            deviations.append({"Seq_ID" : int(seq_id), "Nucleotide_Pos" : pos_in_seq, "From" : origin_seq, "To" : dev_seq, "MAF" : maf_table[pos_in_seq][dev_seq[neighbours_nb]]})
    return deviations
    

def observed_deviations_df_maf(seq_df,consensus,maf_table,neighbours_nb=2):
    """
    It will list all the observed deviations from consensus. The deviations are in the format : {Seq_ID,Pos,From,To,MAF},
    where "Seq_ID" is the id of the sequence, "Pos" in the position in the sequence, "From" is the original value in the consensus, "To" is 
    the value in the sequence, "MAF" is the minor allele frequency.
    
    Parameters:
        seq_df : pandas DataFrame with every sequences and a column per site
        consensus : consensus sequence (string)
        maf_table : minor allele frequency table (dict)
        neighbours_nb: number(int) of desired neighbours in the output, to the left and to the right (default : 2)
    
    Returns:
        deviations : list of dictionnaries containing every deviations for the given sequence
    """
    deviations = []
    for col in seq_df.columns:
        col_deviations = seq_df[seq_df[col] != consensus[col]]
        left_index = col-neighbours_nb if col-neighbours_nb >= 0 else 0
        right_index = col+neighbours_nb+1 if col+neighbours_nb < seq_df.shape[1] else seq_df.shape[1]
        deviations_candidates = col_deviations[list(range(left_index,right_index))] # we select only the sites that are close to the deviations site or the deviation site
        if not deviations_candidates.empty:
            deviations_list = deviations_candidates.to_numpy()
            deviations_list = list(map(lambda c : "".join(c).casefold(),deviations_list))
            deviations_list = list(map(lambda c : c[:col-right_index] + c[col-right_index:].capitalize(),deviations_list)) # we format the deviation in ..xxXxx.. where .. denote other possible neighbours
            consensus_subset = consensus.casefold()[left_index:col] + consensus.casefold()[col:right_index].capitalize()
            for dev in zip(deviations_candidates.index,deviations_list):
                deviations.append({"Seq_ID" : dev[0], "Pos" : col, "From" : consensus_subset, "To" : dev[1], "MAF" : maf_table[col][dev[1][col-left_index]]})
    return deviations


def create_expected_deviations(consensus,genetic_code):
    """
    Create the exèected deviations list based on the consensus sequence. The deviations are in the format : {Pos,From,To,MAF},
    where "Pos" in the position in the consensus sequence, "From" is the original value in the consensus, "To" is 
    the value in the deviations, "Synonymous" indicates if the mutation is synonymous or not.
    
    Parameters:
        consensus : consensus sequence (string)
        genetic_code : NCBI genetic code id (int) (can be found in https://www.ncbi.nlm.nih.gov/Taxonomy/Utils/wprintgc.cgi)
    
    Returns:
        deviations : list of dictionnaries containing every deviations from the consensus sequence
    """
    deviations = []
    for i in range(0,len(consensus),3):
        for j in range(3):
            nucleotide_list = ["A","T","C","G"]
            nucleotide_list.remove(consensus[i+j])
            for nuc in nucleotide_list:
                original_codon = consensus[i:i+3]
                new_codon = consensus[i:i+j]+nuc+consensus[i+j+1:i+3]
                synonymous = True
                origin_translation = Seq.translate(original_codon,genetic_code)
                new_translation = Seq.translate(new_codon,genetic_code)
                if origin_translation != new_translation:
                    synonymous = False
                origin = consensus.casefold()
                left_index = i+j-2 if i+j-2 >= 0 else 0
                origin = origin[left_index:i+j]+origin[i+j:i+j+3].capitalize()
                deviated = consensus.casefold()
                deviated = deviated[left_index:i+j]+nuc+deviated[i+j+1:i+j+3]
                deviation_dict = {"Codon_Pos" : i, "Nucleotide_Pos" : i+j, "From" : origin, "To" : deviated, "Amino_Acid_From" : origin_translation, "Amino_Acid_To" : new_translation, "Synonymous" : synonymous}
                deviations.append(deviation_dict)
    return deviations

In [70]:
def sequence_loader(seq_file_name, log_file_name):
    sequences_df = pd.read_csv(seq_file_name)
    log_df = pd.read_csv(log_file_name)
    deviations = {}
    for query_id in log_df.index: # create deviations list per original queries
        query_seqs = sequences_df[sequences_df["Control_ID"] == query_id] 
        seqs_list = [q for q in query_seqs["Sequence"]]
        seqs_df = dataframe_builder(seqs_list)
        consensus, maf_table = consensus_builder(seqs_df)
        coding = log_df.loc[query_id]["CDS"] == 1
        gen_code = log_df.loc[query_id]["Code"] 
        deviations_obs = observed_deviations_np_maf(query_seqs,consensus,maf_table,coding,gen_code)
        deviations[int(query_id)] = {"Consensus" : consensus,"Observed_Deviations" : deviations_obs}
        if coding:
            expected_deviations = create_expected_deviations(consensus,gen_code)
            deviations[int(query_id)]["Expected_Deviations"] = expected_deviations      
    return deviations

---

## Test Section

In [71]:
mut_dict = sequence_loader("../blast_query/sequences.csv","../blast_query/seq_log.csv")

In [73]:
with open("deviations.json", "w") as dev_out_file:
    json.dump(mut_dict, dev_out_file)

In [74]:
with open("deviations.json", "r") as dev_in_file:
    data = json.load(dev_in_file)

---

### "Numpy style" observed deviations generator

The algorithm takes as input a list containt every sequences (in string format) and works like this:

1) First lower case the sequences and the consensus sequence and make a padding to the left and right with the required number of neighbours (because if we have mutations at position 0, then the two neighbours on the left are nothing, we use the convention "#". 

2) Then use numpy's function np.where to locate indices where the sequences differ from consensus  

3) Use the columns indices which differs and make to arrays which will be used for slicing (thus we have position n to the left and n to the right, where n is the number of neighbours (here 2))  

4) Use a mask to slice those columns (and in fact extract the sequences)  

5) Reshape to make an array whose rows are the deviations string

In [221]:
# 1)
test_seq_list = ["ATCGATCG","ATCCATGG","ATCGAAAA"]
consensus_test = list("--"+"ATCGATCG".casefold()+"--") # we use the first sequence as the consensus (valid only for this example)
arr = np.asarray([list("--"+s.casefold()+"--") for s in test_seq_list])
# 2)
not_eq = np.where(arr != consensus_test)
# 3)
start = not_eq[1]-2 # assume nb of neighbours = 2
end = not_eq[1]+2
col_test = np.arange(arr.shape[1])
# 4)
mask = (start[:,None] <= col_test) & (end[:,None] >= col_test)
#5)
out = arr[not_eq[0]][mask].reshape(5,5)
["".join(s) for s in out]

['tccat', 'atgg-', 'gaaaa', 'aaaa-', 'aaa--']