# Importing necessary libraries

In [14]:
import warnings

from sklearn.exceptions import ConvergenceWarning
warnings.filterwarnings(action='ignore', category=ConvergenceWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd 

import sklearn
from sklearn import linear_model
from sklearn import decomposition
from sklearn.neural_network import MLPRegressor
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import OneHotEncoder
from sklearn.model_selection import GridSearchCV
from smt.sampling_methods import LHS

import time
import datetime
import random
import csv
import os
import copy
import json
from tqdm import tqdm



# Utility functions

In [15]:
def latin_sampling(num):
    xlimits = np.array([[0,1.0]]*10)
    sampling = LHS(xlimits=xlimits,)
    x = sampling(num)
    sequence_list=[]
    for i in range(num):
        oh=""
        for j in range(10):
            if x[i][j]<0.25:
                oh=oh+"A"
            elif x[i][j]<0.5:
                oh=oh+"C"
            elif x[i][j]<0.75:
                oh=oh+"G"
            elif x[i][j]<1:
                oh=oh+"T"             
        sequence_list.append(oh)
    return sequence_list

In [16]:
def random_sampling(num):
    x= np.random.random_sample((num,10))
    sequence_list=[]
    for i in range(num):
        oh=""
        for j in range(10):
            if x[i][j]<0.25:
                oh=oh+"A"
            elif x[i][j]<0.5:
                oh=oh+"C"
            elif x[i][j]<0.75:
                oh=oh+"G"
            elif x[i][j]<1:
                oh=oh+"T"             
        sequence_list.append(oh)
    return sequence_list

In [17]:
import numpy as np

def convert_input_type(seq, target_type):
    """
    Convert DNA sequence between string and one-hot encoded representations.

    Args:
        seq: The input DNA sequence(s).
        target_type: The target representation type ("one_hot" or "strings").

    Returns:
        The converted DNA sequence(s).
    """
    if target_type == "one_hot":
        # Convert string to one-hot encoded numpy array
        mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
        one_hot = np.array([mapping[base] for base in seq])
        return one_hot
    elif target_type == "strings":
        # Convert one-hot encoded numpy array to string
        mapping = {0: 'A', 1: 'C', 2: 'G', 3: 'T'}
        strings = ''.join([mapping[np.argmax(base)] for base in seq])
        return strings
    else:
        raise ValueError("Unsupported target_type. Use 'one_hot' or 'strings'.")

In [None]:
# Gradient Descent

import numpy as np

from typing import Optional, Callable, List

def convert_input_type(seq, target_type):
    mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
    if target_type == "one_hot":
        one_hot = np.array([mapping[base] for base in seq]).flatten()
        return one_hot
    elif target_type == "strings":
        reverse_mapping = {tuple(v): k for k, v in mapping.items()}
        reshaped_seq = seq.reshape(-1, 4)
        strings = ''.join([reverse_mapping[tuple(base)] for base in reshaped_seq])
        return strings
    else:
        raise ValueError("Unsupported target_type. Use 'one_hot' or 'strings'.")

def softmax(x):
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=-1, keepdims=True)

def closest_one_hot(x):
    one_hot = np.zeros_like(x)
    one_hot[np.argmax(x)] = 1
    return one_hot

def gradient_descent_optimization(
    seq: str,
    model: MLPRegressor,
    loss_func: Callable,
    learning_rate: float = 1,
    max_iter: int = 200,
    positions: Optional[List[int]] = None,
    grad_clip: float = 1.0,
):
    """
    Sequence design using gradient descent optimization with MLPRegressor

    Args:
        seq: an initial DNA sequence as a string.
        model: A trained MLPRegressor object
        loss_func: A function to compute the loss
        learning_rate: Learning rate for gradient descent
        max_iter: Number of iterations
        positions: Positions to mutate. If None, all positions will be mutated
        grad_clip: Gradient clipping value

    Returns:
        Optimized DNA sequence as a string and its prediction value.
    """
    # Convert sequence into a one-hot encoded tensor
    X = convert_input_type(seq, "one_hot").reshape(1, -1).astype(float)
    # print(f"Initial shape of X: {X.shape}")

    best_X = X.copy()
    best_y = model.predict(X)
    best_loss = loss_func(best_y)
    # print(f"Initial loss: {best_loss}")

    # Gradient descent optimization
    for i in range(max_iter):
        # Forward pass
        y_pred = model.predict(X)
        
        # Compute loss
        loss = loss_func(y_pred)
        
        # Update best sequence if current loss is better
        if loss < best_loss:
            best_loss = loss
            best_X = X.copy()
            best_y = y_pred
        
        # Compute gradient (using numerical approximation)
        grad = np.zeros_like(X)
        epsilon = 1e-3
        for j in range(X.shape[1]):
            X_plus = X.copy()
            X_plus[0, j] += epsilon
            y_pred_plus = model.predict(X_plus)
            loss_plus = loss_func(y_pred_plus)
            grad[0, j] = (loss_plus - loss) / epsilon
        
        # Clip gradients
        grad = np.clip(grad, -grad_clip, grad_clip)
        
        # Update X using gradient descent
        X -= learning_rate * grad
        
        # Apply softmax normalization to each 4-element segment
        if np.mean(X)>2:
            for k in range(0, X.shape[1], 4):
                X[0, k:k+4] = softmax(X[0, k:k+4])
        
        # Print best sequence and prediction every 2 iterations
        if i % 2 == 0:
            # Convert best_X to closest one-hot vectors
            best_X_closest = best_X.copy()
            for k in range(0, best_X_closest.shape[1], 4):
                best_X_closest[0, k:k+4] = closest_one_hot(best_X_closest[0, k:k+4])
            best_seq = convert_input_type(best_X_closest.reshape(-1, 4), "strings")
            best_y_pred = model.predict(best_X_closest)  # Get the prediction of the best_X_closest
            # print(f"Iteration {i}, Best sequence: {best_seq}, Best prediction value: {best_y_pred}")

    # Convert best one-hot encoded array back to closest one-hot vectors
    for k in range(0, best_X.shape[1], 4):
        best_X[0, k:k+4] = closest_one_hot(best_X[0, k:k+4])
    
    best_y_pred = model.predict(best_X)  # Get the prediction of the best_X_closest
    # print(f"Final loss: {best_y_pred}")


    # Convert optimized one-hot encoded array back to DNA sequence
    optimized_seq = convert_input_type(best_X.reshape(-1, 4), "strings")
    return optimized_seq, best_y_pred



In [None]:
#SSWM
import numpy as np
from sklearn.neural_network import MLPRegressor
from typing import Callable, List
import random

def convert_input_type(seq, target_type):
    mapping = {'A': [1, 0, 0, 0], 'C': [0, 1, 0, 0], 'G': [0, 0, 1, 0], 'T': [0, 0, 0, 1]}
    if target_type == "one_hot":
        one_hot = np.array([mapping[base] for base in seq]).flatten()
        return one_hot
    elif target_type == "strings":
        reverse_mapping = {tuple(v): k for k, v in mapping.items()}
        reshaped_seq = seq.reshape(-1, 4)
        strings = ''.join([reverse_mapping[tuple(base)] for base in reshaped_seq])
        return strings
    else:
        raise ValueError("Unsupported target_type. Use 'one_hot' or 'strings'.")

def mutate_sequence(seq: str, mutation_rate: float = 0.1) -> str:
    bases = ['A', 'C', 'G', 'T']
    mutated_seq = list(seq)
    for i in range(len(seq)):
        if random.random() < mutation_rate:
            mutated_seq[i] = random.choice(bases)
    return ''.join(mutated_seq)

def SSWM_optimization(
    seq: str,
    model: MLPRegressor,
    loss_func: Callable,
    population_size: int = 10,
    generations: int = 200,
    mutation_rate: float = 0.1,
    selection_size: int = 5  # Number of top sequences to retain
):
    """
    Sequence design using SSWM with MLPRegressor

    Args:
        seq: an initial DNA sequence as a string.
        model: A trained MLPRegressor object
        loss_func: A function to compute the loss
        population_size: Number of sequences in the population
        generations: Number of generations to evolve
        mutation_rate: Probability of mutating each base in the sequence
        selection_size: Number of top sequences to retain

    Returns:
        Optimized DNA sequence as a string and its prediction value.
    """
    # Initialize population with the initial sequence
    population = [seq] * population_size
    best_seq = seq
    best_y = model.predict(convert_input_type(seq, "one_hot").reshape(1, -1))
    best_loss = loss_func(best_y)
    # print(f"Initial sequence: {seq}, Initial loss: {best_loss}")
    
    for generation in range(generations):
        # Mutate population
        new_population = [mutate_sequence(seq, mutation_rate) for seq in population]
        
        # Evaluate fitness of the new population
        fitness_scores = []
        for new_seq in new_population:
            X = convert_input_type(new_seq, "one_hot").reshape(1, -1).astype(float)
            y_pred = model.predict(X)
            loss = loss_func(y_pred)
            fitness_scores.append((new_seq, y_pred, loss))
        
        # Select the best sequences in the current generation
        fitness_scores.sort(key=lambda x: x[2])  # Sort by loss (ascending)
        selected_population = fitness_scores[:selection_size]  # Select top sequences
        
        # Update global best sequence if current best is better
        current_best_seq, current_best_y, current_best_loss = selected_population[0]
        if current_best_loss < best_loss:
            best_seq = current_best_seq
            best_y = current_best_y
            best_loss = current_best_loss
        
        # Update population with the top sequences
        population = [seq for seq, _, _ in selected_population]
        
        # Fill the rest of the population with mutated versions of the best sequences
        while len(population) < population_size:
            population.append(mutate_sequence(best_seq, mutation_rate))
        
        # print(f"Generation {generation}, Best sequence: {best_seq}, Best prediction value: {best_y}, Best loss: {best_loss}")

    return best_seq, best_y

# Get optimal sequences

In [None]:
import pandas as pd
import numpy as np

class NKModel:
    def __init__(self, data_path):
        self.df = pd.read_csv(data_path)
        self.model = None

    def get_sampling(self, n_samples):
#         sample_X = latin_sampling(n_samples)
        sample_X = initial_sampling[:n_samples]
        return sample_X

    def build_model(self, sample_X):
        onehot_X = [convert_input_type(i, "one_hot") for i in sample_X]
        y = self.NK_surrogate(sample_X)
        
        # Define the parameter grid
        param_grid = {
            'hidden_layer_sizes': [
                (10,),
                (40, 10),
                (100, 100, 20),
                (10, 100, 100, 20)
            ]
        }

        
        # Initialize the MLPRegressor
        mlp = MLPRegressor(max_iter=1000, random_state=0)
        
        # Use GridSearchCV to perform 5-fold cross-validation
        grid_search = GridSearchCV(estimator=mlp, param_grid=param_grid, cv=5, scoring='r2')
        
        # Fit the model
        grid_search.fit(onehot_X, y)
        
        # Select the best model
        self.model = grid_search.best_estimator_

    def NK_surrogate(self, input):
        genotype_to_phenotype = dict(zip(self.df['Genotype'], self.df['Phenotype']))
        ordered_phenotypes = [genotype_to_phenotype.get(genotype) for genotype in input]
        return np.array(ordered_phenotypes)

    def loss_func(self, y_pred):
        return -np.mean(y_pred)

    def get_model(self, n_samples):
        samples = self.get_sampling(n_samples)
        self.build_model(samples)
        return self.model

    def random_screening(self, sample_for_model=1000, n_screens=100000):
        model = self.get_model(sample_for_model)
        sequences = random_sampling(n_screens)
        evaluations = [(seq, model.predict(convert_input_type(seq, "one_hot").reshape(1, -1))) for seq in sequences]
        sorted_evaluations = sorted(evaluations, key=lambda x: x[1], reverse=True)

        top_100_sequences = sorted_evaluations[:100]
        top_sequences = [seq for seq, score in top_100_sequences]
        top_scores = [score[0] for seq, score in top_100_sequences]
        return top_sequences, top_scores

    def get_best_batch(self, sample_for_model=1000, batch_size=1000, method="SSWM"):
        batch = []
        model = self.get_model(sample_for_model)
        j=1
        while j < batch_size:
            seq = random_sampling(1)[0]
            if method == "gradient":
                optimized_seq, best_y_pred = gradient_descent_optimization(seq, model, self.loss_func)
            elif method == "SSWM":
                optimized_seq, best_y_pred = SSWM_optimization(seq, model, self.loss_func)
            if optimized_seq not in [item[0] for item in batch]:
                batch.append((optimized_seq, best_y_pred))
            j = j+1
        sequences = [item[0] for item in batch]
        expression_levels = np.array([item[1][0] for item in batch])
        return sequences, expression_levels




In [86]:
with open('initial_LHS_2000.txt', 'r') as f:
    initial_sampling=json.load(f)

In [87]:
# Sample sizes to iterate over
sample_size = [1000, 1100, 1200, 1300]
nk_model = NKModel('NK0.csv')


# Loop through sample sizes and collect data
for i in tqdm(sample_size):
    sequences, expression_levels = nk_model.get_best_batch(sample_for_model=i, batch_size=100, method="gradient")
    real_expression_levels = nk_model.NK_surrogate(sequences)
    dataframe_sequences = pd.DataFrame()
    dataframe_sequences["sequence"+str(i)] = sequences
    dataframe_sequences["evaluation"+str(i)] = expression_levels
    dataframe_sequences["real"+str(i)] = real_expression_levels
    print(np.mean(real_expression_levels))
    dataframe_sequences.to_csv('NK0_gd_'+str(i)+'lhs.csv', index=False)
    


 25%|██▌       | 1/4 [01:11<03:35, 71.92s/it]

0.651414283


 50%|█████     | 2/4 [02:36<02:38, 79.36s/it]

0.6135825030425532


 75%|███████▌  | 3/4 [04:33<01:36, 96.52s/it]

0.6324833867254902


100%|██████████| 4/4 [06:29<00:00, 97.47s/it] 

0.6045631339302326





In [88]:
# Sample sizes to iterate over
sample_size = [1000, 1100, 1200, 1300]
nk_model = NKModel('NK1.csv')


# Loop through sample sizes and collect data
for i in tqdm(sample_size):
    sequences, expression_levels = nk_model.get_best_batch(sample_for_model=i, batch_size=100, method="gradient")
    real_expression_levels = nk_model.NK_surrogate(sequences)
    dataframe_sequences = pd.DataFrame()
    dataframe_sequences["sequence"+str(i)] = sequences
    dataframe_sequences["evaluation"+str(i)] = expression_levels
    dataframe_sequences["real"+str(i)] = real_expression_levels
    print(np.mean(real_expression_levels))
    
    dataframe_sequences.to_csv('NK1_gd_'+str(i)+'lhs.csv', index=False)

 25%|██▌       | 1/4 [01:22<04:07, 82.64s/it]

0.5882566535833332


 50%|█████     | 2/4 [02:47<02:47, 83.97s/it]

0.5969569039814814


 75%|███████▌  | 3/4 [04:10<01:23, 83.60s/it]

0.5909328331607143


100%|██████████| 4/4 [05:25<00:00, 81.30s/it]

0.6774279239444445





In [89]:
# Sample sizes to iterate over
sample_size = [1000, 1100, 1200, 1300]
nk_model = NKModel('NK2.csv')


# Loop through sample sizes and collect data
for i in tqdm(sample_size):
    sequences, expression_levels = nk_model.get_best_batch(sample_for_model=i, batch_size=100, method="gradient")
    real_expression_levels = nk_model.NK_surrogate(sequences)
    dataframe_sequences = pd.DataFrame()
    dataframe_sequences["sequence"+str(i)] = sequences
    dataframe_sequences["evaluation"+str(i)] = expression_levels
    dataframe_sequences["real"+str(i)] = real_expression_levels
    print(np.mean(real_expression_levels))
    
    dataframe_sequences.to_csv('NK2_gd_'+str(i)+'lhs.csv', index=False)

 25%|██▌       | 1/4 [01:24<04:14, 84.98s/it]

0.49042211283529413


 50%|█████     | 2/4 [02:49<02:49, 84.69s/it]

0.48231086713698634


 75%|███████▌  | 3/4 [04:13<01:24, 84.52s/it]

0.5257241310684931


100%|██████████| 4/4 [05:28<00:00, 82.08s/it]

0.5546363916777779





In [90]:
# Sample sizes to iterate over
sample_size = [1000, 1100, 1200, 1300]
nk_model = NKModel('NK3.csv')


# Loop through sample sizes and collect data
for i in tqdm(sample_size):
    sequences, expression_levels = nk_model.get_best_batch(sample_for_model=i, batch_size=100, method="gradient")
    real_expression_levels = nk_model.NK_surrogate(sequences)
    dataframe_sequences = pd.DataFrame()
    dataframe_sequences["sequence"+str(i)] = sequences
    dataframe_sequences["evaluation"+str(i)] = expression_levels
    dataframe_sequences["real"+str(i)] = real_expression_levels
    print(np.mean(real_expression_levels))
    
    dataframe_sequences.to_csv('NK3_gd_'+str(i)+'lhs.csv', index=False)

 25%|██▌       | 1/4 [01:24<04:14, 84.90s/it]

0.5280128250967742


 50%|█████     | 2/4 [02:48<02:48, 84.24s/it]

0.52778175975


 75%|███████▌  | 3/4 [04:11<01:23, 83.63s/it]

0.5207579315357143


100%|██████████| 4/4 [05:37<00:00, 84.26s/it]

0.5219012153258427





In [34]:
sample_size = [1000, 1100, 1200, 1300]
nk_model = NKModel('NK0.csv')
dataframe_sequences = pd.DataFrame()

for i in sample_size:
    sequences, expression_levels = nk_model.random_screening(sample_for_model=i)
    real_expression_levels = nk_model.NK_surrogate(sequences)
    dataframe_sequences["sequence"+str(i)] = sequences
    dataframe_sequences["evaluation"+str(i)] = expression_levels
    dataframe_sequences["real"+str(i)] = real_expression_levels
    print(np.mean(real_expression_levels))
dataframe_sequences.to_csv('NK0_random_rd1.csv', index=False)

0.6655472177900001
0.6826985334099999
0.6857966013699999
0.6798443576200001


In [35]:
sample_size = [1000, 1100, 1200, 1300]
nk_model = NKModel('NK1.csv')
dataframe_sequences = pd.DataFrame()

for i in sample_size:
    sequences, expression_levels = nk_model.random_screening(sample_for_model=i)
    real_expression_levels = nk_model.NK_surrogate(sequences)
    dataframe_sequences["sequence"+str(i)] = sequences
    dataframe_sequences["evaluation"+str(i)] = expression_levels
    dataframe_sequences["real"+str(i)] = real_expression_levels
    print(np.mean(real_expression_levels))
dataframe_sequences.to_csv('NK1_random_rd1.csv', index=False)

0.64034856123
0.71114601482
0.7224730678
0.7233445157299999


In [36]:
sample_size = [1000, 1100, 1200, 1300]
nk_model = NKModel('NK2.csv')
dataframe_sequences = pd.DataFrame()

for i in sample_size:
    sequences, expression_levels = nk_model.random_screening(sample_for_model=i)
    real_expression_levels = nk_model.NK_surrogate(sequences)
    dataframe_sequences["sequence"+str(i)] = sequences
    dataframe_sequences["evaluation"+str(i)] = expression_levels
    dataframe_sequences["real"+str(i)] = real_expression_levels
    print(np.mean(real_expression_levels))
dataframe_sequences.to_csv('NK2_random_rd1.csv', index=False)

0.56615744338
0.6043814477599999
0.5993531395399999
0.60061798551


In [37]:
sample_size = [1000, 1100, 1200, 1300]
nk_model = NKModel('NK3.csv')
dataframe_sequences = pd.DataFrame()

for i in sample_size:
    sequences, expression_levels = nk_model.random_screening(sample_for_model=i)
    real_expression_levels = nk_model.NK_surrogate(sequences)
    dataframe_sequences["sequence"+str(i)] = sequences
    dataframe_sequences["evaluation"+str(i)] = expression_levels
    dataframe_sequences["real"+str(i)] = real_expression_levels
    print(np.mean(real_expression_levels))
dataframe_sequences.to_csv('NK3_random_rd1.csv', index=False)

0.55733073215
0.5544379528400001
0.54615851903
0.55444800694
