## Imports

In [1]:
import pandas as pd
import numpy as np
import time
import math

import torch
import torch.nn as nn

from torch.utils.data import Dataset, DataLoader

from torch.amp import GradScaler
from torch.fx import symbolic_trace

dtype = torch.float
device = "cuda" if torch.cuda.is_available() else "cpu"
torch.set_default_device(device)
torch.get_default_device()
torch.backends.cudnn.benchmark = True

## Data Load

In [2]:
base_map = {
    "A":[1.0, 0.0, 0.0, 0.0],
    "T":[0.0, 1.0, 0.0, 0.0],
    "G":[0.0, 0.0, 1.0, 0.0],
    "C":[0.0, 0.0, 0.0, 1.0],

    'W':[0.5, 0.5, 0.0, 0.0],
    'S':[0.0, 0.0, 0.5, 0.5],
    'M':[0.5, 0.0, 0.0, 0.5],
    'K':[0.0, 0.5, 0.5, 0.0],
    'R':[0.5, 0.0, 0.5, 0.0],
    'Y':[0.0, 0.5, 0.0, 0.5],
    
    'B':[0.0, 0.3, 0.3, 0.3],
    'D':[0.3, 0.3, 0.3, 0.0],
    'H':[0.3, 0.3, 0.0, 0.3],
    'V':[0.3, 0.0, 0.3, 0.3],

    'N':[0.25, 0.25, 0.25, 0.25],
}

def encode_sequence(sequence):
    encoded_seq = []

    for base in sequence:
        encoded_seq.append(base_map[base])
    
    return torch.tensor(encoded_seq)

In [3]:
class SequenceDataset(Dataset):
    def __init__(self, train, test, level):

        self.classes = pd.concat([train[level], test[level]]).unique().tolist()
        self.classes.sort()
        self.level = level
        self.labels = train[level]
        self.encoded_labels = SequenceDataset.__encoded_labels__(self.classes, self.labels)
        self.sequences = SequenceDataset.__sequences__(train)

        self.test = SequenceDatasetTest(
            labels = test[level],
            classes = self.classes,
            encoded_labels = SequenceDataset.__encoded_labels__(self.classes, test[level]),
            sequences = SequenceDataset.__sequences__(test)
            )

    def __encoded_labels__(classes, labels):
        return torch.nn.functional.one_hot(torch.tensor([classes.index(l) for l in labels]), len(classes)).type(torch.cuda.FloatTensor)
    
    def __sequences__(ds):
        sequences = []
        for _, row in ds.iterrows():
            sequences.append(encode_sequence(row["truncated_sequence"]))        
        return torch.stack(sequences, dim=0)
    
    def __len__(self):
        return len(self.labels)
    
    def __getitem__(self, idx):
        return   self.sequences[idx], self.encoded_labels[idx]
    
    def __getitems__(self, ids):
        idx = torch.tensor(ids, device=torch.device('cuda:0'))
        return   list(zip(torch.index_select(self.sequences, 0, idx), torch.index_select(self.encoded_labels, 0, idx)))
    
    def get_test(self):
        return self.test

class SequenceDatasetTest(SequenceDataset):    
    def __init__(self, labels, classes, encoded_labels, sequences):
        self.labels = labels
        self.classes = classes
        self.encoded_labels = encoded_labels
        self.sequences = sequences

In [4]:
def loaders_generator(ds_train, ds_test, bs = 128):
    train_loader = DataLoader(ds_train, batch_size=bs, shuffle=True, generator=torch.Generator(device='cuda'))
    test_loader = DataLoader(ds_test, batch_size=bs, shuffle=True, generator=torch.Generator(device='cuda'))

    return train_loader, test_loader

## Models

In [5]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_seq_length=900):
        super().__init__()
        
        position = torch.arange(max_seq_length).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))
        pe = torch.zeros(1, max_seq_length, d_model)
        pe[0, :, 0::2] = torch.sin(position * div_term)
        pe[0, :, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)

    def forward(self, x):
        return x + self.pe[:, :x.size(1)]

In [6]:
class TransformerClassifier(nn.Module):
    def __init__(self, num_classes, input_dim=4, d_model=64, nhead=8, num_layers=3,
                 dim_feedforward=128, dropout=0.1, max_seq_length=900):
        super().__init__()
        
        # Input projection
        self.input_projection = nn.Linear(input_dim, d_model)
        
        # Positional encoding
        self.pos_encoder = PositionalEncoding(d_model, max_seq_length)
        
        # Transformer encoder layers
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(d_model * max_seq_length, 128),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(128, num_classes)
        )
        
    def forward(self, src, src_mask=None):
        # Input shape: (batch_size, seq_length, input_dim)
        
        # Project input to d_model dimensions
        x = self.input_projection(src)
        
        # Add positional encoding
        x = self.pos_encoder(x)
        
        # Pass through transformer encoder
        x = self.transformer_encoder(x, src_mask)
        
        # Flatten and classify
        x = x.reshape(x.shape[0], -1)
        output = self.classifier(x)
        
        return output

In [7]:
class TransformerClassifier2(nn.Module):
    def __init__(self, num_classes, input_dim=4, d_model=16, nhead=8, num_layers=3,
                 dim_feedforward=256, dropout=0.1, max_seq_length=900):
        super().__init__()
        
        self.cls_token = nn.Parameter(torch.randn(1, 1, d_model))

        # Input projection
        self.input_projection = nn.Linear(input_dim, d_model)
        
        # Positional encoding
        self.pos_encoder = PositionalEncoding(d_model, max_seq_length+1)
        
        # Transformer encoder layers
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True,
            activation="gelu"
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(d_model * max_seq_length, 256),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(256, num_classes)
        )
        
    def forward(self, src, src_mask=None):
        # Input shape: (batch_size, seq_length, input_dim)
        
        # Project input to d_model dimensions
        x = self.input_projection(src)
        
        # Add positional encoding
        x = self.pos_encoder(x)
        
        # Pass through transformer encoder
        x = self.transformer_encoder(x, src_mask)
        
        # Flatten and classify
        x = x.reshape(x.shape[0], -1)
        output = self.classifier(x)
        
        return output

In [8]:
class FasterTransformerClassifier(nn.Module):
    def __init__(self, num_classes, input_dim=4, d_model=4, nhead=4, num_layers=2,
                 dim_feedforward=256, dropout=0.1, max_seq_length=900):
        super().__init__()
        
        # Input projection
        self.input_projection = nn.Linear(input_dim, d_model)
        
        # Positional encoding
        self.pos_encoder = PositionalEncoding(d_model, max_seq_length)
        
        # Transformer encoder layers
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model,
            nhead=nhead,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=True
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
        
        # Classification head
        self.classifier = nn.Sequential(
            nn.Linear(d_model * max_seq_length, 256),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(256, num_classes)
        )
        
    def forward(self, src, src_mask=None):
        # Input shape: (batch_size, seq_length, input_dim)
        
        # Project input to d_model dimensions
        x = src#self.input_projection(src)
        
        # Add positional encoding
        x = self.pos_encoder(x)
        
        # Pass through transformer encoder
        x = self.transformer_encoder(x, src_mask)
        
        # Flatten and classify
        x = x.reshape(x.shape[0], -1)
        output = self.classifier(x)
        
        return output

## Test Params

In [9]:
levels = [
    # "genus",
    # "species",
    # "order", 
    # "family", 
    "class", 
]

In [10]:
batch_sizes = [
    # 64,
    # 128,
    # 256,
    # 512,
    # 2048,
    "dynamic"
]

In [11]:
epochs = [
    # 2,
    # 5,
    # 50,
    # 100,
    # 150,
    # 200,
    500
]

In [12]:
models_list = [
    # SimpleCNNClassifier,
    FasterTransformerClassifier,
    TransformerClassifier2,
    TransformerClassifier,
]

In [13]:
loss_functions = {
    "CrossEntropyLoss":{
        "function":nn.CrossEntropyLoss,
        "params":{},
        "function_params":{}
    },
}

In [14]:
learning_rates = [
    # 1e-2,
    # 5e-2,
    # 1e-3,
    # 5e-3,
    1e-4,
    # 5e-4,
]

In [15]:
optimizers = [
    {
        "optim":torch.optim.AdamW,
        "params":{
            "weight_decay":1e-3,
            # "amsgrad":True
        }
    },
]

In [16]:
hiperparams = {
    "batch_size": batch_sizes,
    "epochs": epochs,
    "model": models_list,
    "loss_function": loss_functions,
    "learning_rate": learning_rates,
    "optimizer": optimizers    
}

print(hiperparams)

{'batch_size': ['dynamic'], 'epochs': [500], 'model': [<class '__main__.FasterTransformerClassifier'>, <class '__main__.TransformerClassifier2'>, <class '__main__.TransformerClassifier'>], 'loss_function': {'CrossEntropyLoss': {'function': <class 'torch.nn.modules.loss.CrossEntropyLoss'>, 'params': {}, 'function_params': {}}}, 'learning_rate': [0.0001], 'optimizer': [{'optim': <class 'torch.optim.adamw.AdamW'>, 'params': {'weight_decay': 0.001}}]}


In [17]:
hiperparams

{'batch_size': ['dynamic'],
 'epochs': [500],
 'model': [__main__.FasterTransformerClassifier,
  __main__.TransformerClassifier2,
  __main__.TransformerClassifier],
 'loss_function': {'CrossEntropyLoss': {'function': torch.nn.modules.loss.CrossEntropyLoss,
   'params': {},
   'function_params': {}}},
 'learning_rate': [0.0001],
 'optimizer': [{'optim': torch.optim.adamw.AdamW,
   'params': {'weight_decay': 0.001}}]}

## Batch Execution

In [18]:
def Train_Test(
        model, 
        loss_fn, 
        optimizer, 
        epochs, 
        learning_rate, 
        batch_size, 
        train_data,
        test_data,
        id="", 
        ):
    
    print("Model: \t\t\t"+(model._get_name() if not model._get_name() == "OptimizedModule" else model.__dict__["_modules"]["_orig_mod"].__class__.__name__))
    print("  Loss Func.: \t\t"+loss_fn._get_name())
    print("  Optimizer: \t\t"+type(optimizer).__name__)
    print("  Epochs: \t\t"+str(epochs))
    print("  Learning Rate: \t"+str(learning_rate))

    print("\nModel Arch: ")
    print(str(model))
    print("\n\n\n")

    
    if torch.cuda.get_device_capability() < (7, 0):
        print("Exiting because torch.compile is not supported on this device.")
        import sys
        sys.exit(0)

    epochs_results = []
    current = {
        "model":(model._get_name() if not model._get_name() == "OptimizedModule" else model.__dict__["_modules"]["_orig_mod"].__class__.__name__),
        "loss_function":loss_fn._get_name(),
        "epoch":None,
        "learning_rate":learning_rate,
        "batch_size":None,
        "train_size":None,
        "test_size":None,
        "optimizer":type(optimizer).__name__,
        "train_acc":None,
        "train_loss":None,
        "test_acc":None,
        "test_loss":None,
    }

    if batch_size == "dynamic":
        bss = [1024, 64, 1024, 64, 1024]
    else:
        bss = [batch_size]
    if len(bss) > epochs:
        bss = bss[0:epochs]
    print("Batch Sizes List: "+str(bss))
    batch_lim = int(epochs/len(bss))
    
    
    t_start = time.time()
    
    best = {
        "epoch":0,
        "train_acc":0,
        "train_loss":10000000,
        "test_acc":0,
        "test_loss":10000000,
    }
    
    train_loader = None
    test_loader = None


    scaler = GradScaler(device=device)
    scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(
        optimizer,
        T_0=2,  # First restart
        T_mult=2,  # Period multiplier
        eta_min=1e-8,  # Minimum learning rate
    )
    # fn = torch.compile(optimizer.step)

    # Epochs
    for epoch in range(epochs):
        print(f"Epoch {epoch+1}\n-------------------------------")

        if epoch%batch_lim == 0 and len(bss) > 0:
            if train_loader:
                del train_loader
            if test_loader:
                del test_loader

            batch_size = bss.pop(0)
            train_loader, test_loader = loaders_generator(train_data, test_data, batch_size)

        print("Batch Size: "+str(batch_size))

        # Train --------------------------------------------------------------------------
        model.train()
        train_loss = 0
        train_acc = 0
        
        optimizer.zero_grad()
        for batch, (X, y) in enumerate(train_loader):
            with torch.autocast(device_type=device, dtype=torch.float16):
                # Compute prediction and loss
                pred = model(X)
                loss = loss_fn(pred, y)

            # Backpropagation
            # loss.backward()
            # fn()
            scaler.scale(loss).backward()

            scaler.unscale_(optimizer)
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1)
            
            scaler.step(optimizer)
            scaler.update()
            
            scheduler.step(epoch + batch / len(train_loader))
            
            optimizer.zero_grad()

            # Update results
            train_loss += loss.item()
            train_acc += (pred.argmax(1) == y.argmax(1)).type(torch.float).sum().item()


        # Train results
        train_loss /= len(train_loader)
        train_acc /= len(train_loader.dataset)
        print("Last Learning Rate: "+str(scheduler.get_last_lr()))
        print(f"Train Error: \n Accuracy: {(100*train_acc):>0.1f}%, Avg loss: {train_loss:>8f} \n")

        # Test
        model.eval()
        test_loss = 0
        test_acc = 0

        with torch.no_grad():
            for X, y in test_loader:
                pred = model(X)
                test_loss += loss_fn(pred, y).item()
                test_acc += (pred.argmax(1) == y.argmax(1)).type(torch.float).sum().item()

        # Test results
        test_loss /= len(test_loader)
        test_acc /= len(test_loader.dataset)
        print(f"Test Error: \n Accuracy: {(100*test_acc):>0.1f}%, Avg loss: {test_loss:>8f} \n")


        # Update Results
        if best["test_acc"] < test_acc or (best["test_acc"] == test_acc and best["train_acc"] < train_acc):
            best["epoch"] = epoch+1
            best["test_acc"] = test_acc
            best["test_loss"] = test_loss
            best["train_acc"] = train_acc
            best["train_loss"] = train_loss

            # if test_acc > 0.5:
                # torch.save(model.state_dict(), "/media/stark/Models/Gustavo/"+train_data.level+"/"+str(id)+"_"+current["model"]+".pth")
                                
            
        
        current["epoch"] = epoch+1
        current["batch_size"] = batch_size
        current["learning_rate"] = scheduler.get_last_lr()[0]
        current["train_size"] = train_loader.dataset.__len__()
        current["test_size"] = test_loader.dataset.__len__()
        current["train_acc"] = train_acc
        current["train_loss"] = train_loss
        current["test_acc"] = test_acc
        current["test_loss"] = test_loss

    
        epochs_results.append(current.copy())

    pd.DataFrame(epochs_results).to_csv("./results/epochs/"+str(id)+"__"+current["model"]+"_train_test.csv")
    
    print("\n\n")
    print(f"Best Epoch:{best['epoch']} \n\tAccuracy: {(100*best['test_acc']):>0.1f}%, Avg loss: {best['test_loss']:>8f} \n")
    print("Train and Test execution time: "+str(format(time.time()-t_start, '.4f'))+"s")
    print("Done!")

    return best

In [None]:
import gc


_model_ = None
_lossfunction_ = None
_optimizer_ = None

def clear():
    global _model_, _lossfunction_, _optimizer_
    
    torch.cuda.empty_cache()

    torch.compiler.reset()
    torch._dynamo.reset()

    if _model_:
        del _model_
        _model_ = None
    if _lossfunction_:
        del _lossfunction_
        _lossfunction_ = None
    if _optimizer_:
        del _optimizer_
        _optimizer_ = None
    torch.cuda.empty_cache()
    gc.collect()


results = []
current = {}

id = 0
time_id = str(int(time.time()))
print("Time ID: "+str(time_id))

for level in levels:
    clear()

    train_data = pd.read_csv("../new_data/StratifiedSplit/"+level+"/train_dataset.csv")
    test_data = pd.read_csv("../new_data/StratifiedSplit/"+level+"/test_dataset.csv")
    print(level)
    print(train_data.shape)
    print(test_data.shape)

    dataset = SequenceDataset(
        train=train_data, 
        test=test_data, 
        level=level)


    for batch_size in hiperparams["batch_size"]:
        for epochs in hiperparams["epochs"]:
            for model in hiperparams["model"]:
                for loss_function_name, loss_function in hiperparams["loss_function"].items():
                    for learning_rate in hiperparams["learning_rate"]:
                        for optimizer in hiperparams["optimizer"]:
                            
                            optim = optimizer["optim"]
                            optim_params = optimizer["params"] if "params" in optimizer.keys() else {}

                            current = {
                                    "id": id,
                                    "start_time":time.time(),
                                    "end_time": None,
                                    "level": level,
                                    "batch_size": batch_size,
                                    "epochs": epochs,
                                    "model": model.__name__,
                                    "loss_function": loss_function_name+" ("+str(loss_function["function"])+")",
                                    "learning_rate": learning_rate,
                                    "optimizer": optim.__name__+" (params: "+str(optim_params)+")",
                                    "obs": "9:1",
                                    "error": None
                                }


                            try:                                
                                clear()
                                # torch.set_float32_matmul_precision('high')
                                
                                # _model_ = torch.compile(model(dataset.encoded_labels.shape[1]))
                                _model_ = model(dataset.encoded_labels.shape[1])
                                _lossfunction_ = loss_function["function"](**{func:params[0](*params[1:]) for func,params in loss_function["function_params"].items()})
                                _optimizer_ = optim(_model_.parameters(), lr=learning_rate, **optim_params)


                                # ---- Run ----
                                result = Train_Test(
                                    model=_model_,
                                    loss_fn=_lossfunction_,
                                    optimizer=_optimizer_,
                                    epochs=epochs,
                                    learning_rate=learning_rate,
                                    batch_size=batch_size,
                                    train_data=dataset,
                                    test_data=dataset.get_test(),
                                    id=time_id+"_"+str(id),
                                    )
                                    
                                current["end_time"] = time.time()
                                current["best_epoch"] = result["epoch"]
                                current["train_acc_best_epoch"] = result["train_acc"]
                                current["train_loss_best_epoch"] = result["train_loss"]
                                current["test_acc_best_epoch"] = result["test_acc"]
                                current["test_loss_best_epoch"] = result["test_loss"]

                                
                                clear()                                
                                
                            except Exception as e:
                                print(e)
                                current["error"] = str(e)
                            
                            results.append(current)
                            pd.DataFrame(results).to_csv("./results/summarized/"+str(time_id)+"_models_train_test_"+str(len(results))+".csv")
                            
                            id = id+1

clear()

Time ID: 1732307833


class
(111138, 11)
(12349, 11)
Model: 			FasterTransformerClassifier
  Loss Func.: 		CrossEntropyLoss
  Optimizer: 		AdamW
  Epochs: 		500
  Learning Rate: 	0.0001

Model Arch: 
FasterTransformerClassifier(
  (input_projection): Linear(in_features=4, out_features=4, bias=True)
  (pos_encoder): PositionalEncoding()
  (transformer_encoder): TransformerEncoder(
    (layers): ModuleList(
      (0-1): 2 x TransformerEncoderLayer(
        (self_attn): MultiheadAttention(
          (out_proj): NonDynamicallyQuantizableLinear(in_features=4, out_features=4, bias=True)
        )
        (linear1): Linear(in_features=4, out_features=256, bias=True)
        (dropout): Dropout(p=0.1, inplace=False)
        (linear2): Linear(in_features=256, out_features=4, bias=True)
        (norm1): LayerNorm((4,), eps=1e-05, elementwise_affine=True)
        (norm2): LayerNorm((4,), eps=1e-05, elementwise_affine=True)
        (dropout1): Dropout(p=0.1, inplace=False)
        (dropout2): Dropout(p=0.1, inplace=Fals