In [None]:
import os
import sys
import pickle
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import (
    confusion_matrix,
    roc_auc_score,
    f1_score,
    precision_score,
    recall_score,
    accuracy_score,
    fbeta_score,
)
from sklearn.calibration import CalibratedClassifierCV
from sklearn.ensemble import VotingClassifier

# import own modules
sys.path.append("..")  # Adds higher directory to python modules path.
from scripts import features as ft
from scripts import preprocessing as pp
from scripts import evaluate_models as em

# plt.style.use('https://github.com/dhaitz/matplotlib-stylesheets/raw/master/pitayasmoothie-dark.mplstyle')
plt.style.use(
    "https://github.com/dhaitz/matplotlib-stylesheets/raw/master/pitayasmoothie-light.mplstyle"
)

In [None]:
# path to csv file
path_df = os.path.join("..", "data", "df_deep_sam.csv")

# get features - or recalculate
recalculate_df = False
if os.path.isfile(path_df) and not recalculate_df:
    df = pd.read_csv(path_df)
else:
    df = ft.get_features()
    df.to_csv(path_df, index=False)

# set id as index
df = df.set_index("id", drop=True)

# drop first batch of useless variables
df = df.drop(columns=["img", "sp_idx"])
df = df.drop(
    columns=[col for col in df.columns if "_obj" in col]
)  # drop 'object' columns

# processing
df = df[df["sp_fix_duration_ms_total"] <= 5000]

# find numerical and categorical columns
num_cols = df.columns[df.dtypes != "object"]
cat_cols = df.columns[df.dtypes == "object"]

# print info
print(f" -> dataframe has {df.shape[0]} instances and {df.shape[1]} columns")
print(f" -> there are {len(num_cols)} numerical columns")
print(f" -> there are {len(cat_cols)} categoricals columns")

# prepare features and target
X = df
y = X.pop("asd")

# define numerical columns once more
num_cols = X.columns[X.dtypes != "object"]

# train-test-split
X_train, X_test, y_train, y_test = pp.split(X, y)

# print info
print(f"train-set has '{len(y_train)}' samples & '{X.shape[1]}' features")
print(f"test-set has '{len(y_test)}' samples - out of '{df.shape[0]}'")
print(f"  ~ {len(y_test) / df.shape[0] * 100:.2f}% of full dataset")

----
# functions


In [None]:
def vary_threshold(clf, name, thresholds, calib: bool = True):
    if calib:
        clf = CalibratedClassifierCV(clf, method="isotonic")
        clf.fit(X_train, y_train)

    proba_train = clf.predict_proba(X_train)

    acc, prec, reca = [], [], []
    f1, f2, roc_auc = [], [], []

    cm_cmap = sns.light_palette("seagreen", as_cmap=True)
    nrows, ncols = 5, 7
    _, ax = plt.subplots(nrows=nrows, ncols=ncols, figsize=(20, 13))
    plt.suptitle(name)
    ax = ax.flatten()

    for i, thresh in enumerate(thresholds):
        y_pred = (proba_train[:, 1] > thresh) * 1

        roc_auc.append(roc_auc_score(y_true=y_train, y_score=proba_train[:, 1]))
        prec.append(precision_score(y_true=y_train, y_pred=y_pred))
        reca.append(recall_score(y_true=y_train, y_pred=y_pred))
        acc.append(accuracy_score(y_true=y_train, y_pred=y_pred))
        f1.append(f1_score(y_true=y_train, y_pred=y_pred))
        f2.append(fbeta_score(y_true=y_train, y_pred=y_pred, beta=2))

        iax = ax[i]
        plt.subplot(nrows, ncols, i + 1)
        sns.heatmap(
            confusion_matrix(y_train, y_pred),
            annot=True,
            cmap=cm_cmap,
            fmt="g",
            cbar=False,
        )
        plt.title(f"thresh. = {round(thresh,2)}")
        plt.xlabel("Predicted")
        plt.ylabel("Actual")
        iax.set_aspect("equal", "box")

    plt.tight_layout()

    # ------------------------------------------------------------
    df2 = pd.DataFrame({"thresh": thresholds})
    df2["acc"] = acc
    df2["prec"] = prec
    df2["reca"] = reca
    df2["f1"] = f1
    df2["f2"] = f2
    df2["roc_auc"] = roc_auc

    cols = ["acc", "prec", "reca", "f1", "f2", "roc_auc"]
    _, ax = plt.subplots(nrows=6, ncols=1, figsize=(12, 30))
    for i, c in enumerate(cols):
        plt.subplot(6, 1, i + 1)
        sns.barplot(data=df2, x="thresh", y=c)
        plt.title(c)
        plt.xlabel("threshold")
        plt.ylabel(c)
        plt.ylim(df2[c].min() - 0.01, df2[c].max() + 0.01)
    plt.tight_layout()

    #
    plt.figure()
    sns.lineplot(df2.set_index("thresh", drop=True))

    return clf

In [None]:
def new_threshold(mdl, new_threshold=0.5):
    proba_train = mdl.predict_proba(X_train)
    pred_train = (proba_train[:, 1] > new_threshold) * 1

    proba_test = mdl.predict_proba(X_test)
    pred_test = (proba_test[:, 1] > new_threshold) * 1

    # evaluate model
    em.report(
        y_train=y_train,
        y_train_pred=pred_train,
        y_train_proba=proba_train,
        y_test=y_test,
        y_test_pred=pred_test,
        y_test_proba=proba_test,
    )

----
# models


In [None]:
models = [
    "../models/RF_final/RF_final.pickle",
    "../models/xgb_final/xgb_final_new.pickle",
    "../models/SVC_final/best_svc_resnet.pickle",
]

with open(os.path.join(models[0]), "rb") as f:
    rf = pickle.load(f)
with open(os.path.join(models[1]), "rb") as f:
    xgb = pickle.load(f)
with open(os.path.join(models[2]), "rb") as f:
    svc = pickle.load(f)

thresholds = [i / 100 for i in list(range(16, 86, 2))]

print(thresholds)
print(len(thresholds))

----
# RF


In [None]:
rf_calib = vary_threshold(rf, "RF", thresholds, calib=True)

In [None]:
new_threshold(rf_calib, new_threshold=0.38)
new_threshold(rf, new_threshold=0.5)

----
# XGB


In [None]:
xgb_calib = vary_threshold(xgb, "xgb", thresholds=thresholds, calib=True)

In [None]:
new_threshold(xgb_calib, new_threshold=0.42)
new_threshold(xgb, new_threshold=0.5)

----
# SVC

In [None]:
svc_calib = vary_threshold(svc, "SVC", thresholds=thresholds, calib=True)

In [None]:
new_threshold(svc_calib, new_threshold=0.36)
new_threshold(svc, new_threshold=0.5)

----

# voting

In [None]:
vtg_calib = VotingClassifier(
    estimators=[
        ("calibrated_xgb", xgb_calib),
        ("calibrated_rf", rf_calib),
        ("calibrated_svc", svc_calib),
    ],
    voting="soft",
)
# Fit the voting classifier with training data
vtg_calib.fit(X_train, y_train)

In [None]:
vary_threshold(vtg_calib, "voting_c", thresholds=thresholds, calib=False)

In [None]:
new_threshold(vtg_calib, new_threshold=0.4)
new_threshold(vtg_calib, new_threshold=0.5)

----

# save calibrated models

In [None]:
em.save_model(m=rf_calib, file="RF_calib.pickle", folder="calibrated")
em.save_model(m=xgb_calib, file="XGB_calib.pickle", folder="calibrated")
em.save_model(m=svc_calib, file="SVC_calib.pickle", folder="calibrated")
em.save_model(m=vtg_calib, file="VTG_calib.pickle", folder="calibrated")