# EMit Emotion Detection Task

In [None]:
# monta la repo con PAT nascosto nei Secrets
from google.colab import userdata
import os, wandb

os.environ["WANDB_API_KEY"] = userdata.get('WANDB_KEY')
wandb.login()



In [None]:
# Installazione delle dipendenze (esegui solo la prima volta)
!pip install -q datasets emoji iterative-stratification evaluate tokenizers


In [None]:
# Import principali e informazioni sull'ambiente
import os, platform, re
import numpy as np
import pandas as pd
import torch
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import f1_score, classification_report
from sklearn.multiclass import OneVsRestClassifier
from iterstrat.ml_stratifiers import MultilabelStratifiedShuffleSplit
from transformers import (AutoTokenizer, AutoModelForSequenceClassification,
                          TrainingArguments, Trainer)
import datasets, emoji, evaluate, tokenizers, transformers

print(f"Python     : {platform.python_version()}")
print(f"PyTorch    : {torch.__version__} (CUDA disponibile: {torch.cuda.is_available()})")
print(f"Transformers: {transformers.__version__}")
print(f"Datasets   : {datasets.__version__}")

In [None]:
# Caricamento dei file CSV
DATA_DIR = ""  # <--- modifica se i file sono in un'altra cartella

train_a = pd.read_csv(os.path.join(DATA_DIR, "emit_train_A.csv"))
train_b = pd.read_csv(os.path.join(DATA_DIR, "emit_train_B.csv"))

test_in  = pd.read_csv(os.path.join(DATA_DIR, "emit_test.csv"))

display(train_a.head())

In [None]:
# Statistiche delle etichette (frequenza nel train set)
LABELS = ['Anger','Anticipation','Disgust','Fear','Joy','Love','Neutral','Sadness','Surprise','Trust']

stats = train_a[LABELS].sum().sort_values(ascending=False)
ax = stats.plot(kind='bar', title='Distribuzione etichette')
ax.set_ylabel('Numero di esempi positivi')
print(stats)

In [None]:
# Pre‑processing del testo: link, mention, hashtag, emoji
URL_TOKEN     = "<URL>"
USER_TOKEN    = "<USER>"
HASHTAG_TOKEN = "<HASHTAG>"


def clean(text: str) -> str:
    """Semplice pulizia del testo e sostituzione di token speciali."""
    text = re.sub(r'https?://\S+', URL_TOKEN, text)       # link
    text = re.sub(r'@\w+', USER_TOKEN, text)               # mention
    text = re.sub(r'#(\w+)', HASHTAG_TOKEN + r' \1', text) # hashtag → token + parola
    text = emoji.demojize(text, language='it')               # emoji → testo
    return text.strip()

train_a['text_clean'] = train_a['text'].apply(clean)
train_a[['text', 'text_clean']].head()

In [None]:
# Split 90/10 stratificato con MultilabelStratifiedShuffleSplit
X = train_a['text_clean'].values
Y = train_a[LABELS].values

msss = MultilabelStratifiedShuffleSplit(n_splits=1, test_size=0.10, random_state=42)
train_idx, val_idx = next(msss.split(X, Y))

train_df = train_a.iloc[train_idx].reset_index(drop=True)
val_df   = train_a.iloc[val_idx].reset_index(drop=True)

print(f"Dimensione train: {train_df.shape[0]} – validation: {val_df.shape[0]}")

In [None]:
# Baseline: TF‑IDF uni/bi‑gram + Logistic Regression (One‑vs‑Rest)
vectorizer = TfidfVectorizer(ngram_range=(1, 2), max_features=5_000)

X_train = vectorizer.fit_transform(train_df['text_clean'])
X_val   = vectorizer.transform(val_df['text_clean'])

y_train = train_df[LABELS].values
y_val   = val_df[LABELS].values

clf = OneVsRestClassifier(LogisticRegression(max_iter=1_000, random_state=42))
clf.fit(X_train, y_train)

y_pred   = clf.predict(X_val)
macro_f1 = f1_score(y_val, y_pred, average='macro')
print(f"Baseline TF‑IDF + LR macro‑F1: {macro_f1:.4f}")

print(classification_report(y_val, y_pred, target_names=LABELS, zero_division=0))

In [None]:
# Preparazione dati per il fine‑tuning di UmBERTo
MODEL_NAME = 'Musixmatch/umberto-commoncrawl-cased-v1'  # puoi cambiarlo
DEVICE      = 'cuda' if torch.cuda.is_available() else 'cpu'
NUM_LABELS  = len(LABELS)

# Calcolo dei pesi di classe per BCE ponderata
freqs        = train_df[LABELS].mean().values  # frequenza positivi per classe
pos_weights  = torch.tensor((1 - freqs) / freqs, device=DEVICE)

# Aggiungo colonna "labels" con vettore di float
for df in (train_df, val_df):
    df['labels'] = df[LABELS].astype(float).values.tolist()

# Tokenizzazione
MAX_LEN   = 128
TOKENIZER = AutoTokenizer.from_pretrained(MODEL_NAME)

def tokenize_fn(batch):
    enc = TOKENIZER(batch['text_clean'], padding='max_length', truncation=True, max_length=MAX_LEN)
    enc['labels'] = batch['labels']
    return enc

# Conversione in HuggingFace Dataset e formattazione "torch"
drop_cols = ['text', 'text_clean'] + LABELS

dtrain = datasets.Dataset.from_pandas(train_df).map(tokenize_fn, batched=True, remove_columns=drop_cols)
dval   = datasets.Dataset.from_pandas(val_df).map(tokenize_fn,   batched=True, remove_columns=drop_cols)

dtrain.set_format('torch', columns=['input_ids', 'attention_mask', 'labels'])
dval.set_format('torch',   columns=['input_ids', 'attention_mask', 'labels'])

# Modello
model = AutoModelForSequenceClassification.from_pretrained(
    MODEL_NAME,
    num_labels=NUM_LABELS,
    problem_type='multi_label_classification'
).to(DEVICE)

# Mappatura etichette → id
model.config.label2id = {lbl: i for i, lbl in enumerate(LABELS)}
model.config.id2label = {i: lbl for i, lbl in enumerate(LABELS)}

# Funzione metriche (macro‑F1)
from torch.nn.functional import sigmoid

def compute_metrics(eval_pred):
    logits, labels = eval_pred
    probs  = sigmoid(torch.tensor(logits))
    preds  = (probs >= 0.5).int().numpy()
    labels = labels.astype(int)
    return {"eval_macro_f1": float(f1_score(labels, preds, average='macro', zero_division=0))}

# Trainer personalizzato con BCE ponderata
class WeightedTrainer(Trainer):
    def compute_loss(self, model, inputs, return_outputs=False, **kwargs):
        # opzionale: dreniamo num_items_in_batch se presente
        inputs.pop("num_items_in_batch", None)

        labels = inputs.pop("labels")
        outputs = model(**inputs)
        logits  = outputs.logits
        loss    = torch.nn.BCEWithLogitsLoss(pos_weight=pos_weights)(
            logits, labels.float()
        )
        return (loss, outputs) if return_outputs else loss


# Argomenti di training
args = TrainingArguments(
    output_dir='ckpt/umberto',
    per_device_train_batch_size=16,
    per_device_eval_batch_size=32,
    learning_rate=2e-5,
    num_train_epochs=10,
    lr_scheduler_type='linear',
    warmup_steps=500,
    logging_steps=50,
    eval_strategy='epoch',
    save_strategy='epoch',
    load_best_model_at_end=True,
    metric_for_best_model='eval_macro_f1',
    report_to=None,
    run_name='umberto_full'
)

trainer = WeightedTrainer(
    model=model,
    args=args,
    train_dataset=dtrain,
    eval_dataset=dval,
    compute_metrics=compute_metrics
)

In [None]:
# Avvio del fine‑tuning
trainer.train()
print(f"Migliore eval_macro_f1: {trainer.state.best_metric:.4f}")

In [None]:
# Ottimizzazione della soglia per classe e valutazione finale
# Predizioni sul validation set
eval_preds = trainer.predict(dval)
val_logits = eval_preds.predictions
val_labels = val_df[LABELS].values.astype(int)

# Calcolo probabilità
val_probs = sigmoid(torch.tensor(val_logits)).numpy()

# Ricerca della soglia ottimale per ogni classe
best_thresholds = []
for i in range(NUM_LABELS):
    best_f1, best_t = 0.0, 0.5
    for t in np.linspace(0.1, 0.9, 81):
        f1 = f1_score(val_labels[:, i], (val_probs[:, i] >= t).astype(int))
        if f1 > best_f1:
            best_f1, best_t = f1, t
    best_thresholds.append(best_t)
print("Soglie ottimali per classe:", dict(zip(LABELS, best_thresholds)))

# Valutazione macro‑F1 con soglie ottimizzate
preds_opt = (val_probs >= np.array(best_thresholds)).astype(int)
final_macro = f1_score(val_labels, preds_opt, average='macro', zero_division=0)
print(f"Macro‑F1 finale ottimizzata: {final_macro:.4f}")

In [None]:
# ⚙️ Colab → GitHub sync (modifiche da emit_setup.ipynb)
from google.colab import userdata
import os, shutil

# 1. Ottieni token GitHub da Secrets o input
token = userdata.get('GITHUB_TOKEN')  # richiede input sicuro solo la prima volta
repo_url = f"https://{token}@github.com/Sergio-ddf/emit-llm.git"

# 2. Clona la repo solo se non esiste
if not os.path.exists("/content/emit-llm"):
    !git clone $repo_url
%cd /content/emit-llm

# 3. Configura nome utente Git (obbligatorio per commit)
!git config user.email "ddfsergio9@google.com"
!git config user.name  "Sergio-ddf"

# 4. Copia il notebook attuale dentro la cartella della repo
notebook_name = "emit_setup.ipynb"
src = f"/content/{notebook_name}"
dst = f"/content/emit-llm/{notebook_name}"
if os.path.exists(src):
    shutil.copyfile(src, dst)
    print(f"✅ Copiato {notebook_name} in repo locale.")
else:
    print("❌ Notebook non trovato. Salva prima il notebook in Colab.")

# 5. Esegui commit + push
!git pull origin main
!git add emit_setup.ipynb
!git commit -m "update: modifiche da Colab salvate automaticamente" || echo "⏩ Nessuna modifica da committare"
!git push origin main
