In [None]:
hyperparameters = {
    "learning_rate": 0.0001,
    "batch_size": 64,
    "embed_dim": 128,
    "num_heads": 4,
    "num_layers": 2,
    "dropout": 0.1,
    "ff_dim": 1024,
    "epochs": 50
}

info = {
    "dataset_size": "50K",
    "precision": "FP16",
    "dir_name": "chromosome_model"
}

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pandas as pd

data_path = f"/content/drive/MyDrive/dataset/50K.csv"

data = pd.read_csv(data_path)
data.shape

In [None]:
x = data['sequence'].reset_index(drop=True)
y = data['label'].reset_index(drop=True)

In [None]:
import pickle

def get_codon(seq, k=3):
    return [seq[i:i+k] for i in range(len(seq) - k + 1)]

def get_tensor(text):
    return [vocab.get(codons.lower(), vocab['<UNK>']) for codons in get_codon(text)]

with open("vocab.pkl", "rb") as file:
  vocab = pickle.load(file)

In [None]:
import torch
from torch import nn, optim
from torch.utils.data import Dataset, DataLoader, random_split

class CustomDataset(Dataset):
  def __init__(self, x, y):
    self.x_frame = x
    self.y_frame = y

  def __len__(self):
    return len(self.x_frame)

  def __getitem__(self, index):
    x = torch.tensor(get_tensor(self.x_frame[index]), dtype=torch.long) 
    y = torch.tensor(self.y_frame[index], dtype=torch.long)
    return x, y

In [None]:
dataset = CustomDataset(x, y)

In [None]:
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size

train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

In [None]:
train_loader = DataLoader(train_dataset, batch_size=hyperparameters['batch_size'], shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=hyperparameters['batch_size'])

In [None]:
import math

class PositionalEncoding(nn.Module):
    def __init__(self, embed_dim, max_len=5000):
        super().__init__()

        pe = torch.zeros(max_len, embed_dim)
        position = torch.arange(0, max_len).unsqueeze(1)

        div_term = torch.exp((torch.arange(0, embed_dim, 2)) * (-math.log(10000.0) / embed_dim))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, :x.size(1), :].to(x.device)
        return x

class Transformer(nn.Module):
    def __init__(self, embed_dim=512, num_heads=8, num_layers=6, ff_dim=2048, dropout=0.1, vocab_size=10000, max_len=5000):
        super(Transformer, self).__init__()
        self.embeddings = nn.Embedding(vocab_size, embed_dim)
        self.position_encoding = PositionalEncoding(embed_dim=embed_dim, max_len=max_len)

        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dim_feedforward=ff_dim,
            dropout=dropout,
            batch_first=True
        )
        self.encoder = nn.TransformerEncoder(
            encoder_layer=encoder_layer,
            num_layers=num_layers
        )

        self.y_labels_out = nn.Linear(embed_dim, 12)

    def forward(self, x):
        x = self.embeddings(x)
        x = self.position_encoding(x)

        x = self.encoder(x)
        x = x.mean(dim=1)

        y_label_out = self.y_labels_out(x)
        return y_label_out

In [None]:
model = Transformer(
    embed_dim=hyperparameters['embed_dim'],
    num_heads=hyperparameters['num_heads'],
    num_layers=hyperparameters['num_layers'],
    ff_dim=hyperparameters['ff_dim'],
    dropout=hyperparameters['dropout'],
    vocab_size=len(vocab),
    max_len=200
)

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

In [None]:
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f"Total trainable parameters: {num_params}")

In [None]:
from transformers import get_linear_schedule_with_warmup

ce = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=hyperparameters['learning_rate'], weight_decay=1e-4)
scaler = torch.cuda.amp.GradScaler()

num_training_steps = len(train_loader) * hyperparameters['epochs']
num_warmup_steps = 500

scheduler = get_linear_schedule_with_warmup(
    optimizer,
    num_warmup_steps=num_warmup_steps,
    num_training_steps=num_training_steps
)

In [None]:
def train32(model, loader, ce, optimizer):
    model.train()

    running_loss, correct, total = 0.0, 0, 0

    for x, y in loader:
        optimizer.zero_grad()

        x = x.to(device)
        y = y.to(device)

        output = model(x)
        loss = ce(output, y)

        prediction = torch.argmax(output, dim=1)
        correct += (prediction == y).sum().item()
        total += len(x)

        loss.backward()

        optimizer.step()

        running_loss += loss.item() * len(x)

    accuracy = correct / total

    return (
        running_loss / len(loader.dataset),
        accuracy
    )

def train16(model, loader, ce, optimizer, scaler, scheduler):
    model.train()
    running_loss, correct, total = 0.0, 0, 0

    for x, y in loader:
        optimizer.zero_grad()
        x = x.to(device)
        y = y.to(device)

        with torch.cuda.amp.autocast(dtype=torch.float16):
            output = model(x)
            loss = ce(output, y)

        prediction = torch.argmax(output, dim=1)
        correct += (prediction == y).sum().item()
        total += len(x)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        scheduler.step()

        running_loss += loss.item() * len(x)

    accuracy = correct / total
    return (
        running_loss / len(loader.dataset),
        accuracy
    )

In [None]:
def validation(model, loader, ce):
    model.eval()

    running_loss, correct, total = 0.0, 0, 0

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            y = y.to(device)

            if info['precision'] == 'FP16':
              with torch.cuda.amp.autocast(dtype=torch.float16):
                  output = model(x)
                  loss = ce(output, y)
            else:
              output = model(x)
              loss = ce(output, y)

            running_loss += loss.item() * len(x)

            prediction = torch.argmax(output, dim=1)
            correct += (prediction == y).sum().item()

            total += len(x)

    accuracy = correct / total

    return (
        running_loss / len(loader.dataset),
        accuracy
    )

In [None]:
import os

patience = 10
best_val_loss = float('inf')
counter = 0
early_stop = False

save_dir = f"/content/drive/MyDrive/{info['dir_name']}"
os.makedirs(save_dir, exist_ok=True)

for epoch in range(hyperparameters['epochs']+1):
    if info['precision'] == 'FP16':
      train_loss, train_acc = train16(
            model,
            train_loader,
            ce,
            optimizer,
            scaler,
            scheduler
        )
    else:
      train_loss, train_acc = train32(
          model,
          train_loader,
          ce,
          optimizer
      )

    val_loss, val_acc = validation(
        model,
        test_loader,
        ce
    )

    print(f"Epoch ({epoch+1}/{hyperparameters['epochs']}): Train Loss = {train_loss:.4f}, Valitation Loss = {val_loss:.4f}, Train_acc = {train_acc:.4f}, Val_acc = {val_acc:.4f}")

    if (epoch+1) % 10 == 0:
      checkpoint_path = f"{save_dir}/model_epoch_{epoch+1}.pth"
      torch.save({
          'epoch': epoch+1,
          'model_state_dict': model.state_dict(),
          'optimizer_state_dict': optimizer.state_dict(),
          'train_losses': train_loss,
          'val_losses': val_loss
      }, checkpoint_path)
      print(f"Model saved at {checkpoint_path}")

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        counter = 0
        continue
    else:
        counter += 1
        print(f"No improvement in val loss Counter = {counter}/{patience}")
        if counter >= patience:
            print("Early stopping triggered!")
            early_stop = True
            break

In [None]:
from sklearn.metrics import classification_report

def get_predictions_and_labels(model, loader):
    model.eval()
    all_y_true = []

    all_y_pred = []

    with torch.no_grad():
        for x, y in loader:
            x = x.to(device)
            y = y.to(device)

            output = model(x)

            _, prediction = torch.max(output, 1)

            all_y_true.extend(y.cpu().numpy())

            all_y_pred.extend(prediction.cpu().numpy())

    return (all_y_true, all_y_pred)

y_true, y_pred = get_predictions_and_labels(model, test_loader)

print("Classification Report for Label:")
print(classification_report(y_true, y_pred))
print("-"*20)

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
import pandas as pd

epochs = list(range(1, len(train_loss_history) + 1))

data = pd.DataFrame({
    'Epoch': epochs * 2,
    'Loss': train_loss_history + val_loss_history,
    'Accuracy': train_acc_history + val_acc_history,
    'Type': ['Train'] * len(epochs) + ['Validation'] * len(epochs)
})

plt.figure(figsize=(12, 5))
sns.lineplot(data=data, x='Epoch', y='Loss', hue='Type', marker='o')
plt.title('Train vs Validation Loss')
plt.grid(True)
plt.tight_layout()
plt.show()
    
plt.figure(figsize=(12, 5))
sns.lineplot(data=data, x='Epoch', y='Accuracy', hue='Type', marker='o')
plt.title('Train vs Validation Accuracy')
plt.grid(True)
plt.tight_layout()
plt.show()