In [None]:
import copy
import random
from math import log
from collections import defaultdict
from itertools import islice
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from scipy import stats
from scipy.stats import (
    ttest_ind, t, norm, gaussian_kde, pearsonr, f_oneway
)
from scipy.interpolate import interp1d
from scipy.spatial.distance import pdist, squareform
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.decomposition import PCA
from Bio import Entrez, SeqIO, AlignIO
from Bio.Seq import Seq
from Bio.SeqUtils.ProtParam import ProteinAnalysis
from Bio.Data import CodonTable
from Bio.Phylo.TreeConstruction import DistanceCalculator, DistanceMatrix
import pickle

In [None]:
mit_for_translation = {"TTT": "F", "TTC": "F", "TTA": "L", "TTG": "L",
    "CTT": "L", "CTC": "L", "CTA": "L", "CTG": "L",
    "ATT": "B", "ATC": "I", "ATA": "M", "ATG": "M",   #ATT is B, provisionally
    "GTT": "V", "GTC": "V", "GTA": "V", "GTG": "V",
    "TCT": "S", "TCC": "S", "TCA": "S", "TCG": "S",
    "CCT": "P", "CCC": "P", "CCA": "P", "CCG": "P",
    "ACT": "T", "ACC": "T", "ACA": "T", "ACG": "T",
    "GCT": "A", "GCC": "A", "GCA": "A", "GCG": "A",
    "TAT": "Y", "TAC": "Y", "TAA": "*", "TAG": "*",
    "CAT": "H", "CAC": "H", "CAA": "Q", "CAG": "Q",
    "AAT": "N", "AAC": "N", "AAA": "K", "AAG": "K",
    "GAT": "D", "GAC": "D", "GAA": "E", "GAG": "E",
    "TGT": "C", "TGC": "C", "TGA": "W", "TGG": "W",
    "CGT": "R", "CGC": "R", "CGA": "R", "CGG": "R",
    "AGT": "S", "AGC": "S", "AGA": "R", "AGG": "R",
    "GGT": "G", "GGC": "G", "GGA": "G", "GGG": "G"}

mitochondrial_code_AGRR = {
    'A': ['GCT', 'GCC', 'GCA', 'GCG'],
    'C': ['TGT', 'TGC'],
    'D': ['GAT', 'GAC'],
    'E': ['GAA', 'GAG'],
    'F': ['TTT', 'TTC'],
    'G': ['GGT', 'GGC', 'GGA', 'GGG'],
    'H': ['CAT', 'CAC'],
    'I': ['ATT', 'ATC'],
    'K': ['AAA', 'AAG'],
    'L': ['TTA', 'TTG', 'CTT', 'CTC', 'CTA', 'CTG'],
    'M': ['ATG', 'ATT', 'ATA'],
    'N': ['AAT', 'AAC'],
    'P': ['CCT', 'CCC', 'CCA', 'CCG'],
    'Q': ['CAA', 'CAG'],
    'R': ['CGT', 'CGC', 'CGA', 'CGG'],
    'S': ['TCT', 'TCC', 'TCA', 'TCG', 'AGT', 'AGC'],
    'T': ['ACT', 'ACC', 'ACA', 'ACG'],
    'V': ['GTT', 'GTC', 'GTA', 'GTG'],
    'W': ['TGG', 'TGA'],
    'Y': ['TAT', 'TAC'],
    '*': ['TAA', 'TAG'],
    'X': [ 'GCT', 'GCC', 'GCA', 'GCG','TGT', 'TGC','GAT', 'GAC','GAA', 'GAG','TTT', 'TTC','GGT', 'GGC', 'GGA', 'GGG','CAT', 'CAC','ATT', 'ATC','AAA','AAG','TTA', 'TTG', 'CTT', 'CTC', 'CTA', 'CTG','ATG', 'ATT', 'ATA','AAT', 'AAC','CCT', 'CCC', 'CCA', 'CCG','CAA', 'CAG','CGT', 'CGC', 'CGA', 'CGG','TCT', 'TCC', 'TCA', 'TCG', 'AGT', 'AGC','ACT', 'ACC', 'ACA', 'ACG','GTT', 'GTC', 'GTA', 'GTG','TGG', 'TGA','TAT', 'TAC']
}


In [None]:
def translate(sequence, genetic_code, for_length = False):
    """
    Translates a nucleotide sequence into an amino acid sequence based on the given genetic code.
    """
    amino_acid_seq = ""
    # Loop through the sequence in codons (triplets)
    for i in range(0, len(sequence), 3):
        codon = sequence[i:i+3]  # Extract the codon (triplet)
        
        if len(codon) == 3:  # Ensure full codon length
            amino_acid = genetic_code.get(codon, 'X')  # Translate codon; 'X' for unknown codons
            if amino_acid == 'B' and for_length == False:  # we want to keep ATT as B if going to length_DSS
                if i == 0:
                    amino_acid = 'M'
                else: 
                    amino_acid = 'I'
            amino_acid_seq += amino_acid
    
    return amino_acid_seq

def calculate_CpG(sequences, z = 1, local = False):

    """
    sequences = primate sequences
    z = number of standard deviations around mean
    local means we just want the number of CpG for a single sequence
    
    returns a mean and a standard deviation * z
    """
    CpG = []
    for seq in sequences:
        count = sum(1 for i in range(len(seq) - 1) if seq[i:i+2] == 'CG')
        CpG.append(count / len(seq))
    mu, std = norm.fit(CpG)
    if local == True:
        return CpG
    return mu.round(4), (std*z).round(4)

def calculate_GC_content(sequences, z = 1, local = False):
    """
    sequences = primate sequences
    z = number of standard deviations around mean
    local means we just want the GC content for a single sequence

    returns a mean and a standard deviation * z
    """
    GC = []
    for seq in sequences:
        count = seq.count('G') + seq.count('C')
        GC.append(count / len(seq))
    mu, std = norm.fit(GC)
    if local == True:
        #print('GC in function', GC)
        return GC
    return mu.round(4), (std*z).round(4)

def calculate_codon_density(sequences, codons, frame, z):
    """
    sequences = primate sequences
    codons is the codon/s for which we want a value
    z = number of standard deviations around mean
    frame specifies the frame, 0 is reference (+1 in the article), 2 is mtaltco1 (+3 in the article)

    returns a mean and a standard deviation * z
    """
    densities = []
    for seq in sequences:
        shifted_frame = seq[frame:]  # Shift the frame
        count = sum(1 for i in range(0, len(shifted_frame) - 2, 3)
                    if shifted_frame[i:i+3] in codons)
        densities.append(count / (len(seq)//3))

    mu, std = norm.fit(densities)

    sigma = std*z
    
    return mu, sigma

class phylogenetic_stats:
    """
    gathers all of the listed stats for primates
    """

    def __init__(self, sequences, z=1):
        self.sequences = sequences
        self.z = z
        
        self.stop_codons = ['TAG', 'TAA']
        self.AGR_codons = ['AGA', 'AGG']
        
        self.CpG = calculate_CpG(self.sequences, z)
        self.GC_content = calculate_GC_content(self.sequences, z)
        self.AGR_RF2 = calculate_codon_density(self.sequences, self.AGR_codons, 1, z)
        self.AGR_RF3 = calculate_codon_density(self.sequences, self.AGR_codons, 2, z)
        self.stop_RF2 = calculate_codon_density(self.sequences, self.stop_codons, 1, z)
        self.stop_RF3 = calculate_codon_density(self.sequences, self.stop_codons, 2, z)

def plot_two_distributions(mu1, sigma1, mu2, sigma2, label1='', label2=''):
    """
    this is just a visual confirmation we're in the correct frame. We should expect:
    high AGR in +3, low AGR in +3, high stop in +2, low stop in +3.
    """

    x_min = min(mu1 - 4*sigma1, mu2 - 4*sigma2)
    x_max = max(mu1 + 4*sigma1, mu2 + 4*sigma2)
    x = np.linspace(x_min, x_max, 1000)

    y1 = norm.pdf(x, mu1, sigma1)
    y2 = norm.pdf(x, mu2, sigma2)

    plt.figure(figsize=(8, 5))
    plt.plot(x, y1, label=label1, color='blue')
    plt.plot(x, y2, label=label2, color='red')
    plt.fill_between(x, y1, alpha=0.3, color='blue')
    plt.fill_between(x, y2, alpha=0.3, color='red')
    plt.title('Making sure everything works')
    plt.xlabel('Value')
    plt.ylabel('Density')
    plt.legend()
    plt.grid(True)
    plt.tight_layout()
    plt.show()

def local_codon_density(sequence, codons, frame_shift):

    """
    Outputs a given codon density in the given frame_shift of a nucleotide sequence (starts at frame 0)
    """
    
    shifted_seq = sequence[frame_shift:]
    count = 0
    for i in range(0, len(shifted_seq) - 2, 3):
        codon = shifted_seq[i:i+3]
        if codon in codons:
            count += 1
    return count / (len(sequence)//3)

def generate_DSS(fasta_dict, phylogenetic_stats, fasta_DSS, nb_iter, length = False):

    """
    fasta_dict = every primate nucleotide sequence, in mtCO1's reading frame
    phylogenetic stats = sequence constraints
    fasta_DSS = dictionary that contains true sequences, unto which this function adds generated sequences (amino acid)
    nb_iter = number of generated sequences per item in fasta_dict
    length = True or False, if length = True, we maintain ATT as B, if False, ATT is I
    """
    
    iteration = 0
    label = 'generated'
    for i in range(nb_iter):
        compte = 0
        for key, value in list(fasta_dict.items()): #every primate sequence. value is in nucleotide at this point
        
            prot_value = translate(value, mit_for_translation)
            combinaison, compteur = optimize_sequence(prot_value, mitochondrial_code_AGRR, phylogenetic_stats)
            #DSS_seq = translate(combinaison, mit)
            fasta_DSS[f"{label + str(compte)}_{iteration}"] = translate(combinaison[2:], mit_for_translation, for_length = length)
            compte += 1
        iteration += 1
        print(iteration,'/',nb_iter)
    return fasta_DSS

def optimize_sequence(protein_sequence, genetic_code, phylogenetic_stats, max_iterations=1000000):

    """
    protein_sequence = CO1 sequence (amino acid)
    genetic_code lists every codons that gives a given amino acids
    phylogenetic_stats = primate stats for monitored sequence constraints
    max_iter, if ever it gets stuck, which it should not (expect around 500 iterations per degenerate sequence)
    """

    stop_codons = phylogenetic_stats.stop_codons
    AGR_codons = phylogenetic_stats.AGR_codons

    min_stop_2 = phylogenetic_stats.stop_RF2[0] - phylogenetic_stats.stop_RF2[1]
    max_stop_2 = phylogenetic_stats.stop_RF2[0] + phylogenetic_stats.stop_RF2[1]
    min_AGR_3 = phylogenetic_stats.AGR_RF3[0] - phylogenetic_stats.AGR_RF3[1]
    max_AGR_3 = phylogenetic_stats.AGR_RF3[0] + phylogenetic_stats.AGR_RF3[1]
    min_GC_content = phylogenetic_stats.GC_content[0] - phylogenetic_stats.GC_content[1]
    max_GC_content = phylogenetic_stats.GC_content[0] + phylogenetic_stats.GC_content[1]
    min_CpG = phylogenetic_stats.CpG[0] - phylogenetic_stats.CpG[1]
    max_CpG = phylogenetic_stats.CpG[0] + phylogenetic_stats.CpG[1]

    
    current_sequence = ''.join(random.choice(genetic_code[aa]) for aa in protein_sequence)
    current_codons = [] 
    for i in range(0, len(current_sequence), 3):
        current_codons.append(current_sequence[i:i+3])
        
    #print(current_codons)
    improved = False
    iterations = 0
    
    def is_better(old_seq, trial_seq):

        """
        compares old and trial seq, if the trial seq is better for any metric, and the old-seq was not within range,
        it accepts the trial seq as the new current seq.
        """

        #old
        stop2_old = local_codon_density(old_seq, stop_codons, 1)
        AGR3_old = local_codon_density(old_seq, AGR_codons, 2)
        GC_content_old = calculate_GC_content([old_seq], local = True)
        CpG_old = calculate_CpG([old_seq], local = True)

        #trial
        stop2_trial = local_codon_density(trial_seq, stop_codons, 1)
        AGR3_trial = local_codon_density(trial_seq, AGR_codons, 2)
        GC_content_trial= calculate_GC_content([trial_seq], local = True)
        CpG_trial = calculate_CpG([trial_seq], local = True)

        #STOP2 (canonical stops in the +2 frame)
        if stop2_old < min_stop_2:
            if stop2_trial > stop2_old:
                return trial_seq

        if stop2_old > max_stop_2:
            if stop2_trial < stop2_old:
                return trial_seq

        #AGR3 (AGR codons in the +3 frame)
        if AGR3_old < min_AGR_3:
            if AGR3_trial > AGR3_old:
                return trial_seq
                
        if AGR3_old > max_AGR_3:
            if AGR3_trial < AGR3_old:
                return trial_seq

        #GC_content
        if GC_content_old < min_GC_content:
            if GC_content_trial > GC_content_old:
                return trial_seq

        if GC_content_old > max_GC_content:
            if GC_content_trial < GC_content_old:
                return trial_seq
        #CpG
        if CpG_old < min_CpG:
            if CpG_trial > CpG_old:
                return trial_seq

        if CpG_old > max_CpG:
            if CpG_trial < CpG_old:
                return trial_seq

        #print('not better')
        return old_seq
                
    def is_enough(seq):

        """
        if the current seq meets every criteria, it is good enough and returns True
        """
        
        stop2 = local_codon_density(seq, stop_codons, 1)
        AGR3 = local_codon_density(seq, AGR_codons, 2)
        GC_content = calculate_GC_content([seq], local = True)
        CpG = calculate_CpG([seq], local = True)
        
        return (min_stop_2 <= stop2 <= max_stop_2 and
                min_AGR_3 <= AGR3 <= max_AGR_3 and
                min_GC_content <= GC_content <= max_GC_content and
                min_CpG <= CpG <= max_CpG)

    while iterations < max_iterations:
        if is_enough(current_sequence):
            stop2 = local_codon_density(current_sequence, stop_codons, 1)
            AGR3 = local_codon_density(current_sequence, AGR_codons, 2)
            GC_content = calculate_GC_content([current_sequence], local = True)
            CpG = calculate_CpG([current_sequence], local = True)
            return ''.join(current_codons), iterations
        
        # Mutates one codon at a time
        idx = random.randint(0, len(protein_sequence) - 1)
        aa = protein_sequence[idx]
        alternatives = [c for c in genetic_code[aa] if c != current_codons[idx]] #is it already in use?
        
        # if no alternative, continue
        if not alternatives:
            iterations += 1
            continue

        # else try an alternative codon
        trial_codon = random.choice(alternatives)
        new_codons = copy.deepcopy(current_codons)
        new_codons[idx] = trial_codon
        trial_seq = ''.join(new_codons)
        
        current_sequence = is_better(trial_seq, current_sequence)
        current_codons = []
        
        for i in range(0, len(current_sequence), 3):
            current_codons.append(current_sequence[i:i+3])
        iterations += 1
        if iterations % 1000 == 0:
            print(f"Iteration: {iterations}")

    return ''.join(current_codons), iterations

# Import 27 primate sequences

In [None]:
fasta_dict = {} # nucleotide-level

for record in SeqIO.parse("../data/27_primates_mtaltCO1_nuc.fasta", "fasta"):
    fasta_dict[record.description] = str(record.seq[1:-1]) #to be in the CO1 frame

# Compile primate sequence constraints

In [None]:
fasta_DSS = {} # protein level, +3 RF

listes = []
label = 'true'
compteur = 0
for key, value in list(fasta_dict.items()):
    fasta_DSS[f"{label + str(compteur)}"] = translate(value[2:], mit_for_translation) #skip first two 
    listes.append(value)
    compteur += 1

primate_DSS = fasta_DSS
length_DSS = {}

stats_primates = phylogenetic_stats(listes, z = 1)

plot_two_distributions(stats_primates.AGR_RF2[0],stats_primates.AGR_RF2[1],stats_primates.AGR_RF3[0], stats_primates.AGR_RF3[1], label1 = 'AGR_+2', label2 = 'AGR_+3')
plot_two_distributions(stats_primates.stop_RF2[0],stats_primates.stop_RF2[1],stats_primates.stop_RF3[0], stats_primates.stop_RF3[1], label1 = 'stop_+2', label2 = 'stop_+3')

# Generate degenerate sequences for primate PCA

In [None]:
output_DSS = generate_DSS(fasta_dict, stats_primates, primate_DSS,20)

In [None]:
file_path = "../data/primate_DSS.fasta"

with open(file_path, "w") as f:
    for key, value in output_DSS.items():
        print(key,len(value))
        if 'X' not in value: #for benner74
            header = f"> {key}\n"
            sequence = value.replace("*", "*") + "\n"
            f.write(header + sequence)

# Generate homo sapiens degenerate sequences for length comparison

In [None]:
 # just make degenerate sequences from Homo sapiens' CO1 sequence (first in fasta_dict)
first = dict([next(iter(fasta_dict.items()))])

length_DSS = {}
length_DSS = generate_DSS(first, stats_primates, length_DSS,1000, length = True) 


In [None]:
file_path = "../data/length_DSS.fasta"

with open(file_path, "w") as f:
    for key, value in length_DSS.items():
        #print(key,len(value))
        if 'X' not in value: #for benner74
            header = f"> {key}\n"
            sequence = value.replace("*", "*") + "\n"
            f.write(header + sequence)

# Generate homo sapiens degenerate sequence for disorder predictions

In [None]:
file_path = "../data/disorder_DSS.fasta" 

# Ridao only handles 100 unknown samples at a time
max_sample = 100
sample  = 0

with open(file_path, "w") as f:
    for key, value in length_DSS.items():
        if sample < max_sample:
            #print(key,len(value))
            if 'X' not in value: #for benner74
                header = f"> {key}\n"
                sequence = value.replace("*", "") + "\n" # Ridao does not handle *
                sequence = value.replace("B", "I") + "\n" 
                f.write(header + sequence)
        sample += 1
        