In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import random_split, DataLoader, Dataset, Subset
import torch.optim as optim
from transformers import RobertaModel, RobertaConfig, RobertaForSequenceClassification, AutoModel, AutoTokenizer, RobertaTokenizer
import os
import pandas as pd
import time

In [None]:
DATASET_PATH = os.path.join("../data/processed", "dataset_vulnerabilita_cleaned.csv")
BEST_MODEL_PATH = os.path.join("../model", "best_model.pth")
CHECKPOINT_PATH = os.path.join("../model", "checkpoint.pth")
df = pd.read_csv(DATASET_PATH)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class RobertaClassificationHead(nn.Module):
    """Head for sentence-level classification tasks."""
    def __init__(self, args, config):
        super().__init__()
        self.dense = nn.Linear(config.hidden_size, config.hidden_size)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.out_proj = nn.Linear(config.hidden_size, args.n_classes)

    def forward(self, x):
        x = self.dropout(x)
        x = self.dense(x)
        x = torch.tanh(x)
        x = self.dropout(x)
        x = self.out_proj(x)
        return x

In [None]:
class Projection(nn.Module):
    def __init__(self, d_in: int, d_out: int, p: float=0.1) -> None:
        super().__init__()
        self.linear1 = nn.Linear(d_in, d_out, bias=False)
        self.linear2 = nn.Linear(d_out, d_out, bias=False)
        self.layer_norm = nn.LayerNorm(d_out)
        self.dropout = nn.Dropout(p)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        embed1 = self.linear1(x)
        embed2 = self.dropout(self.linear2(F.gelu(embed1)))
        embeds = self.layer_norm(embed1 + embed2)
        return embeds

In [None]:
class CodeEncoder(nn.Module):
    def __init__(self, model_name="microsoft/codebert-base", projection_dim=128):
        super().__init__()
        self.encoder = AutoModel.from_pretrained(model_name)
        self.projection = Projection(d_in=self.encoder.config.hidden_size,
                                     d_out=projection_dim,
                                     p=0.1)

    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        cls_embedding = outputs.last_hidden_state[:, 0, :]
        projected = self.projection(cls_embedding)
        return projected


class DescEncoder(nn.Module):
    """Encoder per le descrizioni CWE"""
    def __init__(self, model_name="roberta-base", projection_dim=128):
        super().__init__()
        self.encoder = RobertaModel.from_pretrained(model_name)
        # Projection nello spazio condiviso
        self.projection = Projection(d_in=self.encoder.config.hidden_size,
                                     d_out=projection_dim,
                                     p=0.1)

    def forward(self, input_ids, attention_mask):
        outputs = self.encoder(input_ids=input_ids, attention_mask=attention_mask)
        # Prendiamo il CLS token come embedding della descrizione
        cls_embedding = outputs.last_hidden_state[:, 0, :]
        # Proiettiamo nello spazio condiviso
        projected = self.projection(cls_embedding)
        return projected

In [None]:
class AlignModel(nn.Module):
    def __init__(self, source_config, desc_config, projection_dim=256):
        super().__init__()
        # Encoders
        self.code_encoder = CodeEncoder(projection_dim=projection_dim)
        self.desc_encoder = DescEncoder(projection_dim=projection_dim)

        # logit scale per la similarità
        self.logit_scale = nn.Parameter(torch.ones([]) * np.log(1 / 0.07))

        # Numero di classi CWE
        self.num_labels = source_config.n_classes

        # Classification head
        self.classification_head = RobertaClassificationHead(args=desc_config, config=desc_config)

    def compute_similarity(self, code_embeddings, desc_embeddings):
        """Compute cosine similarity scalata"""
        code_embeddings = code_embeddings / torch.norm(code_embeddings, dim=-1, keepdim=True)
        desc_embeddings = desc_embeddings / torch.norm(desc_embeddings, dim=-1, keepdim=True)
        logit_scale = self.logit_scale.exp()
        code_logits = logit_scale * code_embeddings @ desc_embeddings.T
        desc_logits = code_logits.T
        return code_logits, desc_logits

    def compute_alignment_loss(self, code_embeddings, desc_embeddings):
        code_logits, desc_logits = self.compute_similarity(code_embeddings, desc_embeddings)
        labels = torch.arange(code_embeddings.size(0), device=code_embeddings.device)
        loss = (F.cross_entropy(code_logits, labels) + F.cross_entropy(desc_logits, labels)) / 2
        return loss

    def forward(self, code_input_ids, code_attention_mask,
                desc_input_ids, desc_attention_mask,
                cwe_labels):
        #embeddings
        code_embed = self.code_encoder(code_input_ids, code_attention_mask)
        desc_embed = self.desc_encoder(desc_input_ids, desc_attention_mask)

        # logits per classificazione CWE
        cwe_logits = self.classification_head(code_embed)  # [batch_size, num_classes]

        # Loss
        loss = None
        if cwe_labels is not None:
            # Loss di classificazione CWE
            criterion = nn.CrossEntropyLoss()
            class_loss = criterion(cwe_logits, cwe_labels)

            # Loss di allineamento embeddings
            align_loss = self.compute_alignment_loss(code_embed, desc_embed)

            # Somma le due loss
            loss = class_loss + align_loss

        return {
            "logits": cwe_logits,
            "loss": loss,
            "code_embed": code_embed,
            "desc_embed": desc_embed
        }


In [None]:
class VulnerabilityDataset(Dataset):
    def __init__(self, df, code_tokenizer, desc_tokenizer, max_length=256):
        self.df = df
        self.code_tokenizer = code_tokenizer
        self.desc_tokenizer = desc_tokenizer
        self.max_length = max_length
        # Mappa o codifica etichette come numeri interi
        self.label_map = {label: idx for idx, label in enumerate(sorted(df['CWE_ID'].unique()))}

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        code = str(self.df.iloc[idx]['code'])
        desc = str(self.df.iloc[idx]['CWE_desc'])
        label_str = self.df.iloc[idx]['CWE_ID']
        label = self.label_map[label_str]

        code_tokens = self.code_tokenizer(code, return_tensors='pt', truncation=True, padding='max_length', max_length=self.max_length)
        desc_tokens = self.desc_tokenizer(desc, return_tensors='pt', truncation=True, padding='max_length', max_length=self.max_length)

        return {
            'code_input_ids': code_tokens['input_ids'].squeeze(0),
            'code_attention_mask': code_tokens['attention_mask'].squeeze(0),
            'desc_input_ids': desc_tokens['input_ids'].squeeze(0),
            'desc_attention_mask': desc_tokens['attention_mask'].squeeze(0),
            'labels': torch.tensor(label, dtype=torch.long)
        }


In [None]:
# Tokenizer
code_tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base")
desc_tokenizer = RobertaTokenizer.from_pretrained("roberta-base")

dataset = VulnerabilityDataset(df, code_tokenizer, desc_tokenizer)
#indices = np.random.choice(len(dataset), 500, replace=False)

# un subset per testare il modello più velocemente
#dataset = Subset(dataset, indices)
total_size = len(dataset)

# percentuali per train, val, test
train_ratio = 0.7
val_ratio = 0.15
test_ratio = 0.15

train_size = int(train_ratio * total_size)
val_size = int(val_ratio * total_size)
test_size = total_size - train_size - val_size

train_dataset, val_dataset, test_dataset = random_split(dataset, [train_size, val_size, test_size])

# DataLoader
batch_size = 16
num_workers = 0
train_loader = DataLoader(train_dataset, batch_size, num_workers=num_workers, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size, num_workers=num_workers, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size, num_workers=num_workers, shuffle=False)

# Config di default
num_classes = len(df['CWE_ID'].unique())
config = RobertaConfig(
    hidden_size=256,
    hidden_dropout_prob=0.1,
    n_classes= num_classes
)
# Modello e ottimizzatore
model = AlignModel(source_config=config,
                   desc_config=config,
                   projection_dim=256)
model.to(device)
optimizer = optim.AdamW(model.parameters(), lr=2e-5)

In [None]:
def classification_report_per_class(y_true, y_pred, label_map, num_classes):
    """
    Calcola precision, recall, f1 e support per ogni classe.
    y_true: tensor con etichette reali
    y_pred: tensor con predizioni
    label_map: dict {indice: 'CWE-xxx'}
    num_classes: numero totale classi
    """
    report = {}

    y_true_onehot = F.one_hot(y_true, num_classes=num_classes)
    y_pred_onehot = F.one_hot(y_pred, num_classes=num_classes)

    for i in range(num_classes):
        TP = (y_true_onehot[:, i] * y_pred_onehot[:, i]).sum().item()
        FP = ((1 - y_true_onehot[:, i]) * y_pred_onehot[:, i]).sum().item()
        FN = (y_true_onehot[:, i] * (1 - y_pred_onehot[:, i])).sum().item()

        precision = TP / (TP + FP + 1e-8)
        recall = TP / (TP + FN + 1e-8)
        f1 = 2 * precision * recall / (precision + recall + 1e-8)

        report[label_map[i]] = {
            "precision": round(precision, 3),
            "recall": round(recall, 3),
            "f1": round(f1, 3),
            "support": int(y_true_onehot[:, i].sum().item())
        }

    return report

In [None]:
def top_k_accuracy(output, target, k):
    """
    Calcola la Top-k accuracy.
    output: logits dal modello (batch_size x num_classes)
    target: labels vere (batch_size)
    k: numero di predizioni da considerare
    """
    # Prende le top-k predizioni
    max_k = min(k, output.size(1))  # evita valori di k > num_classes
    _, pred = output.topk(max_k, dim=1, largest=True, sorted=True)
    # Verifica se il target è tra le top-k
    correct = pred.eq(target.view(-1, 1).expand_as(pred))
    return correct.sum().item() / target.size(0)

In [None]:
def train_model(model, optimizer, loaders, dataset_sizes, label_map, num_epochs, device):
    since = time.time()
    best_acc = 0.0

    history_loss = {'train': [], 'val': [], 'test': []}
    history_acc = {'train': [], 'val': [], 'test': []}
    history_metrics = {'train': [], 'val': [], 'test': []}  # new

    for epoch in range(num_epochs):
        print(f"Epoch {epoch+1}/{num_epochs}")
        print("-----")

        for phase in ['train', 'val', 'test']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0
            running_topk = 0.0

            all_labels = []
            all_preds = []

            with torch.set_grad_enabled(phase == 'train'):
                for batch in loaders[phase]:
                    code_input_ids = batch['code_input_ids'].to(device)
                    code_attention_mask = batch['code_attention_mask'].to(device)
                    desc_input_ids = batch['desc_input_ids'].to(device)
                    desc_attention_mask = batch['desc_attention_mask'].to(device)

                    labels = batch['labels'].to(device)

                    num_classes = len(label_map)
                    if (labels < 0).any() or (labels >= num_classes).any():
                        raise ValueError(f"Labels fuori range trovate: {labels}")

                    optimizer.zero_grad()
                    outputs = model(code_input_ids, code_attention_mask, desc_input_ids, desc_attention_mask, cwe_labels=labels)
                    loss = outputs['loss']

                    logits = outputs['logits']
                    if isinstance(logits, tuple):
                        logits = logits[0]  # gestisce tuple
                    if logits.dim() > 2:
                        logits = logits.squeeze(1)

                    _, preds = torch.max(logits, dim=1)
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                    running_loss += loss.item() * code_input_ids.size(0)
                    running_corrects += torch.sum(preds == labels).item()

                    # top-k accuracy
                    running_topk += top_k_accuracy(logits, labels, k=3) * labels.size(0)

                    all_labels.append(labels.to(device))
                    all_preds.append(preds.to(device))

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects / dataset_sizes[phase]
            epoch_topk_acc = running_topk / dataset_sizes[phase]


            # Concatenate all labels/predictions to compute metrics
            all_labels_tensor = torch.cat(all_labels)
            all_preds_tensor = torch.cat(all_preds)
            metrics = classification_report_per_class(all_labels_tensor, all_preds_tensor, label_map, num_classes)

            print(f"{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")
            print(f"{phase} Top-3 Acc: {epoch_topk_acc:.4f}")

            print(f"{phase} Metrics per class:")
            for k, v in metrics.items():
                print(f"  {k}: {v}")

            history_loss[phase].append(epoch_loss)
            history_acc[phase].append(epoch_acc)
            history_metrics[phase].append(metrics)

            if phase == 'val' and epoch_acc > best_acc:
                print(f"New best val acc: {epoch_acc:.4f}, saving model.")
                best_acc = epoch_acc
                torch.save(model.state_dict(), BEST_MODEL_PATH)
                torch.save({
                    'epoch': epoch,
                    'model_state_dict': model.state_dict(),
                    'optimizer_state_dict': optimizer.state_dict(),
                    'loss': loss,
                }, CHECKPOINT_PATH)

        print()

    time_elapsed = time.time() - since
    print(f"Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s")
    print(f"Best val Acc: {best_acc:.4f}")

    # Carica miglior modello salvato
    model.load_state_dict(torch.load(BEST_MODEL_PATH))
    model.to(device)
    return model, history_loss, history_acc, history_metrics


In [None]:
# Training
loaders = {'train': train_loader, 'val': val_loader, 'test': test_loader}
dataset_sizes = {'train': train_size, 'val': val_size, 'test': test_size}
label_map = {i: cwe for i, cwe in enumerate(sorted(df['CWE_ID'].unique()))}

trained_model, history_loss, history_acc, history_metrics = train_model(
    model, optimizer, loaders, dataset_sizes, label_map, 10, device)
