In [1]:
import numpy as np
import pickle
import pandas as pd
import os
from os.path import join
import warnings
warnings.filterwarnings("ignore")
from sklearn.metrics import r2_score
from sklearn.linear_model import ElasticNet, LinearRegression
from sklearn.ensemble import RandomForestRegressor
from sklearn import preprocessing
from scipy import stats
import xgboost as xgb
from hyperopt import fmin, tpe, rand, hp, Trials

from tensorflow.keras import regularizers, initializers, optimizers, models, layers
from tensorflow.keras.losses import MSE
from tensorflow.keras.activations import relu

import matplotlib.pyplot as plt
from matplotlib.pyplot import figure
import matplotlib as mpl
plt.style.use('CCB_plot_style_0v4.mplstyle')
c_styles      = mpl.rcParams['axes.prop_cycle'].by_key()['color']   # fetch the defined color styles
high_contrast = ['#004488', '#DDAA33', '#BB5566', '#000000']


Bad key text.latex.preview in file CCB_plot_style_0v4.mplstyle, line 55 ('text.latex.preview  : False')
You probably need to get an updated matplotlibrc file from
https://github.com/matplotlib/matplotlib/blob/v3.5.3/matplotlibrc.template
or from the matplotlib source distribution

Bad key mathtext.fallback_to_cm in file CCB_plot_style_0v4.mplstyle, line 63 ('mathtext.fallback_to_cm : True ## When True, use symbols from the Computer Modern fonts')
You probably need to get an updated matplotlibrc file from
https://github.com/matplotlib/matplotlib/blob/v3.5.3/matplotlibrc.template
or from the matplotlib source distribution


## Loading training and test data:

In [2]:
data_train = pd.read_pickle(join("..", "..", "data", "kcat_data", "splits", "train_df_kcat.pkl"))
data_test = pd.read_pickle(join("..", "..", "data", "kcat_data", "splits", "test_df_kcat.pkl"))


data_train.rename(columns = {"geomean_kcat" :"log10_kcat"}, inplace = True)
data_test.rename(columns = {"geomean_kcat" :"log10_kcat"}, inplace = True)
len(data_train), len(data_test)

(3421, 850)

In [3]:
train_indices = list(np.load(join("..", "..", "data", "kcat_data", "splits", "CV_train_indices.npy"), allow_pickle = True))
test_indices = list(np.load(join("..", "..", "data", "kcat_data", "splits", "CV_test_indices.npy"), allow_pickle = True))

In [4]:
train_X = np.array(list(data_train["DRFP"]))
train_X = np.concatenate([train_X, np.array(list(data_train["ESM1b_ts"]))], axis = 1)
train_Y = np.array(list(data_train["log10_kcat"]))

test_X = np.array(list(data_test["DRFP"]))
test_X = np.concatenate([test_X, np.array(list(data_test["ESM1b_ts"]))], axis = 1)
test_Y = np.array(list(data_test["log10_kcat"]))

In [5]:
mean_y, std_y = np.mean(train_Y), np.std(train_Y)
train_Y = (train_Y-mean_y)/std_y
test_Y = (test_Y-mean_y)/std_y

scaler = preprocessing.StandardScaler().fit(train_X[:, 2048:])
train_X[:, 2048:] = scaler.transform(train_X[:, 2048:])
test_X[:, 2048:] = scaler.transform(test_X[:, 2048:])

## 1. Training and validation machine learning models

### (a) Linear Regression

#### (i) Performing hyperparameter optimization

In [6]:
def cross_validation_neg_r2_linear_regression(param):
    R2 = []
    for i in range(5):
        train_index, test_index  = train_indices[i], test_indices[i]

        reg = ElasticNet(alpha = param["alpha"], l1_ratio = param["l1_ratio"]).fit(train_X[train_index], train_Y[train_index])
        y_valid_pred = reg.predict(train_X[test_index])
        R2.append(r2_score(np.reshape(train_Y[test_index], (-1)),  y_valid_pred))
    return(-np.mean(R2))


#Defining search space for hyperparameter optimizationhp.uniform("reg_alpha", 0, 5)
space_linear_regression = {'alpha': hp.uniform('alpha', 0,5),
                            'l1_ratio': hp.uniform('l1_ratio', 0,1)}
    

In [7]:
'''trials = Trials()
best = fmin(fn = cross_validation_neg_r2_linear_regression, space = space_linear_regression,
            algo=rand.suggest, max_evals = 2000, trials=trials)''';

#### Best set of hyperparameters:

In [8]:
#param = trials.argmin

In [9]:
param = {'alpha': 0.3960857176137572, 'l1_ratio': 0.003735725013911728}

#### (ii) Training and validating the final model
Training the model and validating it on the test set:

In [10]:
test_Y = (test_Y+mean_y)*std_y

In [11]:
reg = ElasticNet(alpha = param["alpha"], l1_ratio = param["l1_ratio"]).fit(train_X, train_Y)
y_test_pred = reg.predict(test_X)
y_test_pred = (y_test_pred+mean_y)*std_y

MSE_dif_fp_test = np.mean(abs(np.reshape(test_Y, (-1)) - y_test_pred)**2)
R2_dif_fp_test = r2_score(np.reshape(test_Y, (-1)), y_test_pred)
Pearson = stats.pearsonr(np.reshape(test_Y, (-1)), y_test_pred)

print(np.round(Pearson[0],3) , np.round(MSE_dif_fp_test,3), np.round(R2_dif_fp_test,3))

0.542 1.014 0.293


### (b) Random forest

In [12]:
#create input matrices:
train_X = np.array(list(data_train["DRFP"]))
train_X = np.concatenate([train_X, np.array(list(data_train["ESM1b_ts"]))], axis = 1)
train_Y = np.array(list(data_train["log10_kcat"]))

test_X = np.array(list(data_test["DRFP"]))
test_X = np.concatenate([test_X, np.array(list(data_test["ESM1b_ts"]))], axis = 1)
test_Y = np.array(list(data_test["log10_kcat"]))


scaler = preprocessing.StandardScaler().fit(train_X)
train_X = scaler.transform(train_X)
test_X = scaler.transform(test_X)

In [13]:
def cross_validation_neg_r2_random_forest(param):
    R2 = []
    for i in range(5):
        train_index, test_index  = train_indices[i], test_indices[i]

        reg = RandomForestRegressor(max_depth = param["max_depth"],
                                    min_samples_leaf = param["min_samples_leaf"],
                                    n_estimators = param["n_estimators"]).fit(train_X[train_index], train_Y[train_index])
        y_valid_pred = reg.predict(train_X[test_index])
        R2.append(r2_score(np.reshape(train_Y[test_index], (-1)),  y_valid_pred))
    return(-np.mean(R2))

#Defining search space for hyperparameter optimization
space_random_forest = {'n_estimators': hp.choice('n_estimators', [50, 100, 200]),
                      'max_depth': hp.choice('max_depth', [5,6,7,8,9,10,11,12,13,14,15,16]),
                       'min_samples_leaf': hp.choice('min_samples_leaf', [1,2,5,10,20])}

In [14]:
'''trials = Trials()
best = fmin(fn = cross_validation_neg_r2_random_forest, space = space_random_forest,
            algo=rand.suggest, max_evals = 2000, trials=trials)''';

Best set of hyperparameters:

In [15]:
#trials.argmin

In [16]:
param = {'max_depth': 15, 'min_samples_leaf': 1, 'n_estimators': 100}

#### (ii) Training and validating the final model
Training the model and validating it on the test set:

In [17]:
reg = RandomForestRegressor(max_depth = param["max_depth"],
                                    min_samples_leaf = param["min_samples_leaf"],
                                    n_estimators = param["n_estimators"]).fit(train_X, train_Y)
y_test_pred = reg.predict(test_X)

MSE_dif_fp_test = np.mean(abs(np.reshape(test_Y, (-1)) - y_test_pred)**2)
R2_dif_fp_test = r2_score(np.reshape(test_Y, (-1)), y_test_pred)
Pearson = stats.pearsonr(np.reshape(test_Y, (-1)), y_test_pred)

print(np.round(Pearson[0],3) ,np.round(MSE_dif_fp_test,3), np.round(R2_dif_fp_test,3))

0.622 0.911 0.364


### (c) Neural Network

In [18]:
train_X = np.array(list(data_train["DRFP"]))
train_X = np.concatenate([train_X, np.array(list(data_train["ESM1b_ts"]))], axis = 1)
train_Y = np.array(list(data_train["log10_kcat"]))

test_X = np.array(list(data_test["DRFP"]))
test_X = np.concatenate([test_X, np.array(list(data_test["ESM1b_ts"]))], axis = 1)
test_Y = np.array(list(data_test["log10_kcat"]))

mean_y, std_y = np.mean(train_Y), np.std(train_Y)
train_Y = (train_Y-mean_y)/std_y
test_Y = (test_Y-mean_y)/std_y

scaler = preprocessing.StandardScaler().fit(train_X[:, 2048:])
train_X[:, 2048:] = scaler.transform(train_X[:, 2048:])
test_X[:, 2048:] = scaler.transform(test_X[:, 2048:])

In [19]:
def build_model(learning_rate=0.001, decay =10e-6, momentum=0.9, l2_parameter= 0.1, hidden_layer_size1 = 256,
               hidden_layer_size2 = 64, input_dim = 1024, third_layer = True): 
    model = models.Sequential()
    model.add(layers.Dense(units = hidden_layer_size1,
                           kernel_regularizer=regularizers.l2(l2_parameter),
                           kernel_initializer = initializers.TruncatedNormal(
                               mean=0.0, stddev= np.sqrt(2./ input_dim), seed=None),
                           activation='relu', input_shape=(input_dim,)))
    model.add(layers.BatchNormalization())
    model.add(layers.Dense(units= hidden_layer_size2,
                           kernel_regularizer=regularizers.l2(l2_parameter),
                           kernel_initializer = initializers.TruncatedNormal(
                               mean=0.0, stddev = np.sqrt(2./ hidden_layer_size1), seed=None),
                           activation='relu'))
    model.add(layers.BatchNormalization())
    if third_layer == True:
        model.add(layers.Dense(units= 16,
                               kernel_regularizer=regularizers.l2(l2_parameter),
                               kernel_initializer = initializers.TruncatedNormal(
                                   mean=0.0, stddev = np.sqrt(2./ hidden_layer_size2), seed=None),
                               activation='relu'))
        model.add(layers.BatchNormalization())
     
    model.add(layers.Dense(1, kernel_regularizer=regularizers.l2(l2_parameter),
                           kernel_initializer = initializers.TruncatedNormal(
                               mean=0.0, stddev = np.sqrt(2./ 16), seed=None)))
    model.compile(optimizer=optimizers.SGD(learning_rate=learning_rate,  momentum=momentum, nesterov=True),
                  loss='mse',  metrics=['mse'])
    return model



def cross_validation_neg_r2_fcnn(param):
    
    param["num_epochs"] = int(np.round(param["num_epochs"]))

    
    R2 = []
    for i in range(5):
        train_index, test_index  = train_indices[i], test_indices[i]
        model = build_model(input_dim = 1280+2048, 
                            learning_rate= param["learning_rate"],
                            decay = param["decay"],
                            momentum = param["momentum"], 
                            l2_parameter = param["l2_parameter"],
                            hidden_layer_size1 = param["hidden_layer_size1"],
                            hidden_layer_size2 = param["hidden_layer_size2"]) 

        model.fit(np.array(train_X[train_index]), np.array(train_Y[train_index]),
                            epochs = param["num_epochs"],
                            batch_size = param["batch_size"],
                            verbose=0)

        R2.append(r2_score( np.reshape(train_Y[test_index], (-1)),
                           model.predict(np.array(train_X[test_index])).reshape(-1) ))
    return(-np.mean(R2))

In [20]:
'''space = {"learning_rate": hp.uniform("learning_rate", 1e-6, 1e-2),
        "hidden_layer_size1": hp.choice("hidden_layer_size1", [256,128,64]),
        "hidden_layer_size2": hp.choice("hidden_layer_size2", [128,64,32]),
        "batch_size": hp.choice("batch_size", [8,16,32,64,96]),
        "decay": hp.uniform("decay", 1e-9, 1e-5),
        "l2_parameter": hp.uniform("l2_parameter", 0, 0.01),
        "momentum": hp.uniform("momentum", 0.1, 1),
        "num_epochs": hp.uniform("num_epochs", 20, 100)}
    
trials = Trials()
best = fmin(fn = cross_validation_neg_r2_fcnn, space = space, algo=rand.suggest, max_evals= 500, trials=trials)''';

In [21]:
param = {'batch_size': 96,
         'decay': 8.925865617547346e-06,
         'hidden_layer_size1': 128,
         'hidden_layer_size2': 64,
         'l2_parameter': 0.0033008915899278156,
         'learning_rate': 0.006808549614442447,
         'momentum': 0.9054104435951468,
         'num_epochs': 62.68663708309369}

In [26]:
model = build_model(input_dim = 1280+2048, 
                            learning_rate = param["learning_rate"],
                            decay = param["decay"],
                            momentum = param["momentum"], 
                            l2_parameter = param["l2_parameter"], 
                            hidden_layer_size1 = param["hidden_layer_size1"],
                            hidden_layer_size2 = param["hidden_layer_size2"]) 

model.fit(np.array(train_X), np.array(train_Y),
                    epochs = 50,# int(np.round(param["num_epochs"])),
                    batch_size = param["batch_size"],
                    verbose=1)

y_test_pred = model.predict(np.array(test_X))
r2_score(test_Y, y_test_pred.reshape(-1))

Epoch 1/50
Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50


0.3237192981547061

In [46]:
y_test_pred = (y_test_pred.reshape(-1) + mean_y)*std_y
test_Y = (test_Y + mean_y)*std_y

MSE_dif_fp_test = np.mean(abs(np.reshape(test_Y, (-1)) - y_test_pred.reshape(-1))**2)
R2_dif_fp_test = r2_score(np.reshape(test_Y, (-1)), y_test_pred.reshape(-1))
Pearson = stats.pearsonr(np.reshape(test_Y, (-1)), y_test_pred.reshape(-1))

print(np.round(Pearson[0],3) ,np.round(MSE_dif_fp_test,3), np.round(R2_dif_fp_test,3))

0.593 0.969 0.324
