# Setting Up

In [68]:
from collections import Counter
import itertools
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import random
from scipy.linalg import expm


In [69]:
from cogent3 import make_unaligned_seqs
from cogent3 import make_seq
from cogent3.core.sequence import Sequence
from cogent3.util.dict_array import DictArrayTemplate

# Notes

* Defaut DNA bases coding: T,C,A,G -> 0,1,2,3

# Functions

### generate_ancestor(n, pi=None)

In [70]:
def generate_ancestor(n, pi=None):
    """
    Generate an ancestor DNA sequence of length n with customizable probability distribution.

    Input: 
        n (integer): The desired length of the ancestor DNA sequence.
        pi (list, optional): List of probabilities for each nucleotide. If None, a random distribution is used.

    Output: 
        list: The randomly generated DNA sequence of length n in list format.
    """
    nucleotides = ['0', '1', '2', '3']  # T, C, A, G

    if pi is None:
        # If no probability distribution is provided, use a random distribution, equal probabilities of 0.25
        return list(random.choices(nucleotides, k=n))
    else:
        # If a custom probability distribution is provided, use it
        if len(pi) != 4:
            raise ValueError("Probability distribution must contain exactly 4 values.")
        
        # Check if the probabilities sum to 1
        if abs(sum(pi) - 1) > 1e-10:
            raise ValueError("Probabilities must sum to 1.")
        
        return list(random.choices(nucleotides, weights=pi, k=n))


In [71]:
def generate_ancestor_cogent3(n: int, pi=None) -> Sequence:
    """
    Generate an ancestor DNA sequence of length n with customizable probability distribution.

    Input: 
        n (int): The desired length of the ancestor DNA sequence.
        pi (list, optional): Array of probabilities for each nucleotide. 
            If None, a random distribution is used.

    Output: 
        cogent3.core.alignment.Alignment: The randomly generated DNA sequence of length n.
    """
    nucleotides = ['0', '1', '2', '3']  # T, C, A, G
    if pi is None:
        # If no probability distribution is provided, use a random distribution, equal probabilities of 0.25
        seq_number = list(random.choices(nucleotides, weights=pi, k=n))
        number_to_base = {'0': 'T', '1': 'C', '2': 'A', '3': 'G'}
        ances_seq_join_alpha = ''.join(number_to_base[number] for number in seq_number)

        return make_seq(ances_seq_join_alpha, 'dna')
    else:
        # If a custom probability distribution is provided, use it
        if len(pi) != 4:
            raise ValueError("Probability distribution must contain exactly 4 values.")
        
        # Check if the probabilities sum to 1
        if abs(sum(pi) - 1) > 1e-10:
            raise ValueError("Probabilities must sum to 1.")
        
        seq_number = list(random.choices(nucleotides, weights=pi, k=n))
        number_to_base = {'0': 'T', '1': 'C', '2': 'A', '3': 'G'}
        ances_seq_join_alpha = ''.join(number_to_base[number] for number in seq_number)

        return make_seq(seq = ances_seq_join_alpha, moltype = 'dna')

### generate_rate_matrix()

In [72]:
def generate_rate_matrix():
    """
    Generate a single 4 by 4 rate matrix.

    Input: 
        None

    Output: 
        rate_matrix (array): A single rate matrix.

    Example output: 
    [[-0.8, 0.1, 0.4, 0.3],
    [0.3, -0.7, 0.2, 0.2],
    [0.1, 0.2, -0.6, 0.3],
    [0.1, 0.2, 0.4, -0.7]]
    """
    matrix = np.zeros((4, 4))
    for i in range(4):
        row_sum = 0 # sum of non-diagonal elements of current row
        for j in range(4):
            if i != j: # fill up non-diagonal elements of current row
                element = np.random.uniform(0.01, 1.0)  # Non-diagonal elements are between 0.01 and 1.0
                row_sum += element
                matrix[i, j] = element
        matrix[i,i] = -row_sum # Ensure every row adds up to 0 

    rate_matrix = matrix.tolist()
    return rate_matrix


In [73]:
def generate_rate_matrix_cogent3() -> DictArrayTemplate: 
    """
    Generate a single 4 by 4 rate matrix.

    Output: 
        DictArray: A single rate matrix.
    """
    matrix = np.zeros((4, 4))
    for i in range(4):
        row_sum = 0  # sum of non-diagonal elements of current row
        for j in range(4):
            if i != j:  # fill up non-diagonal elements of current row
                element = np.random.uniform(0.01, 1.0)  # Non-diagonal elements are between 0.01 and 1.0
                row_sum += element
                matrix[i, j] = element
        matrix[i, i] = -row_sum  # Ensure every row adds up to 0 

    template = DictArrayTemplate(['T', 'C', 'G', 'A'], ['T', 'C', 'G', 'A'])
    return template.wrap(matrix)


### generate_rate_matrices(markov_order)

In [74]:
def generate_rate_matrices(markov_order:int):
    """
    Input:
        markov_order (int): The Markov order.

    Output: 
        rate_matrices (dict): A dictionary of rate matrices for context-dependent DNA substitution. 
            The key is a tuple of left and right neighbours as strings from permutations with [0, 1, 2, 3] (=[T, C, A, G]). 
            The value is the corresponding rate matrix.

    Example output: 
    generate_rate_matrices(1):
        {('0','0') :[[-0.8, 0.1, 0.4, 0.3],
                    [0.3, -0.7, 0.2, 0.2],
                    [0.1, 0.2, -0.6, 0.3],
                    [0.1, 0.2, 0.4, -0.7]],
        ('0','1') : [[...]...], ...}
    """
    rate_matrices = {}

    if markov_order == 0:
        rate_matrix = generate_rate_matrix()
        rate_matrices['0'] = rate_matrix # For standardizing so that output is always a dictionary of matrices.
        return rate_matrices
    
    else:
        nucleotides = ['0', '1', '2', '3']
        one_side_neighbours = [''.join(p) for p in itertools.product(nucleotides, repeat=markov_order)]
        permutations = list(itertools.product(one_side_neighbours, repeat=2))

        for perm in permutations:
            rate_matrix = generate_rate_matrix()
            rate_matrices[perm] = rate_matrix

        return rate_matrices

# # Example usage
# rate_matrices_dict = generate_rate_matrices(2)
# for key, value in rate_matrices_dict.items():
#     print(f"{key}: {value}")
# len(rate_matrices_dict)

In [75]:
import itertools
from cogent3.util.dict_array import DictArrayTemplate

def generate_rate_matrices_cogent3(markov_order: int) -> dict:
    """
    Generate a dictionary of rate matrices for context-dependent DNA substitution.

    Input:
        markov_order (int): The Markov order.

    Output: 
        dict: A dictionary of rate matrices in DictArray. 
            The key is a tuple of left and right neighbors as strings from permutations with ['T', 'C', 'G', 'A']. 
            The value is the corresponding rate matrix.
    """
    rate_matrices = {}

    nucleotides = ['T', 'C', 'G', 'A']
    one_side_neighbours = [''.join(p) for p in itertools.product(nucleotides, repeat=markov_order)]
    permutations = list(itertools.product(one_side_neighbours, repeat=2))

    template = DictArrayTemplate(nucleotides, nucleotides)

    for perm in permutations:
        rate_matrix = np.zeros((4, 4))
        for i in range(4):
            row_sum = 0
            for j in range(4):
                if i != j:
                    element = np.random.uniform(0.01, 1.0)
                    row_sum += element
                    rate_matrix[i, j] = element
            rate_matrix[i, i] = -row_sum
        rate_matrices[perm] = template.wrap(rate_matrix)

    return rate_matrices


### transition_matrix(Q, t)

In [76]:
def transition_matrix(Q, t):
    """
    Calculate the transition matrix P from rate matrix Q for a given time t.

    Input:
        Q: Rate matrix (2D array)
        t: Time passed (float)

    Output:
        P: Transition matrix (2D array)
    """
    Qt = np.dot(Q, t)
    P = expm(Qt)
    return P

# # Example usage:
# rate_matrices_dict = generate_rate_matrices(0)
# rate_mat_Q = rate_matrices_dict['0']
# trans_mat_P = transition_matrix(rate_mat_Q, max_time)
# print(trans_mat_P)

In [77]:
def transition_matrix_cogent3(Q: DictArrayTemplate, t: int):
    """
    Calculate the transition matrix P from rate matrix Q for a given time t.

    Input:
        Q: Rate matrix (2D array)
        t: Time passed (float)

    Output:
        P: Transition matrix (2D array)
    """
    Qt = Q.array*t
    P = expm(Qt)
    template = DictArrayTemplate(['T', 'C', 'G', 'A'], ['T', 'C', 'G', 'A'])

    return template.wrap(P)

# # Example usage:
# rate_matrices_dict = generate_rate_matrices(0)
# rate_mat_Q = rate_matrices_dict['0']
# trans_mat_P = transition_matrix(rate_mat_Q, max_time)
# print(trans_mat_P)

### get_context(seq_index, DNA_seq, markov_order, anchor_base = '0')

In [78]:
def get_context(seq_index, DNA_seq, markov_order, anchor_base = '0'):
    """
    Given the index of a base in the given DNA sequence list and order, obtain the context i.e. left and right neighbours in a tuple.

    Input: 
        seq_index (int): The index of a base in the given DNA sequence list.
        DNA_seq (list): The DNA sequence as a list.
        markov_order (int): The Markov order.
        anchor_base (string): The anchoring base beyond the DNA sequence to address incomplete nucleotide contexts at the left and right ends of the sequence.

    Output:
        context (tuple): The left and right neighbours of the base at the given seq_index.

    Example output: 
    get_context(1, ['3', '2', '3', '1', '2'], 2) = ('03', '31')
    """

    DNA_seq = ''.join(DNA_seq) # convert list DNA_seq to string 
    
    if markov_order == 0:
        return '0' # needed to access single independent matrix in rate_matrices_dict
    
    elif markov_order > 0:

        if ((seq_index < markov_order) & (len(DNA_seq) - (seq_index+1) < markov_order)): # handles incomplete left and right neighbours
            left_base = ''.join([(anchor_base * (markov_order-seq_index)), DNA_seq[:seq_index]])
            right_base =''.join([DNA_seq[seq_index+1:], (anchor_base * (markov_order - (len(DNA_seq)-seq_index-1)))])
            context = (left_base, right_base)

        elif seq_index < markov_order: # handles incomplete left neighbour
            left_base = ''.join([(anchor_base * (markov_order-seq_index)), DNA_seq[:seq_index]])
            right_base = DNA_seq[seq_index+1:seq_index+1+markov_order]
            context = (left_base, right_base)

        elif (len(DNA_seq) - (seq_index+1) < markov_order): # handles incomplete right neighbour
            left_base = DNA_seq[seq_index-markov_order:seq_index]
            right_base = ''.join([DNA_seq[seq_index+1:], (anchor_base * (markov_order - (len(DNA_seq)-seq_index-1)))])
            context = (left_base, right_base)

        else:
            left_base = DNA_seq[seq_index-markov_order:seq_index]
            right_base = DNA_seq[seq_index+1:seq_index+1+markov_order]
            context = (left_base, right_base)
        return context

# # Example usage:
# ancestor_sequence = generate_ancestor(5)
# print(ancestor_sequence)
# print(get_context(1,ancestor_sequence,0))
# print(get_context(1,ancestor_sequence,2))
# print(get_context(1,ancestor_sequence,4))

In [79]:
import itertools

def get_context_cogent3(seq_index: int, DNA_seq: Sequence, markov_order: int, anchor_base: str = 'T') -> tuple:
    """
    Given the index of a base in the given DNA sequence list and order, obtain the context i.e. left and right neighbors in a tuple.

    Input: 
        seq_index (int): The index of a base in the given DNA sequence list.
        DNA_seq (Sequence): The DNA sequence as a Sequence.
        markov_order (int): The Markov order.
        anchor_base (str): The anchoring base beyond the DNA sequence to address incomplete nucleotide contexts at the left and right ends of the sequence.

    Output:
        tuple: The left and right neighbors of the base at the given seq_index.
    """
    DNA_seq = ''.join(DNA_seq)  # Convert list DNA_seq to string 
    
    if markov_order == 0:
        return 'T'  # Needed to access single independent matrix in rate_matrices_dict
    
    elif markov_order > 0:

        if ((seq_index < markov_order) & (len(DNA_seq) - (seq_index + 1) < markov_order)):  # Handles incomplete left and right neighbors
            left_base = anchor_base * (markov_order - seq_index) + DNA_seq[:seq_index]
            right_base = DNA_seq[seq_index + 1:] + anchor_base * (markov_order - (len(DNA_seq) - seq_index - 1))
            context = (left_base, right_base)

        elif seq_index < markov_order:  # Handles incomplete left neighbor
            left_base = anchor_base * (markov_order - seq_index) + DNA_seq[:seq_index]
            right_base = DNA_seq[seq_index + 1:seq_index + 1 + markov_order]
            context = (left_base, right_base)

        elif (len(DNA_seq) - (seq_index + 1) < markov_order):  # Handles incomplete right neighbor
            left_base = DNA_seq[seq_index - markov_order:seq_index]
            right_base = DNA_seq[seq_index + 1:] + anchor_base * (markov_order - (len(DNA_seq) - seq_index - 1))
            context = (left_base, right_base)

        else:
            left_base = DNA_seq[seq_index - markov_order:seq_index]
            right_base = DNA_seq[seq_index + 1:seq_index + 1 + markov_order]
            context = (left_base, right_base)
        return context


### initialize_waiting_times(DNA_seq, Q_dict, markov_order)

In [80]:
def initialize_waiting_times(DNA_seq, Q_dict, markov_order):
    """
    Input:
        DNA_seq (list): The DNA sequence as a list.
        Q_dict (dict): A dictionary of rate matrices for context-dependent DNA substitution. 
            The key is a tuple of left and right neighbours as strings from permutations with [0, 1, 2, 3] (=[T, C, A, G]). 
            The value is the corresponding rate matrix.
        markov_order (int): The Markov order.

    Output: 
        waiting_times (array): A 2D numpy array with dimensions (length of DNA sequence) x 4. Every row contains the 4 
            waiting times of 4 possible bases. The current base waiting time is set to inf.
        min_position (tuple): A tuple containing the index of the base in the DNA sequence with the smallest
            substitution time and the next base that it will be substituted with. 
        min_time (float): The minimum time needed for next substitution to take place in the DNA sequence.
    """
    waiting_times = np.full((len(DNA_seq),4), float('inf'))
    min_time = float('inf')
    min_position = None

    for seq_index in range(len(DNA_seq)):
        curr_base = int(DNA_seq[seq_index])
        curr_context = get_context(seq_index, DNA_seq, markov_order)

        for next_base in range(4):
            if next_base != curr_base:
                rate = 1/(Q_dict[curr_context][curr_base, next_base])
                time = np.random.exponential(rate)
                waiting_times[seq_index, next_base] = time
                if time < min_time:
                    min_time = time
                    min_position = (seq_index, next_base)

    return waiting_times, min_position, min_time

# # Example usage:
# ancestor_seq = generate_ancestor(5)
# print(f"Ancestor sequence is {ancestor_seq}")
# Q_dict = generate_rate_matrices(1)
# waiting_times, min_position, min_time = initialize_waiting_times(ancestor_seq, Q_dict, 1)
# print(waiting_times)

In [81]:
import numpy as np

def initialize_waiting_times_iid_cogent3(DNA_seq: Sequence, Q: DictArrayTemplate) -> tuple:
    """
    Initialize waiting times for DNA sequence substitution.

    Input:
        DNA_seq (Sequence): The DNA sequence as a cogent3.Sequence.
        Q_dict (DictArray): A DictArray of rate matrices for context-dependent DNA substitution.

    Output:
        tuple: A tuple containing the waiting times, the position of the base with the smallest substitution time,
               and the minimum time needed for the next substitution.
    """
    waiting_times = np.full((len(DNA_seq), 4), np.inf)
    min_time = np.inf
    min_position = None
    nucleotides = ['T', 'C', 'G', 'A']

    for seq_index in range(len(DNA_seq)):
        curr_base = str(DNA_seq[seq_index])
        for next_base in nucleotides:
            if next_base != curr_base:
                next_base_index = nucleotides.index(next_base)
                rate = 1 / Q[curr_base][next_base]
                time = np.random.exponential(rate)
                waiting_times[seq_index, next_base_index] = time
                if time < min_time:
                    min_time = time
                    min_position = (seq_index, next_base)

    return waiting_times, min_position, min_time


### get_neighbour_indices(DNA_seq, seq_index, markov_order)

In [82]:
def get_neighbour_indices(DNA_seq, seq_index, markov_order):

    """
    Input:
        DNA_seq (list): The DNA sequence as a list.
        seq_index (int): The index of a base in the given DNA sequence list.
        markov_order (int): The Markov order.

    Output:
        neighbours (array): An array of indices of the left and right neighbours of the given base at the given
            index in the DNA sequence, including the given index.
    """
    neighbours = []
    sequence_length = len(DNA_seq)

    if ((seq_index < 0) | (seq_index >= sequence_length)):
        print("Error: Index out of bounds")
    
    else:

        for index in range(max(0, seq_index - markov_order), min(sequence_length, seq_index + markov_order + 1)):
            neighbours.append(index)

        return neighbours

# # Example usage:
# sequence = '123012301'
# index = 6
# order = 2
# result = get_neighbour_indices(sequence, index, order)
# print(result)


### update_waiting_times(DNA_seq, waiting_times, Q_dict, min_position, min_time, markov_order)

In [83]:
def update_waiting_times(DNA_seq, Q_dict, waiting_times, min_position, min_time, markov_order):
    """
    Input:
        DNA_seq (list): The DNA sequence as a list.
        Q_dict (dict): A dictionary of rate matrices for context-dependent DNA substitution. 
            The key is a tuple of left and right neighbours as strings from permutations with [0, 1, 2, 3] (=[T, C, A, G]). 
            The value is the corresponding rate matrix.
        waiting_times (array): A 2D numpy array with dimensions (length of DNA sequence) x 4. Every row contains the 4 
            waiting times of 4 possible bases. The current base waiting time is set to inf.
        min_position (tuple): A tuple containing the index of the base in the DNA sequence with the smallest
            substitution time and the next base that it will be substituted with. 
        min_time (float): The minimum time needed for next substitution to take place in the DNA sequence.
        markov_order (int): The Markov order.

    Output:
        waiting_times (array): A 2D numpy array with dimensions (length of DNA sequence) x 4. Every row contains the 4 
            waiting times of 4 possible bases. The current base waiting time is set to inf.
        min_position (tuple): A tuple containing the index of the base in the DNA sequence with the smallest
            substitution time and the next base that it will be substituted with. 
        min_time (float): The minimum time needed for next substitution to take place in the DNA sequence.
    """

    seq_index = min_position[0] # obtain the index of the current base that is about to be substituted in the sequence

    new_base = min_position[1] # obtain the new base that is going to substitute the current base
    
    DNA_seq[seq_index] = f"{new_base}" # Substitue with new base in DNA sequence 

    # Regenerate all waiting times
    waiting_times, min_position, min_time = initialize_waiting_times(DNA_seq, Q_dict, markov_order)

    return waiting_times, min_position, min_time

# # Example usage:
# ancestor_seq = generate_ancestor(3)
# print(f"Ancestor sequence is {ancestor_seq}")
# Q_dict = generate_rate_matrices(1)
# waiting_times, min_position, min_time = initialize_waiting_times(ancestor_seq, Q_dict, 1)
# update_waiting_times(ancestor_seq, Q_dict, waiting_times, min_position, min_time, 1)

In [84]:
def update_waiting_times_iid_cogent3(DNA_seq: Sequence, Q: DictArrayTemplate, waiting_times: np.ndarray, min_position: tuple, min_time: float) -> tuple:
    """
    Update waiting times for DNA sequence substitution.

    Input:
        DNA_seq (cogent3.core.sequence.Sequence): The DNA sequence.
        Q (DictArray): A DictArrayTemplate representing the rate matrix for DNA substitution.
        waiting_times (np.ndarray): An array containing waiting times for DNA sequence substitution.
        min_position (tuple): A tuple containing the position of the base with the smallest substitution time.
        min_time (float): The minimum time needed for the next substitution.

    Output:
        tuple: A tuple containing the updated waiting times, the position of the base with the new smallest substitution time,
               and the new minimum time needed for the next substitution.
    """

    seq_index = min_position[0] # obtain the index of the current base that is about to be substituted in the sequence
    new_base = min_position[1] # obtain the new base that is going to substitute the current base
    
    seq_list = list(DNA_seq)
    seq_list[seq_index] = new_base
    DNA_seq = make_seq(str(''.join(seq_list)))
    
    waiting_times, min_position, min_time = initialize_waiting_times_iid_cogent3(DNA_seq, Q)

    return waiting_times, min_position, min_time


### simulate_seq(ancestor_seq, max_time, rate_matrices_dict, markov_order)

In [85]:
def simulate_seq(ancestor_seq, max_time, rate_matrices_dict, markov_order):
    """
    Input:
        ancestor_seq (list): The initial non-substituted generated DNA sequence of length n.
        max_time (float): The maximum time allowed for substitution(s) to take place in the simulation.
        rate_matrices (dict): A dictionary of rate matrices for context-dependent DNA substitution. 
            The key is a tuple of left and right neighbours as strings from permutations with [0, 1, 2, 3] (=[T, C, A, G]). 
            The value is the corresponding rate matrix.
        markov_order (int): The Markov order.

    Output:
        tuple: A tuple containing the final DNA sequence at the end of the simulation as well as an array
        containing the history of substituted DNA sequences throughout the simulation.
        
    """
    history = [ancestor_seq,]

    time_passed = 0

    DNA_seq = ancestor_seq.copy()
    Q_dict = rate_matrices_dict
    waiting_times, min_position, min_time = initialize_waiting_times(DNA_seq, Q_dict, markov_order)

    while time_passed <= max_time:
        seq_index = min_position[0]
        new_base = min_position[1]
        substitution_time = min_time 
        waiting_times, min_position, min_time = update_waiting_times(DNA_seq, Q_dict, waiting_times, min_position, min_time, markov_order)
        DNA_seq[seq_index] = f"{new_base}" # Substitue with new base in DNA sequence
        history.append(DNA_seq.copy())
        time_passed += substitution_time

    if len(history) <= 2: # meaning time for any substitution to occur exceeded the max_time, no substitution took place
        return (ancestor_seq, history[:-1])
    else:
        return (history[-2], history[:-1])
    
# # Example usage:
# ancestor_seq = generate_ancestor(3)
# print(f"Ancestor sequence is {ancestor_seq}")
# order = 1
# max_time = 0.2
# rate_matrices_dict = generate_rate_matrices(order)
# simulate_seq(ancestor_seq,max_time,rate_matrices_dict,order)

In [86]:
def simulate_seq_iid_cogent3(ancestor_seq: Sequence, max_time : float, Q: DictArrayTemplate):
    """
    Input:
        ancestor_seq (cogent3.Sequence): The initial non-substituted generated DNA sequence of length n.
        max_time (float): The maximum time allowed for substitution(s) to take place in the simulation.
        rate_matrices (DictArray): A substitution rate matrix. 

    Output:
        tuple: A tuple containing the final DNA sequence at the end of the simulation as well as an array
        containing the history of substituted DNA sequences throughout the simulation.
        
    """
    history = [ancestor_seq,]

    time_passed = 0

    DNA_seq = ancestor_seq.copy()
    waiting_times, min_position, min_time = initialize_waiting_times_iid_cogent3(DNA_seq, Q)

    while time_passed <= max_time:
        seq_index = min_position[0]
        new_base = min_position[1]
        substitution_time = min_time 
        waiting_times, min_position, min_time = update_waiting_times_iid_cogent3(DNA_seq, Q, waiting_times, min_position, min_time)
        seq_list = list(DNA_seq)
        seq_list[seq_index] = new_base
        DNA_seq = make_seq(str(''.join(seq_list)))
        history.append(DNA_seq.copy())
        time_passed += substitution_time

    if len(history) <= 2: # meaning time for any substitution to occur exceeded the max_time, no substitution took place
        aln_history = make_unaligned_seqs(history[:-1])
        return aln_history
    else:
        aln_history = make_unaligned_seqs(history[:-1])
        return aln_history


# Simulation Study

In [87]:
def generate_stats(theo_df, sim_df):
    """ 
    
    Input: 
        theo_df (dataframe): Dataframe containing all the theoretical propbabilities of each transition.
        sim_df (dataframe): Dataframe containing all the simulated proportions of each transition, each row representing one repeat.
        
    Output:
        None        
    """
    # Calculate mean, standard deviation, and standard error for each column
    mean_sim = sim_df.mean()
    std_sim = sim_df.std()
    se_sim = std_sim / np.sqrt(len(sim_df))

    # Compare with theoretical values
    stats_dict = {}
    for col in sim_df.columns:
        print(f"Mean Simulated Proportion for {col}: {mean_sim[col]}")
        print(f"Theoretical Probability for {col}: {theo_df[col].iloc[0]}")
        print(f"Standard Error for {col}: {se_sim[col]}")
        print(f"Z-score for {col}: {(mean_sim[col] - theo_df[col].iloc[0])/se_sim[col]}")
        stats_dict[col] = [mean_sim[col], theo_df[col].iloc[0], se_sim[col], (mean_sim[col] - theo_df[col].iloc[0])/se_sim[col]]


    return stats_dict

## Simulation 1: Start with sequence of all A, independent substitution.

In [88]:
def A_simulation(repeat, ancestor_seq, max_time, rate_matrices_dict, markov_order):
    """ 
    Input:
        repeat (int): Number of times to repeat the simulation.
        ancestor_seq (list): Starting DNA sequence of each repeat.
        max_time (float): Maximum time allowed for DNA substitutions to take place for each repeat.
        rate_matrices_dict (dict): A dictionary of rate matrices for context-dependent DNA substitution. 
            The key is a tuple of left and right neighbours as strings from permutations with [0, 1, 2, 3] (=[T, C, A, G]). 
            The value is the corresponding rate matrix.
        markov_order (int): Markov order.

    Output:
        theo_probs_df (dataframe): Dataframe containing all the theoretical propbabilities of each transition.
        sim_props_df (dataframe): Dataframe containing all the simulated proportions of each transition, each row representing one repeat.
    """

    rate_mat_Q = rate_matrices_dict['0']
    trans_mat_P = transition_matrix(rate_mat_Q, max_time)
    theo_probs = trans_mat_P[2] # Only consider with the row of probabilities where starting base is A / '2'
    theo_probs_dict = {str(i): theo_probs[i] for i in range(4)}

    simulated_props = {}

    for i in range(repeat):
        simulation_results = simulate_seq(ancestor_seq,max_time,rate_matrices_dict,markov_order)
        final_seq = simulation_results[0]
        nucleotide_counts = {'0': 0, '1': 0, '2': 0, '3': 0}
        nucleotide_counts.update(Counter(final_seq)) # Update counts based on the final sequence
        seq_length = len(final_seq) 
        sim_prop = {nucleotide: count / seq_length for nucleotide, count in nucleotide_counts.items()}
        simulated_props[i+1] = sim_prop

    # Convert dictionaries to DataFrames
    theo_probs_df = pd.DataFrame([theo_probs_dict])
    sim_props_df = pd.DataFrame.from_dict(simulated_props, orient='index')
        
    return theo_probs_df, sim_props_df

In [89]:
num_repeat = 100
pi = [0,0,1,0]
ancestor_seq = generate_ancestor(1000,pi=pi)
max_time = 0.1
order = 0 # independent
rate_matrices_dict = {'0': np.array([[-0.6, 0.1, 0.2, 0.3],
                                     [0.1, -0.65, 0.15, 0.4],
                                     [0.2, 0.1, -0.8, 0.5],
                                     [0.3, 0.2, 0.1, -0.6]])}

In [90]:
# A_theo_df, A_sim_df = A_simulation(num_repeat, ancestor_seq, max_time, rate_matrices_dict, order)

In [91]:

def plot_histograms(theo_df, sim_df):
    nucleotides = ['0', '1', '2', '3']
    bins = 20

    # Set up a 2x2 grid for subplots
    fig, axs = plt.subplots(2, 2, figsize=(10, 8))

    for i, nucleotide in enumerate(nucleotides):
        row, col = divmod(i, 2)
        ax = axs[row, col]

        ax.hist(sim_df[nucleotide], bins=bins, alpha=0.5, label='Simulated', color='blue')
        ax.axvline(x=theo_df[nucleotide][0], color='red', linestyle='dashed', linewidth=1, label='Theoretical')
        ax.axvline(x=sim_df[nucleotide].mean(), color='black', linestyle='solid', linewidth=1, label='Simulated Mean')
        ax.set_xlabel('Proportion')
        ax.set_ylabel('Frequency')
        ax.set_title(f'Distribution of {nucleotide} Proportions')
        ax.legend()

    # Adjust layout 
    plt.tight_layout()
    plt.show()

    return

In [92]:
# plot_histograms(A_theo_df, A_sim_df)

### Simulation statistics:

In [93]:
# A_theo_df

In [94]:
# A_sim_df

In [95]:
# generate_stats(A_theo_df, A_sim_df)

In [96]:
#generate_z_score(A_theo_df, A_sim_df)

## Simulation 2: Different starting sequence, independent

In [97]:
def count_change_props(starting_seq, final_seq):
    """ 
    Input:
        starting_seq (list): Starting DNA sequence.
        final_seq (list): Final DNA sequence after one run of simulation.

    Output:
        changes_proportions (dict): A dictionary of proportions of each possible transition.
    """
    changes_counter = Counter(zip(starting_seq, final_seq))
    changes_proportions = {}

    nucleotide_counts = {'0': 0, '1': 0, '2': 0, '3': 0}
    nucleotide_counts.update(Counter(starting_seq)) # Update counts based on the starting sequence

    seq_length = len(starting_seq)

    for from_nucleotide in '0123':
        for to_nucleotide in '0123':
            key = f"{from_nucleotide}_to_{to_nucleotide}"
            count = changes_counter[(from_nucleotide, to_nucleotide)]
            proportion = count / seq_length
            changes_proportions[key] = proportion

    return changes_proportions

# # Example usage:
# starting_seq = '0000000000'
# final_seq = '0123020010'
# proportions = count_change_props(starting_seq, final_seq)
# print(proportions)


In [98]:
def simulation2(repeat, n, pi, max_time, rate_matrices_dict, markov_order):
    """ 
    Input:
        repeat (int): Number of times to repeat the simulation.
        n (int): Length of DNA sequence.
        pi (list): List of probabilities for each nucleotide. 
        max_time (float): Maximum time allowed for DNA substitutions to take place for each repeat.
        rate_matrices_dict (dict): A dictionary of rate matrices for context-dependent DNA substitution. 
            The key is a tuple of left and right neighbours as strings from permutations with [0, 1, 2, 3] (=[T, C, A, G]). 
            The value is the corresponding rate matrix.
        markov_order (int): Markov order.

    Output:
        theo_probs_df (dataframe): Dataframe containing all the theoretical propbabilities of each transition.
        sim_props_df (dataframe): Dataframe containing all the simulated proportions of each transition, each row representing one repeat.
    """

    rate_mat_Q = rate_matrices_dict['0']
    trans_mat_P = transition_matrix(rate_mat_Q, max_time)
    pi_diag = np.diag(pi)
    joint_mat_J = np.dot(pi_diag, trans_mat_P) 
    theo_probs_dict = {}
    for i in range(4):
        for j in range(4):
            theo_probs_dict[f'{i}_to_{j}'] = joint_mat_J[i,j]

    simulated_props = {}

    for i in range(repeat):
        ancestor_seq = generate_ancestor(n, pi) # new ancestor every run
        simulation_results = simulate_seq(ancestor_seq,max_time,rate_matrices_dict,markov_order)
        final_seq = simulation_results[0]
        change_props = count_change_props(ancestor_seq, final_seq)
        simulated_props[i+1] = change_props

    theo_probs_df = pd.DataFrame([theo_probs_dict])
    sim_props_df = pd.DataFrame.from_dict(simulated_props, orient='index')
        
    return theo_probs_df, sim_props_df

In [99]:
# num_repeat = 100
# n = 1000
# pi = [0.1,0.4,0.3,0.2]
# max_time = 0.2
# order = 0 # independent
# rate_matrices_dict = {'0': np.array([[-0.6, 0.1, 0.2, 0.3],
#                                      [0.1, -0.65, 0.15, 0.4],
#                                      [0.2, 0.1, -0.8, 0.5],
#                                      [0.3, 0.2, 0.1, -0.6]])}


# mix_theo_df, mix_sim_df = simulation2(num_repeat, n, pi, max_time, rate_matrices_dict, order)


In [100]:
def plot_histograms2(theo_probs_df, simulated_props_df):
    columns = []

    for from_nucleotide in '0123':
        for to_nucleotide in '0123':
            col_name = f"{from_nucleotide}_to_{to_nucleotide}"
            columns.append(col_name)

    bins = 20

    # Set up a 4x4 grid for subplots
    fig, axs = plt.subplots(4, 4, figsize=(12, 12))

    for i, col in enumerate(columns):
        row, col_num = divmod(i, 4)
        ax = axs[row, col_num]

        ax.hist(simulated_props_df[col], bins=bins, alpha=0.5, label='Simulated', color='pink')
        ax.axvline(x=theo_probs_df[col][0], color='red', linestyle='dashed', linewidth=1, label='Theoretical')
        ax.axvline(x=simulated_props_df[col].mean(), color='black', linestyle='solid', linewidth=1, label='Simulated Mean')
        # ax.set_xlim(0, 1)  # Set x-axis range from 0 to 1
        ax.set_xlabel('Proportion')
        ax.set_ylabel('Frequency')
        ax.set_title(f'Distribution of {col} Proportions')
        ax.legend()

    # Adjust layout for better spacing
    plt.tight_layout()
    plt.show()

    return

In [101]:
# plot_histograms2(mix_theo_df, mix_sim_df)

### Simulation Statistics

In [102]:
# mix_theo_df

In [103]:
# mix_sim_df

In [104]:
# generate_stats(mix_theo_df, mix_sim_df)

In [105]:
def simulate_probability_distirbution():
    # Generate random numbers
    random_probs = np.random.rand(4)
    
    # Ensure each probability is at least min_prob
    random_probs = np.max([random_probs, np.full(4, 0.01)], axis=0) #minimal proportion is 0.01
    
    # Normalize the sum to 1
    total_sum = np.sum(random_probs)
    normalized_probs = random_probs / total_sum
    
    # Ensure the sum is exactly 1 by normalizing a second time if necessary
    if np.sum(normalized_probs) != 1:
        normalized_probs[-1] = 1 - np.sum(normalized_probs[:-1])
    
    return normalized_probs

In [106]:
def multiple_independent_simulations(simulation, repeat, n, max_time):
    simulation_dict = {}
    for i in range(simulation):
        pi = simulate_probability_distirbution()
        Q_dict = {'0': generate_rate_matrix()}
        markov_order = 0
        theo_probs_df, sim_props_df = simulation2(repeat, n, pi, max_time, Q_dict, markov_order)
        simulation_dict[i] = [theo_probs_df, sim_props_df]

    return(simulation_dict)



In [107]:
# simulation_time = 5
# repeat_time = 100
# n1 = 1000
# max_time1 = 0.2
# simulation_dict1 = multiple_independent_simulations(simulation_time, repeat_time, n1, max_time1)
# simulation_stat_dict = {}
# for key, value in simulation_dict1.items():
#     stats = generate_stats(value['theo_probs_df'], value['sim_props_df'])
#     simulation_stat_dict[key] = stats

# stat_dict = {}
# for key, value in simulation_dict1.items():
#     stat_dict[key] = generate_stats(value['theo_probs_df'], value['sim_props_df'])

In [108]:
# simulation_time2 = 20
# repeat_time2 = 100
# n2 = 1000
# max_time2 = 0.2
# simulation_dict2 = multiple_independent_simulations(simulation_time2, repeat_time2, n2, max_time2)

## Simple context dependent, CpG effect

In [109]:
def CpG_rate_matrices(rate_matrix, markov_order = 1):
    """
    Input:
        rate_matrix (array): A single rate matrix.

    Output: 
        rate_matrices (dict): A dictionary of rate matrices for CpG context-dependent DNA substitution. 
            The key is a tuple of left and right neighbours as strings from permutations with [0, 1, 2, 3] (=[T, C, A, G]). 
            The value is the corresponding rate matrix.
    """
    rate_matrices = {}
    
    nucleotides = ['0', '1', '2', '3']
    one_side_neighbours = [''.join(p) for p in itertools.product(nucleotides, repeat=markov_order)]
    permutations = list(itertools.product(one_side_neighbours, repeat=2))
    
    modified_rate_matrix = rate_matrix.copy()
    modified_rate_matrix[1,0] *= 10 # third row, first column contains rate for C changing to T

    for perm in permutations:
        if perm[1] == '3':
            # rates multiplied by 10 if right neighbour is G ( coded '3')
            rate_matrices[perm] = modified_rate_matrix
        else:
            rate_matrices[perm] = rate_matrix

    return rate_matrices

In [110]:
# rate_matrix = np.array([[-0.6, 0.1, 0.2, 0.3],
#                         [0.1, -0.65, 0.15, 0.4],
#                         [0.2, 0.1, -0.8, 0.5],
#                         [0.3, 0.2, 0.1, -0.6]])

# rate_matrices_dict = CpG_rate_matrices(rate_matrix)
# rate_matrices_dict

In [111]:
def CpG_change_props(starting_seq, final_seq):
    """ 
    Input:
        starting_seq (list): Starting DNA sequence.
        final_seq (list): Final DNA sequence after one run of simulation.

    Output:
        CpG_props_dict (dict): A dictionary of proportions of each possible transition for CpG context.
        non_CpG_props_dict (dict): A dictionary of proportions of each possible transition for non_CpG context.
    """

    seq_length = len(starting_seq)

    start_context_dict = {} # Dictionary of context for all nucleotides in starting DNA sequence.
    for i in range(seq_length):
        start_context_dict[i] = get_context(i,starting_seq,markov_order=1) 

    final_context_dict = {} # Dictionary of context for all nucleotides in final DNA sequence.
    for i in range(seq_length):
        final_context_dict[i] = get_context(i,final_seq,markov_order=1) 

    # Initializing matrices to store simulated probabilities
    CpG_props = np.zeros((4,4))
    non_CpG_props = np.zeros((4,4))

    for i in range(seq_length):
        if (start_context_dict[i] == final_context_dict[i]): # Contexts that appears to remain unchanged 
            start_base = int(starting_seq[i])
            final_base = int(final_seq[i])
            if start_context_dict[i][1] == '3': # Contexts with G / '3' as right neighbour
                CpG_props[start_base, final_base] += 1
            else:
                non_CpG_props[start_base, final_base] += 1

    # Normalizing the matrices to obtain the proportions
    CpG_props /= np.sum(CpG_props)
    non_CpG_props /= np.sum(non_CpG_props)

    # Creating dictionaries to store simulated proportions to prepare for pandas dataframe
    CpG_props_dict = {}
    non_CpG_props_dict = {}
    for i in range(4):
        for j in range(4):
            CpG_props_dict[f'CpG_{i}_to_{j}'] = CpG_props[i,j]
            non_CpG_props_dict[f'non_CpG_{i}_to_{j}'] = non_CpG_props[i,j]

    return CpG_props_dict, non_CpG_props_dict

In [112]:
def CpG_simulation(repeat, n, pi, max_time, rate_matrices_dict, markov_order = 1):
    """ 
    Input:
        repeat (int): Number of times to repeat the simulation.
        n (int): Length of DNA sequence.
        pi (list): List of probabilities for each nucleotide. 
        max_time (float): Maximum time allowed for DNA substitutions to take place for each repeat.
        rate_matrices_dict (dict): A dictionary of rate matrices for context-dependent DNA substitution. 
            The key is a tuple of left and right neighbours as strings from permutations with [0, 1, 2, 3] (=[T, C, A, G]). 
            The value is the corresponding rate matrix.
        markov_order (int): Markov order.

    Output:
        CpG_theo_probs_df (dataframe): Dataframe containing all the theoretical propbabilities of each transition for CpG context.
        CpG_sim_props_df (dataframe): Dataframe containing all the simulated proportions of each transition, each row representing one repeat, for CpG context.
        non_CpG_theo_probs_df (dataframe): Dataframe containing all the theoretical propbabilities of each transition for non-CpG context.
        non_CpG_sim_props_df (dataframe): Dataframe containing all the simulated proportions of each transition, each row representing one repeat, for non-CpG context.
    """

    ancestor_seq = generate_ancestor(n,pi=pi)
    
    # To simplify the number rate matrices since in the 16 matrices, there are actually only 2 different types.
    # One for CpG contexts, another non_CpG
    CpG_context = ('0', '3')
    non_CpG_context = ('0', '0')
    
    CpG_Q = rate_matrices_dict[CpG_context]
    CpG_P = transition_matrix(CpG_Q, max_time)

    non_CpG_Q = rate_matrices_dict[non_CpG_context]
    non_CpG_P = transition_matrix(non_CpG_Q, max_time)
    
    pi_diag = np.diag(pi)

    CpG_J = np.dot(pi_diag, CpG_P) 
    non_CpG_J = np.dot(pi_diag, non_CpG_P)

    CpG_theo_probs_dict = {}
    non_CpG_theo_probs_dict = {}

    for i in range(4):
        for j in range(4):
            CpG_theo_probs_dict[f'CpG_{i}_to_{j}'] = CpG_J[i,j]
            non_CpG_theo_probs_dict[f'non_CpG_{i}_to_{j}'] = non_CpG_J[i,j]

    CpG_sim_props_dict = {}
    non_CpG_sim_props_dict = {}

    for i in range(repeat):
        simulation_results = simulate_seq(ancestor_seq,max_time,rate_matrices_dict,markov_order)
        final_seq = simulation_results[0]
        CpG_sim_props, non_CpG_sim_props = CpG_change_props(ancestor_seq, final_seq)
        CpG_sim_props_dict[i+1] = CpG_sim_props
        non_CpG_sim_props_dict[i+1] = non_CpG_sim_props

    CpG_theo_probs_df = pd.DataFrame([CpG_theo_probs_dict])
    non_CpG_theo_probs_df = pd.DataFrame([non_CpG_theo_probs_dict])
    CpG_sim_props_df = pd.DataFrame.from_dict(CpG_sim_props_dict, orient='index')
    non_CpG_sim_props_df = pd.DataFrame.from_dict(non_CpG_sim_props_dict, orient='index')
        
    return CpG_theo_probs_df, CpG_sim_props_df, non_CpG_theo_probs_df, non_CpG_sim_props_df

In [113]:
# num_repeat = 100
# n = 1000
# pi = [0.1,0.4,0.3,0.2]
# max_time = 0.1
# order = 1 # default for CpG
# rate_matrix = np.array([[-0.6, 0.1, 0.2, 0.3],
#                         [0.1, -0.65, 0.15, 0.4],
#                         [0.2, 0.1, -0.8, 0.5],
#                         [0.3, 0.2, 0.1, -0.6]])
# rate_matrices_dict = CpG_rate_matrices(rate_matrix) # 16 rate matrices, of which 12 baselines and 4 test

In [114]:
# CpG_theo_df, CpG_sim_df, non_CpG_theo_df, non_CpG_sim_df = CpG_simulation(num_repeat, n, pi, max_time, rate_matrices_dict)

In [115]:
def CpG_histograms(hypo_probs_df, simulated_props_df):
    columns = []

    for from_nucleotide in '0123':
        for to_nucleotide in '0123':
            col_name = f"CpG_{from_nucleotide}_to_{to_nucleotide}"
            columns.append(col_name)

    bins = 20

    # Set up a 4x4 grid for subplots
    fig, axs = plt.subplots(4, 4, figsize=(12, 12))

    for i, col in enumerate(columns):
        row, col_num = divmod(i, 4)
        ax = axs[row, col_num]

        ax.hist(simulated_props_df[col], bins=bins, alpha=0.5, label='Simulated', color='pink')
        ax.axvline(x=hypo_probs_df[col][0], color='red', linestyle='dashed', linewidth=1, label='Theoretical')
        ax.axvline(x=simulated_props_df[col].mean(), color='black', linestyle='solid', linewidth=1, label='Simulated Mean')
        # ax.set_xlim(0, 1)  # Set x-axis range from 0 to 1
        ax.set_xlabel('Proportion')
        ax.set_ylabel('Frequency')
        ax.set_title(f'{col} Proportions')
        ax.legend()

    # Adjust layout for better spacing
    plt.tight_layout()
    plt.show()

    return

In [116]:
def non_CpG_histograms(hypo_probs_df, simulated_props_df):
    columns = []

    for from_nucleotide in '0123':
        for to_nucleotide in '0123':
            col_name = f"non_CpG_{from_nucleotide}_to_{to_nucleotide}"
            columns.append(col_name)

    bins = 20

    # Set up a 4x4 grid for subplots
    fig, axs = plt.subplots(4, 4, figsize=(12, 12))

    for i, col in enumerate(columns):
        row, col_num = divmod(i, 4)
        ax = axs[row, col_num]

        ax.hist(simulated_props_df[col], bins=bins, alpha=0.5, label='Simulated', color='pink')
        ax.axvline(x=hypo_probs_df[col][0], color='red', linestyle='dashed', linewidth=1, label='Theoretical')
        ax.axvline(x=simulated_props_df[col].mean(), color='black', linestyle='solid', linewidth=1, label='Simulated Mean')
        # ax.set_xlim(0, 1)  # Set x-axis range from 0 to 1
        ax.set_xlabel('Proportion')
        ax.set_ylabel('Frequency')
        ax.set_title(f'{col} Proportions')
        ax.legend()

    # Adjust layout for better spacing
    plt.tight_layout()
    plt.show()

    return

In [117]:
# CpG_histograms(CpG_theo_df, CpG_sim_df)

In [118]:
# non_CpG_histograms(non_CpG_theo_df, non_CpG_sim_df)

### Simulation Statistics

CpG Statistics

In [119]:
# generate_stats(CpG_theo_df, CpG_sim_df)

non_CpG Statistics

In [120]:
# generate_stats(non_CpG_theo_df, non_CpG_sim_df)