# Reproduce the papers results on the German Credit Dataset

### 1. Init & Load Data

In [1]:
from anchor import anchor_tabular
import numpy as np
import pickle
import pandas as pd
import random

np.random.seed(1)
random.seed(1)


def load_ds(name):
    assert name in ["german", "adult"]

    # Load train set
    title = "scripts/datasets/train_set_"+name+"_strat.p"
    train = open(title,"rb")
    train_set: pd.DataFrame = pickle.load(train)

    title = "scripts/datasets/train_label_"+name+"_strat.p"
    train_l = open(title,"rb")
    train_label: pd.DataFrame = pickle.load(train_l)

    # Load Test set
    title = "scripts/datasets/test_set_" + name + "_strat.p"
    test = open(title, "rb")
    test_set: pd.DataFrame = pickle.load(test)

    title = "scripts/datasets/test_label_" + name + "_strat.p"
    test_l = open(title, "rb")
    test_label: pd.DataFrame = pickle.load(test_l)

    # Reset indices
    train_set = train_set.reset_index(drop=True)
    train_label = train_label.reset_index(drop=True)
    test_set = test_set.reset_index(drop=True)
    test_label = test_label.reset_index(drop=True)

    # Convert object columns to int
    train_set = train_set.astype(int)
    test_set = test_set.astype(int)

    return train_set, train_label, test_set, test_label

train_set, train_label, test_set, test_label = load_ds("german")

print("Length train set:", len(train_set))
print("Length test set:", len(test_set))

Length train set: 700
Length test set: 300


### 2. Load Models

In [2]:
def load_model(model: str, ds: str):
    assert model in ["catboost", "lg", "xgb"]
    assert ds in ["adult", "german"]

    path = f"scripts/results/trained_{model}_{ds}.p"
    path_opened = open(path,"rb")

    if model == "lg":
        from sklearn.linear_model import LogisticRegression
        model: LogisticRegression = pickle.load(path_opened)
    elif model == "xgb":
        from xgboost import XGBClassifier
        model: XGBClassifier = pickle.load(path_opened)
    elif model == "catboost":
        from catboost import CatBoostClassifier
        model: CatBoostClassifier = pickle.load(path_opened)

    if model is None:
        raise Exception()
    return model

def load_explainer(train_set, train_label):
    return anchor_tabular.AnchorTabularExplainer(
        class_names=train_label.unique(),
        feature_names=train_set.columns,
        train_data=train_set.values
    )

lg = load_model("lg", "german")
xgb = load_model("xgb", "german")
cat = load_model("catboost", "german")

explainer: anchor_tabular.AnchorTabularExplainer = load_explainer(train_set, train_label)

def xgb_fn(X):
    return xgb.predict(pd.DataFrame(X, columns=test_set.columns))

### 3. Evaluate Models

In [3]:
from sklearn.metrics import accuracy_score

# Predict
lg_preds  = lg.predict(test_set)
xgb_preds = xgb_fn(test_set)
cat_preds = cat.predict(test_set)

# Accuracy
lg_acc  = accuracy_score(test_label, lg_preds)
xgb_acc = accuracy_score(test_label, xgb_preds)
cat_acc = accuracy_score(test_label, cat_preds)

print("XGBoost Accuracy:", xgb_acc)
print("LogReg Accuracy: ", lg_acc)
print("CatBoost Accuracy:", cat_acc)

XGBoost Accuracy: 0.9233333333333333
LogReg Accuracy:  0.7166666666666667
CatBoost Accuracy: 0.92


### 4a. Calculate Fidelity

In [None]:
import re
from tqdm import tqdm

def conv_rule_to_pandas_query(rule):
    """
    Converts Anchor rules (list of strings) into a valid pandas .query() expression.
    Supports compound range rules like '3.00 < occupation <= 5.00'.
    """
    query_parts = []
    
    for cond in rule:
        # Match compound ranges like '3.00 < occupation <= 5.00'
        match = re.match(r"([\d\.]+)\s*<\s*(.+)\s*<=\s*([\d\.]+)", cond)
        if match:
            low, col, high = match.groups()
            col = col.strip()
            expr = f"({float(low)} < `{col}` <= {float(high)})"
            query_parts.append(expr)
            continue
        
        # Match simple binary comparisons
        match = re.match(r"(.+?)\s*(<=|>=|<|>|==)\s*([\d\.]+)", cond)
        if match:
            col, op, val = match.groups()
            col = col.strip()
            expr = f"(`{col}` {op} {float(val)})"
            query_parts.append(expr)
            continue

        raise ValueError(f"Unsupported condition format: {cond}")
    
    return " & ".join(query_parts)

def calculate_fidelity(model_fn, explainer, dataset, tresh=0.95, num_samples=100):
    fidelity_scores = []
    sampled_count = 0

    random_indices = np.random.permutation(len(dataset))  # Randomly shuffle indices

    while len(fidelity_scores) < num_samples:  # Continue until we have num_samples valid instances
        # Get a new instance and its true label
        i = random_indices[sampled_count]
        x = dataset.iloc[[i]]
        y_true = model_fn(x).item()  # model prediction for the instance
        sampled_count += 1

        # Explain the instance with Anchor
        exp = explainer.explain_instance(x.values, classifier_fn=model_fn, threshold=tresh)
        rule = exp.names()  # e.g., ["age > 30", "capital-gain <= 0"]

        if not rule:
            continue  # Skip if no rule found

        # Convert the Anchor rule to a pandas query expression
        query_str = conv_rule_to_pandas_query(rule)

        # Check how many of the covered instances match the model's original prediction
        covered = dataset.query(query_str)
        pred_match = model_fn(covered) == y_true
        fidelity = pred_match.mean() if len(covered) > 0 else 0

        fidelity_scores.append(fidelity)

    return np.mean(fidelity_scores)


thresh = 0.4 # default: 0.95
num_samples = 15

rounds = 5

fidelity_lg, fidelity_xgb, fidelity_cat = 0,0,0
for _ in tqdm(range(rounds), desc="Calculating fidelity"):
    fid_lg_current = calculate_fidelity(lg.predict, explainer, test_set, tresh=thresh, num_samples=num_samples)
    fid_xgb_current = calculate_fidelity(xgb_fn, explainer, test_set, tresh=thresh, num_samples=num_samples)
    fid_cat_current = calculate_fidelity(cat.predict, explainer, test_set, tresh=thresh, num_samples=num_samples)

    fidelity_lg += fid_lg_current / float(rounds)
    fidelity_xgb += fid_xgb_current / float(rounds)
    fidelity_cat += fid_cat_current / float(rounds)

print("\nFidelity results:")
print(f"LG:\t{fidelity_lg:.3f}")
print(f"XGB:\t{fidelity_xgb:.3f}")
print(f"CAT:\t{fidelity_cat:.3f}")

### 5. Calculate Stability

In [14]:
from tqdm import tqdm
from copy import deepcopy
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import MinMaxScaler

def jaccard_distance(exp_x: set, exp_x_neigh: set, debug):
    """Distance between two explanations."""
    union = exp_x | exp_x_neigh
    if not union:
        return 0.0  # If both are empty, treat as identical
    jacc = 1 - len(exp_x & exp_x_neigh) / len(union)
    if debug:
        print("\tJacc:\t", jacc)
    return jacc

def hamming_distance(exp_x: set, exp_x_neigh: set, debug=False):
    """Distance between two explanations using Hamming distance."""
    # Calculate the symmetric difference between the two sets
    diff = exp_x ^ exp_x_neigh  # ^ is symmetric difference
    # Normalize the distance by the maximum possible number of differences (size of the union)
    max_diff = max(len(exp_x), len(exp_x_neigh))
    if max_diff == 0:
        return 0.0  # If both are empty, treat as identical
    
    hamming = len(diff) / max_diff
    
    if debug:
        print("\tHamming:\t", hamming)
    
    return hamming

def eucl_distance(x1: np.ndarray, x2: np.ndarray, debug):
    """Euclidean distance between inputs."""
    eucl = np.linalg.norm(x1 - x2)
    if debug:
        print("\tEucl:\t", eucl)
    return eucl

def norm_eucl_distance(x1: np.ndarray, x2: np.ndarray, scaler: MinMaxScaler, debug):
    """Euclidean distance between inputs, first normalizing features."""
    
    # Normalize the input vectors (x1 and x2)
    x1_normalized = scaler.transform([x1])
    x2_normalized = scaler.transform([x2])

    # Calculate the Euclidean distance between the normalized vectors
    eucl_dist = np.linalg.norm(x1_normalized - x2_normalized)

    if debug:
        print("\tEuc_norm:\t", eucl_dist)
        
    return eucl_dist

def calc_lipschitz(exp_x, exp_xp, x, xp, debug, sim, scaler=None):
    if sim == "jaccard":
        exp_dist = jaccard_distance(exp_x, exp_xp, debug)
    else:
        exp_dist = hamming_distance(exp_x, exp_xp, debug)

    if scaler is not None:
        input_dist = norm_eucl_distance(x, xp, scaler, debug)
    else:
        input_dist = eucl_distance(x, xp, debug)

    lip = exp_dist/input_dist
    if debug:
        print("\tLip:\t", lip)
    return lip

def generate_perturbation_neighborhood(x: pd.Series, dataset: pd.DataFrame, num_samples=10, max_perturbation=2):
    """Generates a synthetic neighborhood around x by applying small perturbations."""
    neighborhood = []

    for _ in range(num_samples):
        x_prime = deepcopy(x)

        for col in dataset.columns:
            # Apply a small perturbation, limiting to max_perturbation in magnitude
            perturbation = np.random.randint(-max_perturbation, max_perturbation + 1)
            new_value = x[col] + perturbation

            # Ensure the perturbed value is non-negative (not below 0)
            x_prime[col] = max(0, new_value)

        neighborhood.append(x_prime.values)

    return np.array(neighborhood)

def sample_neighborhood(x: np.ndarray, nn: NearestNeighbors, dataset: pd.DataFrame, k=10):
    _, indices = nn.kneighbors(x, n_neighbors=k)
    neigh_idcs = indices[0][1:]
    neigh_vals = dataset.iloc[neigh_idcs].values
    return neigh_vals


def calculate_stability(model, explainer, dataset, k_neighbors=10, thresh=0.95, num_samples=1, debug=False, neigh="gen", sim="jaccard", norm=False):
    assert neigh in ["gen","sampled"]
    assert sim in ["jaccard","hamming"]

    stability_scores = []
    if neigh == "sampled":
        nn = NearestNeighbors(n_neighbors=k_neighbors + 1).fit(dataset.values)

    if norm:
        scaler = MinMaxScaler()
        scaler.fit_transform(dataset.values)

    for i in tqdm(range(num_samples), desc="Calculating stability..."):
        x = dataset.iloc[i]
        x_val = x.values.reshape(1, -1)
        if debug:
            print("Row:\t", list(x.values))

        try:
            anchor_x = explainer.explain_instance(x_val[0], classifier_fn=model.predict, threshold=thresh)
            rule_x = set(anchor_x.names())
            if not rule_x:
                if debug:
                    print(f"No rule on index {i}. Skipping instance...")
                continue  # Skip if no rule was found
            if debug:
                print("Exp:\t", anchor_x.names())
        except:
            if debug:
                print(f"Anchor failed on index {i}. Skipping instance...")
            continue  # Skip instances where anchor fails

        # Find neighbors (excluding self)
        if neigh == "gen":
            neigh_vals = generate_perturbation_neighborhood(x, dataset, num_samples=k_neighbors)
        else:
            neigh_vals = sample_neighborhood(x_val, nn, dataset)

        lipschitz_vals = []
        for j, x_prime in enumerate(neigh_vals):
            if debug:
                print(f"\n\tNN{j}:\t", list(x_prime))
            #try:
            anchor_xp = explainer.explain_instance(x_prime.reshape(1, -1), classifier_fn=model.predict, threshold=thresh)
            if debug:
                print("\tExp:\t", anchor_xp.names())
            
            rule_xp = set(anchor_xp.names())

            lip = calc_lipschitz(
                exp_x=rule_x,
                exp_xp=rule_xp,
                x=x.values,
                xp=x_prime,
                debug=debug,
                sim=sim,
                scaler=scaler if norm else None
            )
            lipschitz_vals.append(lip)
            #except:
                #print(f"Error occured in neighbor explanation. Skipping instance.")
                #continue  # If explanation fails, skip

        if lipschitz_vals:
            max_lip = np.max(lipschitz_vals)
            if debug:
                print("\nMax Lipstein:\t", max_lip)
            stability_scores.append(max_lip)

    return np.mean(stability_scores) if stability_scores else 0.0

num_samples = 5
debug = False
neigh = "sampled" # "sampled" | "gen"
sim = "hamming" # "hamming" | "jaccard"
norm = True

stabilities = []
for model in [lg, xgb, cat]:
    stability = calculate_stability(
        xgb, explainer, test_set, 
        num_samples=num_samples,
        debug=debug,
        neigh=neigh,
        sim=sim,
        norm=norm
    )
    stabilities.append(stability)

print("LG Stability:", stabilities[0])
print("XGB Stability:", stabilities[1])
print("CAT Stability:", stabilities[2])

Calculating stability...:   0%|          | 0/5 [00:00<?, ?it/s]

Calculating stability...: 100%|██████████| 5/5 [00:30<00:00,  6.03s/it]
Calculating stability...: 100%|██████████| 5/5 [00:29<00:00,  5.96s/it]
Calculating stability...: 100%|██████████| 5/5 [00:29<00:00,  5.97s/it]

LG Stability: 1.0710900227428695
XGB Stability: 1.125003788507453
CAT Stability: 1.1514746079361342





## Brute Force Paper Params

In [None]:
from scipy.spatial import distance
from sklearn.neighbors import NearestNeighbors
from sklearn.preprocessing import (
    StandardScaler,
    Normalizer,
    MinMaxScaler,
    RobustScaler,
    QuantileTransformer,
)


preprocessing = {
    #"raw": lambda x: x,
    "l1": Normalizer(norm='l1'),
    #"l2": Normalizer(norm='l2'),
    #"zscore": StandardScaler(),
    #"minmax": MinMaxScaler(),
    #"robust": RobustScaler(),
    #"quantile_normal": QuantileTransformer(output_distribution='normal'),
    #"quantile_uniform": QuantileTransformer(output_distribution='uniform'),
}
distance_metrics = {
    #"euclidean": lambda x1, x2: np.linalg.norm(x1 - x2, ord=2),
    "manhattan": lambda x1, x2: np.linalg.norm(x1 - x2, ord=1),
    #"cosine": lambda x1, x2: distance.cosine(x1, x2),
}
expl_differences = {
    "jaccard": lambda exp1, exp2: jaccard_distance(exp1, exp2),
    #"hamming": lambda exp1, exp2: hamming_distance(exp1, exp2),
}
model_fns = {
    "lg": lg.predict,
    "xgb": xgb_fn,
    "cat": cat.predict,
}
paper = {
    "means": {
        "lg": 101.07,
        "xgb": 121.40,
        "cat": 123.79,
    },
    "std": {
        "lg": 62.75,
        "xgb": 98.43,
        "cat": 76.86,
    }
}


num_samples = 10
k_neighbors = 10
thresh = 0.95


def jaccard_distance(exp_x: set, exp_x_neigh: set, debug=False):
    """Distance between two explanations."""
    union = exp_x | exp_x_neigh
    if not union:
        return 0.0  # If both are empty, treat as identical
    jacc = 1 - len(exp_x & exp_x_neigh) / len(union)
    if debug:
        print("\tJacc:\t", jacc)
    return jacc

def hamming_distance(exp_x: set, exp_x_neigh: set, debug=False):
    """Distance between two explanations using Hamming distance."""
    # Calculate the symmetric difference between the two sets
    diff = exp_x ^ exp_x_neigh  # ^ is symmetric difference
    # Normalize the distance by the maximum possible number of differences (size of the union)
    max_diff = max(len(exp_x), len(exp_x_neigh))
    if max_diff == 0:
        return 0.0  # If both are empty, treat as identical
    
    hamming = len(diff) / max_diff
    
    if debug:
        print("\tHamming:\t", hamming)
    
    return hamming

nn = NearestNeighbors(n_neighbors=k_neighbors + 1).fit(test_set.values)
def neighbors_of(x: np.ndarray):
    _, indices = nn.kneighbors(x, n_neighbors=k_neighbors)
    neigh_idcs = indices[0][1:]
    neigh_vals = test_set.iloc[neigh_idcs].values
    return neigh_vals




sample_indices = np.random.choice(len(test_set), size=num_samples, replace=False)
stability_results = []

for pre_name, pre in preprocessing.items():
    print(f"== Preprocessing: {pre_name} ==")

    # Fit-transform
    if callable(pre):  # for "raw"
        X_prep = pre(test_set.values)
    else:
        X_prep = pre.fit_transform(test_set.values)

    for dist_name, dist_fn in distance_metrics.items():
        print(f"  -- Distance: {dist_name}")

        nn = NearestNeighbors(n_neighbors=k_neighbors + 1, metric="euclidean")
        nn.fit(X_prep)

        for model_name, model_fn in model_fns.items():
            print(f"    >> Model: {model_name}")

            for expl_name, expl_diff_fn in expl_differences.items():
                print(f"      || Exp-diff: {expl_name}")

                L_x = []
                for i in sample_indices:
                    x = test_set.values[i]
                    x_prep = X_prep[i].reshape(1, -1)
                    _, indices = nn.kneighbors(x_prep)
                    neigh_idcs = indices[0][1:]
                    
                    x_anchor = explainer.explain_instance(
                        x, classifier_fn=model_fn, threshold=thresh
                    )
                    rule_x = set(x_anchor.names())

                    lips = []
                    for j in neigh_idcs:
                        x_prime = test_set.values[j]
                        x_prime_prep = X_prep[j]

                        x_prime_anchor = explainer.explain_instance(
                            x_prime, classifier_fn=model_fn, threshold=thresh
                        )
                        rule_xp = set(x_prime_anchor.names())

                        # Explanation distance
                        expl_diff = expl_diff_fn(rule_x, rule_xp)

                        # Input distance
                        input_dist = dist_fn(X_prep[i], x_prime_prep)
                        if input_dist == 0:
                            print("Input distance == 0, skipping...")
                            continue  # skip duplicates or same point

                        lip = expl_diff / input_dist
                        lips.append(lip)
                    
                    L_x.append(max(lips))

                if L_x:
                    stability_results.append({
                        "preprocessing": pre_name,
                        "distance": dist_name,
                        "model": model_name,
                        "lipschitz_vals": L_x,
                        "liptschitz_mean": np.mean(L_x),
                    })
                print("      Lip:", np.mean(L_x))
                print("      Err:", abs(np.mean(L_x) - paper["means"][model_name]))

== Preprocessing: l1 ==
  -- Distance: euclidean
    >> Model: lg
      || Exp-diff: jaccard
      Lip: 396.56337559385486
      Err: 295.49337559385486
      || Exp-diff: hamming
      Lip: 621.249287074608
      Err: 520.179287074608
  -- Distance: manhattan
    >> Model: lg
      || Exp-diff: jaccard
      Lip: 130.98610743360933
      Err: 29.91610743360934
      || Exp-diff: hamming
      Lip: 197.32956478084284
      Err: 96.25956478084285
  -- Distance: cosine
    >> Model: lg
      || Exp-diff: jaccard
      Lip: 528298.719241725
      Err: 528197.6492417251
      || Exp-diff: hamming
      Lip: 845995.4596325671
      Err: 845894.3896325672
== Preprocessing: l2 ==
  -- Distance: euclidean
    >> Model: lg
      || Exp-diff: jaccard
      Lip: 437.88487328755707
      Err: 336.8148732875571
      || Exp-diff: hamming
      Lip: 721.0975801336272
      Err: 620.0275801336272
  -- Distance: manhattan
    >> Model: lg
      || Exp-diff: jaccard
      Lip: 146.94376655081436
      

KeyboardInterrupt: 