In [9]:
import random
import numpy as np
import matplotlib as plt
import Levenshtein
import itertools
import RNA

nucleotides = {'A':0,'G':1,'U':2,'C':3}
nucleotides_text = ['A','G','U','C']

**Parameters**

In [27]:
initial_rna = "ACCC"
target = "AGCCCGA"
global_enviornmental_factors = []
generations = 3
base_mutation = np.array([[0.925, 0.025, 0.025, 0.025], [0.025, 0.925, 0.025, 0.025], [0.025, 0.025, 0.925, 0.025], [0.025, 0.025, 0.025, 0.925]])

**Update Parameters**

In [11]:
def calculate_environmental_factors (rna_population, enviornmental_factors):
    return enviornmental_factors

def calculate_mutation_rate (factors, mutation_rate = base_mutation):
    return mutation_rate

**Find Indel Probability**

In [12]:
def calculate_insertion_rate (sequence, factors, insertion_rate = 0.01):
    return insertion_rate

def calculate_deletion_rate (sequence, factors, deletion_rate = 0.01):
    return deletion_rate

**Transition Matrix Calculation**

In [13]:
def mutation_chance_matrix(mutation_matrix, mutation_rate):
    """Takes in matrix of mutations, outputs matrix of probabilities of each mutation"""
    probability_matrix = np.ones((len(mutation_matrix), len(mutation_matrix[0])))

    for i, row in enumerate(mutation_matrix):
        original = row[0]
        o_length = len(original)
        for j, possibility in enumerate(row):
            for k in range(o_length):
                original_index = nucleotides[original[k]]
                possibility_index = nucleotides[possibility[k]]
                probability_matrix[i, j] *= mutation_rate[original_index, possibility_index]

    probability_matrix[:, 0] = np.maximum(0, 1 - np.sum(probability_matrix[:, 1:], axis=1))

    return probability_matrix

def generate_mutation_matrix(n):
    sequences = [''.join(seq) for seq in itertools.product(nucleotides, repeat=n)]
    matrix = [[seq] + [s for s in sequences if s != seq] for seq in sequences]
    return np.array(matrix)


In [14]:
def optimized_mutation_chance_matrix(mutation_matrix, mutation_rate):
    nucleotides = {'A':0,'G':1,'U':2,'C':3}

    num_rows, num_cols = mutation_matrix.shape
    seq_length = len(mutation_matrix[0, 0])
    
    probability_matrix = np.ones((num_rows, num_cols))

    mutation_indices = np.zeros((num_rows, num_cols, seq_length), dtype=int)
    for i in range(num_rows):
        for j in range(num_cols):
            for k in range(seq_length):
                mutation_indices[i, j, k] = nucleotides[mutation_matrix[i, j][k]]
    
    # Calculate probabilities using the mutation rate matrix
    for k in range(seq_length):
        # Extract the k-th nucleotide for each sequence
        original_indices = mutation_indices[:, 0, k]  # The original sequence nucleotides at position k
        mutation_indices_k = mutation_indices[:, :, k]  # All possible mutations at position k

        # Perform element-wise multiplication for each mutation in parallel (vectorized)
        probability_matrix *= mutation_rate[original_indices[:, np.newaxis], mutation_indices_k]

    # Adjust the first column for "no mutation" probability
    probability_matrix[:, 0] = np.maximum(0, 1 - np.sum(probability_matrix[:, 1:], axis=1))

    return probability_matrix

In [15]:
def compute_probabilities_for_batch(batch, mutation_rate, seq_length):
    batch_probabilities = []
    for row in batch:
        row_probabilities = np.ones(len(row))
        original_seq = row[0]
        original_indices = [nucleotides[nt] for nt in original_seq]
        for j, possible_mutation in enumerate(row):
            mutation_indices = [nucleotides[nt] for nt in possible_mutation]
            for k in range(seq_length):
                row_probabilities[j] *= mutation_rate[original_indices[k], mutation_indices[k]]
        batch_probabilities.append(row_probabilities)
    return batch_probabilities

def parallel_mutation_chance_matrix(mutation_matrix, mutation_rate, batch_size=10):
    num_rows, num_cols = mutation_matrix.shape
    seq_length = len(mutation_matrix[0, 0])

    # Split the mutation matrix into batches of rows
    batches = [mutation_matrix[i:i + batch_size] for i in range(0, num_rows, batch_size)]

    with Pool() as pool:
        probability_batches = pool.starmap(
            compute_probabilities_for_batch,
            [(batch, mutation_rate, seq_length) for batch in batches]
        )

    # Flatten the list of batches back into a full probability matrix
    probability_matrix = np.vstack([np.array(batch) for batch in probability_batches])

    # Adjust the first column for "no mutation" probability
    probability_matrix[:, 0] = np.maximum(0, 1 - np.sum(probability_matrix[:, 1:], axis=1))

    return probability_matrix

In [16]:
%load_ext cython

In [17]:
%%cython
import numpy as np
cimport numpy as np

# Defining C types for efficiency
cdef int num_rows, num_cols, seq_length, i, j, k
cdef int[:] mutation_indices

# Nucleotide mapping
cdef dict nucleotides = {'A': 0, 'G': 1, 'U': 2, 'C': 3}

def optimized_mutation_chance_matrix2(np.ndarray mutation_matrix, np.ndarray mutation_rate):
    """
    Optimized function to compute a matrix of probabilities for each mutation using Cython.
    """
    num_rows = mutation_matrix.shape[0]
    num_cols = mutation_matrix.shape[1]
    seq_length = len(mutation_matrix[0, 0])  # Length of each nucleotide sequence
    
    # Initialize probability matrix
    probability_matrix = np.ones((num_rows, num_cols))

    # Convert the mutation_matrix to a numerical matrix of nucleotide indices
    mutation_indices = np.zeros((num_rows, num_cols, seq_length), dtype=np.int32)
    
    # Populate mutation_indices by converting nucleotide sequences to index values
    for i in range(num_rows):
        for j in range(num_cols):
            for k in range(seq_length):
                mutation_indices[i, j, k] = nucleotides[mutation_matrix[i, j][k]]
    
    # Calculate probabilities using the mutation rate matrix
    for k in range(seq_length):
        # Extract the k-th nucleotide for each sequence
        original_indices = mutation_indices[:, 0, k]  # The original sequence nucleotides at position k
        mutation_indices_k = mutation_indices[:, :, k]  # All possible mutations at position k

        # Perform element-wise multiplication for each mutation in parallel (vectorized)
        probability_matrix *= mutation_rate[original_indices[:, np.newaxis], mutation_indices_k]

    # Adjust the first column for "no mutation" probability
    probability_matrix[:, 0] = np.maximum(0, 1 - np.sum(probability_matrix[:, 1:], axis=1))

    return probability_matrix


**Fitness**

In [18]:
def fitness (target_distance, target_mfe, sequence):
    sequence_distance = Levenshtein.distance(sequence, target)
    if (sequence_distance > target_distance):
        return False
    sequence_mfe = RNA.fold(sequence)[1]
    if (sequence_mfe > target_mfe):
        return False

    return True
    

**Simulation**

In [34]:
def replicate_rna (initial_rna, mutation_rate, generations, target):

    rna_population = [initial_rna]
    full_population = [initial_rna]
    
    target_mfe = abs(RNA.fold(target)[1]-RNA.fold(initial_rna)[1])
    target_distance = Levenshtein.distance(initial_rna, target)

    enviornmental_factors = calculate_environmental_factors(rna_population, global_enviornmental_factors)

    for _ in range(generations):

        enviornmental_factors = calculate_environmental_factors(rna_population, enviornmental_factors)
        mutation_rate = calculate_mutation_rate(enviornmental_factors, mutation_rate)
        
        new_population = []

        for rna in rna_population:
            old_rna = rna
            o_len = len(old_rna)
            new_rna = ""
            nucleotides_gain = 0
            
            insertion_rate = calculate_insertion_rate(rna, enviornmental_factors)
            deletion_rate = calculate_deletion_rate(rna, enviornmental_factors)

            while (insertion_rate > random.random()):
                nucleotides_gain += 1
            while (deletion_rate > random.random()):
                nucleotides_gain -= 1

            if (nucleotides_gain > 0):
                for _ in range(nucleotides_gain):
                    index = random.randint(0, o_len)
                    old_rna = old_rna[:index] + random.choice(nucleotides_text) + old_rna[index:]
                    o_len += 1
            elif (nucleotides_gain < 0):
                for _ in range(-nucleotides_gain):
                    index = random.randint(0, o_len)
                    old_rna = old_rna[:index] + old_rna[index+1:]
                    o_len -= 1

            mutation_matrix = generate_mutation_matrix(o_len)
            mutation_chance = optimized_mutation_chance_matrix2(mutation_matrix, mutation_rate)

            mutation_list = mutation_matrix[mutation_matrix[:, 0] == old_rna][0]
            mutation_chance_list = mutation_chance[mutation_matrix[:, 0] == old_rna][0]
            new_rna = mutation_list[np.random.choice(len(mutation_chance_list), p=mutation_chance_list/mutation_chance_list.sum())]
            
            new_population.append(str(new_rna))

        full_population += new_population.copy()
        rna_population += [rna for rna in new_population if fitness(target_distance, target_mfe, rna)]
    
    return rna_population, full_population

In [None]:
mutation_rate = calculate_mutation_rate(global_enviornmental_factors)

print(replicate_rna(initial_rna, mutation_rate, generations, target))

**Data Analysis**