<a href="https://colab.research.google.com/github/Bobisreallyme/TestRepo/blob/main/Implemented_TransformerModel.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import pandas as pd
import numpy as np
import sqlite3
from torch.utils.data import Dataset
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt

You can remove this if you are not running in colab

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Import Data (change to your directory)

In [None]:
testing_embeddings = pd.read_pickle('/content/drive/MyDrive/SQLDATABASES/testing_embeddings_0612024_scaled_time.pkl')
training_embeddings = pd.read_pickle('/content/drive/MyDrive/SQLDATABASES/training_embeddings_0612024_scaled_time.pkl')


Create Dataset Object

In [None]:
#CREATE DATASET AND LOAD INTO BATCHES
class Embedded(Dataset):
    def __init__(self, df):
        self.labels = df['label'].values
        self.embeddings = np.stack(df['Embeddings'].values)
        self.mask = df['Mask'].values
    def __getitem__(self, index):
        return torch.tensor(self.embeddings[index], dtype=torch.float32), torch.tensor(self.labels[index], dtype=torch.long), torch.tensor(self.mask[index]==0,dtype=torch.bool)
    def __len__(self):
        return len(self.embeddings)

dataset_test = Embedded(testing_embeddings)
dataset_training = Embedded(training_embeddings)


Define the Model. You can change the number of attention


In [None]:
import torch
from torch import nn, optim
from torch.utils.data import DataLoader

#DEFINE MODEL
#
class AddCLS(nn.Module):
    def __init__(self, dim_data):
        super(AddCLS, self).__init__()
        self.cls_token = nn.Parameter(torch.randn(1, 1, dim_data))
    def forward(self, x):
        batch_size = x.shape[0]
        cls_token = self.cls_token.expand(batch_size, -1, -1)
        x = torch.cat((cls_token, x), dim=1)
        return x

class DimensionalityReduction(nn.Module):
    def __init__(self, dim_data, dim_latent):
        super(DimensionalityReduction, self).__init__()
        self.dim_red = nn.Linear(dim_data, dim_latent)
    def forward(self, x):
        return self.dim_red(x)

class MultiheadAttentionBlock(nn.Module):
    def __init__(self, dim_latent, dim_latent_2, n_heads):
        super(MultiheadAttentionBlock, self).__init__()
        self.multihead_attn = nn.MultiheadAttention(dim_latent, n_heads, batch_first=True)
        self.layer_norm1 = nn.LayerNorm(dim_latent)
        self.layer_norm2 = nn.LayerNorm(dim_latent)
        self.linear_1 = nn.Linear(dim_latent, dim_latent_2)
        self.linear_2 = nn.Linear(dim_latent_2, dim_latent)
        self.dropout = nn.Dropout(0.1)
        self.relu = nn.ReLU()

    def forward(self, x, mask):
        attn_output, _ = self.multihead_attn(x, x, x, key_padding_mask = mask)
        x = x + attn_output
        x = self.layer_norm1(x)
        #x = self.dropout(x)

        ff_output = self.relu(self.linear_1(x))
        ff_output = self.linear_2(ff_output)
        x = x + ff_output
        x = self.layer_norm2(x)

        return x

class Classification(nn.Module):
    def __init__(self, dim_latent):
        super(Classification, self).__init__()
        self.relu = nn.ReLU()
        self.classifier = nn.Linear(dim_latent ,2)
    def forward(self, x):
        x = self.relu(x[:, 0, :])
        x = self.classifier(x)
        return x

class TransformerModel(nn.Module):
    def __init__(self, dim_data, dim_latent, dim_latent_2, n_heads):
        super(TransformerModel, self).__init__()
        self.add_cls = AddCLS(dim_data)
        self.dim_red = DimensionalityReduction(dim_data, dim_latent)
        self.attn_block = MultiheadAttentionBlock(dim_latent, dim_latent_2, n_heads)
        self.classifier = Classification(dim_latent)

    def forward(self, x, mask):
        x = self.add_cls(x)
        x = self.dim_red(x)
        x = self.attn_block(x, mask)
        x = self.attn_block(x, mask)
        x = self.attn_block(x, mask)
        x = self.attn_block(x, mask)
        x = self.attn_block(x, mask)
        x = self.attn_block(x, mask)
        x = self.attn_block(x, mask)
        x = self.attn_block(x, mask)
        x = self.classifier(x)
        return x

DONT RUN THIS! You can change out the final classifier above from a linear to non linear layer, depending on desired training speed/interpretibility

In [None]:
#YOU CAN ADD NON-LINEARITY TO THE FINAL CLASSIFCATION LAYER, IT ACCELERATES TRAINING SLIGHTLY
#Non-linear classifier
class Classification(nn.Module):
    def __init__(self, dim_latent):
        super(Classification, self).__init__()
        self.relu = nn.ReLU()
        self.classifier = nn.Linear(dim_latent ,2)
    def forward(self, x):
        x = self.relu(x[:, 0, :])
        x = self.classifier(x)
        return x
#Linear classifier
class Classification(nn.Module):
    def __init__(self, dim_latent):
        super(Classification, self).__init__()
        self.classifier = nn.Linear(dim_latent ,2)
    def forward(self, x):
        return self.classifier(x[:, 0, :])

Function to save model with top accuracy. CHANGE file_path ACCORDING TO YOUR NEEDS!

In [None]:
def save_checkpoint(model, optimizer, epoch, val_loss, accuracy, file_path='/content/drive/MyDrive/ModelTrained/ModelClassifyDonors/Modelmaxvalaccuracy.pth'):
    state = {
        'epoch': epoch,
        'model_state_dict': model.state_dict(),
        'optimizer_state_dict': optimizer.state_dict(),
        'val_loss': val_loss,
        'accuracy': accuracy
    }
    torch.save(state, file_path)
    print(f'Model saved at epoch {epoch} with accuracy {accuracy:.4f}')

Run this cell to train model and save best model along with all relevant data

In [None]:
# SETUP MODEL
dim_data = 770
dim_latent = 256
dim_latent_2 = 512
n_heads = 1
model = TransformerModel(dim_data, dim_latent, dim_latent_2, n_heads)

# LOAD DATA
train_loader = DataLoader(dataset_training, batch_size=10, shuffle=True)
test_loader = DataLoader(dataset_test, batch_size=10, shuffle=False)

# SET UP TRAINING LOOP FOR MODEL
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.000001)
n_epochs = 200
all_loss = []
all_acc = []
all_val_loss = []
all_val_acc = []
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

def calculate_accuracy(outputs, targets):
    _, preds = torch.max(outputs, 1)
    corrects = (preds == targets).sum().item()
    accuracy = corrects / targets.size(0)
    return accuracy

#RUN
#ADDITIONAL VALIDATION ACCURACY STEP AS A CHECK
model.eval()
val_loss = 0
val_acc = 0
with torch.no_grad():
    for batch in test_loader:
        inputs, targets, mask = batch[0].to(device), batch[1].to(device), batch[2].to(device)
        outputs = model(inputs,mask)
        loss_value = loss_fn(outputs, targets)
        val_loss += loss_value.item()
        val_acc += calculate_accuracy(outputs, targets)
val_loss /= len(test_loader)
val_acc /= len(test_loader)
all_val_loss.append(val_loss)
all_val_acc.append(val_acc)
max_val_acc = val_acc
print(f"Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}")


#TRAINING LOOP
for epoch in range(n_epochs):
    model.train()
    epoch_loss = 0
    epoch_acc = 0
    for batch in train_loader:
        inputs, targets, mask = batch[0].to(device), batch[1].to(device), batch[2].to(device)
        optimizer.zero_grad()
        outputs = model(inputs,mask)
        loss_value = loss_fn(outputs, targets)
        loss_value.backward()
        optimizer.step()
        epoch_loss += loss_value.item()
        epoch_acc += calculate_accuracy(outputs, targets)
    epoch_loss /= len(train_loader)
    epoch_acc /= len(train_loader)
    all_loss.append(epoch_loss)
    all_acc.append(epoch_acc)

    # Validation step
    model.eval()
    val_loss = 0
    val_acc = 0
    with torch.no_grad():
        for batch in test_loader:
            inputs, targets, mask = batch[0].to(device), batch[1].to(device), batch[2].to(device)
            outputs = model(inputs,mask)
            loss_value = loss_fn(outputs, targets)
            val_loss += loss_value.item()
            val_acc += calculate_accuracy(outputs, targets)
    val_loss /= len(test_loader)
    val_acc /= len(test_loader)
    all_val_loss.append(val_loss)
    all_val_acc.append(val_acc)

    print(f"Epoch {epoch+1}/{n_epochs}, Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}, Val Loss: {val_loss:.4f}, Val Accuracy: {val_acc:.4f}")

    if val_acc > max_val_acc:
        max_val_acc = val_acc
        save_checkpoint(model, optimizer, epoch, val_loss, val_acc)

print("Training completed.")

Validation Loss and accuracy

In [None]:
fig, ax1 = plt.subplots()
ax1.plot(all_val_loss[:100], 'r')
ax1.set_xlabel('Epoch')
ax1.set_ylabel('Validation Loss (cross entropy)', color='r', fontsize=15)
ax1.tick_params(axis='y', labelcolor='r')

ax2 = ax1.twinx()
ax2.plot(all_val_acc[:100], 'b')
ax2.set_ylabel('Validation Accuracy', color='b', fontsize=15)
ax2.tick_params(axis='y', labelcolor='b')


In [None]:
Plot just loss

In [None]:
plt.rcParams['figure.dpi'] = 1000
plt.plot(all_val_loss[:100])
plt.xlabel('Epoch', fontsize=15)
plt.ylabel('Validation Loss (cross entropy)', fontsize=15)
