In [None]:
import torch
import numpy as np
from scipy import stats
import pandas as pd
import seaborn as sns
from openxai.model import LoadModel
from openxai.dataloader import return_loaders
import matplotlib.pyplot as plt

In [None]:
from src.genetic import GeneticAlgorithm
from src.explainer import Explainer

In [None]:
data_name = 'pima' # gaussian, heloc, pima
model_name = 'ann'

In [None]:
loader_train, loader_test = return_loaders(data_name=data_name, download=True, batch_size=128)
# X = loader_test.dataset.data # gaussian, heloc
X = loader_train.dataset.data # pima as test is too small to estimate cd

In [None]:
model = LoadModel(data_name=data_name, ml_model=model_name, pretrained=True)

In [None]:
def partial_dependence(f, xs, X, s):
    X_raw = X.copy()
    X_raw[:, s] = xs
    return f.predict_proba(X_raw)[:, 1].mean()

def conditional_dependence(f, xs, X, s):
    X_raw = X.copy()
    X_s = X_raw[:, s]
    epsilon = X_s.ptp() / 18
    X_cond = X_raw[(X_s > xs - epsilon) & (X_s < xs + epsilon), :]
    if X_cond.shape[0] == 0:
        return partial_dependence(f, xs, X, s)
    else:
        X_cond[:, s] = xs
        return f.predict_proba(X_cond)[:, 1].mean()

In [None]:
i = 0
X_s = X[:, i]
print(partial_dependence(model, np.quantile(X_s, 0.25), X, i))
print(conditional_dependence(model, np.quantile(X_s, 0.25), X, i))

In [None]:
def distance_dtv_nd(X, Y, method="histogram", nbins=None):
    if method == "histogram":
        if nbins is None:
            nbins = int(np.power(X.shape[0] / 10, 1 / X.shape[1]))
            if nbins < 5:
                nbins = 5
        XY = np.concatenate((X, Y))
        _, edges = np.histogramdd(XY, density=True, bins=nbins)
        X_density = np.histogramdd(X, density=False, bins=edges)[0] / X.shape[0]
        Y_density = np.histogramdd(Y, density=False, bins=edges)[0] / Y.shape[0]
    elif method == "kernel":
        X_kernel = stats.gaussian_kde(X.T)
        Y_kernel = stats.gaussian_kde(Y.T)
        XY = np.concatenate((X.T, Y.T), axis=1)
        X_density = X_kernel.pdf(XY) / X_kernel.pdf(XY).sum()
        Y_density = Y_kernel.pdf(XY) / Y_kernel.pdf(XY).sum()
    return np.sum(np.abs(X_density - Y_density)) / 2

In [None]:
temp = {}
for s in range(X.shape[1]):
    temp[s] = []
    X_s = X[:, i]
    for xs in np.linspace(X_s.min(), X_s.max(), 18):
        temp[s] += [partial_dependence(model, xs, X, s)]

In [None]:
temp_pd = pd.DataFrame(temp)
feature_importance_rank = temp_pd.var(axis=0).argsort().values

In [None]:
s = feature_importance_rank[len(feature_importance_rank)//2]
xs = X[:, s].mean()
corr_X = np.corrcoef(X.T)
np.fill_diagonal(corr_X, 0)
K = 4
# features_to_change = corr_X[:, s].argsort()[-K:]
features_to_change = feature_importance_rank[-np.array(range(1, K+1))]
exp = Explainer(model, xs, X, s, "pd")
alg = GeneticAlgorithm(exp, np.setdiff1d(list(range(X.shape[1])), features_to_change), stop_iter=100)
alg.attack(max_iter=100, random_state=123)

In [None]:
np.random.seed(123)

bounds = pd.DataFrame({'s': [], 'xs': [], 'dfe': [], 'label': []})

for s_name in ['most important', 'median important', 'least important']:
    features_to_change = feature_importance_rank[-np.array(range(1, K+1))]
    if s_name == "most important":
        s = feature_importance_rank[-1]
        features_to_change = feature_importance_rank[-np.array(range(2, K+2))]
    elif s_name == "median important":
        s = feature_importance_rank[len(feature_importance_rank)//2]
    elif s_name == "least important":
        s = feature_importance_rank[0]
    X_s = X[:, s]                        

    for q in [0.2, 0.5, 0.8]:
        xs = np.quantile(X_s, q)
        pd1 = partial_dependence(model, xs, X, s)
        cd1 = conditional_dependence(model, xs, X, s)
        X2 = X.copy()
        ###
        temp = X.copy()
        temp[:, s] = xs
        ###
        B_xs = model.predict_proba(temp)[:, 1].max()
        print(f'----- s = {s_name} | x_s = q_{q} | B_xs = {B_xs} | pd_s = {pd1} | cd_s = {cd1} -----')

        bounds = pd.concat([bounds, pd.DataFrame({
            's': [s_name]*2, 
            'xs': ['q_'+str(q)]*2, 
            'dfe': [np.max([np.abs(B_xs - pd1), pd1]), np.max([np.abs(B_xs - cd1), cd1])], 
            'label': ['marginal', 'conditional']
        })])
bounds = bounds.assign(xs=bounds["xs"].str.replace("_", ""))

In [None]:
bounds.to_csv(f'results/{model_name}_{data_name}_remark2.csv', index=False)

In [None]:
np.random.seed(123)
B = model.predict_proba(X)[:, 1].max()

result = pd.DataFrame({'s': [], 'xs': [], 'dtv': [], 'dfe': [], 'label': []})

ITER = 200
STOP_ITER = 30
POP_COUNT = 100
TOP_SURVIVORS = 5

corr_X = np.corrcoef(X.T)
np.fill_diagonal(corr_X, 0)
K = 2

for s_name in ['most important', 'median important', 'least important']:
    features_to_change = feature_importance_rank[-np.array(range(1, K+1))]
    if s_name == "most important":
        s = feature_importance_rank[-1]
        features_to_change = feature_importance_rank[-np.array(range(2, K+2))]
    elif s_name == "median important":
        s = feature_importance_rank[len(feature_importance_rank)//2]
    elif s_name == "least important":
        s = feature_importance_rank[0]
    X_s = X[:, s]                        
    epsilon = X_s.ptp() / 18
    # features_to_change = corr_X[:, s].argsort()[-K:]
    for q in [0.2, 0.5, 0.8]:
        xs = np.quantile(X_s, q)
        pd1 = partial_dependence(model, xs, X, s)
        cd1 = conditional_dependence(model, xs, X, s)
        X2 = X.copy()
        ###
        temp = X.copy()
        temp[:, s] = xs
        ###
        print(f'----- s = {s_name} | x_s = q_{q} | B_xs = {model.predict_proba(temp)[:, 1].max()} -----')
        for perturbation in ['random', 'adv']:
            if perturbation == "adv":
                for i, sigma in enumerate([0.01, 0.05, 0.1, 0.25, 0.33]): 
                    TARGET_PD = 0
                    TARGET_CD = 0
                    for seed in range(5):
                        exp_pd = Explainer(model, xs, X, s, "pd")
                        alg_pd = GeneticAlgorithm(exp_pd, np.setdiff1d(list(range(X.shape[1])), features_to_change), 
                                                  std_ratio=sigma, pop_count=POP_COUNT, top_survivor=TOP_SURVIVORS, stop_iter=STOP_ITER)
                        alg_pd.attack(max_iter=ITER, random_state=seed, cache_data=True, target=TARGET_PD)
                        X2 = alg_pd.get_best_data()
                        pd2 = partial_dependence(model, xs, X2, s)
                        d_pd = np.abs(pd1 - pd2)
                        dtv_pd = distance_dtv_nd(X[:, features_to_change], X2[:, features_to_change])

                        exp_cd = Explainer(model, xs, X, s, "cd")
                        alg_cd = GeneticAlgorithm(exp_cd, np.setdiff1d(list(range(X.shape[1])), features_to_change), 
                                                  std_ratio=sigma, pop_count=POP_COUNT, top_survivor=TOP_SURVIVORS, stop_iter=STOP_ITER)
                        alg_cd.attack(max_iter=ITER, random_state=seed, cache_data=True, target=TARGET_CD)
                        X2 = alg_cd.get_best_data()
                        cd2 = conditional_dependence(model, xs, X2, s)
                        d_cd = np.abs(cd1 - cd2)
                        X_cond = X[(X_s > xs - epsilon) & (X_s < xs + epsilon), :]
                        X2_cond = X2[(X2[:, s] > xs - epsilon) & (X2[:, s] < xs + epsilon), :]
                        dtv_cd = distance_dtv_nd(X_cond[:, features_to_change], X2_cond[:, features_to_change])

                        if seed == 0: # check if alternative target is better
                            alg_pd = GeneticAlgorithm(exp_pd, np.setdiff1d(list(range(X.shape[1])), features_to_change), 
                                                      std_ratio=sigma, pop_count=POP_COUNT, top_survivor=TOP_SURVIVORS, stop_iter=STOP_ITER)
                            alg_pd.attack(max_iter=ITER, random_state=seed, cache_data=True, target=1 - TARGET_PD)
                            X2 = alg_pd.get_best_data()
                            pd2 = partial_dependence(model, xs, X2, s)
                            d_pd2 = np.abs(pd1 - pd2) 
                            if d_pd < d_pd2: # if better: update 
                                d_pd = d_pd2
                                dtv_pd = distance_dtv_nd(X[:, features_to_change], X2[:, features_to_change])
                                TARGET_PD = 1 - TARGET_PD

                            alg_cd = GeneticAlgorithm(exp_cd, np.setdiff1d(list(range(X.shape[1])), features_to_change), 
                                                      std_ratio=sigma, pop_count=POP_COUNT, top_survivor=TOP_SURVIVORS, stop_iter=STOP_ITER)
                            alg_cd.attack(max_iter=ITER, random_state=seed, cache_data=True, target=1 - TARGET_CD)
                            X2 = alg_cd.get_best_data()
                            cd2 = conditional_dependence(model, xs, X2, s)
                            d_cd2 = np.abs(cd1 - cd2)
                            if d_cd < d_cd2: # if better: update 
                                d_cd = d_cd2
                                X_cond = X[(X_s > xs - epsilon) & (X_s < xs + epsilon), :]
                                X2_cond = X2[(X2[:, s] > xs - epsilon) & (X2[:, s] < xs + epsilon), :]
                                dtv_cd = distance_dtv_nd(X_cond[:, features_to_change], X2_cond[:, features_to_change])
                                TARGET_CD = 1 - TARGET_CD

                        result = pd.concat([result, pd.DataFrame({
                            's': [s_name]*2, 
                            'xs': ['q_'+str(q)]*2, 
                            'dtv': [dtv_pd, dtv_cd],
                            'dfe': [d_pd, d_cd], 
                            'label': ['adversarial:marginal', 'adversarial:conditional']
                        })])

            if perturbation == "random":
                for i, sigma in enumerate([0.01, 0.05, 0.1, 0.12, 0.25]): 
                    for seed in range(5):
                        np.random.seed(seed)
                        X2[:, features_to_change] += np.random.normal(0, sigma, size=(X2.shape[0], K))
                        pd2 = partial_dependence(model, xs, X2, s)
                        cd2 = conditional_dependence(model, xs, X2, s)
                        dtv_pd = distance_dtv_nd(X[:, features_to_change], X2[:, features_to_change])

                        X_cond = X[(X_s > xs - epsilon) & (X_s < xs + epsilon), :]
                        X2_cond = X2[(X2[:, s] > xs - epsilon) & (X2[:, s] < xs + epsilon), :]
                        dtv_cd = distance_dtv_nd(X_cond[:, features_to_change], X2_cond[:, features_to_change])

                        result = pd.concat([result, pd.DataFrame({
                            's': [s_name]*4, 
                            'xs': ['q_'+str(q)]*4, 
                            'dtv': [dtv_pd, dtv_cd, dtv_pd, dtv_cd],
                            'dfe': [2*B*dtv_pd, 2*B*dtv_cd, np.abs(pd1 - pd2), np.abs(cd1 - cd2)], 
                            'label': ['theoretical:marginal', 'theoretical:conditional', 'random:marginal', 'random:conditional']
                        })])

In [None]:
df = result.copy()

In [None]:
df.to_csv(f'results/{model_name}_{data_name}_k2.csv', index=False)

In [None]:
df = df.assign(xs=df["xs"].str.replace("_", ""))

In [None]:
g = sns.relplot(
    data=df.loc[(df.dfe < B+.01) & (df.dtv < 0.7), :], 
    x="dtv", y="dfe",
    col="xs", row="s", 
    hue="label", style="label",
    kind="line", linewidth=2,
    height=1.8,
    aspect=1.1,
    palette=["#4285F4", "#DB4437", "#4285F4", "#DB4437", "#4285F4", "#DB4437"], # ["black", "#0F9D58", "#F4B400", "#4285F4", "#DB4437"]
    dashes=[(1, 1), (1, 1), (2, 2), (2, 2), "", ""]
)
g._legend.set_title("Input perturbation")
g.set_axis_labels("$d_{\mathrm{TV}}(p_{\mathbf{X}}, p'_{\mathbf{X}})$", 
                  "$| g_s(\mathbf{x}_s; f, p_{\mathbf{X}}) - g_s(\mathbf{x}_s; f, p'_{\mathbf{X}}) |$")
for i, ax in enumerate(g.axes):
    if i == 1:
        ax[0].set_ylabel("")
g.fig.subplots_adjust(top=0.9)
sns.move_legend(g, "upper left", bbox_to_anchor=(.15, 0.65), frameon=True, ncol=3) # (.45, .38)
plt.tight_layout()
# plt.savefig("../figures/exp1_heloc.pdf", bbox_inches='tight')

--------------------

In [None]:
df2 = pd.read_csv(f'results/{model_name}_{data_name}_k2.csv') # gaussian, heloc, pima
df2 = df2.assign(xs=df2["xs"].str.replace("_", ""))

bounds = pd.read_csv(f'results/{model_name}_{data_name}_remark2.csv')

In [None]:
for i, row in bounds.iterrows():
    ix = (df2['s'] == row['s']) &\
        (df2['xs'] == row['xs']) &\
        (df2['label'].str.split(":", expand=True).iloc[:, 0] == "theoretical") &\
        (df2['label'].str.split(":", expand=True).iloc[:, 1] == row['label']) &\
        (df2['dfe'] > row['dfe'])
    df2.loc[ix, "dfe"] = row['dfe']
    

In [None]:
g = sns.relplot(
    data=df2,
    x="dtv", y="dfe",
    col="xs", row="s", 
    hue="label", style="label",
    kind="line", linewidth=2,
    height=1.8,
    aspect=1.1,
    palette=["#4285F4", "#DB4437", "#4285F4", "#DB4437", "#4285F4", "#DB4437"], # ["black", "#0F9D58", "#F4B400", "#4285F4", "#DB4437"]
    dashes=[(1, 1), (1, 1), (2, 2), (2, 2), "", ""]
)
g._legend.set_title("Input perturbation")
g.set_axis_labels("$d_{\mathrm{TV}}(p_{\mathbf{X}}, p'_{\mathbf{X}})$", 
                  "$| g_s(\mathbf{x}_s; f, p_{\mathbf{X}}) - g_s(\mathbf{x}_s; f, p'_{\mathbf{X}}) |$")
for i, ax in enumerate(g.axes):
    if i == 1:
        ax[0].set_ylabel("")
# g.fig.subplots_adjust(top=1.5)
sns.move_legend(g, "upper left", bbox_to_anchor=(.15, 1.13), frameon=True, ncol=3) # (.45, .38)
plt.tight_layout()
# plt.savefig("../figures/exp1_pima_tighter.pdf", bbox_inches='tight')