In [None]:
import shap
import pandas as pd
from alibi.explainers import AnchorTabular
from alibi.confidence import TrustScore
from sklearn.inspection import partial_dependence
from sklearn.inspection import PartialDependenceDisplay
import time
import pickle

In [None]:
import warnings
warnings.filterwarnings("ignore")

In [None]:
model_id_index = 2

In [None]:
from sqlalchemy import MetaData, text
from sqlalchemy.ext.automap import automap_base
from sqlalchemy.orm import registry, Session

from util import get_engine

In [None]:
engine = get_engine()
metadata = MetaData()
metadata.reflect(bind=engine)
Base = automap_base(metadata=metadata)
Base.prepare(autoload_with=engine)
mapper_registry = registry()

In [None]:
table_names = ["datapoint_feature_value", "datapoint_class_label", "datapoint_filter", "datapoint", "datapoint_mappings", "feature", "model"]
dfs = []

for table in table_names:
    with engine.begin() as conn:
        query = text("""SELECT * FROM """ + table)
        dfs.append(pd.read_sql_query(query, conn))

In [None]:
datapoint_df = dfs[3]
datapoint_mappings_df = dfs[4]
datapoint_feature_value_df = dfs[0]
feature_df = dfs[5]
datapoint_class_label_df = dfs[1]
datapoint_filter_df = dfs[2]
model_df = dfs[6]

In [None]:
# Merging the datapoint DataFrame with datapoint_mappings to get the grouping (train/test).
datapoints_merged = pd.merge(datapoint_df, datapoint_mappings_df, left_on='datapoint_mappings_id', right_on='id', suffixes=('_datapoint', '_mappings'))

# Pivot the datapoint_feature_value table.
pivot_feature_values = datapoint_feature_value_df.pivot(index='datapoint_id', columns='feature_id', values='value').reset_index()

# Rename pivot_feature_values columns using the feature names for readability.
pivot_feature_values.columns = ['datapoint_id'] + feature_df.set_index('id').loc[pivot_feature_values.columns[1:]]['name'].tolist()

# Merge the class label and filter values with the datapoints.
final_df = pd.merge(pivot_feature_values, datapoint_class_label_df[['datapoint_id', 'label_categorical_id']], on='datapoint_id', how='left').rename(columns={'label_categorical_id': 'class_label'})
final_df = pd.merge(final_df, datapoint_filter_df[['datapoint_id', 'value']], on='datapoint_id', how='left', suffixes=('', '_filter')).rename(columns={'value': 'filter_value'})

# Merge with the datetime and grouping information from datapoints_merged.
final_df = pd.merge(final_df, datapoints_merged[['id_datapoint', 'datetime', 'grouping']], left_on='datapoint_id', right_on='id_datapoint', how='left')


In [None]:
model_df["id"]

In [None]:
final_df = final_df.rename(columns={'label_categorical_id': 'class_label'}); final_df.head()

In [None]:

# Select and reorder columns as needed (optional).
final_columns = ['datapoint_id', 'datetime', 'grouping', 'filter_value', 'class_label'] + feature_df['name'].tolist()
final_df = final_df[final_columns]


In [None]:
final_df.head()

In [None]:
model_df.head()

In [None]:
model_df["path_to_model"][model_id_index]

In [None]:
model = pickle.load(open(model_df["path_to_model"][model_id_index], 'rb'))
model_features_in = list(model.feature_names_in_); model_features_in

In [None]:
model

In [None]:
classes_out = model.classes_

In [None]:
def label_transform(row):
    print(row["class_label"])
    if row["class_label"] == 1: return 0
    elif row["class_label"] == 2: return 1
    elif row["class_label"] == 3: return 2
    elif row["class_label"] == 4: return 3
    elif row["class_label"] == 5: return 4
    else: return -1

In [None]:
X_train = final_df[final_df['grouping'] == "train"][model_features_in]
X_test = final_df[final_df['grouping'] == "test"][model_features_in]
y_train = final_df[final_df['grouping'] == "train"]["class_label"]
y_train_trans = final_df[final_df['grouping'] == "train"].apply(label_transform, axis=1)
y_test = final_df[final_df['grouping'] == "test"]["class_label"]
y_test_trans = final_df[final_df['grouping'] == "test"].apply(label_transform, axis=1)


In [None]:
y_pred = model.predict(X_train.to_numpy()); y_pred

In [None]:
y_pred

In [None]:
my_df = pd.DataFrame({'pred': y_pred, 'label': y_train}, columns=['pred', 'label'])

In [None]:
my_df.head()

In [None]:
import pandas as pd
from sklearn.metrics import balanced_accuracy_score

def confusion_matrix(df: pd.DataFrame, col1: str, col2: str):
    """
    Given a dataframe with at least
    two categorical columns, create a 
    confusion matrix of the count of the columns
    cross-counts
    
    use like:
    
    >>> confusion_matrix(test_df, 'actual_label', 'predicted_label')
    """

    print(balanced_accuracy_score(df[col1], df[col2]))
    
    return (
            df
            .groupby([col1, col2])
            .size()
            .unstack(fill_value=0)
            )

In [None]:
confusion_matrix(my_df, "pred", "label")

In [None]:
X_train.head()

In [None]:
features = X_train.columns.tolist()

In [None]:
y_train_trans

## SHAP

In [None]:
explainer = shap.TreeExplainer(model)

In [None]:
classes = model.classes_.tolist()

In [None]:
shap_values = explainer.shap_values(X_test)

In [None]:
shap_values.shape

In [None]:
len(shap_values)

In [None]:
explanation_object = shap.Explanation(shap_values)

In [None]:
explanation_object.values[120]

In [None]:
shap.summary_plot(shap_values, X_test, plot_type="bar", class_names=[0,1,2,3,4], feature_names = X_test.columns)

## Anchors

In [None]:
predict_fn = lambda x: model.predict_proba(x)
feature_names = X_train.columns.to_list()
explainer = AnchorTabular(predict_fn, feature_names)
multiplier = 25
entries = int((100 / multiplier) - 1)
explainer.fit(X_train.to_numpy(), disc_perc=(tuple([(x+1) * multiplier for x in range(entries)])))

In [None]:
feature_names

In [None]:
X_test_np = X_test.to_numpy()
test_length = X_test_np.shape[0]

In [None]:
def time_convert(sec):
  mins = sec // 60
  sec = sec % 60
  hours = mins // 60
  mins = mins % 60
  return "Time Lapsed = {0}:{1}:{2}".format(int(hours),int(mins),sec)

In [None]:
X_test_np = X_test.to_numpy()
X_test_np.shape
start_time = time.time()

anchors = []
for i, dp in enumerate(X_test_np):
    explanation = explainer.explain(dp, threshold=0.95)
    anchors.append(explanation)
    current_time = time.time()
    time_lapsed = current_time - start_time
    print("{} von {} Schritten abgeschlossen. Zeit: {}. Precision: {}".format(i, test_length, time_convert(time_lapsed), explanation["data"]["precision"]), end="\r", flush=True)

In [None]:
anchors[1]["data"]

## Trustscores

In [None]:
classes = model.classes_-1; classes

In [None]:
y_train_norm = y_train.to_numpy()-1
y_test_norm = y_test.to_numpy() - 1

In [None]:
X_train.to_numpy()

In [None]:
ts = TrustScore()
ts.fit(X_train.to_numpy(), y_train_norm, classes=5)


In [None]:
scores = ts.score(X_test, y_test_norm, k=4)

In [None]:
import numpy as np

In [None]:
print(len(datapoint_df))

In [None]:
dist_sum  = 0
conf_sum = 0
for i, score in enumerate(scores[1]):
    abs_dist = abs(y_test_trans.iloc[i] - (score))
    dist_sum = dist_sum + abs_dist
    conf_sum = conf_sum + scores[0][i]

print(dist_sum / len(scores[1]))
print(conf_sum / len(scores[1]))

## Partial Dependence

#### Prepare model wrapper to output numerical values instead of categorical values (i.e. 0 instead of "low", ...)

In [None]:
from sklearn.base import BaseEstimator, ClassifierMixin

class CategoricalToNumericalClassifier(BaseEstimator, ClassifierMixin):
    def __init__(self, base_model, class_mapping):
        self.base_model = base_model
        self.class_mapping = class_mapping
        self.classes_ = list(class_mapping.values())
        # Reverse mapping for decoding
        self.reverse_mapping = {v: k for k, v in class_mapping.items()}

    def fit(self, X, y=None):
        # This model is already trained, so we don't do anything here
        self.is_fitted_ = True
        return self

    def predict(self, X):
        # Predict using the base model
        predictions = self.base_model.predict(X)
        # Map categorical predictions to numerical
        return [self.class_mapping.get(pred, -1) for pred in predictions]  # Default to -1 for unknown classes
    
    def predict_proba(self, X):
        # Optional: Implement this if you need probability estimates
        # and your base model supports it
        if hasattr(self.base_model, 'predict_proba'):
            return self.base_model.predict_proba(X)
        else:
            raise NotImplementedError("This model does not support predict_proba.")

    def inverse_transform(self, y):
        # Convert numerical predictions back to categorical
        return [self.reverse_mapping[pred] for pred in y]

In [None]:
class_mapping = {'low': 0, 'low-med': 1, 'medium': 2, 'med-high': 3, 'high': 4}
model_wrapper = CategoricalToNumericalClassifier(base_model=model, class_mapping=class_mapping)

# this does nothing as the model is already trained! It is just done to sign that the model is trained
model_wrapper.fit(X_train, y_train)

In [None]:
model_wrapper.classes_

In [None]:
for index, feature in enumerate(X_test.columns):
    pdp = partial_dependence(model_wrapper, X_test, [index], kind="average")
    for cl_index, cl in enumerate(model_wrapper.classes_):
        print(
            {
                "feature": feature,
                "class": int(cl),
                "average": pdp["average"][cl_index].tolist()
            }
        )
        print(
            {
                "feature": feature,
                "class": int(cl),
                "values": pdp["values"][0].tolist()
            }
        )
        # for elem_ind, elem in enumerate(pdp["individual"][cl_index]):
        #     print(
        #         {
        #             "feature": feature,
        #             "class": int(cl),
        #             "index": elem_ind,
        #             "individual": elem.tolist()
        #         }
        #     )

In [None]:
PartialDependenceDisplay.from_estimator(model_wrapper, X_train, [X_train.columns[4]], target=0)

## Save to Database

In [None]:
shap_values.shape

In [None]:
from util import create_object

In [None]:
context = dict()
context["Base"] = Base
context["session"] = Session(bind=engine)

In [None]:
datapoint_df[datapoint_df["datapoint_mappings_id"] == 2]

In [None]:
# TODO setup correct model reference
feature_ids = list(feature_df["id"])
datapoint_ids = list(datapoint_df[datapoint_df["datapoint_mappings_id"] == 2]["id"])

for i, feature in enumerate(model_features_in):
    for j, datapoint_id in enumerate(datapoint_ids):
        for k, class_id in enumerate(classes_out):
            create_object(context, "shap", with_commit=False,
                        model_id = int(model_df["id"][model_id_index]),
                        feature_id = int(feature_df[feature_df["name"]==feature].id),
                        datapoint_id = int(datapoint_id),
                        label_id = int(k+1),
                        # TODO: add SHAP values based on shap value data structure
                        value = float(shap_values[j][i][k])
                      )
        if j % 5000 == 0 and j != 0:
            print("yo")
            context["session"].commit()

context["session"].commit()

In [None]:

for i, datapoint_id in enumerate(datapoint_ids):
    create_object(context, "anchors", with_commit=False,
                    id = i,
                    model_id = model.id,
                    datapoint_id = datapoint_id,
                    # TODO: add Anchor performance values based on anchor data structure
                    precision = None,
                    coverage = None
                )
    for feature_id in feature_ids:
        create_object(context, "anchor_rules", with_commit=False,
                        anchor_id = i,
                        feature_id = feature_id,
                        # TODO: add boundary values based on anchor data structure
                        lower_bound = None,
                        upper_bound = None
                    )
    if (i % 5000 == 0 and i != 0):
        context["session"].commit()
# Final commit in case something was not commited yet
context["session"].commit()


In [None]:
datapoint_ids = list(datapoint_df[datapoint_df["datapoint_mappings_id"] == 2]["id"])
for i, datapoint_id in enumerate(datapoint_ids):
    create_object(context, "trustscores", with_commit=False,
                    model_id = int(model_df["id"][model_id_index]),
                    datapoint_id = int(datapoint_id),
                    neighbor = int(scores[1][i]+1),
                    score = float(scores[0][i])
    )
    if (i % 5000 == 0 and i != 0):
        print("yo")
        context["session"].commit()
# Final commit in case something was not commited yet
context["session"].commit()


In [None]:

for i, feature_id in enumerate(feature_ids):
    for j, class_id in enumerate(classes):
        # Calculate unique, up-counting id based on i and j.
        unique_id = ((i+j) * (i+j+1) / 2) + j
        create_object(context, "partial_dependence", with_commit=False,
                        id = unique_id,
                        model_id = model.id,
                        feature_id = feature_id,
                        label_id = class_id
                      )
        create_object(context, "partial_dependence_average", with_commit=False,
                    partial_dependence_id = unique_id,
                    # TODO: add PDP values based on PDP data structure
                    feature_value = None,
                    pd_value = None,
                    index = None
                )
        
context["session"].commit()