# Appendice Codice — Pipeline BRFSS 2015
_Notebook generato automaticamente dal documento Word; include sezioni markdown e blocchi di codice eseguibili._

# Pre-processing

Caricamento del dataset

In [None]:
from google.colab import files
uploaded = files.upload()
import pandas as pd

# Caricamento del dataset dal file CSV

In [None]:
df = pd.read_csv("heart_disease_health_indicators_BRFSS2015.csv")

# Visualizzazione della forma del dataset

In [None]:
print("Dimensioni del dataset:", df.shape)

# Visualizzazione dei tipi di dati

In [None]:
print("Tipi di variabili:\n", df.dtypes)

# Visualizzazione dei primi 5 record

In [None]:
print("Prime 5 righe del dataset:\n", df.head())

# Verifica della presenza di valori mancanti

In [None]:
print("Valori nulli per colonna:\n", df.isnull().sum())

# Mostra le prime righe

In [None]:
df.head()

## Separazione X / y

In [None]:
# Separazione tra features (X) e target (y)
X = df.drop(columns=['HeartDiseaseorAttack'])
y = df['HeartDiseaseorAttack']

# Controlli veloci

In [None]:
print("Shape X (features):", X.shape)
print("Shape y (target):", y.shape)
print("\nPrime colonne di X:", list(X.columns[:10]))
print("\nDistribuzione target (conteggi):")
print(y.value_counts())
print("\nDistribuzione target (%):")
print((y.value_counts(normalize=True)*100).round(2))

## Train–Test split

In [None]:
from sklearn.model_selection import train_test_split
# Suddivisione in training e test set (80%-20%)
X_train, X_test, y_train, y_test = train_test_split(

X, y, test_size=0.2, random_state=42, stratify=y

In [None]:
)
print("Shape X_train:", X_train.shape)
print("Shape X_test:", X_test.shape)
print("Distribuzione target nel training set:")
print(y_train.value_counts(normalize=True) * 100)

## Standardizzazione del BMI

In [None]:
from sklearn.preprocessing import StandardScaler

# Creazione copia per non modificare direttamente il dataset

In [None]:
X_train_scaled = X_train.copy()
X_test_scaled = X_test.copy()
scaler = StandardScaler()

# Adattare lo scaler sul train e trasformiamo train e test

In [None]:
X_train_scaled['BMI'] = scaler.fit_transform(X_train[['BMI']])
X_test_scaled['BMI'] = scaler.transform(X_test[['BMI']])
print("BMI normalizzato - media (train):", round(X_train_scaled['BMI'].mean(), 4))
print("BMI normalizzato - deviazione standard (train):", round(X_train_scaled['BMI'].std(), 4))

## Encoding variabili categoriche

In [None]:
ordinal_cols = ['GenHlth', 'Education', 'Income', 'Age']
missing_ordinal = [c for c in ordinal_cols if c not in X_train_scaled.columns]
if missing_ordinal:
print("\n[WARN] Colonne ordinali non trovate (verifica header):", missing_ordinal)

## Verifica varianza (feature poco informative)

In [None]:
from sklearn.feature_selection import VarianceThreshold

# Rimozione di feature costanti

In [None]:
selector = VarianceThreshold(threshold=0.0)  # elimina solo variabili con varianza nulla
selector.fit(X_train_scaled)

# Maschera booleana delle feature mantenute

In [None]:
constant_mask = selector.get_support()

# Identificazione delle feature eliminate

In [None]:
dropped_features = list(X_train_scaled.columns[~constant_mask])
print("Feature eliminate (costanti):", dropped_features)

## Oversampling (SMOTE)

# Installazione imbalanced-learn se non già presente

In [None]:
!pip install -U imbalanced-learn

## Oversampling (SMOTE)

## Oversampling (SMOTE)

## Oversampling (SMOTE)

## Oversampling (SMOTE)

## Oversampling (SMOTE)

## Oversampling (SMOTE)

## Oversampling (SMOTE)

## Salvataggio artefatti

In [None]:
import joblib
import pandas as pd

# Salvataggio dataset di training e test

## Oversampling (SMOTE)

## Oversampling (SMOTE)

In [None]:
train_df.to_csv("/content/train_preprocessed.csv", index=False)
test_df = pd.DataFrame(X_test_scaled, columns=X_train_scaled.columns)
test_df['target'] = y_test
test_df.to_csv("/content/test_preprocessed.csv", index=False)

# Salvataggio dello scaler

In [None]:
joblib.dump(scaler, "/content/scaler.pkl")

# Salvataggio della lista delle feature

In [None]:
joblib.dump(list(X_train_scaled.columns), "/content/features_list.pkl")
print("[INFO] Artefatti salvati con successo:")
print("- train_preprocessed.csv")
print("- test_preprocessed.csv")
print("- scaler.pkl")
print("- features_list.pkl")

## Download artefatti

In [None]:
from google.colab import files
files.download("/content/train_preprocessed.csv")
files.download("/content/test_preprocessed.csv")
files.download("/content/scaler.pkl")
files.download("/content/features_list.pkl")

# Modulo Predittivo — Random Forest

## Setup ambiente (librerie + cartelle)

In [None]:
!pip install xgboost joblib pyarrow fastparquet --quiet
import os, json, joblib, pathlib, datetime, numpy as np, pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import StratifiedKFold, RandomizedSearchCV, cross_validate
from sklearn.metrics import (roc_auc_score, roc_curve, precision_recall_curve,

classification_report, confusion_matrix, auc,

In [None]:
precision_score, recall_score, f1_score)
from sklearn.ensemble import RandomForestClassifier
from xgboost import XGBClassifier
np.random.seed(42)
# Dove salva output e figure (in Colab locale)
RUN_ID   = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
BASE_OUT = pathlib.Path(f"./artifacts/{RUN_ID}")
(BASE_OUT / "random_forest").mkdir(parents=True, exist_ok=True)
(BASE_OUT / "xgboost").mkdir(parents=True, exist_ok=True)
print("Run folder:", BASE_OUT)

## Caricamento artefatti

In [None]:
from google.colab import files
uploaded = files.upload()  # seleziona qui i 4 file dal tuo PC
# Riconoscere i file caricati in modo robusto (anche se hanno suffissi diversi)
def pick(name_contains, exts=(".csv",".pkl",".joblib")):
for k in uploaded.keys():
if name_contains in k and k.endswith(exts):
return k
raise ValueError(f"File con '{name_contains}' non trovato tra: {list(uploaded.keys())}")
train_path  = pick("train_preprocessed", exts=(".csv",))
test_path   = pick("test_preprocessed",  exts=(".csv",))
scaler_path = pick("scaler",             exts=(".pkl",".joblib"))
feat_path   = pick("features",           exts=(".pkl",".joblib"))
print("Train CSV: ", train_path)
print("Test  CSV: ", test_path)
print("Scaler:    ", scaler_path)
print("Features:  ", feat_path)
import pandas as pd, joblib, pickle, numpy as np

# 1) Leggere  CSV

In [None]:
train_df = pd.read_csv(train_path)
test_df  = pd.read_csv(test_path)

# 2) Caricare scaler

In [None]:
try:
scaler = joblib.load(scaler_path)
except:
with open(scaler_path, "rb") as f:
scaler = pickle.load(f)

# 3) Caricare lista feature

In [None]:
try:
feature_names = joblib.load(feat_path)
except:
with open(feat_path, "rb") as f:
feature_names = pickle.load(f)
print("Train shape:", train_df.shape, "Test shape:", test_df.shape)
print("Prime colonne:", train_df.columns.tolist()[:8])
print("N. feature dichiarate:", len(feature_names))

## Separazione X / y

# individuare target

In [None]:
possible_y = ["target", "HeartDiseaseorAttack", "HeartDisease"]
y_col = next((c for c in possible_y if c in train_df.columns), None)
assert y_col is not None, f"Colonna target non trovata. Disponibili: {train_df.columns.tolist()}"
print("Target column:", y_col)

# normalizzare {2:0} se necessario

In [None]:
def normalize_y(s):
s = pd.Series(s).astype(int)
return s.replace({2:0}).values
y_train = normalize_y(train_df[y_col])
y_test  = normalize_y(test_df[y_col])

# usare solo le feature presenti sia nella lista che nel dataframe

In [None]:
feature_set = [c for c in feature_names if c in train_df.columns]
X_train = train_df[feature_set].values
X_test  = test_df[feature_set].values
print("X_train:", X_train.shape, "X_test:", X_test.shape)
print("Positivi train (%):", round(100*np.mean(y_train), 2), " — Positivi test (%):", round(100*np.mean(y_test), 2))

4) VALIDAZIONE INCROCIATA E FUNZIONI DI VALUTAZIONE

In [None]:
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
def evaluate_on_test(model_name, y_true, y_proba, out_dir):
out_dir.mkdir(parents=True, exist_ok=True)

# Pred a soglia 0.5

In [None]:
y_pred = (y_proba >= 0.5).astype(int)

# Metriche principali

In [None]:
metrics = {

"threshold": 0.5,

In [None]:
"accuracy": float((y_pred == y_true).mean()),

"roc_auc": float(roc_auc_score(y_true, y_proba)),
"precision": float(precision_score(y_true, y_pred, zero_division=0)),
"recall": float(recall_score(y_true, y_pred, zero_division=0)),

In [None]:
"f1": float(f1_score(y_true, y_pred, zero_division=0))

}
# Report + CM

In [None]:
report_dict = classification_report(y_true, y_pred, output_dict=True, zero_division=0)
cm = confusion_matrix(y_true, y_pred)

# ROC figure

In [None]:
fpr, tpr, _ = roc_curve(y_true, y_proba)
roc_auc_val = auc(fpr, tpr)
plt.figure()
plt.plot(fpr, tpr, label=f"AUC = {roc_auc_val:.3f}")
plt.plot([0,1],[0,1],'--',linewidth=1)
plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate")
plt.title(f"ROC - {model_name}")
plt.legend(loc="lower right")
plt.tight_layout(); plt.savefig(out_dir / "roc_curve.png", dpi=150); plt.close()

# PR figure

In [None]:
prec, rec, _ = precision_recall_curve(y_true, y_proba)
pr_auc_val = auc(rec, prec)
plt.figure()
plt.plot(rec, prec, label=f"PR AUC = {pr_auc_val:.3f}")
plt.xlabel("Recall"); plt.ylabel("Precision")
plt.title(f"Precision-Recall - {model_name}")
plt.legend(loc="lower left")
plt.tight_layout(); plt.savefig(out_dir / "pr_curve.png", dpi=150); plt.close()

# Aggiunta PR AUC alle metriche salvate

In [None]:
metrics["pr_auc"] = float(pr_auc_val)

# Salvataggi

In [None]:
with open(out_dir / "metrics.json", "w") as f:
json.dump({**metrics,

"classification_report": report_dict,

In [None]:
"confusion_matrix": cm.tolist()}, f, indent=2)

# CSV utili

In [None]:
pd.DataFrame(cm, index=["Actual_0","Actual_1"], columns=["Pred_0","Pred_1"])\
.to_csv(out_dir / "confusion_matrix.csv")
pd.DataFrame(report_dict).to_csv(out_dir / "classification_report.csv")
return metrics

## Random Forest — ricerca, training e valutazione

In [None]:
rf = RandomForestClassifier(random_state=42, n_jobs=-1)
rf_param_dist = {

"n_estimators":       np.arange(200, 601, 50),
"max_depth":          [None] + list(range(4, 21, 2)),
"min_samples_split":  [2, 5, 10, 20],
"min_samples_leaf":   [1, 2, 4, 8],
"max_features":       ["sqrt", None, 0.5]
}

In [None]:
rf_search = RandomizedSearchCV(

estimator=rf,
param_distributions=rf_param_dist,
n_iter=25,
scoring="f1",
cv=cv,
n_jobs=-1,
random_state=42,
verbose=1

In [None]:
)
rf_search.fit(X_train, y_train)
rf_best = rf_search.best_estimator_
print("RF best params:", rf_search.best_params_)
# CV informativa (multi-metrica)
scoring = {"roc_auc":"roc_auc", "precision":"precision", "recall":"recall", "f1":"f1"}
rf_cv = cross_validate(rf_best, X_train, y_train, cv=cv, scoring=scoring, n_jobs=-1, return_train_score=False)
print({m: (rf_cv["test_"+m].mean(), rf_cv["test_"+m].std()) for m in scoring})

# Fit finale + test

In [None]:
rf_best.fit(X_train, y_train)
rf_proba_test = rf_best.predict_proba(X_test)[:,1]

# Salvataggi

In [None]:
joblib.dump(rf_best, BASE_OUT / "random_forest" / "random_forest.joblib")
rf_metrics = evaluate_on_test("random_forest", y_test, rf_proba_test, BASE_OUT / "random_forest")
with open(BASE_OUT / "random_forest" / "cv_summary.json","w") as f:
json.dump({m: {"mean": float(rf_cv["test_"+m].mean()), "std": float(rf_cv["test_"+m].std())} for m in scoring}, f, indent=2)
print("RF test metrics:", rf_metrics)

### Patch salvataggi best_params_

# PATCH salvataggi "robusti" e consistenti

In [None]:
import json, joblib, numpy as np, pandas as pd, matplotlib.pyplot as plt
from pathlib import Path
from sklearn.metrics import classification_report, confusion_matrix, roc_auc_score, roc_curve, precision_recall_curve, auc
out_dir = Path("rf_results")
out_dir.mkdir(exist_ok=True)

# best_params_ in tipi Python "puri"

In [None]:
def to_py(obj):
if isinstance(obj, (np.integer,)):  return int(obj)
if isinstance(obj, (np.floating,)): return float(obj)
if isinstance(obj, (np.bool_,)):    return bool(obj)
return obj
best_params_py = {k: to_py(v) for k, v in rf_search.best_params_.items()}
# Salvataggio modello + iperparametri (usa rf_best, non best_rf)
joblib.dump(rf_best, out_dir / "random_forest_model.pkl")
with open(out_dir / "best_params.json", "w") as f:
json.dump(best_params_py, f, indent=2)
# Ricalcolare pred, cm, report e curve sul test (riuso y_test / rf_proba_test)
y_prob = rf_proba_test
y_pred = (y_prob >= 0.5).astype(int)
cm = confusion_matrix(y_test, y_pred)
roc_auc = float(roc_auc_score(y_test, y_prob))
report_dict = classification_report(y_test, y_pred, output_dict=True, zero_division=0)

# ROC

In [None]:
fpr, tpr, _ = roc_curve(y_test, y_prob)
plt.figure(figsize=(6,5))
plt.plot(fpr, tpr, label=f"AUC = {auc(fpr,tpr):.3f}")
plt.plot([0,1],[0,1],'--',linewidth=1)
plt.xlabel("False Positive Rate"); plt.ylabel("True Positive Rate"); plt.title("ROC - Random Forest"); plt.legend()
plt.tight_layout(); plt.savefig(out_dir / "roc_curve.png", dpi=150); plt.close()

# PR

In [None]:
prec, rec, _ = precision_recall_curve(y_test, y_prob)
plt.figure(figsize=(6,5))
plt.plot(rec, prec, label=f"PR AUC = {auc(rec,prec):.3f}")
plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title("Precision-Recall - Random Forest"); plt.legend()
plt.tight_layout(); plt.savefig(out_dir / "pr_curve.png", dpi=150); plt.close()

#  Salvataggi JSON/CSV

In [None]:
with open(out_dir / "metrics.json", "w") as f:
json.dump({

"threshold": 0.5,

In [None]:
"accuracy": float((y_pred == y_test).mean()),

"roc_auc": roc_auc,
"precision": float(precision_score(y_test, y_pred, zero_division=0)),
"recall": float(recall_score(y_test, y_pred, zero_division=0)),
"f1": float(f1_score(y_test, y_pred, zero_division=0)),
"classification_report": report_dict,

In [None]:
"confusion_matrix": cm.tolist()
}, f, indent=2)
pd.DataFrame(cm, index=["Actual_0","Actual_1"], columns=["Pred_0","Pred_1"]).to_csv(out_dir / "confusion_matrix.csv")
pd.DataFrame(report_dict).to_csv(out_dir / "classification_report.csv")
print(" Salvataggi completati in:", out_dir)

### Migliorare il recall senza leakage

## Oversampling (SMOTE)

In [None]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score, f1_score, confusion_matrix, classification_report, roc_curve, precision_recall_curve, auc
import numpy as np, json
from pathlib import Path
import matplotlib.pyplot as plt

# Cartella risultati RF già usata

In [None]:
OUT = Path("rf_results")
# 1A) Split del test in validation e test finale (stessa distribuzione, niente leakage)
X_val, X_test_final, y_val, y_test_final = train_test_split(
X_test, y_test, test_size=0.5, stratify=y_test, random_state=42
)

# 1B) Probabilità sul validation e scelta soglia

In [None]:
y_val_prob = rf_best.predict_proba(X_val)[:,1]
def choose_threshold(y_true, y_score, metric="recall", min_precision=None, max_fpr=None):
thresholds = np.linspace(0.05, 0.95, 181)
best_t, best_val = 0.5, -1
for t in thresholds:
y_pred = (y_score >= t).astype(int)
tp = ((y_true==1)&(y_pred==1)).sum()
fp = ((y_true==0)&(y_pred==1)).sum()
fn = ((y_true==1)&(y_pred==0)).sum()
tn = ((y_true==0)&(y_pred==0)).sum()
prec = tp / (tp+fp) if (tp+fp)>0 else 0.0
rec  = tp / (tp+fn) if (tp+fn)>0 else 0.0
fpr  = fp / (fp+tn) if (fp+tn)>0 else 0.0
if min_precision is not None and prec < min_precision:

continue

In [None]:
if max_fpr is not None and fpr > max_fpr:

continue

In [None]:
# metrica di ottimizzazione (qui massimizziamo il Recall)
val = rec if metric=="recall" else (2*prec*rec/(prec+rec) if (prec+rec)>0 else 0.0)
if val > best_val:
best_val, best_t = val, float(t)
return best_t, best_val

# Esempio: massimizza Recall mantenendo Precision almeno 0.25

In [None]:
t_opt, best_recall = choose_threshold(y_val, y_val_prob, metric="recall", min_precision=0.25)
print(f"Soglia scelta su validation: {t_opt:.3f}  |  Recall(validation)={best_recall:.3f}")
# Valutazione sul test finale (quello "vero")
y_test_final_prob = rf_best.predict_proba(X_test_final)[:,1]
y_test_final_pred_default = (y_test_final_prob >= 0.5).astype(int)
y_test_final_pred_opt     = (y_test_final_prob >= t_opt).astype(int)
def summarize(y_true, y_pred, y_score):
pr, rc, f1 = precision_score(y_true, y_pred, zero_division=0), recall_score(y_true, y_pred, zero_division=0), f1_score(y_true, y_pred, zero_division=0)
cm = confusion_matrix(y_true, y_pred)
fpr, tpr, _ = roc_curve(y_true, y_score)
return {

"precision": float(pr),
"recall":    float(rc),
"f1":        float(f1),
"cm":        cm.tolist(),

In [None]:
"roc_auc":   float(auc(fpr, tpr))

}

In [None]:
metrics_default = summarize(y_test_final, y_test_final_pred_default, y_test_final_prob)
metrics_opt     = summarize(y_test_final, y_test_final_pred_opt,     y_test_final_prob)
print("== Test finale (soglia 0.5)  ==>", metrics_default)
print("== Test finale (soglia opt.)  ==>", metrics_opt)

# Salva tutto

In [None]:
with open(OUT / "threshold_selection.json", "w") as f:
json.dump({

"validation_threshold": t_opt,

In [None]:
"validation_constraint": "maximize recall with precision >= 0.25",

"test_final_metrics_default": metrics_default,
"test_final_metrics_opt": metrics_opt

In [None]:
}, f, indent=2)

# Grafico Precision-Recall con punto soglia

In [None]:
prec, rec, thr = precision_recall_curve(y_test_final, y_test_final_prob)
pr_auc = auc(rec, prec)
plt.figure(figsize=(6,5))
plt.plot(rec, prec, label=f"PR curve (AUC={pr_auc:.3f})")

# punto corrispondente a t_opt

In [None]:
# (troviamo il punto più vicino a t_opt tra le soglie restituite)
thr_full = np.r_[thr, 1.0]  # allinea dimensioni a rec/prec
idx = (np.abs(thr_full - t_opt)).argmin()
plt.scatter(rec[idx], prec[idx], s=60, marker="o", label=f"Soglia ottimale ~{t_opt:.2f}")
plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title("Precision-Recall (test finale)")
plt.legend()
plt.tight_layout(); plt.savefig(OUT / "precision_recall_threshold.png", dpi=150); plt.close()
print(" File salvati:", OUT / "threshold_selection.json", "e PR curve aggiornata")

### Importanza delle feature

### Importanza delle feature

In [None]:
import os, json, joblib, numpy as np, pandas as pd, matplotlib.pyplot as plt
from pathlib import Path
OUT = Path("rf_results")
OUT.mkdir(exist_ok=True)

# Recupero importanze dal modello addestrato

In [None]:
importances = rf_best.feature_importances_
# Recupero dei nomi delle feature in modo robusto (in ordine di priorità):
feat_names = None
#  Se già in memoria la lista caricata dagli artefatti (features_list.pkl)
if 'feature_names' in globals() and isinstance(feature_names, (list, tuple)) and len(feature_names) == len(importances):
feat_names = list(feature_names)

#  Se X_train è un DataFrame, usa le colonne

In [None]:
elif 'X_train' in globals():
try:
import pandas as pd
if isinstance(X_train, pd.DataFrame) and len(X_train.columns) == len(importances):
feat_names = list(X_train.columns)
except Exception:

pass
# Se hai ancora train_df/test_df (o un df con la target), prova a dedurle

In [None]:
if feat_names is None:
for df_name in ['train_df', 'test_df']:
if df_name in globals():
_df = globals()[df_name]
if isinstance(_df, pd.DataFrame):
candidates_target = [c for c in ["target","HeartDiseaseorAttack","HeartDisease"] if c in _df.columns]
if candidates_target:
candidate_feats = [c for c in _df.columns if c not in candidates_target]
else:
candidate_feats = list(_df.columns)
if len(candidate_feats) == len(importances):
feat_names = candidate_feats

break

In [None]:
# Se proprio non troviamo nulla, crea nomi fittizi (ma segnala)
if feat_names is None:
print("  Non sono riuscito a ricavare i nomi reali delle feature; uso nomi generici.")
feat_names = [f"feat_{i}" for i in range(len(importances))]

# Costruzione tabella importanze e salvataggio

In [None]:
fi = pd.DataFrame({"feature": feat_names, "importance": importances}) \
.sort_values("importance", ascending=False)
fi_path = OUT / "feature_importances.csv"
fi.to_csv(fi_path, index=False)
# Grafico TOP-K (barre orizzontali, solo matplotlib per massima compatibilità)
TOPK = 15 if len(fi) >= 15 else len(fi)
plt.figure(figsize=(8, 0.4*TOPK + 2))
top = fi.head(TOPK)[::-1]  # reverse per barh (dal basso verso alto)
plt.barh(top["feature"], top["importance"])
plt.xlabel("Importance")
plt.title(f"Random Forest - Top {TOPK} Feature Importances")
plt.tight_layout()
plot_path = OUT / "feature_importances_top.png"
plt.savefig(plot_path, dpi=150)
plt.close()
print(" Salvati:")
print(" -", fi_path)
print(" -", plot_path)

### Esportazione risultati RF (ZIP)

In [None]:
import shutil
import pathlib
from google.colab import files

# Percorso alla cartella della Random Forest

In [None]:
rf_out_dir = BASE_OUT / "random_forest"

# Percorso zip temporaneo

In [None]:
zip_path = pathlib.Path("/content/random_forest_results.zip")

# Crea il file zip con tutti i risultati

In [None]:
shutil.make_archive(str(zip_path).replace(".zip", ""), 'zip', rf_out_dir)

# Scarica il file sul PC

In [None]:
files.download(zip_path)
print(f"File zip creato e pronto per il download: {zip_path}”)

# Modulo Predittivo — XGBoost

In [None]:
# ==========================================================
#  XGBOOST - PIPELINE COMPLETA (RICERCA, FIT, VALUTAZIONE, SALVATAGGI)
# ==========================================================

# !pip install xgboost joblib --quiet

In [None]:
import os, json, joblib, pickle, datetime, pathlib, gc
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pathlib import Path
from google.colab import files
from sklearn.metrics import (

accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, average_precision_score,
roc_curve, precision_recall_curve, confusion_matrix, ConfusionMatrixDisplay,
classification_report

In [None]:
)
from sklearn.model_selection import RandomizedSearchCV, StratifiedKFold
from xgboost import XGBClassifier

# ----------------------------------------------------------
#  Setup esecuzione e cartelle
# ----------------------------------------------------------

In [None]:
np.random.seed(42)
RUN_ID   = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
BASE_OUT = Path(f"./artifacts/{RUN_ID}")
XGB_OUT  = BASE_OUT / "xgboost"
XGB_OUT.mkdir(parents=True, exist_ok=True)
print(" Run folder:", BASE_OUT)

# ----------------------------------------------------------
# 1) Caricamento artefatti (upload dal PC) + pick robusto
#    Attesi: train_preprocessed.csv, test_preprocessed.csv,

In [None]:
#            features_list.pkl  (scaler.pkl opzionale)

# ----------------------------------------------------------

In [None]:
print(" Seleziona i file artefatto (train/test CSV + features_list.pkl; scaler.pkl opzionale)...")
uploaded = files.upload()
def pick(name_contains, exts=(".csv",".pkl",".joblib")):
for k in uploaded.keys():
if name_contains in k and k.endswith(exts):
return k
raise ValueError(f"File con '{name_contains}' non trovato. Presenti: {list(uploaded.keys())}")
train_path  = pick("train_preprocessed", exts=(".csv",))
test_path   = pick("test_preprocessed",  exts=(".csv",))
feat_path   = pick("features",           exts=(".pkl",".joblib"))

# scaler è opzionale per XGBoost (tree-based), lo carichiamo solo se c'è

In [None]:
scaler_path = None
for k in uploaded.keys():
if ("scaler" in k) and k.endswith((".pkl",".joblib")):
scaler_path = k

break

In [None]:
print(" Train CSV:", train_path)
print(" Test  CSV:", test_path)
print(" Features :", feat_path)
print(" Scaler  :", scaler_path if scaler_path else "non usato per XGBoost")

# ----------------------------------------------------------
#  Lettura CSV + feature list; costruzione X/y con controlli
# ----------------------------------------------------------

In [None]:
train_df = pd.read_csv(train_path)
test_df  = pd.read_csv(test_path)

# Carica lista feature

In [None]:
try:
feature_names = joblib.load(feat_path)
except:
with open(feat_path, "rb") as f:
feature_names = pickle.load(f)
print("Train shape:", train_df.shape, " | Test shape:", test_df.shape)
print("Prime colonne train:", train_df.columns.tolist()[:8])
print("N. feature dichiarate:", len(feature_names))

# Individua colonna target in modo robusto

In [None]:
possible_y = ["target", "HeartDiseaseorAttack", "HeartDisease"]
y_col = next((c for c in possible_y if c in train_df.columns), None)
assert y_col is not None, f"Colonna target non trovata. Disponibili: {train_df.columns.tolist()}"
print(" Target column:", y_col)

# Normalizza target {2->0} se presente questa codifica

In [None]:
def normalize_y(s):
s = pd.Series(s).astype(int)
return s.replace({2:0}).values
y_train = normalize_y(train_df[y_col])
y_test  = normalize_y(test_df[y_col])

# Seleziona solo le feature attese e presenti nei DF

In [None]:
feature_set = [c for c in feature_names if c in train_df.columns]
assert len(feature_set) > 0, "Nessuna feature valida trovata nel train_df!"
X_train = train_df[feature_set].astype(np.float32).values
X_test  = test_df[feature_set].astype(np.float32).values
print(" X_train:", X_train.shape, " | X_test:", X_test.shape)
print(" Positivi train (%):", round(100*np.mean(y_train), 2),
" — Positivi test (%):", round(100*np.mean(y_test), 2))
gc.collect()

# ----------------------------------------------------------

In [None]:
#  Funzioni di valutazione + salvataggi (metriche, curve, CSV)

# ----------------------------------------------------------

In [None]:
def evaluate_and_save(model_name, y_true, y_prob, out_dir: Path, threshold=0.5):

"""
Calcola metriche, salva JSON/CSV, plotta e salva ROC/PR/Confusion Matrix.
Ritorna dict con metriche top-level.
"""

In [None]:
out_dir.mkdir(parents=True, exist_ok=True)
y_pred = (y_prob >= threshold).astype(int)

# Metriche top-level

In [None]:
metrics = {

"threshold": float(threshold),
"accuracy": float(accuracy_score(y_true, y_pred)),
"roc_auc": float(roc_auc_score(y_true, y_prob)),
"precision": float(precision_score(y_true, y_pred, zero_division=0)),
"recall": float(recall_score(y_true, y_pred, zero_division=0)),
"f1": float(f1_score(y_true, y_pred, zero_division=0)),

In [None]:
"pr_auc": float(average_precision_score(y_true, y_prob))

}
# Report esteso e CM

In [None]:
report_dict = classification_report(y_true, y_pred, output_dict=True, zero_division=0)
cm = confusion_matrix(y_true, y_pred)

# ROC

In [None]:
fpr, tpr, _ = roc_curve(y_true, y_prob)
plt.figure()
plt.plot(fpr, tpr, label=f"AUC = {metrics['roc_auc']:.3f}")
plt.plot([0,1],[0,1],'--', linewidth=1)
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.title(f"ROC Curve - {model_name}")
plt.legend(loc="lower right")
plt.tight_layout(); plt.savefig(out_dir / "roc_curve.png", dpi=150); plt.close()

# PR

In [None]:
prec, rec, _ = precision_recall_curve(y_true, y_prob)
plt.figure()
plt.plot(rec, prec, label=f"PR AUC = {metrics['pr_auc']:.3f}")
plt.xlabel("Recall")
plt.ylabel("Precision")
plt.title(f"Precision-Recall Curve - {model_name}")
plt.legend(loc="lower left")
plt.tight_layout(); plt.savefig(out_dir / "pr_curve.png", dpi=150); plt.close()

# Confusion Matrix

In [None]:
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=["No Disease", "Disease"])
disp.plot(cmap="Blues")
plt.title(f"Confusion Matrix - {model_name} (thr={threshold})")
plt.tight_layout(); plt.savefig(out_dir / "confusion_matrix.png", dpi=150); plt.close()

#  Salvataggi

In [None]:
with open(out_dir / "metrics.json", "w") as f:
json.dump({

**metrics,
"classification_report": report_dict,

In [None]:
"confusion_matrix": cm.tolist()
}, f, indent=2)
pd.DataFrame(cm, index=["Actual_0","Actual_1"], columns=["Pred_0","Pred_1"])\
.to_csv(out_dir / "confusion_matrix.csv")
pd.DataFrame(report_dict).to_csv(out_dir / "classification_report.csv")
return metrics

# ----------------------------------------------------------
# 4) Spazio iperparametrico + RandomizedSearchCV

In [None]:
#  (scoring=roc_auc per confronto con RF / robustezza globale)

# ----------------------------------------------------------

In [None]:
param_dist = {

"n_estimators":      [200, 300, 400],
"max_depth":         [4, 6, 8],
"learning_rate":     [0.05, 0.1],
"subsample":         [0.8, 1.0],
"colsample_bytree":  [0.8, 1.0],
"gamma":             [0, 0.1],
"min_child_weight":  [1, 2],
"reg_alpha":         [0, 0.1],
"reg_lambda":        [1, 2],

## Oversampling (SMOTE)

}

In [None]:
xgb_base = XGBClassifier(

objective="binary:logistic",

In [None]:
eval_metric="logloss",   # mettere qui (fit(...) su XGB 3.x non accetta eval_metric=...)

n_jobs=-1,
random_state=42,
tree_method="hist"       # più veloce/memoria efficiente su Colab

In [None]:
)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
print(" Avvio RandomizedSearchCV per XGBoost...")
xgb_search = RandomizedSearchCV(

estimator=xgb_base,
param_distributions=param_dist,
n_iter=25,
scoring="roc_auc",
cv=cv,
n_jobs=-1,
random_state=42,
verbose=1

In [None]:
)
xgb_search.fit(X_train, y_train)
best_params_raw = xgb_search.best_params_
print(" Migliori iperparametri (CV):", best_params_raw)
# converti eventuali tipi numpy in tipi Python puri (per JSON)
def to_py(o):
if isinstance(o, (np.integer,)):  return int(o)
if isinstance(o, (np.floating,)): return float(o)
if isinstance(o, (np.bool_,)):    return bool(o)
return o
best_params = {k: to_py(v) for k, v in best_params_raw.items()}
with open(XGB_OUT / "best_params.json", "w") as f:
json.dump(best_params, f, indent=2)

# ----------------------------------------------------------
#  Fit finale con i migliori iperparametri
# ----------------------------------------------------------

In [None]:
xgb_final = XGBClassifier(

**best_params,
objective="binary:logistic",
eval_metric="logloss",
n_jobs=-1,
random_state=42,
tree_method="hist"

In [None]:
)
print(" Fit finale su tutto il training set...")
xgb_final.fit(X_train, y_train)

# ----------------------------------------------------------
# Valutazione su test (soglia standard 0.5) + salvataggi
# ----------------------------------------------------------

In [None]:
print(" Valutazione su test set (thr=0.5)...")
y_prob = xgb_final.predict_proba(X_test)[:,1]
xgb_metrics = evaluate_and_save("XGBoost", y_test, y_prob, XGB_OUT, threshold=0.5)
# Importanza feature (Gini gain di XGBoost)
try:
importances = xgb_final.feature_importances_
fi = pd.DataFrame({"feature": feature_set, "importance": importances})\
.sort_values("importance", ascending=False)
fi.to_csv(XGB_OUT / "feature_importances.csv", index=False)
TOPK = min(15, len(fi))
plt.figure(figsize=(8, 0.45*TOPK + 2))
top = fi.head(TOPK)[::-1]
plt.barh(top["feature"], top["importance"])
plt.xlabel("Importance")
plt.title(f"XGBoost - Top {TOPK} Feature Importances")
plt.tight_layout(); plt.savefig(XGB_OUT / "feature_importances_top.png", dpi=150); plt.close()
except Exception as e:
print(" Impossibile salvare importanze feature:", e)

# Salva modello

In [None]:
joblib.dump(xgb_final, XGB_OUT / "xgb_model.pkl")
print("\n Metriche TEST XGBoost:")
for k,v in xgb_metrics.items():
if isinstance(v, float):
print(f" - {k}: {v:.6f}")
else:
print(f" - {k}: {v}")
print(f"\n File generati in: {XGB_OUT.resolve()}")

# ----------------------------------------------------------
#  ZIP dei risultati per download locale
# ----------------------------------------------------------

In [None]:
import shutil
zip_path = Path("/content/xgboost_results.zip")
shutil.make_archive(str(zip_path).replace(".zip",""), "zip", XGB_OUT)
files.download(zip_path)
print(f" ZIP pronto per il download: {zip_path}")

## XGBoost — scelta soglia robusta

In [None]:
#  XGBoost: scelta soglia robusta + salvataggi ===
import json, pickle, pathlib
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.metrics import (

precision_score, recall_score, f1_score, roc_curve, auc,
precision_recall_curve, confusion_matrix

In [None]:
)
# Percorsi (aggiorna la RUN se necessario)
ARTIFACTS_DIR = pathlib.Path("/content/artifacts/20250821-191003/xgboost")
MODEL_PATH    = ARTIFACTS_DIR / "xgb_model.pkl"
OUT_DIR       = ARTIFACTS_DIR / "thresholding"
OUT_DIR.mkdir(parents=True, exist_ok=True)

# Carica modello + dati pre-elaborati

In [None]:
with open(MODEL_PATH, "rb") as f:
xgb_model = pickle.load(f)
test_df  = pd.read_csv("/content/test_preprocessed.csv")
train_df = pd.read_csv("/content/train_preprocessed.csv")  # solo per coerenza di feature
target_col = "target"
features = [c for c in test_df.columns if c != target_col]
X_test_full = test_df[features].values
y_test_full = test_df[target_col].astype(int).values
# Split test -> validation + test_final (50/50 stratificato)
X_val, X_test_final, y_val, y_test_final = train_test_split(
X_test_full, y_test_full, test_size=0.5, stratify=y_test_full, random_state=42
)

# Predizioni probabilistiche

In [None]:
y_val_prob  = xgb_model.predict_proba(X_val)[:, 1]
y_test_prob = xgb_model.predict_proba(X_test_final)[:, 1]

# Scansione soglie robusta

In [None]:
def scan_thresholds(y_true, y_score, min_precision=0.25, metric="recall"):

"""

In [None]:
Ritorna: df con soglia/prec/rec/f1 e (best_t, best_row)

metric: "recall" (consigliato in ambito clinico) oppure "f1"
"""

In [None]:
thresholds = np.linspace(0.01, 0.99, 99)  # evita 0 e 1 estremi
rows = []
best_val, best_t, best_row = -1.0, 0.5, None
for t in thresholds:
y_pred = (y_score >= t).astype(int)

# metriche dirette, zero_division=0 evita warn/crash quando non ci sono predetti positivi

In [None]:
prec = precision_score(y_true, y_pred, zero_division=0)
rec  = recall_score(y_true, y_pred, zero_division=0)
f1   = f1_score(y_true, y_pred, zero_division=0)

# vincolo (facoltativo): precisione minima

In [None]:
if prec < min_precision:
rows.append({"threshold": t, "precision": prec, "recall": rec, "f1": f1, "valid": False})

continue

In [None]:
rows.append({"threshold": t, "precision": prec, "recall": rec, "f1": f1, "valid": True})

# funzione obiettivo

In [None]:
val = rec if metric == "recall" else f1
if val > best_val:
best_val, best_t = val, float(t)
best_row = {"threshold": best_t, "precision": prec, "recall": rec, "f1": f1}
df = pd.DataFrame(rows)
return df, best_t, best_row
#  Selezione soglia su validation (massimizza recall, precision >= 0.25)
scan_df, t_opt, best_row = scan_thresholds(y_val, y_val_prob, min_precision=0.25, metric="recall")

#  Valutazione sul test finale alle due soglie

In [None]:
def eval_at_threshold(y_true, y_score, t):
y_pred = (y_score >= t).astype(int)
cm  = confusion_matrix(y_true, y_pred)
pr  = precision_score(y_true, y_pred, zero_division=0)
rc  = recall_score(y_true, y_pred, zero_division=0)
f1  = f1_score(y_true, y_pred, zero_division=0)
fpr, tpr, _ = roc_curve(y_true, y_score)
roc_auc = auc(fpr, tpr)
return {

"threshold": float(t),
"precision": float(pr),
"recall": float(rc),
"f1": float(f1),
"roc_auc": float(roc_auc),

In [None]:
"confusion_matrix": cm.tolist()

}

In [None]:
metrics_default = eval_at_threshold(y_test_final, y_test_prob, 0.5)
metrics_opt     = eval_at_threshold(y_test_final, y_test_prob, t_opt)
print(f"Soglia scelta su validation: {t_opt:.3f}  |  Recall(val)={best_row['recall']:.3f}  |  Precision(val)={best_row['precision']:.3f}")

#  Curve PR con marker soglia ottimale

In [None]:
prec, rec, thr = precision_recall_curve(y_test_final, y_test_prob)
pr_auc = auc(rec, prec)
thr_full = np.r_[thr, 1.0]
idx = (np.abs(thr_full - t_opt)).argmin()
plt.figure(figsize=(9,6))
plt.plot(rec, prec, label=f"PR AUC = {pr_auc:.3f}")
plt.scatter(rec[idx], prec[idx], s=70, label=f"Soglia ottimale ≈ {t_opt:.2f}")
plt.xlabel("Recall"); plt.ylabel("Precision"); plt.title("Precision-Recall (XGBoost) - Test finale")
plt.legend()
plt.tight_layout(); plt.savefig(OUT_DIR / "precision_recall_threshold_xgb.png", dpi=150); plt.close()

#  Salvataggi

In [None]:
scan_df.to_csv(OUT_DIR / "threshold_scan_xgb.csv", index=False)
with open(OUT_DIR / "threshold_selection_xgb.json", "w") as f:
json.dump({

"picked_threshold": t_opt,
"validation_best_row": best_row,
"test_metrics_default": metrics_default,
"test_metrics_opt": metrics_opt,

In [None]:
"pr_auc_test": float(pr_auc)
}, f, indent=2)
print("== Test finale (thr=0.50) =>", metrics_default)
print("== Test finale (thr≈opt)  =>", metrics_opt)
print(" Salvato:", OUT_DIR)

# Modulo di Similarità

## Setup ambiente (librerie + cartelle)

# Install e import

In [None]:
!pip install joblib --quiet
import os, json, glob, pickle, datetime, pathlib, shutil
import numpy as np
import pandas as pd
from sklearn.neighbors import NearestNeighbors
from google.colab import files
import matplotlib.pyplot as plt
np.random.seed(42)

# Cartelle output “run-specific”

In [None]:
RUN_ID   = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
BASE_OUT = pathlib.Path(f"./artifacts/{RUN_ID}")
SIM_OUT  = BASE_OUT / "similarity"
SIM_OUT.mkdir(parents=True, exist_ok=True)
print("Run folder:", BASE_OUT)

## Caricamento artefatti

## Oversampling (SMOTE)

In [None]:
train_preprocessed.csv (solo per completezza, non lo useremo come corpus);
test_preprocessed.csv (corpus di riferimento);

features_list.pkl (ordine/insieme delle feature);

In [None]:
scaler.pkl (StandardScaler del BMI).

# Seleziona dal  PC: train_preprocessed.csv, test_preprocessed.csv, scaler.pkl, features_list.pkl

In [None]:
uploaded = files.upload()
def pick(name_contains, exts=(".csv",".pkl",".joblib")):
for k in uploaded.keys():
if name_contains in k and k.endswith(exts):
return k
raise ValueError(f"File con '{name_contains}' non trovato. Caricati: {list(uploaded.keys())}")
train_path  = pick("train_preprocessed", exts=(".csv",))
test_path   = pick("test_preprocessed",  exts=(".csv",))
scaler_path = pick("scaler",             exts=(".pkl",".joblib"))
feat_path   = pick("features",           exts=(".pkl",".joblib"))
print("Train CSV: ", train_path)
print("Test  CSV: ", test_path)
print("Scaler:    ", scaler_path)
print("Features:  ", feat_path)

## Caricamento artefatti

# Carica CSV

In [None]:
train_df = pd.read_csv(train_path)
test_df  = pd.read_csv(test_path)

# Carica scaler

In [None]:
try:
import joblib
scaler = joblib.load(scaler_path)
except Exception:
with open(scaler_path, "rb") as f:
scaler = pickle.load(f)

# Carica lista feature

In [None]:
try:
feature_names = joblib.load(feat_path)
except Exception:
with open(feat_path, "rb") as f:
feature_names = pickle.load(f)

# Target

In [None]:
possible_y = ["target", "HeartDiseaseorAttack", "HeartDisease"]
y_col = next((c for c in possible_y if c in test_df.columns), None)
if y_col is None:
raise ValueError(f"Colonna target non trovata. Colonne disponibili: {test_df.columns.tolist()}")
def normalize_y(s):
s = pd.Series(s).astype(int)
return s.replace({2:0}).values  # prudenziale
y_ref = normalize_y(test_df[y_col])
# X_ref = 21 feature
feature_set = [c for c in feature_names if c in test_df.columns]
X_ref = test_df[feature_set].copy()
print(f" Target individuata: {y_col}")
print(" X_ref shape:", X_ref.shape)
print(" Distribuzione target (test):", dict(pd.Series(y_ref).value_counts().sort_index()))

## Ricodifica variabili binarie 0/1

Evitare il problema che alcune variabili hanno valori diversi da 0 e 1 in linea con la fonte del dataset
# Dizionario binarie

In [None]:
BINARY_CANDIDATES = [

"HighBP","HighChol","CholCheck","Smoker","Stroke","Diabetes",
"PhysActivity","Fruits","Veggies","HvyAlcoholConsump",
"AnyHealthcare","NoDocbcCost","DiffWalk","Sex"
]

In [None]:
def recode_binary(df, cols):
cols = [c for c in cols if c in df.columns]
if not cols:
return df
# 1 se ==1, altrimenti 0
df.loc[:, cols] = (df[cols] == 1).astype(int)
return df
X_ref = recode_binary(X_ref, BINARY_CANDIDATES)
print("Ricodifica binarie → 0/1 completata su:", [c for c in BINARY_CANDIDATES if c in X_ref.columns][:10], "...")

## Normalizzazione per distanza

Quando vogliamo calcolare quanto due pazienti sono “simili”, usiamo una distanza matematica (nel nostro caso la distanza euclidea).
Problema: se le variabili non sono tutte sulla stessa scala, una sola variabile può “dominare” la distanza e rendere i risultati distorti.

In [None]:
Esempio molto intuitivo:

la variabile BMI (Indice di Massa Corporea) può avere valori da 15 a 50,
la variabile Fumatore è solo 0 oppure 1.
Se non facciamo nulla, il BMI contribuisce molto di più al calcolo della distanza rispetto al fatto che uno sia fumatore o meno. Risultato: i vicini verrebbero scelti quasi solo sulla base del BMI, trascurando gli altri fattori di rischio.

In [None]:
Per evitare questo, normalizziamo tutte le variabili così che abbiano un peso comparabile:

Variabili binarie (0/1)
Già perfette: non serve toccarle, perché “0” e “1” hanno lo stesso impatto di qualsiasi altra variabile normalizzata.
BMI
Qui applichiamo lo StandardScaler salvato nel pre-processing: trasformiamo i valori in “z-score”, cioè quanto ciascun BMI si discosta dalla media in termini di deviazioni standard. Lo scaler.transform(df[["BMI"]]) produce un array float32 annidato ma la colonna BMI era float64 quindi è stato eseguito il cast a float64 al momento dell’assegnazione.
➝ Così un BMI di 30 non viene considerato “grande” solo perché il numero è alto, ma perché è più alto della media della popolazione.

In [None]:
Variabili ordinali (scale finite)
Alcune variabili hanno scale numeriche diverse:

GenHlth da 1 a 5,
Age da 1 a 13,
Income da 1 a 8,
MentHlth da 0 a 30, ecc.
Le dividiamo per il loro massimo teorico, così diventano tutte comprese tra 0 e 1.
➝ In questo modo, “età=13” diventa 1.0, “età=6” diventa circa 0.46.
Così l’età pesa come qualsiasi altro fattore in scala ridotta.
# Scale teoriche per ordinali

In [None]:
ORDINAL_SCALES = {

"GenHlth": 5,   # 1..5
"Age": 13,      # 1..13
"Education": 6, # 1..6
"Income": 8,    # 1..8
"MentHlth": 30, # 0..30
"PhysHlth": 30  # 0..30
}

In [None]:
def normalize_dataframe_for_distance(df, feature_names, scaler, ordinal_scales):
Xn = df[feature_names].copy()

# BMI → z-score con scaler del pre-processing

In [None]:
if "BMI" in Xn.columns:
bmi_df = Xn[["BMI"]].astype("float64")
Xn.loc[:, "BMI"] = pd.DataFrame(
scaler.transform(bmi_df),

index=Xn.index,
columns=["BMI"]

In [None]:
).astype("float64")

# Ordinali → [0,1]

In [None]:
for col, mx in ordinal_scales.items():
if col in Xn.columns:
Xn.loc[:, col] = Xn[col].astype("float64") / float(mx)
# Tutto float64 (sklearn compatibile)
return Xn.astype("float64")
X_ref_norm = normalize_dataframe_for_distance(

df=X_ref.copy(), feature_names=feature_set, scaler=scaler, ordinal_scales=ORDINAL_SCALES

In [None]:
)
print(" X_ref normalizzato:", X_ref_norm.shape)

## Addestramento indici KNN

#  KNN globale

In [None]:
knn = NearestNeighbors(n_neighbors=min(200, len(X_ref_norm)), metric="euclidean")
knn.fit(X_ref_norm.values)

# KNN solo positivi

In [None]:
mask_pos   = (y_ref == 1)
X_pos_norm = X_ref_norm[mask_pos]
knn_pos = None
if X_pos_norm.shape[0] >= 1:
knn_pos = NearestNeighbors(n_neighbors=min(200, len(X_pos_norm)), metric="euclidean")
knn_pos.fit(X_pos_norm.values)
print(f" KNN globale pronto con n={knn.n_neighbors}")
print(f" KNN positivi: {'OK' if knn_pos is not None else 'NON disponibile (nessun positivo?)'}")

## Utility e funzioni di similarità

In [None]:
# binarie effettive presenti (per fattori di rischio)
BINARY_COLS = [c for c in BINARY_CANDIDATES if c in feature_set]
def normalize_single_patient_for_distance(p_series, scaler, feature_names, ordinal_scales):
p = pd.DataFrame([p_series.values], columns=feature_names).copy()
if "BMI" in p.columns:
bmi_df = p[["BMI"]].astype("float64")
p.loc[:, "BMI"] = pd.DataFrame(
scaler.transform(bmi_df),

index=p.index,
columns=["BMI"]

In [None]:
).astype("float64")
for col, mx in ordinal_scales.items():
if col in p.columns:
p.loc[:, col] = p[col].astype("float64") / float(mx)
return p.astype("float64")
def analyze_similarity(paziente_series, k_total=50, k_pos_min=5, save_json=True, out_dir=SIM_OUT):

"""
1) Trova k_total vicini globali su X_ref_norm.
2) Se i positivi tra quei vicini < k_pos_min, aggiunge i più vicini dal KNN dei positivi.
3) Calcola % malattia (pesata per distanza) e fattori binari ricorrenti tra i positivi.
"""
# normalizza paziente

In [None]:
p_norm = normalize_single_patient_for_distance(

p_series=paziente_series, scaler=scaler, feature_names=feature_set, ordinal_scales=ORDINAL_SCALES
).values
# vicini globali

In [None]:
d_all, i_all = knn.kneighbors(p_norm, n_neighbors=min(k_total, len(X_ref_norm)), return_distance=True)
d_all = d_all[0]; i_all = i_all[0]
y_all = y_ref[i_all]
# garantisci almeno k_pos_min positivi (se possibile)
if (knn_pos is not None) and (y_all.sum() < k_pos_min):
need = int(k_pos_min - y_all.sum())
need = max(0, min(need, X_pos_norm.shape[0]))
if need > 0:
d_pos, i_pos = knn_pos.kneighbors(p_norm, n_neighbors=need, return_distance=True)
i_pos_global = np.where(y_ref == 1)[0][i_pos[0]]
i_all = np.r_[i_all, i_pos_global]
d_all = np.r_[d_all, d_pos[0]]
y_all = y_ref[i_all]
# % malati pesata per distanza (pesi=1/(d+1e-6))
w = 1.0 / (d_all + 1e-6)
perc_malati = float((w * y_all).sum() / w.sum() * 100.0)

# fattori binari ricorrenti tra i POSITIVI

In [None]:
fattori = {}
pos_mask_local = (y_all == 1)
if pos_mask_local.any():
w_pos  = w[pos_mask_local]
idxpos = i_all[pos_mask_local]
for col in BINARY_COLS:
vals = X_ref.loc[idxpos, col].values.astype(float)  # ora veri 0/1
fattori[col] = float((w_pos * vals).sum() / (w_pos.sum() + 1e-9))
fattori_sorted = dict(sorted(fattori.items(), key=lambda kv: kv[1], reverse=True)[:10])
result = {

"k_total": int(min(k_total, len(X_ref_norm))),
"k_pos_min": int(k_pos_min),
"neighbors_index": i_all.tolist(),
"perc_malati_vicini": perc_malati,   # %
"fattori_rischio_top": fattori_sorted
}

In [None]:
if save_json:
out_dir.mkdir(parents=True, exist_ok=True)
pid = paziente_series.name if paziente_series.name is not None else "sample"
out_path = out_dir / f"similarity_patient_{pid}.json"
with open(out_path, "w") as f:
json.dump(result, f, indent=2)
return result

## Esempio: analisi di un paziente

In [None]:
patient_idx = int(np.random.choice(X_ref.index.values, 1)[0])
paziente = X_ref.loc[patient_idx]
print("Paziente test index:", patient_idx)
res = analyze_similarity(paziente_series=paziente, k_total=50, k_pos_min=5, save_json=True, out_dir=SIM_OUT)
print(f" % malattia tra i vicini (pesata): {res['perc_malati_vicini']:.2f}%")
print(" Fattori di rischio (top, 0..1):", res["fattori_rischio_top"])

## Analisi batch di 10 pazienti

In [None]:
import pandas as pd
import numpy as np
from pathlib import Path
def analyze_similarity_batch_random(df, n=10, k_total=200, k_pos_min=100, out_dir=SIM_OUT):

"""
Esegue l'analisi per n pazienti scelti CASUALMENTE dal DataFrame X_ref.

In [None]:
Salva un CSV riassuntivo con:

- indice reale del paziente (quello del dataset),
- percentuale malati tra i vicini,
- top-3 fattori di rischio più ricorrenti.
"""

In [None]:
rows = []

# Campionamento casuale degli indici del dataset

In [None]:
sample_idx = np.random.choice(df.index.values, size=n, replace=False)
for i, idx in enumerate(sample_idx, 1):
print(f"[{i}/{len(sample_idx)}] Analizzo paziente index={idx}")
paz = df.loc[idx]
res = analyze_similarity(paz, k_total=k_total, k_pos_min=k_pos_min, save_json=True, out_dir=out_dir)

# Top-3 fattori di rischio

In [None]:
top_items = list(res["fattori_rischio_top"].items())
top1 = f"{top_items[0][0]}={top_items[0][1]:.2f}" if len(top_items)>0 else ""
top2 = f"{top_items[1][0]}={top_items[1][1]:.2f}" if len(top_items)>1 else ""
top3 = f"{top_items[2][0]}={top_items[2][1]:.2f}" if len(top_items)>2 else ""

rows.append({
"patient_index": idx,   # indice reale dal dataset
"perc_malati_vicini": res["perc_malati_vicini"],
"top1": top1, "top2": top2, "top3": top3

In [None]:
})

# Costruisci DataFrame e salva CSV

In [None]:
df_out = pd.DataFrame(rows)
out_csv = out_dir / "similarity_batch_summary.csv"
df_out.to_csv(out_csv, index=False)
print(f"\n CSV riassuntivo salvato: {out_csv}")
print(f"Report creato con {len(df_out)} pazienti (campionati casualmente):")
print(df_out.to_string(index=False))  # stampa TUTTE le righe
return df_out

# Esecuzione con n=10 pazienti casuali

In [None]:
df_batch = analyze_similarity_batch_random(
df=X_ref,          # il DataFrame di riferimento (test reale)

n=10,              # numero di pazienti da analizzare
k_total=200,       # numero di vicini totali
k_pos_min=100,     # min. vicini positivi garantiti
out_dir=SIM_OUT

In [None]:
)

### Download CSV (Similarity)

In [None]:
import glob
from pathlib import Path
import pandas as pd

# trova l'ultima cartella artifacts/.../similarity/

In [None]:
sim_dirs = sorted(glob.glob("artifacts/*/similarity/"))
assert sim_dirs, "Nessuna cartella similarity trovata sotto artifacts/."
SIM_DIR = Path(sim_dirs[-1])
print("Cartella similarity:", SIM_DIR)

# path del CSV

In [None]:
csv_path = SIM_DIR / "similarity_batch_summary.csv"
assert csv_path.exists(), f"CSV non trovato: {csv_path}"

# mostra anteprima

In [None]:
df = pd.read_csv(csv_path)
print("\nPrime 10 righe:")
print(df.head(10).to_string(index=False))
# avvia download in locale (Colab)
from google.colab import files
files.download(str(csv_path))
print("\n Download del CSV avviato:", csv_path)

### Download ZIP (Similarity)

In [None]:
import os, shutil, glob
from pathlib import Path

# 1) ultima cartella similarity

In [None]:
sim_dirs = sorted(glob.glob("artifacts/*/similarity/"))
assert sim_dirs, "Nessuna cartella similarity trovata."
SIM_DIR = Path(sim_dirs[-1])

# 2) CSV riassuntivo

In [None]:
csv_path = SIM_DIR / "similarity_batch_summary.csv"
assert csv_path.exists(), f"CSV non trovato: {csv_path}"

# 3) leggi gli indici reali dal CSV

In [None]:
import pandas as pd
ids = pd.read_csv(csv_path)["patient_index"].astype(int).tolist()

# 4) copia i JSON elencati nel CSV + il CSV in una sottocartella

In [None]:
dst_dir = SIM_DIR / "batch_json_10"
dst_dir.mkdir(exist_ok=True)
copiati = []
for pid in ids:
src = SIM_DIR / f"similarity_patient_{pid}.json"
if src.exists():
shutil.copy2(src, dst_dir / src.name)
copiati.append(src.name)
else:
print(" Mancante:", src.name)

# copia anche il CSV

In [None]:
shutil.copy2(csv_path, dst_dir / csv_path.name)
print(f"File copiati ({len(copiati)} JSON + CSV):", len(copiati), "+ 1")

# 5) zippa e scarica

In [None]:
zip_path = SIM_DIR / "batch_json_10.zip"
if zip_path.exists():
zip_path.unlink()
shutil.make_archive(str(zip_path).replace(".zip",""), "zip", dst_dir)
from google.colab import files
files.download(str(zip_path))
print(" ZIP scaricato:", zip_path)