In [1]:
#from google.colab import drive
#drive.mount('/content/drive')

# Percorso al file dentro Google Drive
#file_path = '/content/drive/MyDrive/AML_project/embeddings.pkl'

file_path = 'embeddings.pkl'

# Leggere un file pickle
import pickle
with open(file_path, 'rb') as f:
    data = pickle.load(f)

  data = pickle.load(f)


In [2]:
%pip install torch

Note: you may need to restart the kernel to use updated packages.


In [None]:
import pickle
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import f1_score, mean_squared_error, r2_score
import os
from tqdm import tqdm

# --- CONFIGURATION ---
class Config:
    # UPDATE THIS PATH TO YOUR FILE
    data_path = 'embeddings.pkl' 
    
    epochs = 10
    batch_size = 256
    
    # Hyperbolic Params
    hyp_lr = 1e-4
    hyp_temp = 0.1
    hyp_dim = 128
    
    # Euclidean Params
    euc_lr = 1e-3
    euc_hidden = [1024, 512]
    
    val_frac = 0.2
    seed = 42
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# --- UTILS & DATASET ---

def discretize_fitness(fitness_array):
    """Transforms continuous fitness into 5 discrete classes."""
    bins = [-np.inf, -3.0, -1.0, 1.0, 3.0, np.inf]
    labels = np.digitize(fitness_array, bins) - 1
    return labels

class MutationDataset(Dataset):
    def __init__(self, pkl_path):
        with open(pkl_path, 'rb') as f:
            data = pickle.load(f)

        # 1. Handle different pkl structures (DataFrame vs Dict)
        if hasattr(data, 'loc'): 
             self.fitness = data['fitness'].values
             embeddings_raw = data['embedding'].values
             self.file_names = data['file_name'].values
        else: 
             self.fitness = np.array(data['fitness'])
             embeddings_raw = np.array(data['embedding'])
             self.file_names = np.array(data['file_name'])

        self.labels = discretize_fitness(self.fitness)
        
        # 2. FIX: Padding Logic to handle variable-length embeddings
        # First, convert everything to numpy arrays and check shapes
        np_embs = [np.array(e) for e in embeddings_raw]
        shapes = [e.shape for e in np_embs]
        
        # Calculate max height and width
        max_h = max(s[0] for s in shapes)
        # Handle case where embedding might be 1D
        max_w = max(s[1] for s in shapes) if len(shapes[0]) > 1 else 1
        
        print(f"Dataset Stats: Max Height={max_h}, Max Width={max_w}")
        
        processed_embs = []
        for arr in np_embs:
            # Create a zero-filled array of the max shape
            if len(arr.shape) == 1:
                padded = np.zeros((max_h,), dtype=np.float32)
                padded[:arr.shape[0]] = arr
            else:
                padded = np.zeros((max_h, max_w), dtype=np.float32)
                padded[:arr.shape[0], :arr.shape[1]] = arr
            
            # Flatten to 1D vector for MLP input
            processed_embs.append(padded.flatten())
        
        # Now stacking works because all arrays are the same size
        self.embeddings = np.array(processed_embs, dtype=np.float32)
        
        # 3. Normalize input data (Z-score normalization)
        # Essential for Hyperbolic stability
        mean = self.embeddings.mean(axis=0)
        std = self.embeddings.std(axis=0) + 1e-6
        self.embeddings = (self.embeddings - mean) / std

        self.input_dim = self.embeddings.shape[1]
        
        # Parse proteins for InfoNCE
        self.proteins = np.array([str(fn).split('_')[0] for fn in self.file_names])

    def __len__(self):
        return len(self.embeddings)

    def __getitem__(self, idx):
        return {
            'embedding': torch.tensor(self.embeddings[idx]),
            'fitness': torch.tensor(self.fitness[idx], dtype=torch.float32), 
            'label': torch.tensor(self.labels[idx], dtype=torch.long),
            'protein': self.proteins[idx]
        }

def collate_fn(batch):
    embeddings = torch.stack([b['embedding'] for b in batch])
    fitness = torch.stack([b['fitness'] for b in batch])
    labels = torch.stack([b['label'] for b in batch])
    proteins = [b['protein'] for b in batch]
    return embeddings, fitness, labels, proteins

# ==========================================
# 1. FIXED HYPERBOLIC MODEL (Classification)
# ==========================================

# Using float64 to prevent NaN
torch.set_default_dtype(torch.float64) 

class HyperbolicUtils:
    EPS = 1e-5
    
    @staticmethod
    def artanh(x):
        x = torch.clamp(x, min=-1.0 + 1e-5, max=1.0 - 1e-5)
        return 0.5 * torch.log((1 + x) / (1 - x))

    @staticmethod
    def exp_map_zero(v):
        v_norm = torch.norm(v, p=2, dim=-1, keepdim=True).clamp_min(1e-6)
        scaled = torch.tanh(v_norm) * (v / v_norm)
        return scaled

class HyperbolicClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dims, hyp_dim, num_classes=5):
        super().__init__()
        
        layers = []
        in_d = input_dim
        
        # Euclidean Encoder Layers
        for h_dim in hidden_dims:
            layers.append(nn.Linear(in_d, h_dim))
            layers.append(nn.LayerNorm(h_dim)) # Added Norm
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.1))
            in_d = h_dim
            
        self.encoder = nn.Sequential(*layers)
        self.to_hyp = nn.Linear(in_d, hyp_dim)
        self.classifier = nn.Linear(hyp_dim, num_classes)

    def forward(self, x):
        x = x.double()
        feat = self.encoder(x)
        tangent = self.to_hyp(feat)
        
        # Tangent Clipping (Crucial for stability)
        norm = torch.norm(tangent, p=2, dim=-1, keepdim=True)
        scale = torch.clamp(norm, max=5.0) / (norm + 1e-6)
        tangent = tangent * scale
        
        # Map to Hyperbolic space (for potential metric learning)
        hyp_emb = HyperbolicUtils.exp_map_zero(tangent)
        
        # Logits from Tangent space (standard MLR in tangent space)
        logits = self.classifier(tangent)
        
        return hyp_emb, logits

# ==========================================
# 2. EUCLIDEAN MODEL (Regression)
# ==========================================

class EuclideanRegressor(nn.Module):
    def __init__(self, input_dim, hidden_dims=[1024, 512]):
        super().__init__()
        
        layers = []
        in_d = input_dim
        
        for h_dim in hidden_dims:
            layers.append(nn.Linear(in_d, h_dim))
            layers.append(nn.BatchNorm1d(h_dim)) 
            layers.append(nn.ReLU())
            layers.append(nn.Dropout(0.1))
            in_d = h_dim
            
        # Output dimension 1 for Regression
        layers.append(nn.Linear(in_d, 1))
        self.model = nn.Sequential(*layers)

    def forward(self, x):
        x = x.float() # Use float32 for speed
        return self.model(x).squeeze()

# ==========================================
# 3. TRAINING LOOPS
# ==========================================

def train_hyperbolic(config, train_loader, val_loader, input_dim):
    print("\n--- Training Hyperbolic Model (Classification) ---")
    model = HyperbolicClassifier(input_dim, [512], config.hyp_dim).to(config.device)
    optimizer = torch.optim.AdamW(model.parameters(), lr=config.hyp_lr)
    
    best_f1 = 0.0
    
    for epoch in range(config.epochs):
        model.train()
        total_loss = 0
        
        for embs, _, labels, _ in train_loader:
            embs, labels = embs.to(config.device), labels.to(config.device)
            
            _, logits = model(embs)
            loss = F.cross_entropy(logits, labels)
            
            optimizer.zero_grad()
            loss.backward()
            torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
            optimizer.step()
            total_loss += loss.item()
            
        # Validation
        model.eval()
        preds, true_labels = [], []
        with torch.no_grad():
            for embs, _, labels, _ in val_loader:
                embs = embs.to(config.device)
                _, logits = model(embs)
                preds.append(logits.argmax(dim=-1).cpu().numpy())
                true_labels.append(labels.numpy())
        
        y_pred = np.concatenate(preds)
        y_true = np.concatenate(true_labels)
        f1 = f1_score(y_true, y_pred, average='weighted', zero_division=0)
        
        print(f"Epoch {epoch+1} | Loss: {total_loss/len(train_loader):.4f} | Val F1: {f1:.4f}")
        if f1 > best_f1: best_f1 = f1
        
    return best_f1

def train_euclidean(config, train_loader, val_loader, input_dim):
    print("\n--- Training Euclidean Model (Regression) ---")
    # Switch context to float32 for Euclidean model
    torch.set_default_dtype(torch.float32)
    
    model = EuclideanRegressor(input_dim, config.euc_hidden).to(config.device)
    optimizer = torch.optim.Adam(model.parameters(), lr=config.euc_lr)
    criterion = nn.MSELoss()
    
    best_reg_f1 = 0.0
    
    for epoch in range(config.epochs):
        model.train()
        total_loss = 0
        
        for embs, fitness, _, _ in train_loader:
            embs = embs.to(config.device).float()
            fitness = fitness.to(config.device).float()
            
            pred = model(embs)
            loss = criterion(pred, fitness)
            
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
            
        # Validation
        model.eval()
        all_preds, all_fitness = [], []
        with torch.no_grad():
            for embs, fitness, _, _ in val_loader:
                embs = embs.to(config.device).float()
                pred = model(embs)
                all_preds.append(pred.cpu().numpy())
                all_fitness.append(fitness.numpy())
        
        y_pred = np.concatenate(all_preds)
        y_true = np.concatenate(all_fitness)
        
        # Convert Regression output to Classification bins for fair comparison
        y_pred_cls = discretize_fitness(y_pred)
        y_true_cls = discretize_fitness(y_true)
        f1_as_cls = f1_score(y_true_cls, y_pred_cls, average='weighted', zero_division=0)
        
        print(f"Epoch {epoch+1} | MSE: {mean_squared_error(y_true, y_pred):.4f} | Equivalent F1: {f1_as_cls:.4f}")
        
        if f1_as_cls > best_reg_f1: 
            best_reg_f1 = f1_as_cls
            
    return best_reg_f1

# --- MAIN ---

def main():
    if not os.path.exists(Config.data_path):
        print(f"File not found: {Config.data_path}. Please check the path.")
        return

    # Data Loading
    print("Loading Dataset...")
    dataset = MutationDataset(Config.data_path)
    
    train_size = int((1 - Config.val_frac) * len(dataset))
    val_size = len(dataset) - train_size
    train_ds, val_ds = torch.utils.data.random_split(
        dataset, [train_size, val_size], 
        generator=torch.Generator().manual_seed(Config.seed)
    )
    
    train_loader = DataLoader(train_ds, batch_size=Config.batch_size, shuffle=True, collate_fn=collate_fn)
    val_loader = DataLoader(val_ds, batch_size=Config.batch_size, shuffle=False, collate_fn=collate_fn)
    
    print(f"Dataset Loaded. Input Dim (Flattened): {dataset.input_dim}")
    
    # 1. Train Hyperbolic (Classification)
    hyp_f1 = train_hyperbolic(Config, train_loader, val_loader, dataset.input_dim)
    
    # 2. Train Euclidean (Regression)
    euc_f1 = train_euclidean(Config, train_loader, val_loader, dataset.input_dim)
    
    print("\n================ FINAL COMPARISON ================")
    print(f"Hyperbolic (Classification) Weighted F1: {hyp_f1:.4f}")
    print(f"Euclidean (Regression -> Class) Weighted F1: {euc_f1:.4f}")
    
    if euc_f1 > hyp_f1:
        print(">> Euclidean Regression performed better.")
    else:
        print(">> Hyperbolic Classification performed better.")

if __name__ == '__main__':
    main() 

Loading Dataset...


  data = pickle.load(f)


Dataset Stats: Max Height=768, Max Width=384


In [3]:
torch.device("cuda")

device(type='cuda')

In [5]:
# Applica la discretizzazione
labels = discretize_fitness(data["fitness"])

# Conta il numero di valori per ciascuna classe (0..4)
unique, counts = np.unique(labels, return_counts=True)
for cls, count in zip(unique, counts):
    print(f"Classe {cls}: {count} valori")

Classe 0: 824 valori
Classe 1: 977 valori
Classe 2: 1800 valori
Classe 3: 208 valori
Classe 4: 107 valori
