In [3]:
import accelerate, transformers
print("accelerate:", accelerate.__version__)
print("transformers:", transformers.__version__)

  from .autonotebook import tqdm as notebook_tqdm


accelerate: 1.12.0
transformers: 4.57.3


In [4]:
import pandas as pd
import numpy as np
import json
import torch , os
import matplotlib.pyplot as plt
import seaborn as sns
from torch.utils.data import Dataset, DataLoader
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, matthews_corrcoef
from transformers import ( RobertaTokenizerFast, RobertaModel, DataCollatorWithPadding, TrainingArguments, Trainer )
from transformers.modeling_outputs import SequenceClassifierOutput
import torch.nn as nn
import torch.nn.functional as F
from typing import Optional

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cpu


In [6]:
class ESALayer(nn.Module):
    """
    ESA = standard self-attention + learnable feature scaling vector S ∈ R^H.

    Input:
      E_input: [B, L, H]  (RoBERTa last_hidden_state)
      attention_mask: [B, L] (1=real, 0=pad)

    Output:
      Z_scaled: [B, L, H]
      attn_probs: [B, L, L]  (optional, useful for debugging/visualization)
    """
    def __init__(self, hidden_dim: int, num_heads: int = 2, max_len: int = 512):
        super().__init__()
        self.hidden_dim = hidden_dim
        self.num_heads = num_heads
        self.max_len = max_len

        # Learned positional encoding P ∈ R[max_len, H]
        self.pos_emb = nn.Embedding(max_len, hidden_dim)

        # Multi-head self-attention (Transformer-style)
        self.mha = nn.MultiheadAttention(embed_dim=hidden_dim, num_heads=num_heads, batch_first=True)

        # Learnable scaling vector S ∈ R[H]
        self.S = nn.Parameter(torch.ones(hidden_dim))

    def forward(self, E: torch.Tensor, attention_mask: Optional[torch.Tensor] = None):
        """
        E: [B, L, H]  (e.g., roberta_outputs.last_hidden_state)
        attention_mask: [B, L] with 1=real token, 0=pad
        """
        B, L, H = E.shape
        assert H == self.hidden_dim
        assert L <= self.max_len, f"Sequence length {L} exceeds max_len {self.max_len}"

        # Build positions [L] and expand to [B, L] for embedding lookup
        positions = torch.arange(L, device=E.device).unsqueeze(0).expand(B, L)  # [B, L]
        P = self.pos_emb(positions)  # [B, L, H]

        # (1) E_input = E + P
        E_input = E + P

        # Prepare key padding mask for MHA: True means "ignore"
        key_padding_mask = None
        if attention_mask is not None:
            key_padding_mask = (attention_mask == 0)  # [B, L] boolean

        # (2) Standard attention
        Z, attn_weights = self.mha(
            E_input, E_input, E_input,
            key_padding_mask=key_padding_mask,
            need_weights=True,
            average_attn_weights=False  # returns per-head weights (closer to attention analysis)
        )  # Z: [B, L, H]

        # (3) Emotion-specific scaling: Z_scaled = Z ⊙ S
        Z_scaled = Z * self.S  # broadcasts [H] -> [B, L, H]

        # (5) Re-add positional encoding: Z_final = Z_scaled + P
        Z_final = Z_scaled + P

        return Z_final, attn_weights

In [17]:
class RobertaESAClassifier(nn.Module):
    def __init__(self, model_name, num_labels, dropout, id2label=None, label2id=None, dense_dim=256):
        super().__init__()

        self.num_labels = num_labels
        self.roberta = RobertaModel.from_pretrained(model_name)

        cfg = self.roberta.config
        hidden_size = self.roberta.config.hidden_size

        self.esa_layer = ESALayer(
            hidden_dim=hidden_size, 
            num_heads=cfg.num_attention_heads, 
            max_len=cfg.max_position_embeddings
        )

        self.pre_dropout = nn.Dropout(dropout)
        self.dense = nn.Sequential(
            nn.Linear(2 *hidden_size, dense_dim),
            nn.ReLU(),
            nn.Dropout(dropout)
        )

        self.classifier = nn.Linear(dense_dim, num_labels)
        self.loss_fn = nn.CrossEntropyLoss()

        if id2label is not None:
            self.roberta.config.id2label = id2label
        if label2id is not None:
            self.roberta.config.label2id = label2id


    def forward(self, input_ids=None, attention_mask=None, labels=None, inputs_embeds=None, **kwargs):
        outputs = self.roberta(
            input_ids=input_ids,
            attention_mask=attention_mask,
            inputs_embeds=inputs_embeds,
            return_dict=True
        )

        E = outputs.last_hidden_state  # [B, L, H]
        Z_final, _ = self.esa_layer(E, attention_mask=attention_mask)

        # CLS pooling
        cls_pool = Z_final[:, 0, :] 

        # Mean pooling (masked)
        mask = attention_mask.unsqueeze(-1).type_as(Z_final)
        sum_pool = (Z_final * mask).sum(dim=1)
        len_pool = mask.sum(dim=1).clamp(min=1e-9)
        mean_pool = sum_pool / len_pool      # [B, H]

        # Concatenate pooling outputs
        pooled = torch.cat([cls_pool, mean_pool], dim=-1)  # [B, 2H]
        X = self.dense(pooled)
        logits = self.classifier(X)

        loss = None
        if labels is not None:
            loss = self.loss_fn(logits, labels)

        return SequenceClassifierOutput(
            loss=loss,
            logits=logits,
        )

In [18]:
BASELINE_CONFIGURATION = {
    'model_name': 'roberta-base',
    'learning_rate': 1e-5,
    'batch_size': 16,
    'num_epochs': 10,
    'dropout': 0.3,
    'weight_decay': 0.01,
    'warmup_ratio': 0.1,
    'max_length': 128,
    'num_labels': 6
}

device = "cuda" if torch.cuda.is_available() else "cpu"

id2label = {0: 'anger', 1: 'fear', 2: 'joy', 3: 'love', 4: 'sadness', 5: 'surprise'}
label2id = {v: k for k, v in id2label.items()}

os.makedirs('models/baseline_roberta', exist_ok=True)
os.makedirs('reports', exist_ok=True)

model = RobertaESAClassifier(
    model_name=BASELINE_CONFIGURATION["model_name"],
    num_labels=BASELINE_CONFIGURATION["num_labels"],
    dropout=BASELINE_CONFIGURATION["dropout"],
    id2label=id2label,
    label2id=label2id
).to(device)

total_params = sum(p.numel() for p in model.parameters())
trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f"\nDONE -- Model încărcat pe {device}")
print(f"   Total parametri: {total_params:,}")
print(f"   Parametri antrenabili: {trainable_params:,}")



Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['pooler.dense.bias', 'pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.



DONE -- Model încărcat pe cpu
   Total parametri: 127,798,534
   Parametri antrenabili: 127,798,534


In [19]:
def integrated_gradients(
    model,
    tokenizer,
    text,
    target_label=None,
    max_length=128,
    steps=50
):
    """
    Integrated Gradients w.r.t. input embeddings.
    Baseline = [CLS] + [PAD]...[PAD]
    Returns tokens + attribution score per token.
    """
    device = next(model.parameters()).device
    model.eval()

    enc = tokenizer(
        text,
        return_tensors="pt",
        truncation=True,
        padding="max_length",
        max_length=max_length,
        add_special_tokens=True
    )
    input_ids = enc["input_ids"].to(device)
    attention_mask = enc["attention_mask"].to(device)

    # Predict label if not provided
    with torch.no_grad():
        input_embeds = model.roberta.embeddings.word_embeddings(input_ids)
        logits = model(inputs_embeds=input_embeds, attention_mask=attention_mask).logits
        probs = F.softmax(logits, dim=-1)
        pred = probs.argmax(dim=-1).item()

    if target_label is None:
        target_label = pred

    # Build baseline ids: [CLS] + PADs
    baseline_ids = torch.full_like(input_ids, tokenizer.pad_token_id)
    baseline_ids[:, 0] = tokenizer.cls_token_id

    # Embeddings
    input_embeds = model.roberta.embeddings(input_ids=input_ids).detach()
    baseline_embeds = model.roberta.embeddings(input_ids=baseline_ids).detach()

    # Accumulate gradients along path
    total_grads = torch.zeros_like(input_embeds)

    for i in range(1, steps + 1):
        alpha = i / steps
        scaled = baseline_embeds + alpha * (input_embeds - baseline_embeds)
        scaled.requires_grad_(True)

        logits_i = model(inputs_embeds=scaled, attention_mask=attention_mask).logits
        target_logit = logits_i[:, target_label].sum()

        model.zero_grad(set_to_none=True)
        if scaled.grad is not None:
            scaled.grad.zero_()
        target_logit.backward()

        total_grads += scaled.grad.detach()

    avg_grads = total_grads / steps
    ig = (input_embeds - baseline_embeds) * avg_grads  # (B, L, H)

    # Token scores: sum over hidden dim
    token_scores = ig.sum(dim=-1).squeeze(0)  # (L,)
    token_scores = token_scores * attention_mask.squeeze(0)  # mask PADs

    tokens = tokenizer.convert_ids_to_tokens(input_ids.squeeze(0).tolist())

    return {
        "text": text,
        "pred_label": pred,
        "target_label": target_label,
        "probs": probs.squeeze(0).detach().cpu(),
        "tokens": tokens,
        "scores": token_scores.detach().cpu(),
    }


In [20]:
def show_top_tokens(result, top_k=15):
    pairs = [(t, float(s)) for t, s in zip(result["tokens"], result["scores"])]
    # remove special tokens for readability
    pairs = [(t, s) for t, s in pairs if t not in ["<s>", "</s>", "<pad>"]]
    pairs_sorted = sorted(pairs, key=lambda x: abs(x[1]), reverse=True)

    print("Pred:", result["pred_label"], "Target:", result["target_label"])
    for tok, sc in pairs_sorted[:top_k]:
        print(f"{tok:>12}  {sc:+.4f}")


In [21]:
tokenizer = RobertaTokenizerFast.from_pretrained(BASELINE_CONFIGURATION["model_name"])

text = "I can't believe you did that. I'm so angry right now."
res = integrated_gradients(
    model=model,
    tokenizer=tokenizer,
    text=text,
    max_length=BASELINE_CONFIGURATION["max_length"],
    steps=50
)
show_top_tokens(res)


Pred: 5 Target: 5
          'm  -0.0077
           .  -0.0064
      Ġangry  -0.0053
         Ġso  -0.0050
       Ġthat  +0.0044
          ĠI  -0.0043
          't  -0.0027
      Ġright  -0.0027
        Ġnow  +0.0025
        Ġcan  -0.0023
           I  +0.0023
        Ġdid  -0.0014
        Ġyou  -0.0003
           .  +0.0001
    Ġbelieve  -0.0000


In [22]:
res_pred = integrated_gradients(model, tokenizer, text)
res_anger = integrated_gradients(
    model, tokenizer, text,
    target_label=label2id["anger"]
)

In [23]:
show_top_tokens(res_pred)
print("\nAttributions for target label 'anger':")
show_top_tokens(res_anger)

Pred: 5 Target: 5
          'm  -0.0077
           .  -0.0064
      Ġangry  -0.0053
         Ġso  -0.0050
       Ġthat  +0.0044
          ĠI  -0.0043
          't  -0.0027
      Ġright  -0.0027
        Ġnow  +0.0025
        Ġcan  -0.0023
           I  +0.0023
        Ġdid  -0.0014
        Ġyou  -0.0003
           .  +0.0001
    Ġbelieve  -0.0000

Attributions for target label 'anger':
Pred: 5 Target: 0
           .  -0.0070
         Ġso  -0.0067
        Ġdid  +0.0036
      Ġright  -0.0026
          ĠI  -0.0025
      Ġangry  -0.0021
           .  -0.0019
          't  -0.0017
       Ġthat  +0.0015
        Ġcan  +0.0014
        Ġyou  -0.0014
           I  -0.0013
    Ġbelieve  -0.0008
          'm  -0.0003
        Ġnow  -0.0000


In [24]:
# check if classifier looks like random init (very small logits / unstable)
text = "I can't believe you did that. I'm so angry right now."
enc = tokenizer(text, return_tensors="pt", truncation=True, padding=True).to(device)
with torch.no_grad():
    logits = model(**enc).logits.squeeze(0)
    probs = torch.softmax(logits, dim=-1)
print("logits:", logits.cpu().numpy())
print("probs:", probs.cpu().numpy())

logits: [-0.04050211 -0.06943162  0.1640822  -0.10848603 -0.07312965  0.22783612]
probs: [0.15604365 0.15159406 0.1914679  0.14578776 0.1510345  0.20407224]
