In [41]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.base import BaseEstimator, RegressorMixin

from sklearn.metrics import make_scorer, mean_squared_error
import numpy as np
import pandas as pd
from skorch import NeuralNetRegressor
from sklearn.preprocessing import StandardScaler

from torch.utils.data import DataLoader, TensorDataset

import matplotlib.pyplot as plt

import captum.attr as attr
import seaborn as sns

from scipy.stats import mannwhitneyu
from sklearn.metrics import roc_auc_score

from sklearn.metrics import mean_absolute_error
import shap

from sklearn.metrics import make_scorer, mean_squared_error

from sklearn.metrics import r2_score

In [42]:
def filter_by_variance(data_name: pd.DataFrame, object: str, n: float) -> pd.DataFrame:
    # exclude object
    df_excluding_object = data_name.drop(columns=[object])
    # calculate var
    variance_values = df_excluding_object.var(axis=0)
    # calculate threshold
    threshold = np.percentile(variance_values, 100 - n)
    # select columns
    top_n_columns = variance_values[variance_values >= threshold].index.tolist()
    # create df
    new_df = data_name[top_n_columns + [object]]
    return new_df

In [43]:
class KernelSVRwithNN(nn.Module):
    class EarlyStopping:
        def __init__(self, patience=5, verbose=False):
            self.patience = patience    
            self.verbose = verbose     
            self.counter = 0            
            self.best_score = None     
            self.early_stop = False    
            self.val_loss_min = np.Inf   
        
        def __call__(self, val_loss, model):
            score = -val_loss

            if self.best_score is None: 
                self.best_score = score   
                self.checkpoint(val_loss, model)  
            elif score < self.best_score: 
                self.counter += 1   
                if self.verbose:  
                    print(f'EarlyStopping counter: {self.counter} out of {self.patience}')   
                if self.counter >= self.patience: 
                    self.early_stop = True
            else:  
                self.best_score = score  
                self.checkpoint(val_loss, model)  
                self.counter = 0  

        def checkpoint(self, val_loss, model):
            
            if self.verbose:  
                print(f'Validation loss decreased ({self.val_loss_min:.6f} --> {val_loss:.6f}).  Saving model ...')
            self.val_loss_min = val_loss  
    def __init__(self, input_features, nn_layer_size, kernel_svr_features, 
                 train_data_x, epsilon,epochs,lr,l2_reg,kernel='rbf', gamma_init=1.0, train_gamma=True,):
        super(KernelSVRwithNN, self).__init__()
        self.l2_reg = l2_reg
        # def NN
        
        self.nn_layers = nn.Sequential(
            nn.Linear(input_features, nn_layer_size),
            nn.BatchNorm1d(nn_layer_size),
            nn.ReLU(),
            
            nn.Linear(nn_layer_size, kernel_svr_features)
        )

        # parameter Kernel SVR
        self.kernel = kernel
        self.train_data_x = train_data_x
        if kernel == 'rbf':
            self.gamma = torch.nn.Parameter(torch.FloatTensor([gamma_init]),
                                         requires_grad=train_gamma)
        else:
            self.gamma = None  #linear

        self.w = torch.nn.Linear(in_features=train_data_x.size(0) if kernel == 'rbf' else kernel_svr_features, out_features=1)
        self.b = torch.nn.Parameter(torch.zeros(1))

    def rbf(self, x):
        y = self.nn_layers(self.train_data_x).repeat(x.size(0), 1, 1)  # embedding train_data_x via NN
        return torch.exp(-self.gamma * ((x[:, None, :] - y) ** 2).sum(dim=2))


    def linear(self, x):
        return x

    def kernel_func(self, x):
        if self.kernel == 'rbf':
            return self.rbf(x)
        elif self.kernel == 'linear':
            return self.linear(x)
        else:
            assert False, "Invalid kernel type"

    def forward(self, x):
        # NN
        
        x = self.nn_layers(x)

        # Kernel SVR
        x = self.kernel_func(x)
        x = self.w(x) + self.b
        return x

    def fit(self, X, y, epsilon, lr, epochs,batch_size):
        # epsilon-insensitive loss
        earlystopping = self.EarlyStopping(patience=3, verbose=False)
        loss_fn = lambda x, y: self.epsilon_loss(x, y, epsilon)
        optimizer = torch.optim.RAdam(self.parameters(), lr=lr)

        self.train() #train
        epoch_loss_lis = []
        for epoch in range(epochs):
            running_loss=0.0
            loss_lis = []
            for batch_idx, (inputs, targets) in enumerate(DataLoader(TensorDataset(X, y), batch_size, shuffle=True)):
                optimizer.zero_grad()
                outputs = self(inputs)
                loss = loss_fn(outputs, targets)
                l2_loss = 0.0
                for param in self.w.parameters():  # regularize self.w(SVR layer)
                    l2_loss += torch.norm(param, 2)**2  # calculate L2norm
                loss += self.l2_reg * l2_loss
                loss_lis.append(loss.item())
                loss.backward()
                optimizer.step()
                running_loss+=loss.item()
            earlystopping((running_loss / batch_idx), self)
            if earlystopping.early_stop: #if flag==True, break the for loop
                break
            avg_loss = sum(loss_lis) / len(loss_lis)
            epoch_loss_lis.append(avg_loss)

        self.eval()  # eval
    def predict(self, X):
        return self.forward(X)

    def epsilon_loss(self, x, y, epsilon):
        return torch.mean(torch.maximum(torch.abs(x - y) - epsilon, torch.zeros_like(y)))

In [44]:
def read_cells(path_data,medicine_name):
    df=pd.read_csv(path_data,index_col='rownames')
    filtered_df=df[df[medicine_name].notna()]
    if medicine_name in filtered_df.columns:
        filtered_df = filtered_df.copy()
    return filtered_df

In [45]:
if torch.cuda.is_available():
    d_type = "cuda:0"
else:
    d_type = "cpu"
#2
device = torch.device(d_type)

In [46]:
drug_df=read_cells("../../Data/Bortezomib/bortezomib_train_cell.csv",'Bortezomib')
#drug_df=read_cells("../../../SVM_1/resource/bortezomib/bortezomib_cells.csv",'Bortezomib')
drug_df_var=filter_by_variance(drug_df,"Bortezomib", 100)

In [47]:
x_ = drug_df_var.drop(columns=['Bortezomib'])
y = drug_df_var[['Bortezomib']]
row=x_.shape[1]
scaler = StandardScaler()
x = scaler.fit_transform(x_)
y=y.to_numpy().astype("float32")

In [49]:
x_train = torch.tensor(x, dtype=torch.float32).to(device)
y_train = torch.tensor(y, dtype=torch.float32).to(device)

In [51]:
test_df=read_cells("../../Data/Bortezomib/bortezomib_test_cell.csv",'Bortezomib')
X_test_original = test_df[drug_df_var.drop("Bortezomib", axis=1).columns] #exclude feature 
X_test = scaler.transform(X_test_original) # StandardScaler
y_test = test_df["Bortezomib"].values
X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)

In [111]:
nn_layers= 1
nn_layer_size=256
kernel_svr_features=16
kernel='rbf'
epochs=10000
epsilon= 0.1
lr=0.0001
batch_size=8
l2_reg=10

In [117]:
model_ = KernelSVRwithNN(row, nn_layer_size, kernel_svr_features,x_train,epsilon,epochs,lr,l2_reg)
model=model_.to(device)

In [118]:
model.fit(x_train, y_train, epsilon, lr, epochs,batch_size)

In [119]:
# predict
with torch.no_grad():
    y_pred_tensor = model(X_test_tensor)
    y_pred=y_pred_tensor.to('cpu').numpy().flatten()

In [120]:
mae = mean_absolute_error(y_test, y_pred)
mse=mean_squared_error(y_test, y_pred)
print(f"MAE: {mae}")
print(f"MSE: {mse}")

MAE: 0.7245645652744584
MSE: 0.9026066014273392


In [121]:
R2=r2_score(y_test, y_pred)
R2

0.06733340155842071