In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import os
import tensorflow as tf

from sklearn.metrics import r2_score
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, Flatten
from tensorflow.keras.layers import Conv2D, MaxPooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping

from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_absolute_error

In [None]:
# onehot encoding script for TESR sequence data set
def onehot(seq):
        module = np.array([[1,0,0,0],[0,1,0,0],[0,0,1,0],[0,0,0,1]])
        i = 0
        onehot_result = []
        while i < len(seq):
            seqlist = []
            for base in seq[i]:
                if base == 'a' or base == 'A':
                    seqlist.append(module[0])
                elif base == 't' or base == 'T':
                    seqlist.append(module[1])
                elif base == 'g' or base == 'G':
                    seqlist.append(module[2])
                elif base == 'c' or base == 'C':
                    seqlist.append(module[3])
                else:
                    seqlist.append([0,0,0,0])
            onehot_result.append(seqlist)
            i = i + 1
        result = np.zeros((len(seq),9,1,4))
        result = np.float32(result)
        i = 0
        while i < len(seq):
            j = 0
            while j < len(seq[0]):
                result[i,j,0,:] = onehot_result[i][j]
                j = j + 1
            i = i + 1
        
        return result

# Matching the codon usages for each TESR sequence with codon usage data file
def usage_matcher(path, sheet_name, host):    
    usage_data = pd.read_excel("c:/Users/FMB/Documents/python/MLDL/CDSanalysis/codon_usage.xlsx", 
                               sheet_name = host)
    codon_list = usage_data['codon']
    usage = usage_data.set_index('codon').to_dict(orient = 'index')
    
    sequence = pd.read_excel(path, sheet_name = sheet_name)
    seq_list = sequence["sequence"]
    
    first_usage = []
    second_usage = []
    third_usage = []
    
    for i in range(len(seq_list)):
        first_codon = seq_list[i][0:3]
        second_codon = seq_list[i][3:6]
        third_codon = seq_list[i][6:9]
        first = usage[first_codon]['frequency']
        second = usage[second_codon]['frequency']
        third = usage[third_codon]['frequency']
        first_usage.append(first)
        second_usage.append(second)
        third_usage.append(third)
        
    df_first = pd.DataFrame(first_usage)
    df_second = pd.DataFrame(second_usage)
    df_third = pd.DataFrame(third_usage)
    
    result = pd.concat([sequence[["sequence", "score avg"]], df_first, df_second, df_third ],
                       axis = 1, ignore_index=True)
    
    return result

# codon usage min-max scaler
def Usage_MinMaxscaler(df):
    usage_1st = df['1st_usage'].values
    usage_1st = usage_1st.reshape(-1,1)
    
    usage_2nd = df['2nd_usage'].values
    usage_2nd = usage_2nd.reshape(-1,1)
    
    usage_3rd = df['3rd_usage'].values
    usage_3rd = usage_3rd.reshape(-1,1)
    
    scaler = MinMaxScaler()
    
    scaled_1 = pd.DataFrame(scaler.fit_transform(usage_1st))
    scaled_2 = pd.DataFrame(scaler.fit_transform(usage_2nd))
    scaled_3 = pd.DataFrame(scaler.fit_transform(usage_3rd))
    
    scaled_result = pd.concat([scaled_1, scaled_2, scaled_3], axis = 1)
    
    scaled_result.columns = ['1st_usage', '2nd_usage', '3rd_usage']
    
    return scaled_result

# Separate and process the feature and label data from dataframe of training dataset.
def Data_processing(df):
    
    sequence_array = onehot(df['sequence'])
    
    label = df['score avg']
    
    usage_df = df[['1st_usage', '2nd_usage', '3rd_usage']]
    
    usage_array = Usage_MinMaxscaler(usage_df)
    
    return sequence_array, label, usage_array


def evaluate_model(trainX_seq, trainy, testX_seq, testy):
    
    input_seq = keras.Input(shape = (9,1,4,), name = 'X_train_seq_hot')
    y = keras.layers.Conv2D(128, (3, 1), activation = 'relu', padding = 'same')(input_seq)
    y = keras.layers.Conv2D(128, (3, 1), activation = 'relu', padding = 'same')(y)
    y = keras.layers.MaxPooling2D(pool_size = (4,1))(y)
    y = keras.layers.Conv2D(256, (2, 1), activation = 'relu', padding = 'same')(y)
    y = keras.layers.Conv2D(256, (2, 1), activation = 'relu', padding = 'same')(y)
    y = keras.layers.MaxPooling2D(pool_size = (2,1))(y)
    y = keras.layers.Flatten()(y)
    output_seq = keras.layers.Dense(9, activation = 'relu')(y)
    
    z = keras.layers.Dense(1024, activation = 'relu')(output_seq)
    z = keras.layers.Dense(512, activation = 'relu')(z)
    z = keras.layers.Dense(256, activation = 'relu')(z)
    z = keras.layers.BatchNormalization()(z)
    z = keras.layers.Dense(64, activation = 'relu')(z)
    z = keras.layers.Dense(16, activation = 'relu')(z)
    z = keras.layers.Dense(1, activation = 'linear')(z)
    
    
    model = keras.Model(inputs = input_seq, outputs = z)
    model.compile(loss = 'mean_absolute_error', optimizer = 'adam', metrics = ['mean_absolute_error'])
    
    MODEL_DIR = './model_Func/'
    if not os.path.exists(MODEL_DIR):
        os.mkdir(MODEL_DIR)
    
    model_save_path = "./model/{epoch:02d}-{val_loss:.4f}.hdf5"
    checkpointer = ModelCheckpoint(filepath=model_save_path, monitor='val_loss', 
                                   verbose=0, save_best_only=True)
    early_stopping_callback = EarlyStopping(monitor = 'val_loss', patience=10)
    
    print("Training model...")

    history = model.fit(trainX_seq, trainy,
                        epochs = 100, batch_size = 64, validation_split = 0.9,
                        verbose = 0, callbacks = [early_stopping_callback,checkpointer])
    
    test_MAE, _ = model.evaluate(testX_seq, testy, verbose=0)

    return model, test_MAE

# Ensemble prediction with model set
def ensemble_predictions(model_set, testX_seq):
    
    yhats = [model.predict(testX_seq) for model in model_set]
    yhats = np.array(yhats)
    
    result = np.sum(yhats, axis=0) / len(model_set)
    return result

# Partial evaluation of ensemble set
def partial_evaluation(model_set, number, testX_seq, testy):
    
    model_subset = model_set[:number]
    
    yhat = ensemble_predictions(model_subset,testX_seq)
    
    return mean_absolute_error(testy, yhat)

In [None]:
# Loading and processing training data

Total_dataset = usage_matcher("C:/Users/FMB/Documents/python/MLDL/Ecoli codon variant transformer.xlsx",
                             "score 1", "Ecoli")
Total_dataset.columns = ['sequence', 'score avg', '1st_usage', '2nd_usage', '3rd_usage']

Total_array, Total_label, Total_usage = Data_processing(Total_dataset)

print(Total_dataset.head())
print(Total_array.shape)
print(Total_label.head())
print(Total_usage.head())

In [None]:
# Train - Test data split for model training(train : test = 9 : 1)
trainX_seq, testX_seq, trainX_usage, testX_usage, trainy, testy = train_test_split(Total_array, 
                                                                                   Total_usage, Total_label, test_size = 0.1)

In [None]:
# Training of single model with same architecture of evaluate_model function

input_seq = keras.Input(shape = (9,1,4,), name = 'X_train_seq_hot')
y = keras.layers.Conv2D(128, (3, 1), activation = 'relu', padding = 'same')(input_seq)
y = keras.layers.Conv2D(128, (3, 1), activation = 'relu', padding = 'same')(y)
y = keras.layers.MaxPooling2D(pool_size = (4,1))(y)
y = keras.layers.Conv2D(256, (2, 1), activation = 'relu', padding = 'same')(y)
y = keras.layers.Conv2D(256, (2, 1), activation = 'relu', padding = 'same')(y)
y = keras.layers.MaxPooling2D(pool_size = (2,1))(y)
y = keras.layers.Flatten()(y)
output_seq = keras.layers.Dense(9, activation = 'relu')(y)

z = keras.layers.Dense(1024, activation = 'relu')(output_seq)
z = keras.layers.Dense(512, activation = 'relu')(z)
z = keras.layers.Dense(256, activation = 'relu')(z)
z = keras.layers.BatchNormalization()(z)
z = keras.layers.Dense(64, activation = 'relu')(z)
z = keras.layers.Dense(16, activation = 'relu')(z)
z = keras.layers.Dense(1, activation = 'linear')(z)


model = keras.Model(inputs = input_seq, outputs = z, name = 'sDeepTESR')
model.compile(loss = 'mean_absolute_error', optimizer = 'adam', metrics = ['mean_absolute_error'])
model.summary()

MODEL_DIR = './model_Func/'
if not os.path.exists(MODEL_DIR):
    os.mkdir(MODEL_DIR)
    
modelpath = "./model/{epoch:02d}-{val_loss:.4f}.hdf5"
checkpointer = ModelCheckpoint(filepath=modelpath, monitor='val_loss', verbose=1, save_best_only=True)
early_stopping_callback = EarlyStopping(monitor = 'val_loss', patience=10)

print("Training model...")

history = model.fit(trainX_seq, trainy, 
                    epochs = 100, batch_size = 64, validation_split = 0.9,
                    verbose = 1, callbacks = [early_stopping_callback,checkpointer])
y_vloss = history.history['val_loss']
y_err = history.history['mean_absolute_error']
x_len = np.arange(len(y_err))

In [None]:
# Visualization of training result from sDeepTESR

plt.figure(figsize = (7,7), dpi = 300)
plt.plot(x_len[:20], y_vloss[:20], "o", linestyle = "solid", c="red", markersize=0, label = "validation loss")
plt.plot(x_len[:20], y_err[:20], "o", linestyle = "solid", c="blue", markersize=0, label = "training loss")
plt.legend(loc = (0.55, 0.83), fontsize = 15)
plt.title("sDeepTESR training result", fontsize = 15)
plt.xlabel("Epochs", fontsize = 15)
plt.xticks([0, 5, 10, 15, 20])
plt.ylabel("Mean absolute error", fontsize = 15)
plt.savefig("C:/Users/FMB/Desktop/Training_result_only_seq_TESR.jpg")
plt.show()

In [None]:
# Visulization of test result from sDeepTESR
Test_Predict = model.predict(testX_seq)
test_loss, test_acc = model.evaluate(testX_seq, testy)

plt.figure(figsize = (5,5), dpi = 300)
plt.scatter(testy, Test_Predict, alpha = 0.03)
plt.title("GFP score(Test set) - TESR score(Prediction)", fontsize = 10)
plt.xlabel("GFP score(Test set)", fontsize = 10)
plt.ylabel("TESR score(Prediction)", fontsize = 10)
plt.xlim(1.0, 5.0)
plt.xticks([1, 2, 3, 4, 5])
plt.ylim(1.0, 5.0)
plt.yticks([1, 2, 3, 4, 5])
plt.savefig("C:/Users/FMB/Desktop/GFP_TESR_sDeepTESR_only_seq.jpg")
plt.show()

print("r2_score = ", r2_score(testy, Test_Predict))

In [None]:
# Bagging ensemble training for eDeepTESR

n_splits = 20
scores, trained_models = list(), list()

for _ in range(n_splits):
    print("train-test split...")
    train_ens_usage, test_ens_usage, train_ens_seq, test_ens_seq, train_ens_y, test_ens_y = train_test_split(trainX_usage, trainX_seq, trainy, 
                                                                                                             test_size = 0.2)
    # evaluate model
    model, test_MAE = evaluate_model(train_ens_usage, train_ens_seq, train_ens_y, 
                                     test_ens_usage, test_ens_seq, test_ens_y)
    print('Mean_absolute_error=%.5f' % test_MAE)
    scores.append(test_MAE)
    trained_models.append(model)
    
# Evaluation of eDeepTESR

evaluation_result = list()

for i in range(1, n_splits+1):
    n_models_evaluation = partial_evaluation(trained_models, i, testX_usage, testX_seq, testy)
    print('> %d: ensemble=%.5f' % (i, n_models_evaluation))
    evaluation_result.append(n_models_evaluation)

In [None]:
# Load the best model

from keras.models import load_model

best_models = []

for i in range(20):
    model_load = load_model('C:/Users/FMB/Documents/python/MLDL/Short_translational_ramp_DL/model_Func_ensemble/Ensemble_reg_set_5(n=20_filter0.5_80000)/model_set_%d.hdf5'%i)
    best_models.append(model_load)

In [None]:
# Evaluation of eDeepTESR

evaluation_result = list()

for i in range(1, len(best_models)+1):
    n_models_evaluation = partial_evaluation(best_models, i, testX_usage, testX_seq, testy)
    print('> %d: ensemble=%.5f' % (i, n_models_evaluation))
    evaluation_result.append(n_models_evaluation)


# Visualizatino of results from eDeepTESR evaulation

x_axis = [i for i in range(1, len(best_models)+1)]
plt.figure(figsize = (8,8), dpi = 300)
plt.plot(x_axis, evaluation_result)
plt.title("Evaluation of eDeepTESR", fontsize = 15)
plt.xlabel("Number of model training", fontsize = 10)
plt.xticks([0 , 5, 10 , 15, 20])
plt.ylabel("Mean absolute error", fontsize = 10)
plt.xlim(0, len(best_models)+1)
plt.show()

In [None]:
# Visulization of test result from eDeepTESR
best_test_Predict = ensemble_predictions(best_models, testX_usage, testX_seq)
best_test_loss = partial_evaluation(best_models, 20, testX_usage, testX_seq, testy)

plt.figure(figsize = (5,5), dpi = 300)
plt.scatter(testy, best_test_Predict, alpha = 0.03)
plt.title("GFP score(Test set) - TESR score(Prediction)", fontsize = 10)
plt.xlabel("GFP score(Test set)", fontsize = 10)
plt.xlim(1.0, 5.0)
plt.xticks([1, 2, 3, 4, 5])
plt.ylim(1.0, 5.0)
plt.yticks([1, 2, 3, 4, 5])
plt.ylabel("TESR score(Prediction)", fontsize = 10)
plt.show()

print("Mean absolute error = ", best_test_loss)
print("r2_score = ", r2_score(testy, best_test_Predict))