# Sentiment Analysis: Confronto Modelli su Dataset Bilanciato

Questo notebook estende l'analisi precedente integrando:
1.  **Data Engineering**: Aggregazione delle classi (`Serious`, `Light`) e ribilanciamento (Undersampling).
2.  **Modelli**: 
    - **Naive Bayes (Baseline)**: TF-IDF + MultinomialNB
    - **Custom LSTM**: Con GloVe embeddings
    - **XLM-RoBERTa**: Fine-tuning con LoRA
3.  **Confronto**: Metriche e grafici comparativi.

In [6]:
!pip install transformers datasets wordcloud gensim seaborn torch scikit-learn pandas matplotlib accelerate peft




[notice] A new release of pip is available: 25.2 -> 25.3
[notice] To update, run: python.exe -m pip install --upgrade pip


In [7]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import re
import torch
import torch.nn as nn
import torch.optim as optim
from collections import Counter
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import make_pipeline
from sklearn.metrics import accuracy_score, classification_report, f1_score, confusion_matrix
from sklearn.utils import resample
from torch.utils.data import DataLoader, Dataset
import gensim.downloader as api
from transformers import AutoTokenizer, AutoModelForSequenceClassification, Trainer, TrainingArguments
from datasets import Dataset as HFDataset
from peft import PeftModel, get_peft_model, LoraConfig, TaskType

# Configurazione Device
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

Using device: cuda


## 1. Preparazione Dati: Mapping e Bilanciamento

In [None]:
# Caricamento Dataset
df = pd.read_csv('../data/processed/mental.csv')
df = df.rename(columns={'statement': 'review_text', 'status': 'sentiment'}).dropna()

# 1. Mapping Classi (Aggregazione)
def map_labels(label):
    label = str(label).strip()
    if label in ['Bipolar', 'Personality disorder', 'suicidal']:
        return 'Serious'
    elif label in ['Anxiety', 'Stress']:
        return 'Light'
    else:
        return label

df['label_str'] = df['sentiment'].apply(map_labels)

# 2. Bilanciamento (Undersampling a 7000)
def balance_classes(df, target_col, max_samples=8000):
    balanced_dfs = []
    for label in df[target_col].unique():
        df_class = df[df[target_col] == label]
        if len(df_class) > max_samples:
            df_class = resample(df_class, replace=False, n_samples=max_samples, random_state=42)
        balanced_dfs.append(df_class)
    return pd.concat(balanced_dfs).reset_index(drop=True)

df_balanced = balance_classes(df, 'label_str', max_samples=8000)

# Mappatura ID numerici
labels_order = sorted(df_balanced['label_str'].unique())
label2id = {l: i for i, l in enumerate(labels_order)}
id2label = {i: l for l, i in label2id.items()}
df_balanced['label'] = df_balanced['label_str'].map(label2id)

# Visualizzazione Distribuzione
print(f"Classi finali: {labels_order}")
plt.figure(figsize=(8, 4))
sns.countplot(x='label_str', data=df_balanced, palette='viridis', order=labels_order)
plt.title('Distribuzione Classi (Bilanciata)')
plt.show()

# Split Train/Val/Test
train_val, test_df = train_test_split(df_balanced, test_size=0.1, random_state=42, stratify=df_balanced['label'])
train_df, val_df = train_test_split(train_val, test_size=0.1111, random_state=42, stratify=train_val['label'])

print(f"Train: {len(train_df)} | Val: {len(val_df)} | Test: {len(test_df)}")

AttributeError: 'DataFrame' object has no attribute 'shuffle'

## 2. Baseline: Naive Bayes

In [None]:
print("Training Naive Bayes...")
model_nb = make_pipeline(
    TfidfVectorizer(max_features=10000, stop_words='english'),
    MultinomialNB()
 )
model_nb.fit(train_df['review_text'], train_df['label'])

# Valutazione
nb_preds = model_nb.predict(test_df['review_text'])
nb_acc = accuracy_score(test_df['label'], nb_preds)
nb_f1 = f1_score(test_df['label'], nb_preds, average='weighted')
print(f"Naive Bayes -> Accuracy: {nb_acc:.4f} | F1: {nb_f1:.4f}")

Training Naive Bayes...
Naive Bayes -> Accuracy: 0.6638 | F1: 0.6725


## 3. Custom LSTM

In [None]:
# Preprocessing per LSTM
def clean_text(text):
    text = str(text).lower()
    text = re.sub(r'http\S+|www\.\S+', '', text)
    text = re.sub(r'[^a-zA-Z\s]', '', text)
    return text.strip()

all_text = " ".join(train_df['review_text'].apply(clean_text))
vocab_counts = Counter(all_text.split())
vocab = {w: i+1 for i, (w, c) in enumerate(vocab_counts.items()) if c >= 2}
vocab_size = len(vocab) + 1

def encode(text, max_len=60):
    tokens = clean_text(text).split()
    vec = [vocab.get(t, 0) for t in tokens]
    if len(vec) < max_len: vec += [0] * (max_len - len(vec))
    else: vec = vec[:max_len]
    return vec

class LSTMDataset(Dataset):
    def __init__(self, df):
        self.X = torch.tensor([encode(t) for t in df['review_text']], dtype=torch.long)
        self.y = torch.tensor(df['label'].values, dtype=torch.long)
    def __len__(self): return len(self.y)
    def __getitem__(self, i): return self.X[i], self.y[i]

train_loader = DataLoader(LSTMDataset(train_df), batch_size=64, shuffle=True)
test_loader = DataLoader(LSTMDataset(test_df), batch_size=64)

# GloVe Embeddings
emb_dim = 100
emb_matrix = np.random.normal(scale=0.6, size=(vocab_size, emb_dim))
try:
    print("Caricamento GloVe...")
    glove = api.load("glove-twitter-100")
    found = 0
    for w, i in vocab.items():
        if w in glove: 
            emb_matrix[i] = glove[w]
            found += 1
    print(f"Trovate {found}/{vocab_size} parole in GloVe.")
except: print("GloVe non disponibile, uso random.")

# Modello LSTM
class LSTMNet(nn.Module):
    def __init__(self):
        super().__init__()
        self.emb = nn.Embedding(vocab_size, emb_dim, padding_idx=0)
        self.emb.weight.data.copy_(torch.from_numpy(emb_matrix))
        self.emb.weight.requires_grad = False # Freeze GloVe
        self.lstm = nn.LSTM(emb_dim, 64, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(128, len(labels_order))
        self.drop = nn.Dropout(0.5)
    def forward(self, x):
        x = self.emb(x)
        _, (h, _) = self.lstm(x)
        return self.fc(self.drop(torch.cat((h[-2], h[-1]), dim=1)))

model_lstm = LSTMNet().to(device)
opt = optim.Adam(model_lstm.parameters(), lr=1e-3)
crit = nn.CrossEntropyLoss()

# Training
for ep in range(5):
    model_lstm.train()
    for X, y in train_loader:
        X, y = X.to(device), y.to(device)
        opt.zero_grad()
        loss = crit(model_lstm(X), y)
        loss.backward()
        opt.step()
    print(f"LSTM Epoch {ep+1} complete")

# Valutazione
model_lstm.eval()
lstm_preds, lstm_true = [], []
with torch.no_grad():
    for X, y in test_loader:
        X = X.to(device)
        preds = model_lstm(X).argmax(1).cpu().numpy()
        lstm_preds.extend(preds)
        lstm_true.extend(y.numpy())

lstm_acc = accuracy_score(lstm_true, lstm_preds)
lstm_f1 = f1_score(lstm_true, lstm_preds, average='weighted')
print(f"LSTM -> Accuracy: {lstm_acc:.4f} | F1: {lstm_f1:.4f}")

Caricamento GloVe...
GloVe non disponibile, uso random.


RuntimeError: CUDA error: device-side assert triggered
CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect.
For debugging consider passing CUDA_LAUNCH_BLOCKING=1
Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions.


## 4. XLM-RoBERTa (LoRA)

In [None]:
model_name = "xlm-roberta-base"
tokenizer = AutoTokenizer.from_pretrained(model_name)

def tokenize(batch):
    return tokenizer(batch['text'], padding="max_length", truncation=True, max_length=128)

train_ds = HFDataset.from_dict({"text": train_df['review_text'].tolist(), "label": train_df['label'].tolist()}).map(tokenize, batched=True)
val_ds = HFDataset.from_dict({"text": val_df['review_text'].tolist(), "label": val_df['label'].tolist()}).map(tokenize, batched=True)
test_ds = HFDataset.from_dict({"text": test_df['review_text'].tolist(), "label": test_df['label'].tolist()}).map(tokenize, batched=True)

base_model = AutoModelForSequenceClassification.from_pretrained(
    model_name, num_labels=len(labels_order), id2label=id2label, label2id=label2id
).to(device)

peft_config = LoraConfig(task_type=TaskType.SEQ_CLS, r=16, lora_alpha=32, lora_dropout=0.1, target_modules=["query", "value"])
model_bert = get_peft_model(base_model, peft_config)

def compute_metrics(p):
    preds = np.argmax(p.predictions, axis=1)
    return {"accuracy": accuracy_score(p.label_ids, preds), "f1": f1_score(p.label_ids, preds, average='weighted')}

args = TrainingArguments(
    output_dir="../models/bert_lora_v2", 
    num_train_epochs=2, 
    per_device_train_batch_size=16, 
    logging_steps=50, 
    evaluation_strategy="epoch", 
    save_strategy="epoch",
    load_best_model_at_end=True
)

trainer = Trainer(model=model_bert, args=args, train_dataset=train_ds, eval_dataset=val_ds, compute_metrics=compute_metrics)
trainer.train()

bert_res = trainer.predict(test_ds)
bert_acc = bert_res.metrics['test_accuracy']
bert_f1 = bert_res.metrics['test_f1']
print(f"BERT -> Accuracy: {bert_acc:.4f} | F1: {bert_f1:.4f}")

## 5. Confronto Finale

In [None]:
results = pd.DataFrame({
    'Model': ['Naive Bayes', 'LSTM (GloVe)', 'XLM-RoBERTa (LoRA)'],
    'Accuracy': [nb_acc, lstm_acc, bert_acc],
    'F1-Score': [nb_f1, lstm_f1, bert_f1]
})

print(results)

results.set_index('Model').plot(kind='bar', figsize=(10, 6), ylim=(0, 1), rot=0)
plt.title('Confronto Prestazioni Modelli (5 Classi Bilanciate)')
plt.ylabel('Score')
plt.legend(loc='lower right')
plt.grid(axis='y', linestyle='--', alpha=0.7)
plt.show()