<a href="https://colab.research.google.com/github/Sergio-ddf/emit-llm/blob/main/emit_setup.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# EMit Emotion Detection Task

# 1. Setup e import


In [None]:
# API Key WandB
from google.colab import userdata
import os, wandb

os.environ["WANDB_API_KEY"] = userdata.get('WANDB_KEY')
wandb.login()

In [None]:
# Installazione delle dipendenze
!pip install -q datasets emoji iterative-stratification evaluate tokenizers


In [None]:
# Import principali e informazioni sull'ambiente
import os
import re

import numpy as np
import pandas as pd
import torch
import emoji

from iterstrat.ml_stratifiers import MultilabelStratifiedShuffleSplit
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score, classification_report
from sklearn.multiclass import OneVsRestClassifier

from datasets import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer
)
from torch.nn import BCEWithLogitsLoss
from torch.nn.functional import sigmoid

print(f"PyTorch    : {torch.__version__} (CUDA disponibile: {torch.cuda.is_available()})")

# 2. Configurazione e percorsi


In [None]:
DATA_DIR     = ""  # path della cartella con i dati
MODEL_NAME   = "Musixmatch/umberto-commoncrawl-cased-v1"
DEVICE       = torch.device("cuda" if torch.cuda.is_available() else "cpu")
LABELS       = ['Anger','Anticipation','Disgust','Fear','Joy',
                'Love','Neutral','Sadness','Surprise','Trust']
NUM_LABELS   = len(LABELS)


# 3. Caricamento dati e statistiche


In [None]:
train_a = pd.read_csv(os.path.join(DATA_DIR, "emit_train_A.csv"))
test_in = pd.read_csv(os.path.join(DATA_DIR, "emit_test.csv"))

print("Train A:", train_a.shape, "Test in-domain:", test_in.shape)
display(train_a.head())

# distribuzione etichette
counts = train_a[LABELS].sum().sort_values(ascending=False)
display(counts.to_frame("etichette"))


# 4. Pulizia testo


In [None]:
URL, USER, TAG = "<URL>", "<USER>", "<HASHTAG>"

def clean(text: str) -> str:
    t = re.sub(r'https?://\S+', URL, text)
    t = re.sub(r'@\w+', USER, t)
    t = re.sub(r'#(\w+)', TAG + r' \1', t)
    t = emoji.demojize(text, language='it')
    return t.strip()

train_a['text_clean'] = train_a['text'].map(clean)
train_a[['text', 'text_clean']].head()



# 5. Split stratificato 90/10


In [None]:
X = train_a['text_clean'].values
Y = train_a[LABELS].values

msss = MultilabelStratifiedShuffleSplit(test_size=0.1, random_state=42)
train_idx, val_idx = next(msss.split(X, Y))

train_df = train_a.iloc[train_idx].reset_index(drop=True)
val_df   = train_a.iloc[val_idx].reset_index(drop=True)

print("Train/Val:", train_df.shape[0], "/", val_df.shape[0])


# 6. Baseline TF-IDF + Logistic Regression


In [None]:
vec = TfidfVectorizer(ngram_range=(1,2), max_features=5000)
X_tr = vec.fit_transform(train_df['text_clean'])
X_va = vec.transform(val_df['text_clean'])

y_tr = train_df[LABELS].values
y_va = val_df[LABELS].values

clf = OneVsRestClassifier(LogisticRegression(max_iter=1000, random_state=42))
clf.fit(X_tr, y_tr)
y_pr = clf.predict(X_va)

print("TF-IDF+LR macro-F1:", f1_score(y_va, y_pr, average='macro'))
print(classification_report(y_va, y_pr, target_names=LABELS, zero_division=0))


# 7. Preparazione HF Dataset


In [None]:
# colonne labels in float vector
for df in (train_df, val_df):
    df['labels'] = df[LABELS].astype(float).values.tolist()

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)
MAX_LEN   = 128

def tokenize_fn(batch):
    enc = tokenizer(batch['text_clean'],
                    padding='max_length',
                    truncation=True,
                    max_length=MAX_LEN)
    enc['labels'] = batch['labels']
    return enc

drop_cols = ['text', 'text_clean'] + LABELS

dtrain = Dataset.from_pandas(train_df)\
           .map(tokenize_fn, batched=True, remove_columns=drop_cols)\
           .with_format('torch', columns=['input_ids','attention_mask','labels'])
dval   = Dataset.from_pandas(val_df)\
           .map(tokenize_fn, batched=True, remove_columns=drop_cols)\
           .with_format('torch', columns=['input_ids','attention_mask','labels'])


# 8. Fine-tuning UmBERTo con BCE pesata


In [None]:
# pesi per BCE
freqs       = train_df[LABELS].mean().values
pos_weights = torch.tensor((1 - freqs) / freqs, device=DEVICE)

class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        inputs.pop("num_items_in_batch", None)
        labels = inputs.pop("labels")
        logits = model(**inputs).logits
        loss   = BCEWithLogitsLoss(pos_weight=pos_weights)(logits, labels.float())
        return (loss, logits) if return_outputs else loss

model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=NUM_LABELS,
    problem_type='multi_label_classification'
).to(DEVICE)
model.config.id2label = {i:l for i,l in enumerate(LABELS)}
model.config.label2id = {l:i for i,l in enumerate(LABELS)}

args = TrainingArguments(
    output_dir='ckpt/umberto',
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    learning_rate=2e-5,
    num_train_epochs=10,
    lr_scheduler_type='linear',
    warmup_steps=500,
    logging_steps=50,
    eval_strategy='epoch',
    save_strategy='epoch',
    load_best_model_at_end=True,
    metric_for_best_model='eval_macro_f1',
    report_to=None
)

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    probs  = sigmoid(torch.tensor(logits))
    preds  = (probs >= 0.5).int().numpy()
    return {"eval_macro_f1": float(f1_score(labels, preds, average='macro', zero_division=0))}

trainer = WeightedTrainer(
    model=model,
    args=args,
    train_dataset=dtrain,
    eval_dataset=dval,
    compute_metrics=compute_metrics
)


In [None]:

trainer.train()


# 9. Ottimizzazione soglie e valutazione finale


In [None]:
# logits e labels
preds_out = trainer.predict(dval)
val_probs = sigmoid(torch.tensor(preds_out.predictions)).numpy()
val_labels= val_df[LABELS].values.astype(int)

# soglie per classe
best_t = []
for i in range(NUM_LABELS):
    best_f1, best_thresh = 0, 0.5
    for t in np.linspace(0.1, 0.9, 81):
        f1 = f1_score(val_labels[:,i], (val_probs[:,i]>=t).astype(int))
        if f1>best_f1:
            best_f1, best_thresh = f1, t
    best_t.append(best_thresh)

print(dict(zip(LABELS, best_t)))

# macro-F1 ottimizzata
preds_opt = (val_probs >= np.array(best_t)).astype(int)
print("Final optimized macro-F1:", f1_score(val_labels, preds_opt, average='macro', zero_division=0))
