In [None]:
import pandas as pd
import numpy as np
import sys
import yaml
import itertools
import matplotlib.pyplot as plt
import plotly.graph_objects as go
import shap

from sklearn.metrics import f1_score
from sklearn.model_selection import KFold
from xgboost import XGBClassifier
from mnist import MNIST

sys.path.insert(1, '/home/guilherme-resende/Desktop/mono2/utils')
import qif

In [None]:
RANDOM_SEED = 1
MAX_DEPTHS = [2,4,8, 16]
N_ESTIMATORS = [16,32,64,128,256]

#### Load Data

In [None]:
ds_name = "mnist"

In [None]:
datasets = yaml.load(open("datasets.yaml"))
df = pd.read_csv(datasets[ds_name]["path"])

df_train = df.loc[df.set == "train"].drop("set", axis=1).reset_index(drop=True)
df_test = df.loc[df.set == "test"].drop("set", axis=1).reset_index(drop=True)

del df

#### Create Input

In [None]:
X = df_train.drop("targets", axis=1)
Y = df_train.targets

X_test = df_test.drop("targets", axis=1)
Y_test = df_test.targets

#### Select the best parameters combination

In [None]:
%%script False
%%time

data = []
for i, (max_depth, n_estimators) in enumerate(itertools.product(MAX_DEPTHS, N_ESTIMATORS), start=1):
    kf = KFold(n_splits=5)
    for fold, (train_idx, valid_idx) in enumerate(kf.split(X)):
        model = XGBClassifier(max_depth=max_depth, n_estimators=n_estimators)
        model.fit(X.iloc[train_idx].values, Y[train_idx])

        preds = model.predict(X.iloc[valid_idx].values)
        preds_proba = model.predict_proba(X.iloc[valid_idx].values)

        f1 = f1_score(Y[valid_idx], preds, average="weighted")

        data.append([max_depth, n_estimators, fold, f1])
    
    print(f"{(i*100) // (len(MAX_DEPTHS) * len(N_ESTIMATORS))}% Complete.")
    
df_results = pd.DataFrame(data, columns=["max_depth", "n_estimators", "fold", "f1_score"])

In [None]:
%%script False

df_results = (
    df_results.groupby(["max_depth", "n_estimators"])
    .agg(
        mean_f1_score=("f1_score", "mean"),
    )
    .reset_index()
)

In [None]:
%%script False

fig = go.Figure(
    data=go.Heatmap(
        x=df_results.max_depth,
        y=df_results.n_estimators,
        z=df_results.mean_f1_score,
        colorbar={
            "title":"Mean F1-Score"
        }
    )
)

fig.update_layout(
    title="Mean F1-Score for Each Parameter Combination",
    xaxis_title="Maximal Depth",
    yaxis_title="Number of Estimators",
)

fig.update_xaxes(type='category')
fig.update_yaxes(type='category')

fig.show()

#### Training Process

In [None]:
model = XGBClassifier(max_depth=8, n_estimators=128)
model.fit(X.values, Y)

#### Prediction Process

In [None]:
preds = model.predict(X_test.values)
preds_proba = model.predict_proba(X_test.values)

#### Select a Given Class to Analyze

In [None]:
for target in model.classes_:
    target_idx = df_test[df_test.targets == target].index
    df_test.loc[target_idx, "preds_proba"] = preds_proba[target_idx, target]

In [None]:
df_test.head(3)

#### Calculate SHAP

In [None]:
%%time

data = []
for target in model.classes_:
    df_target = df_test[df_test.targets == target]
    X_target = df_target.drop(["targets", "preds_proba"], axis=1)
    
    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_target.values)

    shap_values = np.array(shap_values)[target]

    data.append(shap_values.mean(axis=0))

df_shap = pd.DataFrame(data)
df_shap["digit"] = model.classes_

In [None]:
df_shap

#### Visualize SHAP

In [None]:
shap_values = df_shap[df_shap.digit == 5].drop("digit", axis=1)

In [None]:
plt.imshow(np.array(shap_values).reshape((28, 28)))
plt.show()

#### Calculate QIF

In [None]:
%%time

data = []
for i, target in enumerate(model.classes_, start=1):
    df_target = df_test[df_test.targets == target]
    
    feature_names = X_test.columns
    bayes_leakage = qif.BayesLeakage(df_target)
    
    qif_values = []
    for feature in feature_names:
        leakage = bayes_leakage.compute_flows(x=feature, y='preds_proba')
        qif_values.append(leakage[0])
    
    data.append(qif_values)
    
    print(f"{i / len(model.classes_) * 100} Complete.")

df_qif = pd.DataFrame(data)
df_qif["digit"] = model.classes_

#### Visualize QIF

In [None]:
qif_values = df_qif[df_qif.digit == 5].drop("digit", axis=1)

In [None]:
plt.imshow(np.array(qif_values).reshape((28, 28)))

#### Save Coeficients

In [None]:
df_shap["method"] = "SHAP"
df_qif["method"] = "QIF"

df = pd.concat([df_shap, df_qif])

df.to_csv(f"../data/results/{ds_name}_coeficients.csv", index=False)