In [None]:
%cd ~/Documents/cvd-predictor/
from sklearnex import patch_sklearn
patch_sklearn()
import polars as pl
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from xgboost import XGBClassifier
from lightgbm import LGBMClassifier
from CVD.utils import get_metrics
from sklearn.model_selection import cross_val_score
import optuna
from xgboost import XGBClassifier
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.metrics import f1_score
import pickle

In [None]:
df: pl.DataFrame = pl.read_parquet("data/intermediate/heart_cdc_2023_cleaned.parquet")

In [None]:
X: pl.DataFrame = df.drop(["CVD"])
y: pl.Series = df["CVD"]

scaler = StandardScaler()
X[X.columns] = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

In [None]:
def objective(trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 100, 1000),
        "max_depth": trial.suggest_int("max_depth", 3, 15),
        "learning_rate": trial.suggest_loguniform("learning_rate", 0.01, 0.3),
        "subsample": trial.suggest_uniform("subsample", 0.5, 1.0),
        "colsample_bytree": trial.suggest_uniform("colsample_bytree", 0.5, 1.0),
        "gamma": trial.suggest_loguniform("gamma", 1e-8, 1.0),
        "min_child_weight": trial.suggest_int("min_child_weight", 1, 10),
        "scale_pos_weight": trial.suggest_loguniform("scale_pos_weight", 1e-2, 10),  # For handling imbalance
        "random_state": 42,
        "eval_metric": "logloss",
        "use_label_encoder": False,
    }

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    model = XGBClassifier(**params)
    scores = cross_val_score(model, X_train, y_train, cv=skf, scoring="f1", n_jobs=-1)
    return scores.mean()

# Create Optuna study and optimize
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)

print("Best trial:")
best_trial = study.best_trial
print(f"  F1 Score: {best_trial.value}")
print("  Params:")
for key, value in best_trial.params.items():
    print(f"    {key}: {value}")

In [None]:
best_xgb = XGBClassifier(**best_trial.params, random_state=42, eval_metric="logloss", use_label_encoder=False)
best_xgb.fit(X_train, y_train)

y_pred_prob = best_xgb.predict_proba(X_test)[:, 1]
y_pred_binary = (y_pred_prob >= 0.5).astype(int)

test_f1 = f1_score(y_test, y_pred_binary, average="weighted")
print(f"Test F1 Score: {test_f1}")

In [None]:
results: list[dict] = []
results.append(get_metrics(y_test, y_pred_binary, "XGBoost"))

In [None]:
pickle.dump(best_xgb, open("models/xgb_cvd.pkl", "wb"))
xgb = pickle.load(open("models/xgb_cvd.pkl", "rb"))
y_pred_prob = xgb.predict_proba(X_test)[:, 1]
pl.DataFrame(get_metrics(y_test, (y_pred_prob >= 0.5).astype(int), "XGBoost"))

In [None]:
def objective(trial):
    params = {
        "n_estimators": trial.suggest_int("n_estimators", 100, 1000),
        "max_depth": trial.suggest_int("max_depth", 3, 15),
        "learning_rate": trial.suggest_loguniform("learning_rate", 0.01, 0.3),
        # LightGBM uses bagging_fraction (equivalent to subsample) and bagging_freq to enable bagging
        "bagging_fraction": trial.suggest_uniform("bagging_fraction", 0.5, 1.0),
        "feature_fraction": trial.suggest_uniform("feature_fraction", 0.5, 1.0),
        # min_split_gain is LightGBM's equivalent to gamma in XGBoost
        "min_split_gain": trial.suggest_loguniform("min_split_gain", 1e-8, 1.0),
        # min_child_samples corresponds to the minimum number of samples in a leaf
        "min_child_samples": trial.suggest_int("min_child_samples", 1, 10),
        "scale_pos_weight": trial.suggest_loguniform("scale_pos_weight", 1e-2, 10),  # For handling imbalance
        "random_state": 42,
        "eval_metric": "logloss",
    }

    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    model = LGBMClassifier(**params)
    scores = cross_val_score(model, X_train, y_train, cv=skf, scoring="f1", n_jobs=-1)
    return scores.mean()

# Create an Optuna study and optimize the objective
study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)

# Print the best trial details
print("Best trial:")
best_trial = study.best_trial
print(f"  F1 Score: {best_trial.value}")
print("  Params:")
for key, value in best_trial.params.items():
    print(f"    {key}: {value}")

In [None]:
best_lgb = LGBMClassifier(**best_trial.params, random_state=42, eval_metric="logloss")
best_lgb.fit(X_train, y_train)

y_pred_prob: np.ndarray = best_lgb.predict_proba(X_test)[:, 1]
y_pred_binary:  np.ndarray = (y_pred_prob >= 0.5).astype(int)
test_f1: float = f1_score(y_test, y_pred_binary, average="weighted")
print(f"Test F1 Score: {test_f1}")

In [None]:
results.append(get_metrics(y_test, y_pred_binary, "LightGBM"))

In [None]:
pickle.dump(best_lgb, open("models/lightgbm_cvd.pkl", "wb"))
lgb: LGBMClassifier = pickle.load(open("models/lightgbm_cvd.pkl", "rb"))
y_pred_prob: np.ndarray = lgb.predict_proba(X_test)[:, 1]
y_pred_binary:  np.ndarray = (y_pred_prob >= 0.5).astype(int)
pl.DataFrame(get_metrics(y_test, y_pred_binary, "LightGBM"))

In [None]:
df = pl.DataFrame(results)
df