# x to przykladowy dataset, Exp to przykladowy explainer

In [1]:
import tensorflow as tf
from tensorflow import keras
import sklearn.neighbors
import numpy as np
import pandas as pd
# pd.set_option("display.max_rows", None)

In [2]:
from data import GermanData, AdultData

german_data = GermanData('data/datasets/input_german.csv', 'data/datasets/labels_german.csv')
adult_data = AdultData('data/datasets/adult.csv')
german_model = keras.models.load_model('models/model_german')
adult_model = keras.models.load_model('models/model_adult')

In [3]:
german_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 2)                 124       
                                                                 
Total params: 124
Trainable params: 124
Non-trainable params: 0
_________________________________________________________________


In [4]:
adult_model.summary()

Model: "sequential_10"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense_20 (Dense)            (None, 1)                 38        
                                                                 
 dense_21 (Dense)            (None, 2)                 4         
                                                                 
Total params: 42
Trainable params: 42
Non-trainable params: 0
_________________________________________________________________


In [5]:
def alpha_metric(explainer, x: pd.DataFrame, dataset: pd.DataFrame) -> float:
    x_columns = x.columns
    cf_x = explainer.generate(x)
    if cf_x is None:
        return float("nan")
    x = x.to_numpy()
    dataset = dataset.to_numpy()
    knn = sklearn.neighbors.KNeighborsClassifier(2)
    knn.fit(dataset, np.ones_like(dataset))
    _, ids = knn.kneighbors(x)
    neighboring_original = dataset[np.squeeze(ids)[1]]
    neighboring_original = pd.DataFrame(neighboring_original, x_columns).T
    cf_neighbor = explainer.generate(neighboring_original)
    if cf_neighbor is None:
        return float("nan")
    return np.linalg.norm(cf_x.to_numpy() - cf_neighbor.to_numpy()) / cf_x.to_numpy().size

In [6]:
from tqdm import trange

class ModelWrapper:
    def __init__(self, model):
        self.model = model

    def generate(self, x: pd.DataFrame):
        x = x.iloc[0]
        x = self.model.generate(x)
        return x

In [7]:
# to jest beta metric
def beta_metric(explainer, x: pd.DataFrame) -> float:
    cf = explainer.generate(x)
    if cf is None:
        return float("nan")
    cf = np.squeeze(cf.to_numpy())
    x = np.squeeze(x.to_numpy())
    return np.mean(~np.isclose(x, cf, atol=0.05))

In [8]:
def gamma_metric(explainer, x: pd.DataFrame, model) -> float:
    cf = explainer.generate(x)
    if cf is None:
        return float("nan")
    label = model.predict(cf)
    original_label = model.predict(x)
    return not np.isclose(label, original_label)

In [9]:
# w pracy to jest δ - delta
def delta_metric(explainer, x: pd.DataFrame) -> float:
    cf = explainer.generate(x)
    if cf is None:
        return float("nan")
    cf = np.squeeze(cf.to_numpy())
    x = np.squeeze(x.to_numpy())
    return np.linalg.norm(x - cf) / x.size

In [10]:
def is_able_to_generate_cf(explainer, x: pd.DataFrame):
    cf = explainer.generate(x)
    return cf is not None

In [11]:
from tqdm import trange

print("Adult constraints:")
for c in adult_data.constraints:
    print(c)
print("\nGerman constraints:")
for c in german_data.constraints:
    print(c)

def filter_nans(func):
    def wrapper(*args, **kwargs):
        return [
            x for x in func(*args, **kwargs) if not np.isnan(x)
        ]
    return wrapper

@filter_nans
def generate_alphas(explainer, dataset, max_examples=200):
        return [
        alpha_metric(ModelWrapper(explainer), dataset.iloc[i].to_frame().T, dataset)
        for i in trange(min(dataset.shape[0], max_examples))
    ]

@filter_nans
def generate_betas(explainer, dataset, max_examples=200):
    return [
        beta_metric(ModelWrapper(explainer), dataset.iloc[i].to_frame().T)
        for i in trange(min(dataset.shape[0], max_examples))
    ]

@filter_nans
def generate_gammas(explainer, dataset, original_model, max_examples=200):
    return [
        gamma_metric(ModelWrapper(explainer), dataset.iloc[i].to_frame().T, original_model)
        for i in trange(min(dataset.shape[0], max_examples))
    ]

@filter_nans
def generate_deltas(explainer, dataset, max_examples=200):
    return [
        delta_metric(ModelWrapper(explainer), dataset.iloc[i].to_frame().T)
        for i in trange(min(dataset.shape[0], max_examples))
    ]


def generate_is_able_to_generate_cf(explainer, dataset, max_samples=1_000):
    return [
        is_able_to_generate_cf(ModelWrapper(explainer), dataset.iloc[i].to_frame().T)
        for i in trange(min(dataset.shape[0], max_samples))
    ]

Adult constraints:
OneHot(name='workclass', start_column=2, end_column=8)
OneHot(name='martial.status', start_column=9, end_column=15)
OneHot(name='occupation', start_column=16, end_column=29)
OneHot(name='race', start_column=30, end_column=34)
OneHot(name='sex', start_column=35, end_column=36)

German constraints:
OneHot(name='account_status', start_column=7, end_column=10)
OneHot(name='credit_history', start_column=11, end_column=15)
OneHot(name='purpose', start_column=16, end_column=25)
OneHot(name='savings', start_column=26, end_column=30)
OneHot(name='sex_status', start_column=31, end_column=34)
OneHot(name='debtors', start_column=35, end_column=37)
OneHot(name='property', start_column=38, end_column=41)
OneHot(name='other_installment_plans', start_column=42, end_column=44)
OneHot(name='housing', start_column=45, end_column=47)
OneHot(name='job', start_column=48, end_column=51)
OneHot(name='phone', start_column=52, end_column=53)
OneHot(name='foreign', start_column=54, end_column=

In [12]:
class ArgmaxModelWrapper:
    def __init__(self, model):
        self.model = model
    def predict(self, x):
        return np.argmax(self.model.predict(x), axis=-1)
german_model_wrapped = ArgmaxModelWrapper(german_model)
adult_model_wrapped = ArgmaxModelWrapper(adult_model)
german_model_predictions = german_model_wrapped.predict(german_data.X_train)
adult_model_predictions = adult_model_wrapped.predict(adult_data.X_train)
adult_model_predictions = np.round(adult_model_predictions)

In [None]:
from cfec.explainers import Fimap

options = {'s_epochs':500, 'g_epochs':500}

fimap_no_constraints_adult = Fimap(l1=0.01, l2=0.1)
fimap_no_constraints_adult.fit(adult_data.X_train, adult_model_predictions, **options)

In [None]:
fimap_with_constraints_adult = Fimap(l1=0.01, l2=0.1, constraints=adult_data.constraints)
fimap_with_constraints_adult.fit(adult_data.X_train, adult_model_predictions, **options)

In [None]:
fimap_no_constraints_german = Fimap(l1=0.01, l2=0.1)
fimap_no_constraints_german.fit(german_data.X_train, german_model_predictions, **options)

In [None]:
fimap_with_constraints_german = Fimap(l1=0.01, l2=0.1, constraints=german_data.constraints)
fimap_with_constraints_german.fit(german_data.X_train, german_model_predictions, **options)

In [None]:
from cfec.explainers import Cadex
cadex_no_constraints_adult = Cadex(adult_model)
cadex_with_constraints_adult = Cadex(adult_model, constraints=adult_data.constraints)
cadex_no_constraints_german = Cadex(german_model)
cadex_with_constraints_german = Cadex(german_model, constraints=german_data.constraints)

In [13]:
def generate_results(model, dataset, max_examples=200, original_model=None):
    return [
        generate_alphas(model, dataset, max_examples=max_examples),
        generate_betas(model, dataset, max_examples=max_examples),
        generate_gammas(model, dataset, original_model=original_model, max_examples=max_examples),
        generate_deltas(model, dataset, max_examples=max_examples ),
    ]

adult_x_test = adult_data.X_test.sample(frac=1)
german_x_test = german_data.X_test.sample(frac=1)

In [None]:
fimap_no_constraints_adult_results = generate_results(fimap_no_constraints_adult, adult_x_test, original_model=adult_model_wrapped)

In [None]:
fimap_with_constraints_adult_results = generate_results(fimap_with_constraints_adult, adult_x_test, original_model=adult_model_wrapped)

In [None]:
fimap_no_constraints_german_results = generate_results(fimap_no_constraints_german, german_x_test, original_model=german_model_wrapped)

In [None]:
fimap_with_constraints_german_results = generate_results(fimap_with_constraints_german, german_x_test, original_model=german_model_wrapped)

In [None]:
cadex_no_constraints_adult_results = generate_results(cadex_no_constraints_adult, adult_data.X_test, original_model=adult_model_wrapped)

In [None]:
cadex_with_constraints_adult_results = generate_results(cadex_with_constraints_adult, adult_data.X_test, original_model=adult_model_wrapped)

In [None]:
cadex_no_constraints_german_results = generate_results(cadex_no_constraints_german, german_data.X_test, original_model=german_model_wrapped)

In [None]:
cadex_with_constraints_german_results = generate_results(cadex_with_constraints_german, german_data.X_test, original_model=german_model_wrapped)

In [None]:
adult_data.X_train.columns, len(adult_data.X_train.columns), len(adult_data.constraints)

In [None]:
for i in range(5):
    x = adult_x_test.iloc[i]
    cf1 = fimap_with_constraints_adult.generate(x)
    cf2 = fimap_no_constraints_adult.generate(x)
    print(np.linalg.norm(x.values - cf1.values))
    print(np.linalg.norm(x.values - cf2.values))
    print()

In [None]:
#fimap_no_constraints_adult_results[1] = generate_betas(fimap_no_constraints_adult, adult_x_test, 100)
#fimap_with_constraints_adult_results[1] = generate_betas(fimap_with_constraints_adult, adult_x_test, 100)
#fimap_no_constraints_german_results[1] = generate_betas(fimap_no_constraints_german, german_x_test, 100)
#fimap_with_constraints_german_results[1] = generate_betas(fimap_with_constraints_german, german_x_test, 100)

In [None]:
format_str = '{:.5f}'
print(" ".join(format_str.format(y) for y in [np.mean(x) for x in fimap_no_constraints_adult_results]))
print(" ".join(format_str.format(y) for y in [np.mean(x) for x in cadex_no_constraints_adult_results]))

print(" ".join(format_str.format(y) for y in [np.mean(x) for x in fimap_with_constraints_adult_results]))
print(" ".join(format_str.format(y) for y in [np.mean(x) for x in cadex_with_constraints_adult_results]))


print(" ".join(format_str.format(y) for y in [np.mean(x) for x in fimap_no_constraints_german_results]))
print(" ".join(format_str.format(y) for y in [np.mean(x) for x in cadex_no_constraints_german_results]))

print(" ".join(format_str.format(y) for y in [np.mean(x) for x in fimap_with_constraints_german_results]))
print(" ".join(format_str.format(y) for y in [np.mean(x) for x in cadex_with_constraints_german_results]))

In [None]:
adult_sampled = pd.concat([adult_data.X_train, adult_data.X_test]).sample(frac=1)
german_sampled = pd.concat([german_data.X_train, german_data.X_test]).sample(frac=1)
is_able_to_generate_cf_no_constraints_adult_results = generate_is_able_to_generate_cf(cadex_no_constraints_adult, adult_sampled)
is_able_to_generate_cf_with_constraints_adult_results = generate_is_able_to_generate_cf(cadex_with_constraints_adult, adult_sampled)
is_able_to_generate_cf_no_constraints_german_results = generate_is_able_to_generate_cf(cadex_no_constraints_german, german_sampled)
is_able_to_generate_cf_with_constraints_german_results = generate_is_able_to_generate_cf(cadex_with_constraints_german, german_sampled)

In [None]:
print(int(100 * np.mean(is_able_to_generate_cf_no_constraints_adult_results)))
print(int(100 * np.mean(is_able_to_generate_cf_with_constraints_adult_results)))
print(int(100 * np.mean(is_able_to_generate_cf_no_constraints_german_results)))
print(int(100 * np.mean(is_able_to_generate_cf_with_constraints_german_results)))

In [14]:
from cfec.constraints import OneHot, ValueMonotonicity, Freeze

german_one_hot_constraints = [OneHot("account_status", 7, 10), OneHot("credit_history", 11, 15),
                            OneHot("purpose", 16, 25), OneHot("savings", 26, 30), OneHot("sex_status", 31, 34),
                            OneHot("debtors", 35, 37), OneHot("property", 38, 41),
                            OneHot("other_installment_plans", 42, 44), OneHot("housing", 45, 47), OneHot("job", 48, 51),
                            OneHot("phone", 52, 53), OneHot("foreign", 54, 55), OneHot("employment", 56, 60)]

german_additional_constraints = [
        #Freeze(['credit']), 
        ValueMonotonicity(['age'], "increasing")
    ]

adult_one_hot_constraints = [
            OneHot('workclass', 2, 8),
            OneHot('martial.status', 9, 15),
            OneHot('occupation', 16, 29),
            OneHot('race', 30, 34),
            OneHot('sex', 35, 36),
        ]

#['race', 'sex', 'native.country']
adult_additional_constraints = [
    Freeze(columns='race'),
    Freeze(columns='sex'),
    Freeze(columns='native.country'),
    ValueMonotonicity(['age'], "increasing")
]

In [16]:
from itertools import chain, combinations
from functools import partial
from cfec.explainers import Cadex, Fimap

def powerset(iterable):
    "powerset([1,2,3]) --> () (1,) (2,) (3,) (1,2) (1,3) (2,3) (1,2,3)"
    s = list(iterable)
    return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))


with open('metryki_wyniki.txt', 'w+') as output_file:
    myprint = partial(print, file=output_file)
    for dataset, original_model, (constraints, additional_constraints) in zip((adult_data, german_data), (adult_model_wrapped, german_model_wrapped), 
            (
                (adult_one_hot_constraints, adult_additional_constraints),
                (german_one_hot_constraints, german_additional_constraints),
            )
        ):
        for additional in powerset(additional_constraints):
            all_constraints = [*constraints, *additional]
            cadex = Cadex(original_model.model, constraints=all_constraints)
            results = generate_results(cadex, dataset.X_test, original_model=original_model)
            myprint(all_constraints)
            myprint(int(100 * np.mean(results)))

100%|██████████| 200/200 [01:15<00:00,  2.64it/s]
100%|██████████| 200/200 [00:29<00:00,  6.69it/s]
100%|██████████| 200/200 [00:48<00:00,  4.10it/s]
100%|██████████| 200/200 [00:26<00:00,  7.59it/s]


UnsupportedOperation: not writable