In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt

import torch.optim as optim
from sklearn.model_selection import train_test_split
import torch
import torch.nn as nn

from sklearn.preprocessing import LabelEncoder

from models.mlp import BlackBoxModel

from models.rbf import RBFNet
from models.svm import LinearSVM
from explainers.model import Model
from lightgbm import LGBMClassifier
from utils.datasets import dataset_loader

pd.set_option('display.max_columns', None)

%reload_ext autoreload
%autoreload 2


In [None]:
def bold(string):
    return "\033[1m" + string + "\033[0m"

In [None]:
name = 'compas'
dropped_features = []#UCIDatasets().continuous_features[dataset]
dataset = dataset_loader(name, dropped_features=dropped_features, n_bins=None)

In [None]:
dataset.data

In [None]:
X_train, y_train, X_test, y_test, mean, std = dataset.get_split(normalise=False, shuffle=False,
                                                                     return_mean_std=True)
prop1s = round(np.average(y_train)*100, 2)
print(bold("Proportion of 1s in Training Data:") + " {}%".format(prop1s))

In [None]:
X = pd.DataFrame(X_train)
X.columns = dataset.features[:-1]
X_train = pd.DataFrame(X_train)
X_train.columns = dataset.features[:-1]
X_test = pd.DataFrame(X_test)
X_test.columns = dataset.features[:-1]
y_train = pd.DataFrame(y_train)
y_test = pd.DataFrame(y_test)
print(bold("Dataset:") + " {}\n".format(name.replace('_', ' ').title()))
X

In [None]:
target_name = 'Status'

In [None]:
seed = 43

np.random.seed(seed)  # for reproducibility


std = X_train.std()
mean = X_train.mean()

for col in ['Priors_Count', 'Time_Served']:
    X_train[col] = (X_train[col] - X_train[col].mean()) / X_train[col].std()
    X_test[col] = (X_test[col] - X_test[col].mean()) / X_test[col].std()

X_train, X_test, y_train, y_test = X_train.values, X_test.values, y_train.values, y_test.values

X_train_tensor = torch.FloatTensor(X_train)
y_train_tensor = torch.FloatTensor(y_train).view(-1, 1)
X_test_tensor = torch.FloatTensor(X_test)
y_test_tensor = torch.FloatTensor(y_test).view(-1, 1)

# Initialize the model
model_raw = LGBMClassifier(n_estimators=100, random_state=seed, verbose=-1)

# Train the model
model_raw.fit(X_train, y_train.ravel())
model = Model(model=model_raw, backend="lightgbm", data=None)

# Evaluate on test set
y_pred = model_raw.predict(X_test)
accuracy = (y_pred == y_test.ravel()).mean()
accuracy


In [None]:
sample_num = 50
X_test = pd.DataFrame(X_test, columns=X.columns)

np.random.seed(seed)
indice = pd.Index(np.random.choice(len(X_test), size=sample_num, replace=False))

df_explain = X_test.loc[indice]

# y_target = torch.distributions.beta.Beta(0.1, 0.9).sample((sample_num,))
y_test = pd.Series(y_test.reshape(-1))
y_true = y_test.loc[indice]

y = model(torch.FloatTensor(df_explain.values))

In [None]:
def postprocessing(counterfactual_X):

    prior_count_col = counterfactual_X['Priors_Count']
    time_served_col = counterfactual_X['Time_Served']
    counterfactual_X = (counterfactual_X>0.5).replace({False:0 ,True:1})
    counterfactual_X['Priors_Count'] = prior_count_col
    counterfactual_X['Time_Served'] = time_served_col
    
    return counterfactual_X

## GLOBE_CE

In [None]:
from explainers.globe_ce import GLOBE_CE

In [None]:
normalise = None

# AReS initiated to determine bin widths for costs
from explainers.ares import AReS

X_for_ares = (
    dataset.data.drop(columns=[target_name])
    .apply(pd.to_numeric, errors="coerce")
    .fillna(0)
    .astype(np.float32)
)

ares = AReS(model=model_raw, dataset=dataset, X=X_for_ares, n_bins=10, normalise=normalise)  # Use raw model for AReS
bin_widths = ares.bin_widths


In [None]:
# example of ordinal features usage
ordinal_features = ['Present-Employment'] if name == 'german_credit' else []
# initialise GLOBE_CE
globe_ce = GLOBE_CE(model=model_raw, dataset=dataset, X=df_explain, affected_subgroup=None,
                    dropped_features=dropped_features, ordinal_features=ordinal_features, delta_init='zeros',
                    normalise=normalise, bin_widths=bin_widths, monotonicity=None, p=1)

In [None]:
globe_ce.sample(n_sample=sample_num, magnitude=2, sparsity_power=1,  
                idxs=None, n_features=5, disable_tqdm=False,  
                plot=True, seed=0, scheme='random', dropped_features=dropped_features)
delta = globe_ce.best_delta  # pick best delta
globe_ce.select_n_deltas(n_div=3)

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=globe_ce.deltas_div.shape[0], dpi=150)
fig.set_figwidth(12)
fig.set_figheight(3)
plt.subplots_adjust(wspace=0.3)
for i in range(globe_ce.deltas_div.shape[0]):
    delta_cost = globe_ce.deltas_div[i] * globe_ce.feature_costs_vector
    cycle = plt.rcParams['axes.prop_cycle'].by_key()['color']
    j, k = 0, 0
    for feature in globe_ce.features_tree:
        if not globe_ce.features_tree[feature]:
            ax[i].bar(range(j, j+1), delta_cost[j], hatch='/',
                        linewidth=1, edgecolor='black', color=cycle[k%len(cycle)])
            j += 1
            k += 1
        else:
            feature_values = globe_ce.features_tree[feature]
            n_f = len(feature_values)
            ax[2].bar(range(j, j+n_f), delta_cost[j:j+n_f], color=cycle[k%len(cycle)])
            j += n_f
            k += 1
    ax[i].set_title(f'Direction {i+1}')
    ax[i].set_xlabel('Feature Index')
    ax[i].set_ylabel('Cost')
plt.show()

In [None]:
n_div = globe_ce.deltas_div.shape[0]
min_costs = np.zeros((n_div, globe_ce.x_aff.shape[0]))
min_costs_idxs = np.zeros((n_div, globe_ce.x_aff.shape[0]))
for i in range(n_div): 
    cor_s, cos_s, k_s = globe_ce.scale(globe_ce.deltas_div[i], disable_tqdm=False, vector=True) 
    min_costs[i], min_costs_idxs[i] = globe_ce.min_scalar_costs(cos_s, return_idxs=True, inf=True) 
min_costs = min_costs.min(axis=0)

In [None]:
ces = globe_ce.round_categorical(globe_ce.x_aff+globe_ce.best_delta) if globe_ce.n_categorical else globe_ce.x_aff+globe_ce.best_delta
counterfactual_X_global_ce = pd.DataFrame(ces, columns=X_test.columns)
counterfactual_X_global_ce = postprocessing(counterfactual_X_global_ce)
counterfactual_y_global_ce = model_raw.predict(counterfactual_X_global_ce.values)

In [None]:
print('Coverage (Globe CE):', counterfactual_y_global_ce.sum()/len(counterfactual_y_global_ce))

In [None]:
factual_X = pd.DataFrame(globe_ce.x_aff, columns=df_explain.columns)
factual_y = model_raw.predict(factual_X.values)

In [None]:
y_target = torch.ones(factual_X.shape[0])

In [None]:
costs_vector = globe_ce.feature_costs_vector

## AReS

In [None]:
# AReS initiated to determine bin widths for costs
from explainers.ares import AReS
ares = AReS(model=model_raw, dataset=dataset, X=factual_X, n_bins=10, normalise=normalise)  

In [None]:
ares.generate_itemsets(apriori_threshold=0.2, max_width=None, # defaults to e2-1
                       affected_subgroup=None, save_copy=False)

ares.generate_groundset(max_width=None, RL_reduction=True,
                        then_generation=None, save_copy=False)
lams = [1, 0]  # can play around with these lambda values
ares.evaluate_groundset(lams=lams, r=194, save_mode=1,
                        disable_tqdm=False, plot_accuracy=True)
ares.select_groundset(s=194)
ares.optimise_groundset(lams=lams, factor=1, print_updates=False,
                        print_terms=False)

In [None]:
counterfactual_X_ares = pd.DataFrame(ares.R.cfx_matrix[0], columns=X_test.columns)
counterfactual_X_ares = postprocessing(counterfactual_X_ares)
counterfactual_y_ares = model_raw.predict(counterfactual_X_ares.values)

In [None]:
print('Coverage (AReS):', counterfactual_y_ares.sum()/len(counterfactual_y_ares))

## Diverse Counterfactual Explanation (DiCE)

In [None]:
backend = 'sklearn'  # Use sklearn backend for tree models

In [None]:
import dice_ml

m = dice_ml.Model(model=model_raw, backend='sklearn')  # Use sklearn backend for tree models

factual_X_ext = factual_X.copy()
factual_X_ext[target_name] = factual_y

dice_features = factual_X.columns.drop(['Race = Asian', 'Race = Other']).to_list()

d = dice_ml.Data(dataframe=factual_X_ext, continuous_features=dice_features, outcome_name = target_name)

dice_explainer = dice_ml.Dice(d, m, method="random")  # Use random method for tree models

In [None]:
dice_results = dice_explainer.generate_counterfactuals(query_instances=factual_X, features_to_vary=dice_features, desired_class="opposite", total_CFs=1)

In [None]:
# Iterate through each result and append to the DataFrame
dice_df_list = []
for cf in dice_results.cf_examples_list:
    # Convert to DataFrame and append
    cf_df = cf.final_cfs_df
    dice_df_list.append(cf_df)

counterfactual_X_dice = pd.concat(dice_df_list).reset_index(drop=True).drop(target_name, axis=1)
counterfactual_X_dice = postprocessing(counterfactual_X_dice)

In [None]:
counterfactual_y_dice = model_raw.predict(counterfactual_X_dice.values)

In [None]:
print('Coverage (DiCE):', counterfactual_y_dice.sum()/len(counterfactual_y_dice))

In [None]:
print("="*80)
print("DICE: Computing OT Distance between CF predictions and target")
print("="*80)
y_target_tensor = y_target

dice_y_prob = model_raw.predict_proba(counterfactual_X_dice.values)[:, 1]
dice_y_prob_tensor = torch.FloatTensor(dice_y_prob)


from explainers.distances import WassersteinDivergence
wd = WassersteinDivergence()
ot_dist_dice_y, _ = wd.distance(
    y_s=dice_y_prob_tensor,
    y_t=y_target_tensor,
    delta=0.1
)

print(f"DICE Y Probability OT Distance to Target: {ot_dist_dice_y:.6f}")

## DCE not available for non-differential models

In [None]:
# from explainers.dce import DistributionalCounterfactualExplainer

# delta = 1e-5
# alpha = 0.05
# N = 10

# explain_columns = df_explain.columns

# explainer = DistributionalCounterfactualExplainer(
#     model=model, 
#     df_X=factual_X, 
#     explain_columns=explain_columns,
#     y_target=y_target, 
#     lr=0.1, 
#     n_proj=N,
#     delta=delta,
#     costs_vector=None)

In [None]:
# import os 
# import pickle 
# dump_data_path = './data/baseline/'
# with open(os.path.join(dump_data_path, f"explainer_{model.name}_{name}.pkl"), 'rb') as file:
#     explainer = pickle.load(file)

In [None]:
# explainer.optimize(U_1=0.01, U_2=0.2, l=0.7, r=0.85, max_iter=100, tau=1e3)

In [None]:
# import os 
# import pickle 
# dump_data_path = './data/baseline/'
# with open(os.path.join(dump_data_path, f"explainer_{model.name}_{name}.pkl"), "wb") as file:
#     pickle.dump(explainer, file)

In [None]:
# X_train = pd.DataFrame(X_train, columns=X.columns)

In [None]:
# counterfactual_X_dce = pd.DataFrame(explainer.best_X.detach().numpy(), columns=df_explain.columns)
# counterfactual_X_dce = postprocessing(counterfactual_X_dce)

# dtype_dict = X_train.dtypes.apply(lambda x: x.name).to_dict()
# for k, v in dtype_dict.items():
#     if k in counterfactual_X_dce.columns:
#         if v[:3] == 'int':
#             counterfactual_X_dce[k] = counterfactual_X_dce[k].round().astype(v)
#         else:
#             counterfactual_X_dce[k] = counterfactual_X_dce[k].astype(v)

# counterfactual_y_prob_dce = pd.DataFrame(explainer.y.detach().numpy(),columns=[target_name], index=counterfactual_X_dce.index)
# counterfactual_y_dce = np.int64((counterfactual_y_prob_dce.values > 0.5).reshape(-1))

In [None]:
# print('Coverage (DCE):', counterfactual_y_dce.sum()/len(counterfactual_y_dce))

## DISCOVER (Distributional Counterfactual Solver)

In [None]:
from explainers.DCESolver import DCESolver
from explainers.cone_sampling.monte_carlo import MonteCarloStrategy
# from explainers.data import DataLoader
from data_loader.compas import CompasData
from explainers.model import Model
discover_seed = seed
print(f"DISCOVER seed: {discover_seed}")

In [None]:

class BaselineDataWrapper:
    def __init__(self, df_factual, df_explain, y_test, continuous_cols, categorical_cols):
        self.df = df_factual.copy()
        self.X_train = df_explain.values
        self.y_train = y_test

        self.explain_columns = df_explain.columns.tolist()
        self.continuous_columns = continuous_cols
        self.categorical_columns = categorical_cols

        self.mean = df_factual[continuous_cols].mean().to_dict()
        self.std = df_factual[continuous_cols].std().to_dict()

        for col in continuous_cols:
            if self.std[col] == 0:
                self.std[col] = 1.0
        
    def get_X_init(self):
        import torch
        return torch.from_numpy(self.X_train).float()
    
    def get_y_target(self):
        import torch
        return torch.ones(len(self.X_train))


continuous_features_discover = ['Priors_Count', 'Time_Served']
categorical_features_discover = [col for col in df_explain.columns if col not in continuous_features_discover]

discover_data = BaselineDataWrapper(
    df_factual=factual_X,
    df_explain=factual_X, 
    y_test=factual_y,
    continuous_cols=continuous_features_discover,
    categorical_cols=categorical_features_discover
)

print(f"DISCOVER data wrapper created: {len(factual_X)} samples")
print(f"Continuous features: {len(continuous_features_discover)}")
print(f"Categorical features: {len(categorical_features_discover)}")

In [None]:
discover_solver = DCESolver(model=model, data=discover_data)

discover_strategy = MonteCarloStrategy(
    explainer=discover_solver,
    random_state=discover_seed,
    cone_angle=3.14159/4,  # Ï€/4
    use_cone_sampling_categorical=True,
    use_cone_sampling_continuous=True,
    categorical_step=1.2,
    continuous_step=0.1,
    temperature=2.0,
    h = 2
)

print("DISCOVER solver and strategy initialized")

In [None]:
# Run DISCOVER optimization
counterfactual_X_discover_df = discover_solver.explain(
    df_factual=factual_X,
    explain_columns=factual_X.columns.tolist(),
    categorical_columns=categorical_features_discover,
    continuous_columns=continuous_features_discover,
    y_target=torch.ones(len(factual_X)),
    strategy=discover_strategy,
    X_init=False,  
    n_proj=10,
    delta=1e-5,
    costs_vector=None,
    U_1=0.8, 
    U_2=0.6,   
    alpha=0.05,
    l=0.2,     
    r=1,   
    kappa=0.05,
    max_iter=40,
    num_trials=10,
    bootstrap=True,
    callback=False,
    top_k=1,
    save_results=False, 
    use_global_ranges=False,
    target_ot_y=ot_dist_dice_y
)

print(f"\nDISCOVER optimization completed")
print(f"Best Q found: {discover_solver.best_Q:.6f}")
print(f"Best iteration: {discover_solver.best_iter}")

In [None]:
counterfactual_X_discover = postprocessing(counterfactual_X_discover_df)
counterfactual_y_discover = model_raw.predict(counterfactual_X_discover.values)
print(f"DISCOVER counterfactuals: {len(counterfactual_X_discover)} samples")
print(f"Coverage (DISCOVER): {counterfactual_y_discover.sum()/len(counterfactual_y_discover):.4f}")

## Distance Evaluation

In [None]:
from explainers.distances import SlicedWassersteinDivergence, WassersteinDivergence
from scipy.stats import gaussian_kde, entropy
from numpy.linalg import LinAlgError

def compute_distance(X_s, X_t):
    if type(X_s) == pd.DataFrame:
        X_s = torch.FloatTensor(X_s.values)
    if type(X_t) == pd.DataFrame:
        X_t = torch.FloatTensor(X_t.values)

    if type(X_s) == np.ndarray:
        X_s = torch.FloatTensor(X_s)
    if type(X_t) == np.ndarray:
        X_t = torch.FloatTensor(X_t)

    if X_s.ndim == 1:
        wd = WassersteinDivergence()
        distance, _ = wd.distance(X_s, X_t, delta=0.1)
    else:
        swd = SlicedWassersteinDivergence(
                dim=X_s.shape[1], n_proj=5000
        )
        distance, _ = swd.distance(X_s, X_t, delta=0.1)
    return distance.item()


def compute_kl_divergence(X_s, X_t):
    kl_divergences = []
    for i in range(X_s.shape[1]):  # Iterate over columns (features)
        try:
            # Estimate probability distributions using KDE
            kde_s = gaussian_kde(X_s[:, i])
            kde_t = gaussian_kde(X_t[:, i])

            # Evaluate the densities on a linear space of the same range
            x_min = min(X_s[:, i].min(), X_t[:, i].min())
            x_max = max(X_s[:, i].max(), X_t[:, i].max())
            x = np.linspace(x_min, x_max, 1000)

            # Compute the KL divergence (entropy)
            kl_div = entropy(kde_s(x), kde_t(x))
        except LinAlgError:
            # Catch the singular matrix error and set the divergence to infinity
            kl_div = np.inf

        kl_divergences.append(kl_div)

    # Aggregate the KL divergences
    total_kl_divergence = np.sum(kl_divergences)  # Or use np.mean for average
    return total_kl_divergence

def gaussian_kernel(x, y, sigma=1.0):
    """Compute the Gaussian kernel between x and y"""
    return np.exp(-np.linalg.norm(x - y) ** 2 / (2 * sigma ** 2))

def mmd(X_s, X_t, kernel=gaussian_kernel):
    """Compute the Maximum Mean Discrepancy (MMD) between two samples X_s and X_t"""
    n = X_s.shape[0]
    m = X_t.shape[0]

    # Calculate the kernel values between all points in the first sample
    XX = np.sum([kernel(X_s[i], X_s[j]) for i in range(n) for j in range(n)])
    
    # Calculate the kernel values between all points in the second sample
    YY = np.sum([kernel(X_t[i], X_t[j]) for i in range(m) for j in range(m)])
    
    # Calculate the kernel values between all points across the two samples
    XY = np.sum([kernel(X_s[i], X_t[j]) for i in range(n) for j in range(m)])

    return XX / (n ** 2) + YY / (m ** 2) - 2 * XY / (n * m)


In [None]:
cov_ares = counterfactual_y_ares.sum()/len(counterfactual_y_ares)  
cov_global_ce = counterfactual_y_global_ce.sum()/len(counterfactual_y_global_ce)
cov_dice = counterfactual_y_dice.sum()/len(counterfactual_y_dice)
# cov_dce = counterfactual_y_dce.sum()/len(counterfactual_y_dce)  # DCE not supported 
cov_discover = counterfactual_y_discover.sum()/len(counterfactual_y_discover)

In [None]:
print('Coverage (AReS):', cov_ares) 
print('Coverage (Globe CE):', cov_global_ce)
print('Coverage (DiCE):', cov_dice)
# print('Coverage (DCE):', cov_dce)  # DCE not supported 
print('Coverage (DISCOVER):', cov_discover) 

In [None]:
ot_dist_ares = compute_distance(X_s=counterfactual_X_ares, X_t=factual_X)  
ot_dist_global_ce = compute_distance(X_s=counterfactual_X_global_ce, X_t=factual_X)
# ot_dist_dce = compute_distance(X_s=counterfactual_X_dce, X_t=factual_X)  # DCE not supported
ot_dist_dice = compute_distance(X_s=counterfactual_X_dice.dropna(), X_t=factual_X)
ot_dist_discover = compute_distance(X_s=counterfactual_X_discover, X_t=factual_X) 

print('X Distance (AReS):', ot_dist_ares) 
print('X Distance (Globe CE):', ot_dist_global_ce)
print('X Distance (DiCE):', ot_dist_dice)
# print('X Distance (DCE):', ot_dist_dce)  # DCE not supported
print('X Distance (DISCOVER):', ot_dist_discover)

In [None]:
# ========== Y Distance Evaluation (Predicted Y Probability vs Target Y) ==========
print("="*80)
print("Y Distance Evaluation: Predicted Y Probability vs Target Y")
print("="*80)

y_target_tensor = y_target


# Get predicted y PROBABILITIES (not binary 0/1) for each method's counterfactual results
y_prob_globe = model_raw.predict_proba(counterfactual_X_global_ce.values)[:, 1]
y_prob_ares = model_raw.predict_proba(counterfactual_X_ares.values)[:, 1]
y_prob_dice = model_raw.predict_proba(counterfactual_X_dice.values)[:, 1]
y_prob_discover = model_raw.predict_proba(counterfactual_X_discover.values)[:, 1]

# Convert to tensors
y_prob_globe_tensor = torch.FloatTensor(y_prob_globe)
y_prob_ares_tensor = torch.FloatTensor(y_prob_ares)
y_prob_dice_tensor = torch.FloatTensor(y_prob_dice)
y_prob_discover_tensor = torch.FloatTensor(y_prob_discover)

# Compute OT distance between predicted probabilities and target y (all 1s)
ot_dist_y_globe = compute_distance(X_s=y_prob_globe_tensor, X_t=y_target_tensor)
ot_dist_y_ares = compute_distance(X_s=y_prob_ares_tensor, X_t=y_target_tensor)
ot_dist_y_dice = compute_distance(X_s=y_prob_dice_tensor, X_t=y_target_tensor)
ot_dist_y_discover = compute_distance(X_s=y_prob_discover_tensor, X_t=y_target_tensor)

print(f'Y Probability Distance (AReS vs Target): {ot_dist_y_ares:.6f}')
print(f'Y Probability Distance (GLOBE vs Target): {ot_dist_y_globe:.6f}')
print(f'Y Probability Distance (DiCE vs Target): {ot_dist_y_dice:.6f}')
print(f'Y Probability Distance (DISCOVER vs Target): {ot_dist_y_discover:.6f}')



In [None]:
# ========== Y (Risk) CDF Curve Visualization ==========
import matplotlib.pyplot as plt
import numpy as np

y_factual = model_raw.predict_proba(factual_X.values)[:, 1]
y_ares_cf = model_raw.predict_proba(counterfactual_X_ares.values)[:, 1]
y_globe_cf = model_raw.predict_proba(counterfactual_X_global_ce.values)[:, 1]
y_dice_cf = model_raw.predict_proba(counterfactual_X_dice.values)[:, 1]
y_discover_cf = model_raw.predict_proba(counterfactual_X_discover.values)[:, 1]
y_target_vals = y_target

fig, ax = plt.subplots(figsize=(10, 7), dpi=120)

ax.plot(np.sort(y_factual), np.linspace(0, 1, len(y_factual)),
        label="Factual", color="gray",
        linewidth=2.5, alpha=0.7, linestyle=":")

ax.plot(np.sort(y_ares_cf), np.linspace(0, 1, len(y_ares_cf)),
        label="AReS", color="#FF6B6B",
        linewidth=2, alpha=0.85)

ax.plot(np.sort(y_globe_cf), np.linspace(0, 1, len(y_globe_cf)),
        label="GLOBE", color="#4ECDC4",
        linewidth=2, alpha=0.85)

ax.plot(np.sort(y_dice_cf), np.linspace(0, 1, len(y_dice_cf)),
        label=f"DiCE (mu={y_dice_cf.mean():.3f})", color="#95E1D3",
        linewidth=2, alpha=0.85)

ax.plot(np.sort(y_discover_cf), np.linspace(0, 1, len(y_discover_cf)),
        label="DISCOVER", color="#1E88E5",
        linewidth=2.5, alpha=0.9)

ax.plot(np.sort(y_target_vals), np.linspace(0, 1, len(y_target_vals)),
        label="Target", color="black",
        linestyle="--", linewidth=3, alpha=0.95)

ax.axvline(0.5, color="red", linestyle=":", linewidth=1.5, alpha=0.4)

ax.set_xlabel("Risk Probability (Y)", fontsize=13, fontweight="bold")
ax.set_ylabel("Cumulative Probability (Quantile)", fontsize=13, fontweight="bold")
ax.set_title("Risk (Y) CDF: Methods vs Target(Curves closer to Target have smaller OT distance)",
            fontsize=14, fontweight="bold", pad=15)

ax.legend(loc="lower right", fontsize=10, frameon=True, shadow=True)
ax.grid(True, alpha=0.3, linestyle="--", linewidth=0.5)
ax.set_xlim([0, 1.05])
ax.set_ylim([0, 1.05])

plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np

feature_name = 'Time_Served'

factual_feature = factual_X[feature_name].values
ares_cf_feature = counterfactual_X_ares[feature_name].values
globe_cf_feature = counterfactual_X_global_ce[feature_name].values
dice_cf_feature = counterfactual_X_dice[feature_name].values
discover_cf_feature = counterfactual_X_discover[feature_name].values

fig, ax = plt.subplots(figsize=(12, 8), dpi=120)

ax.plot(np.sort(ares_cf_feature), np.linspace(0, 1, len(ares_cf_feature)),
        label='ARES', linewidth=2.5, alpha=0.9, color='#E74C3C', linestyle='-',
        marker='o', markersize=3, markevery=5)

ax.plot(np.sort(globe_cf_feature), np.linspace(0, 1, len(globe_cf_feature)),
        label='GLobe', linewidth=2.5, alpha=0.9, color='#3498DB', linestyle='-',
        marker='s', markersize=3, markevery=5)

ax.plot(np.sort(dice_cf_feature), np.linspace(0, 1, len(dice_cf_feature)),
        label='DiCE', linewidth=2.5, alpha=0.9, color='#2ECC71', linestyle='-',
        marker='^', markersize=3, markevery=5)

ax.plot(np.sort(discover_cf_feature), np.linspace(0, 1, len(discover_cf_feature)),
        label='DISCOVER', linewidth=2.5, alpha=0.9, color='#9B59B6', linestyle='-',
        marker='d', markersize=3, markevery=5)

ax.plot(np.sort(factual_feature), np.linspace(0, 1, len(factual_feature)),
        label='Factual', linewidth=3, alpha=0.8, color='#2C3E50', linestyle='--')

ax.set_xlabel(f'{feature_name} Value', fontsize=13, fontweight='bold')
ax.set_ylabel('Cumulative Probability', fontsize=13, fontweight='bold')
ax.set_title(f'{feature_name} Distribution CDF Comparison',
            fontsize=14, fontweight='bold', pad=15)

ax.legend(loc='best', fontsize=12, framealpha=0.95, shadow=True)
ax.grid(True, alpha=0.3, linestyle='--', linewidth=0.8)

plt.tight_layout()
plt.show()


In [None]:
import matplotlib.pyplot as plt
import numpy as np
import seaborn as sns

feature_name = 'Time_Served'

factual_feature = factual_X[feature_name].values
ares_cf_feature = counterfactual_X_ares[feature_name].values
globe_cf_feature = counterfactual_X_global_ce[feature_name].values
dice_cf_feature = counterfactual_X_dice[feature_name].values
discover_cf_feature = counterfactual_X_discover[feature_name].values

fig, ax = plt.subplots(figsize=(12, 7), dpi=120)

sns.kdeplot(ares_cf_feature, ax=ax, label='ARES', linewidth=2.5,
            color='#E74C3C', alpha=0.85)

sns.kdeplot(globe_cf_feature, ax=ax, label='GLobe', linewidth=2.5,
            color='#3498DB', alpha=0.85)

sns.kdeplot(dice_cf_feature, ax=ax, label='DiCE', linewidth=2.5,
            color='#2ECC71', alpha=0.85)

sns.kdeplot(discover_cf_feature, ax=ax, label='DISCOVER', linewidth=2.5,
            color='#9B59B6', alpha=0.85)

sns.kdeplot(factual_feature, ax=ax, label='Factual', linewidth=2.5,
            linestyle='--', color='#2C3E50', alpha=0.8)

ax.set_xlabel(f'{feature_name} Value', fontsize=13, fontweight='bold')
ax.set_ylabel('Density', fontsize=13, fontweight='bold')
ax.set_title(f'{feature_name} Distribution Comparison (Gaussian KDE)',
            fontsize=14, fontweight='bold', pad=15)

ax.legend(loc='best', fontsize=11, framealpha=0.95, shadow=True)
ax.grid(True, alpha=0.3, linestyle='--', linewidth=0.8)

plt.tight_layout()
plt.show()


In [None]:
print('X MMD (AReS):', mmd(X_s=counterfactual_X_ares.values, X_t=factual_X.values))  
print('X MMD (Globe CE):', mmd(X_s=counterfactual_X_global_ce.values, X_t=factual_X.values))
print('X MMD (DiCE):', mmd(X_s=counterfactual_X_dice.dropna().values, X_t=factual_X.values))
# print('X MMD (DCE):', mmd(X_s=counterfactual_X_dce.values, X_t=factual_X.values))  # DCE not supported
print('X MMD (DISCOVER):', mmd(X_s=counterfactual_X_discover.values, X_t=factual_X.values)) 

In [None]:
print('X KL-Divergence (AReS):', 
      compute_kl_divergence(X_s=counterfactual_X_ares.values, X_t=factual_X.values))
print('X KL-Divergence (Globe CE):', 
      compute_kl_divergence(X_s=counterfactual_X_global_ce.values, X_t=factual_X.values))
print('X KL-Divergence (DiCE):', 
      compute_kl_divergence(X_s=counterfactual_X_dice.dropna().values, X_t=factual_X.values))
# print('X KL-Divergence (DCE):', 
#       compute_kl_divergence(X_s=counterfactual_X_dce.values, X_t=factual_X.values))
print('X KL-Divergence (DISCOVER):',
      compute_kl_divergence(X_s=counterfactual_X_discover.values, X_t=factual_X.values))

In [None]:
ares_diff_pct = []  
globe_ce_diff_pct = []
dice_diff_pct = []
# dce_diff_pct = []  # DCE not supported
discover_diff_pct = []
for column in df_explain.columns:
    ares_pct = (counterfactual_X_ares[column] - factual_X[column]).abs().sum() / (1e-7 + factual_X[column].abs().sum())  
    globe_ce_pct = (counterfactual_X_global_ce[column] - factual_X[column]).abs().sum() / (1e-7 + factual_X[column].abs().sum())
    dice_pct = (counterfactual_X_dice[column] - factual_X[column]).abs().sum() / (1e-7 + factual_X[column].abs().sum())
    # dce_pct = (counterfactual_X_dce[column] - factual_X[column]).abs().sum() / (1e-7 + factual_X[column].abs().sum())  # DCE not supported
    discover_pct = (counterfactual_X_discover[column] - factual_X[column]).abs().sum() / (1e-7 + factual_X[column].abs().sum())

    ares_diff_pct.append({column: ares_pct}) 
    globe_ce_diff_pct.append({column: globe_ce_pct})
    dice_diff_pct.append({column: dice_pct})
    # dce_diff_pct.append({column: dce_pct})  # DCE not supported
    discover_diff_pct.append({column: discover_pct})

## Cost Evaluation

In [None]:
def compute_cost(delta, costs_vector):
    return np.linalg.norm(delta @ np.diag(costs_vector)) 


def compute_absolute_difference(counterfactual_X, factual_X):
    columns = counterfactual_X.columns.drop(['Priors_Count', 'Time_Served'])
    diff_list = []

    for column in columns:
        diff_list.append((counterfactual_X[column] - factual_X[column]).abs().mean())

    return np.nanmean(diff_list)

def compute_statistic_difference(counterfactual_X, factual_X, metric, columns):
    diff_list = []
    for column in columns:
        val_cf = counterfactual_X[column].agg(metric)
        val_f = factual_X[column].agg(metric)
        diff_list.append(abs(val_cf - val_f)/(abs(val_f)) * 100)

    return np.nanmean(diff_list)


In [None]:
ares_delta = (counterfactual_X_ares - factual_X).dropna().values  
globe_ce_delta = (counterfactual_X_global_ce - factual_X).values
# dce_delta = (counterfactual_X_dce - factual_X).dropna().values  # DCE not supported
dice_delta = (counterfactual_X_dice - factual_X).dropna().values
discover_delta = (counterfactual_X_discover - factual_X).dropna().values

In [None]:
print('Cost (AReS):', compute_cost(ares_delta, costs_vector))  
print('Cost (Globe CE):', compute_cost(globe_ce_delta, costs_vector))
print('Cost (DiCE):', compute_cost(dice_delta, costs_vector))
# print('Cost (DCE):', compute_cost(dce_delta, costs_vector))  # DCE not supported
print('Cost (DISCOVER):', compute_cost(discover_delta, costs_vector))

In [None]:
metric = 'mean'
columns = ['Time_Served'] # , 'Time_Served'
print(f'Difference {metric} (AReS):', compute_statistic_difference(counterfactual_X_ares, factual_X, metric,columns))  # AReS not supported
print(f'Difference {metric} (Globe CE):', compute_statistic_difference(counterfactual_X_global_ce, factual_X, metric,columns))
print(f'Difference {metric} (DiCE):', compute_statistic_difference(counterfactual_X_dice, factual_X, metric,columns))
# print(f'Difference {metric} (DCE):', compute_statistic_difference(counterfactual_X_dce, factual_X, metric,columns))  # DCE not supported
print(f'Difference {metric} (DISCOVER):', compute_statistic_difference(counterfactual_X_discover, factual_X, metric,columns))

In [None]:
metric = 'std'
columns = ['Time_Served'] # , 'Time_Served'
print(f'Difference {metric} (AReS):', compute_statistic_difference(counterfactual_X_ares, factual_X, metric,columns))  
print(f'Difference {metric} (Globe CE):', compute_statistic_difference(counterfactual_X_global_ce, factual_X, metric,columns))
print(f'Difference {metric} (DiCE):', compute_statistic_difference(counterfactual_X_dice, factual_X, metric,columns))
# print(f'Difference {metric} (DCE):', compute_statistic_difference(counterfactual_X_dce, factual_X, metric,columns))  # DCE not supported
print(f'Difference {metric} (DISCOVER):', compute_statistic_difference(counterfactual_X_discover, factual_X, metric,columns))

## Diversity

In [None]:
def compute_average_pairwise_distance(counterfactual_X):
    n = len(counterfactual_X)
    total_distance = 0
    count = 0

    for i in range(n):
        for j in range(i+1, n):
            dist = np.linalg.norm(counterfactual_X.iloc[i] - counterfactual_X.iloc[j])
            total_distance += dist
            count += 1

    if count > 0:
        average_distance = total_distance / count
    else:
        average_distance = 0

    return average_distance


In [None]:
diversity_factual = compute_average_pairwise_distance(factual_X)

In [None]:
print('Diversity (Factual)', diversity_factual)

In [None]:
diversity_ares = compute_average_pairwise_distance(counterfactual_X_ares)  
diversity_global_ce = compute_average_pairwise_distance(counterfactual_X_global_ce)
diversity_dice = compute_average_pairwise_distance(counterfactual_X_dice.dropna())
# diversity_dce = compute_average_pairwise_distance(counterfactual_X_dce)  # DCE not supported
diversity_discover = compute_average_pairwise_distance(counterfactual_X_discover)

In [None]:
print('Diversity (AReS):', diversity_ares) 
print('Diversity (Globe CE):', diversity_global_ce)
print('Diversity (DiCE):', diversity_dice)
# print('Diversity (DCE):', diversity_dce)  # DCE not supported
print('Diversity (DISCOVER):', diversity_discover)

In [None]:
print('Effective Diversity (AReS):', diversity_ares/ot_dist_ares * cov_ares)  
print('Effective Diversity (Globe CE):', diversity_global_ce/ot_dist_global_ce * cov_global_ce)
print('Effective Diversity (DiCE):', diversity_dice/ot_dist_dice * cov_dice)
# print('Effective Diversity (DCE):', diversity_dce/ot_dist_dce * cov_dce)  # DCE not supported
print('Effective Diversity (DISCOVER):', diversity_discover/ot_dist_discover * cov_discover)