# Libraries

In [None]:
import pandas as pd
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import models
from torchsummary import summary
import matplotlib.pyplot as plt
from sklearn.metrics import accuracy_score, confusion_matrix
from sklearn.ensemble import RandomForestClassifier
import seaborn as sns
from sklearn.model_selection import RandomizedSearchCV, cross_val_score, train_test_split, StratifiedKFold, KFold
from xgboost import XGBClassifier
from sklearn.manifold import SpectralEmbedding
from torch.utils.data import DataLoader, TensorDataset
import warnings
warnings.filterwarnings("ignore")

# Loading Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
# Read the CSV file into a DataFrame
df = pd.read_csv('/content/drive/MyDrive/Colab Notebooks/5Drugs_Python.csv')

In [None]:
df.head()

Unnamed: 0,subject_id,drug_label,standards_1,standards_2,standards_3,standards_4,standards_5,standards_6,standards_7,standards_8,...,deviants_8748,deviants_8749,deviants_8750,deviants_8751,deviants_8752,deviants_8753,deviants_8754,deviants_8755,deviants_8756,deviants_8757
0,101,Biperdine,-0.721011,0.905316,-0.268513,0.872179,-0.437147,0.258573,-0.155697,0.862296,...,1.632576,-0.022835,0.915061,-0.499025,0.667729,0.635308,-1.262281,-0.214111,1.228925,1.30234
1,102,Placebo,1.715061,1.055126,-0.86964,-0.454077,-0.749553,-0.32107,-0.750834,-0.725049,...,-0.09766,-1.25429,0.469221,-1.562479,-0.437813,-1.482711,-2.151189,-1.790298,-1.038511,-0.989457
2,103,Amisulpride,0.865764,1.049517,-0.069502,-0.472154,-0.650986,-0.775461,-0.49749,-1.040329,...,1.08465,-0.648157,0.922619,-1.046926,-0.600638,-1.221202,-0.168258,-1.239763,-1.819047,-1.004539
3,104,Biperdine,0.861297,0.965113,-0.145852,-0.03872,-0.31821,-0.373414,-0.713712,-0.593783,...,0.382564,2.847351,-0.711737,1.096458,-0.612047,-0.134576,-2.426027,-1.084767,-0.44618,-0.844795
4,105,Biperdine,-0.871517,0.12764,-0.010109,0.37332,-0.437701,-0.135828,-0.256461,-0.605408,...,0.034907,-1.714166,0.884205,-1.34448,-1.194164,-0.761948,-3.5415,-2.257122,-1.308842,-0.375088


In [None]:
def map_drug_to_number(drug_name):
    if drug_name == 'Amisulpride':
        return 2
    elif drug_name == 'Biperdine':
        return 1
    elif drug_name == 'Levodopa':
        return 3
    elif drug_name == 'Galantamine':
        return 4
    elif drug_name == 'Placebo':
        return 0
    else:
        return None

In [None]:
standards = df.iloc[:, 2:8759].values.reshape((149,63,139))
deviants = df.iloc[:, 8759:].values.reshape((149,63,139))
labels = df['drug_label'].apply(map_drug_to_number).values

# Model

In [None]:
class TNU(nn.Module):
    def __init__(self):
        super(TNU, self).__init__()

         # Channel - Wise Attention Layer
        self.r = 32
        self.W1 = nn.Parameter(torch.randn(63, self.r))
        self.b1 = nn.Parameter(torch.zeros(self.r))
        self.W2 = nn.Parameter(torch.randn(self.r, 63))
        self.b2 = nn.Parameter(torch.zeros(63))

        # Convolution Layer
        self.kernel_height = 63
        self.kernel_width = 45
        self.out_channels = 40
        self.in_channels = 1
        self.conv = nn.Conv2d(self.in_channels, self.out_channels, kernel_size=(self.kernel_height, self.kernel_width))
        self.bn = nn.BatchNorm2d(self.out_channels)

        # MaxPooling Layer
        self.pool_height = 1
        self.pool_width = 75
        self.pool_stride = 10
        self.max_pool = nn.MaxPool2d(kernel_size=(self.pool_height, self.pool_width), stride=(self.pool_stride, self.pool_stride))

        # Dropout Layer
        self.dropout = nn.Dropout(p=0.5)

        # LSTM Layer
        self.lstm_units = 139
        self.lstm = nn.LSTM(input_size=120, hidden_size=self.lstm_units, num_layers=2, dropout=0.5, batch_first=True)

        # LSTM - Attention Layer
        self.attention_size = 512
        self.W3 = nn.Parameter(torch.randn(self.lstm_units, self.attention_size))
        self.b3 = nn.Parameter(torch.randn(self.attention_size))
        self.W4 = nn.Parameter(torch.randn(self.attention_size, self.lstm_units))
        self.b4 = nn.Parameter(torch.randn(self.lstm_units))

        # Combined STD/DVT - Attention Layer
        self.W5 = nn.Parameter(torch.randn(self.lstm_units*2, self.attention_size))
        self.b5 = nn.Parameter(torch.randn(self.attention_size))
        self.W6 = nn.Parameter(torch.randn(self.attention_size, self.lstm_units*2))
        self.b6 = nn.Parameter(torch.randn(self.lstm_units*2))

        # Prediction Layer
        self.num_classes = 5
        self.softmax_weights = nn.Parameter(torch.randn(2 * self.lstm_units, self.num_classes))
        self.softmax_biases = nn.Parameter(torch.randn(self.num_classes))

    def forward(self, standards_input, deviants_input, labels=None):
        # Mean pooling layer for Standards/Deviants dataset
        standards_mean_pool = torch.mean(standards_input, dim=2)  # Shape: [batch_size, 63]
        deviants_mean_pool = torch.mean(deviants_input, dim=2)  # Shape: [batch_size, 63]

        # Fully connected dimensionality reduction layer for Standards/Deviants dataset
        standards_fc1 = torch.tanh(torch.matmul(standards_mean_pool, self.W1) + self.b1)  # Shape: [batch_size, r]
        deviants_fc1 = torch.tanh(torch.matmul(deviants_mean_pool, self.W1) + self.b1)  # Shape: [batch_size, r]
        standards_fc2 = torch.matmul(standards_fc1, self.W2) + self.b2  # Shape: [batch_size, 63]
        deviants_fc2 = torch.matmul(deviants_fc1, self.W2) + self.b2  # Shape: [batch_size, 63]
        
        # Softmax layer to transform importance of channels to probability distribution
        standards_softmax = F.softmax(standards_fc2, dim=1)  # Shape: [batch_size, 63]
        deviants_softmax = F.softmax(deviants_fc2, dim=1)  # Shape: [batch_size, 63]
        
        # Extend model to consider probability weights as recoding information
        standards_weighted_samples = standards_input * standards_softmax.unsqueeze(-1)  # Shape: [batch_size, 63, 139]
        deviants_weighted_samples = deviants_input * deviants_softmax.unsqueeze(-1)  # Shape: [batch_size, 63, 139]

        #Reshaping for CNN layers
        batch_size = standards_weighted_samples.shape[0]
        standards_weighted_samples_reshaped = torch.reshape(standards_weighted_samples, (batch_size, self.in_channels, 63, 139)) # Shape: [batch_size, 1, 63, 139]
        deviants_weighted_samples_reshaped = torch.reshape(deviants_weighted_samples, (batch_size, self.in_channels, 63, 139)) # Shape: [batch_size, 1, 63, 139]
        
        # Convolutional layer with ELU activation
        standards_conv = self.conv(standards_weighted_samples_reshaped) # Shape: [batch_size, 40, 1, 95]
        deviants_conv = self.conv(deviants_weighted_samples_reshaped) # Shape: [batch_size, 40, 1, 95]
        standards_conv = self.bn(standards_conv) # Shape: [batch_size, 40, 1, 95]
        deviants_conv = self.bn(deviants_conv) # Shape: [batch_size, 40, 1, 95]
        
        standards_conv_elu = F.elu(standards_conv)
        deviants_conv_elu = F.elu(deviants_conv)
        
        # Max-Pooling Layer
        standards_pool = self.max_pool(standards_conv_elu) # Shape [batch_size, 40, 1, 3]
        deviants_pool = self.max_pool(deviants_conv_elu) # Shape [batch_size, 40, 1, 3]

        # Flatten the pooled features
        std_pool_shape = standards_pool.size()
        dvt_pool_shape = deviants_pool.size()
        
        standards_flat = standards_pool.view(-1, std_pool_shape[1] * std_pool_shape[2] * std_pool_shape[3]) # Shape [batch_size, 120]
        deviants_flat = deviants_pool.view(-1, dvt_pool_shape[1] * dvt_pool_shape[2] * dvt_pool_shape[3]) # Shape [batch_size, 120]

        # Dropout Layer
        standards_fc_drop = self.dropout(standards_flat) # Shape [batch_size, 120]
        deviants_fc_drop = self.dropout(deviants_flat) # Shape [batch_size, 120]
        
        # Reshape the input for LSTM
        standards_reshaped = standards_fc_drop.view(-1, 1, standards_fc_drop.shape[1]) # Shape [batch_size, 1, 120]
        deviants_reshaped = deviants_fc_drop.view(-1, 1, standards_fc_drop.shape[1]) # Shape [batch_size, 1, 120]
        
        # LSTM layer
        standards_lstm, _ = self.lstm(standards_reshaped) # Shape [batch_size, 1, 139]
        deviants_lstm, _ = self.lstm(deviants_reshaped) # Shape [batch_size, 1, 139]

        # Flatten the LSTM output
        standards_lstm_flat = standards_lstm.view(-1, self.lstm_units) # Shape [batch_size, 139]
        deviants_lstm_flat = deviants_lstm.view(-1, self.lstm_units) # Shape [batch_size, 139]
        
        # Self-Attention Layer
        # Fully-Connected Layer with non-linear activation (Dimensionality Increasing)
        standards_fc3 = torch.tanh(torch.matmul(standards_lstm_flat, self.W3)+ self.b3) # Shape [batch_size, 512]
        deviants_fc3 = torch.tanh(torch.matmul(deviants_lstm_flat, self.W3)+ self.b3) # Shape [batch_size, 512]

        # Fully-Connected Layer (Dimensionality Reducing)
        standards_fc4 = torch.matmul(standards_fc3, self.W4) + self.b4 # Shape [batch_size, 139]
        deviants_fc4 = torch.matmul(deviants_fc3, self.W4) + self.b4 # Shape [batch_size, 139]

        # Softmax layer
        standards_softmax_2 = F.softmax(standards_fc4, dim=1)  # Shape [batch_size, 139]
        deviants_softmax_2 = F.softmax(deviants_fc4, dim=1)  # Shape [batch_size, 139]
        
        # Apply attention weights to LSTM output
        standards_att_applied = torch.mul(standards_lstm_flat, standards_softmax_2) # Shape [batch_size, 139]
        deviants_att_applied = torch.mul(deviants_lstm_flat, deviants_softmax_2) # Shape [batch_size, 139]

        # Dropout Layer
        standards_fc2_drop = self.dropout(standards_att_applied) # Shape [batch_size, 139]
        deviants_fc2_drop = self.dropout(deviants_att_applied) # Shape [batch_size, 139]

        # Concatenate the attention-weighted features
        combined_features = torch.cat([standards_fc2_drop, deviants_fc2_drop], dim=1) # Shape [batch_size, 278]
        
        # Calculate attention weights
        combined_fc5 = torch.tanh(torch.matmul(combined_features, self.W5) + self.b5) # Shape [batch_size, 278]
        combined_fc6 = torch.matmul(combined_fc5, self.W6) + self.b6 # Shape [batch_size, 512]
        combined_weights = F.softmax(combined_fc6, dim=1) # Shape [batch_size, 278]

        # Apply attention weights to the flattened LSTM output
        combined_weighted = combined_features * combined_weights # Shape [batch_size, 278]
        #print("combined_weighted  shape: ", combined_weighted.shape)

        # Calculate logits and predicted probabilities
        logits = torch.matmul(combined_weighted, self.softmax_weights) + self.softmax_biases
        y_prob = F.softmax(logits, dim=1) # Shape [batch_size, 5]

        return y_prob

In [None]:
# Set the device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Set hyperparameters
epochs = 200
batch_size = 11
learning_rate = 0.0001

# Datasets
standards = torch.Tensor(standards)
deviants = torch.Tensor(deviants)
labels = torch.Tensor(labels).long()

# Define the loss function and optimi
skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=seed)

# Convert the data to tensors
standards = torch.Tensor(standards)
deviants = torch.Tensor(deviants)
labels = torch.Tensor(labels)

# Perform the cross-validation
accuracy_history = [] #accuracy per fold
cross_val_accuracy = 0 #mean accuracy per fold

for fold, (train_index, test_index) in enumerate(skf.split(standards, labels)):

    model = TNU().to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    # Split the data into training and test sets
    standards_train, standards_test = standards[train_index], standards[test_index]
    deviants_train, deviants_test = deviants[train_index], deviants[test_index]
    labels_train, labels_test = labels[train_index], labels[test_index]

    # Create DataLoader for training and test sets
    train_dataset = TensorDataset(standards_train, deviants_train, labels_train)
    train_loader = DataLoader(train_dataset, batch_size=batch_size)
    test_dataset = TensorDataset(standards_test, deviants_test, labels_test)
    test_loader = DataLoader(test_dataset, batch_size=batch_size)
    
    for epoch in range(epochs):
        model.train()
        total_loss = 0

        for batch_standards, batch_deviants, batch_labels in train_loader:
            optimizer.zero_grad()

            batch_standards = batch_standards.to(device)
            batch_deviants = batch_deviants.to(device)
            batch_labels = batch_labels.to(device)

            outputs = model(batch_standards, batch_deviants)

            loss = criterion(outputs, batch_labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

        avg_loss = total_loss/len(train_loader)

    # Evaluation loop
    model.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for batch_standards, batch_deviants, batch_labels in test_loader:
            batch_standards = batch_standards.to(device)
            batch_deviants = batch_deviants.to(device)
            batch_labels = batch_labels.to(device)

            outputs = model(batch_standards, batch_deviants)
            _, predicted = torch.max(outputs.data, 1)

            total += batch_labels.size(0)
            correct += (predicted == batch_labels).sum().item()

    accuracy = correct / total
    print(f'Fold {fold + 1} accuracy: {accuracy}')
    accuracy_history.append(accuracy)

# Print cross-validation accuracy
cross_val_accuracy = np.mean(accuracy_history)
print(f"Cross-Validation Accuracy: {cross_val_accuracy}")

Cross-Validation Accuracy: 0.4904761904761905
