<a href="https://colab.research.google.com/github/Yanikko/progetto-AI-2/blob/main/notebooks%2003_models_HARTH.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# HARTH — Models & Evaluation (03_models.ipynb)

Confronto **Logistic Regression (L2) baseline** vs **LDA (shrinkage)** con validazione corretta **per soggetto**.

Ricetta (semplice e “da esame”):
1. Split finale **train/test** con `GroupShuffleSplit` (test = 20% soggetti).
2. Sul solo training: stima robusta con **GroupKFold (k=5)**.
3. Fit finale su tutto il training, valutazione una sola volta sul test.
4. Metriche: **macro-F1**, **balanced accuracy**, **confusion matrix**.

Prerequisito:
- aver eseguito `02_preprocessing.ipynb`, che salva:
  - `data/processed/X_features.parquet`
  - `data/processed/y_labels.parquet`
  - `data/processed/groups_subject.parquet`


In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path

pd.set_option("display.max_columns", 80)
pd.set_option("display.width", 140)


## 1) Caricamento dataset preprocessato

In [2]:
DATA_DIR = Path("data/processed")
X_path = DATA_DIR / "X_features.parquet"
y_path = DATA_DIR / "y_labels.parquet"
g_path = DATA_DIR / "groups_subject.parquet"

if not (X_path.exists() and y_path.exists() and g_path.exists()):
    raise FileNotFoundError(
        "File preprocessati non trovati. Esegui prima 02_preprocessing.ipynb per generare i parquet in data/processed/."
    )

X = pd.read_parquet(X_path)
y = pd.read_parquet(y_path).iloc[:, 0]      # colonna 'label'
groups = pd.read_parquet(g_path).iloc[:, 0] # colonna 'subject'

print("X:", X.shape, "y:", y.shape, "groups:", groups.shape)
print("N soggetti:", groups.nunique(), "N classi:", y.nunique())
display(X.head())


FileNotFoundError: File preprocessati non trovati. Esegui prima 02_preprocessing.ipynb per generare i parquet in data/processed/.

### Controlli rapidi: distribuzione classi e soggetti

In [None]:
class_counts = y.value_counts()
subj_counts = groups.value_counts().sort_index()

display(class_counts.head(15).to_frame("n_windows"))
display(subj_counts.to_frame("n_windows").head(30))

plt.figure()
class_counts.head(20).plot(kind="bar")
plt.title("Distribuzione classi (finestre)")
plt.xlabel("Label")
plt.ylabel("N windows")
plt.tight_layout()
plt.show()

plt.figure()
subj_counts.plot(kind="bar")
plt.title("Finestre per soggetto")
plt.xlabel("Subject")
plt.ylabel("N windows")
plt.tight_layout()
plt.show()


## 2) Split train/test per soggetto (GroupShuffleSplit)

In [None]:
from sklearn.model_selection import GroupShuffleSplit

RANDOM_STATE = 42
TEST_SIZE = 0.20  # ~4-5 soggetti su 22

gss = GroupShuffleSplit(n_splits=1, test_size=TEST_SIZE, random_state=RANDOM_STATE)
train_idx, test_idx = next(gss.split(X, y, groups=groups))

X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y.iloc[train_idx], y.iloc[test_idx]
g_train, g_test = groups.iloc[train_idx], groups.iloc[test_idx]

print("Train:", X_train.shape, "Test:", X_test.shape)
print("Train subjects:", sorted(g_train.unique()))
print("Test subjects:", sorted(g_test.unique()))
print("N train subjects:", g_train.nunique(), "N test subjects:", g_test.nunique())


## 3) Modelli: LR-L2 baseline vs LDA (shrinkage)

In [None]:
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis

# Logistic Regression (baseline): L2 + class_weight per sbilanciamento
lr_l2 = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", LogisticRegression(
        penalty="l2",
        C=1.0,
        solver="lbfgs",
        multi_class="auto",
        class_weight="balanced",
        max_iter=2000
    ))
])

# LDA con shrinkage automatico: più robusto con feature correlate
lda_shrink = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", LinearDiscriminantAnalysis(
        solver="lsqr",
        shrinkage="auto"
    ))
])

models = {
    "LR (L2, class_weight=balanced)": lr_l2,
    "LDA (shrinkage=auto)": lda_shrink
}

list(models.keys())


## 4) Cross-validation sul training (GroupKFold)

In [None]:
from sklearn.model_selection import GroupKFold, cross_validate

cv = GroupKFold(n_splits=5)

scoring = {
    "macro_f1": "f1_macro",
    "balanced_acc": "balanced_accuracy"
}

cv_rows = []
for name, pipe in models.items():
    res = cross_validate(
        pipe,
        X_train, y_train,
        groups=g_train,
        cv=cv,
        scoring=scoring,
        n_jobs=-1,
        return_train_score=False
    )
    cv_rows.append({
        "model": name,
        "cv_macro_f1_mean": float(np.mean(res["test_macro_f1"])),
        "cv_macro_f1_std": float(np.std(res["test_macro_f1"])),
        "cv_bal_acc_mean": float(np.mean(res["test_balanced_acc"])),
        "cv_bal_acc_std": float(np.std(res["test_balanced_acc"])),
    })

cv_df = pd.DataFrame(cv_rows).sort_values("cv_macro_f1_mean", ascending=False)
display(cv_df)


## 5) Fit finale su train e valutazione sul test

In [None]:
from sklearn.metrics import f1_score, balanced_accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay

test_rows = []
preds = {}

for name, pipe in models.items():
    pipe.fit(X_train, y_train)
    y_pred = pipe.predict(X_test)
    preds[name] = y_pred

    test_rows.append({
        "model": name,
        "test_macro_f1": float(f1_score(y_test, y_pred, average="macro")),
        "test_balanced_acc": float(balanced_accuracy_score(y_test, y_pred)),
    })

test_df = pd.DataFrame(test_rows).sort_values("test_macro_f1", ascending=False)
display(test_df)


### Report dettagliato e confusion matrix

In [None]:
best_model_name = test_df.iloc[0]["model"]
print("Best model on test (by macro-F1):", best_model_name)

y_pred_best = preds[best_model_name]
print("\nClassification report:")
print(classification_report(y_test, y_pred_best, digits=3))

cm = confusion_matrix(y_test, y_pred_best)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)

plt.figure(figsize=(8, 8))
disp.plot(values_format="d", ax=plt.gca(), colorbar=False)
plt.title(f"Confusion Matrix — {best_model_name}")
plt.tight_layout()
plt.show()


## 6) Note metodologiche (testo pronto per relazione)

- Lo split **per soggetto** evita leakage e misura la generalizzazione su persone non viste.
- Il test set viene usato **una sola volta** per il risultato finale.
- La GroupKFold sul training fornisce una stima più stabile (media ± deviazione standard).
- Metriche scelte (macro-F1, balanced accuracy) sono adatte a classi sbilanciate.
