## Imports

In [None]:
import numpy as np
import pandas as pd
import pickle
import matplotlib.pyplot as plt
import seaborn as sns
import matplotlib.colors as mcolors
from pandas.plotting import scatter_matrix
from scipy.stats import spearmanr
from scipy.cluster import hierarchy
from collections import defaultdict

from sklearn.preprocessing import StandardScaler
from sklearn.cluster import KMeans
from sklearn.feature_selection import mutual_info_classif, mutual_info_regression
from sklearn.model_selection import train_test_split, cross_val_score, StratifiedKFold, RepeatedStratifiedKFold
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier, AdaBoostClassifier, GradientBoostingClassifier, ExtraTreesClassifier
from sklearn.neighbors import KNeighborsClassifier
from sklearn.svm import SVC
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from lightgbm import LGBMClassifier
from catboost import CatBoostClassifier
from xgboost import XGBClassifier
from sklearn.metrics import confusion_matrix, f1_score
from sklearn.feature_selection import RFECV

from statsmodels.stats.outliers_influence import variance_inflation_factor

import optuna
import statsmodels.api as sm
from boruta import BorutaPy

import custom_map

In [None]:
import importlib

importlib.reload(custom_map)

## Data and feature engineering

In [None]:
data = pd.read_csv("healthcare-dataset-stroke-data.csv")
data.info()

In [None]:
target = "stroke"

categorical_features = data.select_dtypes(['object']).columns.tolist()
numerical_features = data.select_dtypes(['float64', 'int64']).columns.drop('id')
categorical_features.append('stroke')

print(categorical_features, '\n', numerical_features)

data.head()

In [None]:
data[numerical_features].hist(bins=30, figsize=(15, 10), color="teal", edgecolor='black')
plt.suptitle("Histogramy zmiennych numerycznych", fontsize=16)
plt.show()

In [None]:
bmi_median = data['bmi'].median()
bmi_mean = data['bmi'].mean()
print(bmi_median, bmi_mean)

data['bmi'] = data['bmi'].fillna(bmi_median)

In [None]:
nrows, ncols = 2, 3
fig, axes = plt.subplots(nrows, ncols, figsize=(16, 9))
axes = axes.flat

for ax, col in zip(axes, categorical_features):
    sizes = data[col].value_counts()
    labels = sizes.index.astype(str)

    ax.pie(sizes, labels=labels, autopct="%1.1f%%", startangle=90)
    ax.set_title(col)
    ax.axis("equal")

plt.tight_layout()
plt.show()

In [None]:
categorical_features.remove('stroke')
data = pd.get_dummies(data, columns=categorical_features, drop_first=True, dtype=float)
data = data.drop('id', axis=1)

In [None]:
correlation_data = data[numerical_features].corr()

n_features = data[numerical_features].shape[1]
n = data[numerical_features].size/data[numerical_features].shape[1]
custom_map.cmap_pearson(n_features, n, 0.1)
print(custom_map.cmap_pearson(n_features, n, 0.1))

plt.figure(figsize=(16, 16))
sns.heatmap(correlation_data, annot=True, fmt=".2f", cmap="bajon", vmin=-1, vmax=1) #tab20b
plt.tight_layout()
plt.show()

In [None]:
correlation_data = data.corr()

n_features = data[numerical_features].shape[1]
n = data[numerical_features].size/data[numerical_features].shape[1]
custom_map.cmap_pearson(n_features, n, 0.1)
print(custom_map.cmap_pearson(n_features, n, 0.1))

plt.figure(figsize=(16, 16))
sns.heatmap(correlation_data, annot=True, fmt=".2f", cmap="bajon", vmin=-1, vmax=1) #tab20b
plt.tight_layout()
plt.show()

In [None]:
X = data.drop(columns=[target])
y = data[target]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size = 0.2, stratify=y, shuffle=True)

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scaler = StandardScaler()

X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

## Feature selection

In [None]:
rf = RandomForestClassifier(
    n_estimators=500,
    n_jobs=-1,
    class_weight="balanced",
)

boruta = BorutaPy(
    estimator=rf,
    n_estimators='auto',   # lub liczba
    max_iter=100,
    alpha=0.05
)

boruta.fit(X_train, y_train)

selected_mask = boruta.support_
selected_features = np.where(selected_mask)[0]
print("Wybrane cechy (indeksy):", selected_features)
print("Ranking:", boruta.ranking_)

In [None]:
selected_indices = np.where(boruta.support_)[0]
boruta_features = X_train.columns[selected_indices].tolist()

boruta_features

In [None]:
corr_features = data.corr()[target].sort_values(ascending=False)
corr_features = corr_features[corr_features > custom_map.cmap_pearson(n_features, n, 0.1)['r_crit']].index.tolist()

corr_features.remove('stroke')
corr_features

In [None]:
data[corr_features].head()

In [None]:
X_chosen = data[corr_features].copy()
X_scaled = scaler.fit_transform(X_chosen)

kmeans = KMeans(
    n_clusters=3,
    random_state=42,
    n_init=20
)

clusters = kmeans.fit_predict(X_scaled)
X_chosen["cluster"] = clusters

X_chosen["cluster"].describe()

In [None]:
y_train.info()

In [None]:
sample_idx = np.random.choice(len(X_train), size=int(X_train.size/20), replace=False)

mi = mutual_info_regression(X_train.iloc[sample_idx], y_train.iloc[sample_idx])
mi_df = pd.DataFrame({"Feature": X_train.columns, "Mutual Information": mi})

plt.figure(figsize=(20, 20))
plt.barh(X_train.columns, mi)
plt.grid(False)
plt.xticks(rotation=90)
plt.title('Mutual Information')
plt.show()
mi_df



In [None]:
names = X_train.columns
# 1. Compute Spearman correlation and distance matrix
# Assuming X is your dataframe of explanatory variables
corr = spearmanr(X).correlation
# Ensure the matrix is symmetric (sometimes float errors occur)
corr = (corr + corr.T) / 2
np.fill_diagonal(corr, 1)

# Convert correlation to a distance matrix
dist_matrix = 1 - np.abs(corr)
dist_linkage = hierarchy.ward(hierarchy.distance.squareform(dist_matrix))

# 2. Visualize the Dendrogram
fig, ax = plt.subplots(figsize=(10, 6))
dendro = hierarchy.dendrogram(
    dist_linkage, labels=names, ax=ax, leaf_rotation=90
)
ax.set_title("Hierarchical Clustering Dendrogram (Feature Redundancy)")
plt.tight_layout()
plt.grid(False)
plt.show()

# 3. Select Features
# Threshold '1' is common for 1 - abs(corr), but you can adjust based on the plot
cluster_ids = hierarchy.fcluster(dist_linkage, t=1, criterion='distance')
cluster_id_to_feature_ids = defaultdict(list)

for idx, cluster_id in enumerate(cluster_ids):
    cluster_id_to_feature_ids[cluster_id].append(idx)

# Keep only the first feature from each cluster
selected_idx = [v[0] for v in cluster_id_to_feature_ids.values()]
mi_features = names[selected_idx]
X_reduced = X_train.iloc[:, selected_idx]


print(f"Original features: {X_train.shape[1]}")
print(f"Reduced features: {len(mi_features)}")
print(f"Selected: {mi_features}")

In [None]:
def calculate_vif(df: pd.DataFrame):
    vif_data = pd.DataFrame()
    vif_data["feature"] = df.columns

    vif_data["VIF"] = [variance_inflation_factor(df.values, i) for i in range(len(df.columns))]
    print(vif_data)

In [None]:
calculate_vif(X_train.drop(columns=["age", "bmi"]))

In [None]:
vif_features = X_train.drop(columns=["age", "bmi"]).columns.tolist()
vif_features

In [None]:
model = LogisticRegression(
    solver="lbfgs",
    max_iter=10000,
    class_weight="balanced",
    random_state=42
)

rfecv = RFECV(
    estimator=model,
    step=1,
    cv=StratifiedKFold(5),
    scoring="roc_auc",
    min_features_to_select=3
)

rfecv.fit(X_train_scaled, y_train)

rfe_features = X_train.columns[rfecv.support_].tolist()
ranking = pd.DataFrame({
    "feature": X_train.columns,
    "rank": rfecv.ranking_
}).sort_values("rank")

print("Liczba wybranych cech:", len(rfe_features))
print("Wybrane cechy:")
print(rfe_features)

In [None]:
plt.figure(figsize=(8, 5))
plt.plot(
    range(rfecv.min_features_to_select, len(rfecv.cv_results_["mean_test_score"]) + rfecv.min_features_to_select),
    rfecv.cv_results_["mean_test_score"]
)
plt.xlabel("Number of features")
plt.ylabel("CV ROC AUC")
plt.title("RFECV performance")
plt.grid(True)
plt.show()


## Model training

In [None]:
## hard coding because of stochasticity

all_features = ["age", "hypertension", "heart_disease", "avg_glucose_level",
                "bmi", "gender_Male", "gender_Other", "ever_married_Yes",
                "work_type_Never_worked", "work_type_Private", "work_type_Self-employed",
                "work_type_children", "Residence_type_Urban",
                "smoking_status_formerly smoked", "smoking_status_never smoked",
                "smoking_status_smokes"]

boruta_features = ["age", "avg_glucose_level", "bmi"]

corr_features = ["age", "heart_disease", "avg_glucose_level", "hypertension",
                 "ever_married_Yes", "smoking_status_formerly smoked",
                 "work_type_Self-employed"]

mi_features = ["age", "hypertension", "gender_Other",
               "work_type_Private", "smoking_status_formerly smoked"]

rfe_features = ["age", "hypertension", "heart_disease", "avg_glucose_level",
                "bmi", "work_type_Never_worked", "work_type_children",
                "Residence_type_Urban", "smoking_status_never smoked",
                "smoking_status_smokes"]

FEATURE_SETS = {
    "all": all_features,
    "boruta": boruta_features,
    "correlation": corr_features,
    "mi": mi_features,
    "rfe": rfe_features
}

In [None]:
X_train_all = X_train[all_features] # all features
X_train_boruta = X_train[boruta_features].copy() # boruta selection
X_train_corr = X_train[corr_features].copy() # cmap correlation
X_train_mi = X_train[mi_features].copy() # correlation, mi & clustering
X_train_rfe = X_train[rfe_features].copy() # rfecv

data = {
    "Method": [
        "All features",
        "Boruta",
        "Corelation",
        "Mutual Information",
        "RFE"
    ],
    "Feature quantity": [
        len(all_features),
        len(boruta_features),
        len(corr_features),
        len(mi_features),
        len(rfe_features)
    ],
    "Feature name": [
        ", ".join(all_features),
        ", ".join(boruta_features),
        ", ".join(corr_features),
        ", ".join(mi_features),
        ", ".join(rfe_features)
    ]
}

pd.set_option("display.max_colwidth", None)
pd.set_option("display.width", None)
pd.set_option("display.max_columns", None)
df = pd.DataFrame(data)
df

In [None]:
storage_url = "sqlite:///optuna_studies.db"
cv = StratifiedKFold(5, shuffle=True, random_state=42)

MODELS = [
    "logreg", "knn", "svm", "gnb", "dt",
    "rf", "ada", "gb", "extra",
    "lgbm", "xgb", "cat"
]

In [None]:
def sanitize_params(model, params):
    valid = model.get_params().keys()
    return {k: v for k, v in params.items() if k in valid}


In [None]:
def get_model_base(name, pos_weight=None):

    if name == "logreg":
        return Pipeline([
            ("scaler", StandardScaler()),
            ("clf", LogisticRegression(
                solver="saga",
                penalty="l2",
                C=1.0,
                class_weight="balanced",
                max_iter=5000,
                random_state=42
            ))
        ])

    if name == "svm":
        return Pipeline([
            ("scaler", StandardScaler()),
            ("clf", SVC(
                kernel="rbf",
                C=1.0,
                gamma="scale",
                probability=True,
                class_weight="balanced",
                random_state=42
            ))
        ])

    if name == "knn":
        return Pipeline([
            ("scaler", StandardScaler()),
            ("clf", KNeighborsClassifier(
                n_neighbors=15,
                weights="distance"
            ))
        ])

    if name == "gnb":
        return Pipeline([
            ("scaler", StandardScaler()),
            ("clf", GaussianNB())
        ])

    if name == "dt":
        return DecisionTreeClassifier(
            max_depth=6,
            min_samples_leaf=20,
            class_weight="balanced",
            random_state=42
        )

    if name == "rf":
        return RandomForestClassifier(
            n_estimators=500,
            max_depth=None,
            min_samples_leaf=5,
            class_weight="balanced",
            n_jobs=-1,
            random_state=42
        )

    if name == "extra":
        return ExtraTreesClassifier(
            n_estimators=500,
            min_samples_leaf=5,
            class_weight="balanced",
            n_jobs=-1,
            random_state=42
        )

    if name == "ada":
        return AdaBoostClassifier(
            n_estimators=300,
            learning_rate=0.05,
            random_state=42
        )

    if name == "lgbm":
        return LGBMClassifier(
            n_estimators=500,
            learning_rate=0.05,
            num_leaves=31,
            class_weight="balanced",
            random_state=42
        )

    if name == "xgb":
        return XGBClassifier(
            n_estimators=500,
            learning_rate=0.05,
            max_depth=6,
            subsample=0.8,
            colsample_bytree=0.8,
            scale_pos_weight=pos_weight,
            eval_metric="logloss",
            tree_method="hist",
            random_state=42
        )

    if name == "cat":
        return CatBoostClassifier(
            iterations=500,
            depth=6,
            learning_rate=0.05,
            loss_function="Logloss",
            eval_metric="F1",
            auto_class_weights="Balanced",
            verbose=False,
            random_seed=42
        )

    raise ValueError(name)


In [None]:
def get_model_search(trial, name):

    if name == "logreg":
        return Pipeline([
            ("scaler", StandardScaler()),
            ("clf", LogisticRegression(
                C=trial.suggest_float("clf__C", 1e-3, 10, log=True),
                l1_ratio=trial.suggest_float("clf__l1_ratio", 0.0, 1.0),
                solver="saga",
                class_weight="balanced",
                max_iter=5000,
                random_state=42
            ))
        ])

    if name == "knn":
        return Pipeline([
            ("scaler", StandardScaler()),
            ("clf", KNeighborsClassifier(
                n_neighbors=trial.suggest_int("clf__n_neighbors", 3, 25),
                weights=trial.suggest_categorical("clf__weights", ["uniform", "distance"])
            ))
        ])

    if name == "svm":
        return Pipeline([
            ("scaler", StandardScaler()),
            ("clf", SVC(
                C=trial.suggest_float("clf__C", 1e-2, 10, log=True),
                kernel="rbf",
                probability=True,
                class_weight="balanced",
                random_state=42
            ))
        ])

    if name == "gnb":
        return Pipeline([
            ("scaler", StandardScaler()),
            ("clf", GaussianNB(
                var_smoothing=trial.suggest_float("clf__var_smoothing", 1e-12, 1e-8, log=True)
            ))
        ])

    if name == "dt":
        return DecisionTreeClassifier(
            max_depth=trial.suggest_int("max_depth", 2, 15),
            min_samples_split=trial.suggest_int("min_samples_split", 2, 20),
            class_weight="balanced",
            random_state=42
        )

    if name == "rf":
        return RandomForestClassifier(
            n_estimators=trial.suggest_int("n_estimators", 200, 600),
            max_depth=trial.suggest_int("max_depth", 3, 15),
            class_weight="balanced",
            n_jobs=-1,
            random_state=42
        )

    if name == "extra":
        return ExtraTreesClassifier(
            n_estimators=trial.suggest_int("n_estimators", 200, 600),
            max_depth=trial.suggest_int("max_depth", 3, 15),
            class_weight="balanced",
            n_jobs=-1,
            random_state=42
        )

    if name == "ada":
        return AdaBoostClassifier(
            n_estimators=trial.suggest_int("n_estimators", 100, 400),
            learning_rate=trial.suggest_float("learning_rate", 0.01, 1.0),
            random_state=42
        )

    if name == "gb":
        return GradientBoostingClassifier(
            n_estimators=trial.suggest_int("n_estimators", 100, 400),
            learning_rate=trial.suggest_float("learning_rate", 0.01, 0.2),
            max_depth=trial.suggest_int("max_depth", 2, 5),
            random_state=42
        )

    if name == "lgbm":
        return LGBMClassifier(
            n_estimators=trial.suggest_int("n_estimators", 200, 600),
            learning_rate=trial.suggest_float("learning_rate", 0.01, 0.2),
            num_leaves=trial.suggest_int("num_leaves", 16, 64),
            class_weight="balanced",
            random_state=42
        )

    if name == "xgb":
        return XGBClassifier(
            n_estimators=trial.suggest_int("n_estimators", 200, 600),
            max_depth=trial.suggest_int("max_depth", 3, 10),
            learning_rate=trial.suggest_float("learning_rate", 0.01, 0.2),
            subsample=trial.suggest_float("subsample", 0.6, 1.0),
            colsample_bytree=trial.suggest_float("colsample_bytree", 0.6, 1.0),
            eval_metric="logloss",
            tree_method="hist",
            random_state=42
        )

    raise ValueError(name)


In [None]:
def objective_catboost(trial, X, y):

    params = {
        "iterations": trial.suggest_int("iterations", 200, 600),
        "depth": trial.suggest_int("depth", 4, 10),
        "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.2),
        "loss_function": "Logloss",
        "auto_class_weights": "Balanced",
        "verbose": False,
        "random_seed": 42
    }

    f1s = []

    for tr, va in cv.split(X, y):
        model = CatBoostClassifier(**params)
        model.fit(X.iloc[tr], y.iloc[tr])
        preds = model.predict(X.iloc[va])
        f1s.append(f1_score(y.iloc[va], preds))

    return np.mean(f1s)


In [None]:
def get_model_final(name, params):

    model = get_model_base(name)

    clean_params = sanitize_params(model, params)

    model.set_params(**clean_params)

    return model


In [None]:
def predict_f1(model, X, y, threshold=0.5):
    proba = model.predict_proba(X)[:, 1]
    y_pred = (proba >= threshold).astype(int)
    return f1_score(y, y_pred)

In [None]:
def find_best_f1_threshold(model, X, y, thresholds=np.linspace(0.01, 0.5, 50)):
    proba = model.predict_proba(X)[:, 1]

    best_f1 = 0.0
    best_thr = 0.5

    for t in thresholds:
        y_pred = (proba >= t).astype(int)
        f1 = f1_score(y, y_pred)

        if f1 > best_f1:
            best_f1 = f1
            best_thr = t

    return best_thr, best_f1


In [None]:
results = pd.DataFrame(
    index=MODELS,
    columns=FEATURE_SETS.keys(),
    dtype=float
)

for feature_name, feature_list in FEATURE_SETS.items():

    X_train_sel = X_train[feature_list]
    X_test_sel = X_test[feature_list]

    for model_name in MODELS:

        study_name = f"{model_name}_{feature_name}_prob"

        with open(f"models/{study_name}.pkl", "rb") as f:
            model = pickle.load(f)

        # ðŸ”¥ TU â€“ dobÃ³r progu NA TRAIN
        best_thr, _ = find_best_f1_threshold(
            model,
            X_train_sel,
            y_train
        )

        # ðŸ”¥ TU â€“ F1 NA TEST tym progiem
        proba_test = model.predict_proba(X_test_sel)[:, 1]
        y_pred_test = (proba_test >= best_thr).astype(int)

        f1 = f1_score(y_test, y_pred_test)
        results.loc[model_name, feature_name] = f1

        print(
            f"{model_name}_{feature_name} | "
            f"thr={best_thr:.2f} | "
            f"F1_test={f1:.4f}"
        )


In [None]:
results = results.sort_values(
    by=results.columns.tolist(),
    ascending=False
)

results

In [None]:
with open("models/rf_all_prob.pkl", "rb") as f:
    model = pickle.load(f)

y_pred = model.predict(X_test)
f1 = f1_score(y_test, y_pred)

print(f"F1 score (rf_all): {f1:.4f}")
