In [None]:
import sys
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix
from raiutils.exceptions import UserConfigValidationException

In [None]:
preprocessor_dir = 'modules'
sys.path.append(preprocessor_dir)

In [None]:
from counterfactual_explainers.apply_dice import apply_dice

In [None]:
preprocessor_dir = 'modules/preprocessors'
sys.path.append(preprocessor_dir)

In [None]:
seed = 420000
np.random.seed(seed)

In [None]:
def evaluate_counterfactual(ce: pd.DataFrame, reference: pd.DataFrame, labeler, decisive_columns,
                            criteria='both') -> bool:
    """
    method to evaluate a counterfactual explanation if the decision-making mechanism is known

    :param ce: counterfactual instance
    :param reference: original instance
    :param labeler: function to obtain the ground truth of the counterfactual instance
    :param decisive_columns: decisive columns for the ground truth decision
    :param criteria: 'both' criteria or criteria 'one' or 'two'
    :return: boolean if counterfactual explanation is true
    """

    # evaluate counterfactual explanations -- two criteria:
    # 1) if I would label this instance, would be belong to the desired class?
    criteria_1 = False
    if int(labeler(ce.drop(target, axis=1), seed)[0]) == int(ce.loc[0, target]):
        criteria_1 = True

    # 2) are only the correct columns modified?
    diff = np.asarray(ce) - np.asarray(reference)
    diff_dict = {}
    for c, col in enumerate(list(ce.columns)):
        diff_dict[col] = diff[:, c]
    diff_df = pd.DataFrame.from_dict(diff_dict).drop(target, axis=1)

    criteria_2 = [False] * len(list(diff_df.columns))
    for c, col in enumerate(list(diff_df.columns)):
        if (diff_df.loc[0, col] != 0 and col in decisive_columns) or diff_df.loc[0, col] == 0:
            criteria_2[c] = True
    criteria_2 = all(criteria_2)

    if criteria == 'one':
        return criteria_1
    elif criteria == 'two':
        return criteria_2
    elif criteria == 'both':
        return criteria_1 and criteria_2
    else:
        raise ValueError('unknown criteria value')


### German Credit Risk

In [None]:
from preprocess_german_credit_data import preprocess_german_credit_data
from preprocess_german_credit_data import german_credit_label_modifier

In [None]:
german_credit_dir = 'data/german_credit_data.csv'

In [None]:
target = 'risk'
decisive_columns = ['housing', 'job', 'age', 'savingaccounts', 'checkingaccount']

In [None]:
df_train, df_test, label_transformers, metric_transformers = preprocess_german_credit_data(german_credit_dir, 0.3, True, seed)
df_train.head()

In [None]:
rf = RandomForestClassifier(class_weight='balanced', random_state=seed)
rf.fit(np.asarray(df_train.drop(columns=[target])), np.asarray(df_train[target]))

In [None]:
y_pred = rf.predict(df_test.drop(target, axis=1))
tn, fp, fn, tp = confusion_matrix(np.asarray(df_test[target]), y_pred).ravel()
fp_rate = fp / (fp + tn)
fn_rate = fn / (fn + tp)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
accuracy = (tp + tn) / len(y_pred)

print('fp rate:', fp_rate)
print('fn rate:', fn_rate)
print('precision:', precision)
print('recall:', recall)
print('accuracy:', accuracy)

In [None]:
%%capture

df_eval_ces = df_test.copy()

df_eval_ces['preds'] = y_pred
df_eval_ces = df_eval_ces[df_eval_ces[target] == df_eval_ces['preds']]

if len(df_eval_ces) > 100:
    select_index = np.random.choice(list(range(0, len(df_eval_ces))), 100, replace=False)
    df_eval_ces = df_eval_ces.iloc[select_index]

corr_ces = [False] * len(df_eval_ces)
for n in range(len(df_eval_ces)):

    try:
        ce = apply_dice(rf, df_eval_ces.iloc[[n]].drop([target, 'preds'], axis=1), df_train,
                        list(df_eval_ces.drop([target, 'preds'], axis=1).columns), target,
                        number_cfs=1, seed=seed)
    except (TimeoutError, UserConfigValidationException):
        continue

    # evaluate counterfactual explanation
    corr_ces[n] = evaluate_counterfactual(ce, df_eval_ces.iloc[[n]].drop('preds', axis=1),
                                          german_credit_label_modifier, 
                                          decisive_columns)

In [None]:
df_eval_ces['corr_ces'] = corr_ces

try:
    corr_ces_pos = len(df_eval_ces[(df_eval_ces[target] == 1) & (df_eval_ces['corr_ces'] == True)]) \
        / len(df_eval_ces[df_eval_ces[target] == 1])
except ZeroDivisionError:
    corr_ces_pos = 0
try:
    corr_ces_neg = len(df_eval_ces[(df_eval_ces[target] == 0) & (df_eval_ces['corr_ces'] == True)]) \
        / len(df_eval_ces[df_eval_ces[target] == 0])
except ZeroDivisionError:
    corr_ces_neg = 0

print('correct ces positive class:', corr_ces_pos, '(n='+str(len(df_eval_ces[df_eval_ces[target] == 1]))+str(')'))
print('correct ces negative class:', corr_ces_neg, '(n='+str(len(df_eval_ces[df_eval_ces[target] == 0]))+str(')'))

### Adult

In [None]:
from preprocess_adult_data import preprocess_adult_data
from preprocess_adult_data import adult_label_modifier

In [None]:
adult_dir = 'data/adult.csv'

In [None]:
target = 'income'
decisive_columns = ['workclass', 'educationalnum', 'hoursperweek']

In [None]:
df_train, df_test, label_transformers, metric_transformers = preprocess_adult_data(adult_dir, 0.3, True, seed)
df_train.head()

In [None]:
rf = RandomForestClassifier(class_weight='balanced', random_state=seed)
rf.fit(np.asarray(df_train.drop(columns=[target])), np.asarray(df_train[target]))

In [None]:
y_pred = rf.predict(df_test.drop(target, axis=1))
tn, fp, fn, tp = confusion_matrix(np.asarray(df_test[target]), y_pred).ravel()
fp_rate = fp / (fp + tn)
fn_rate = fn / (fn + tp)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
accuracy = (tp + tn) / len(y_pred)

print('fp rate:', fp_rate)
print('fn rate:', fn_rate)
print('precision:', precision)
print('recall:', recall)
print('accuracy:', accuracy)

In [None]:
%%capture

df_eval_ces = df_test.copy()

df_eval_ces['preds'] = y_pred
df_eval_ces = df_eval_ces[df_eval_ces[target] == df_eval_ces['preds']]

if len(df_eval_ces) > 100:
    select_index = np.random.choice(list(range(0, len(df_eval_ces))), 100, replace=False)
    df_eval_ces = df_eval_ces.iloc[select_index]

corr_ces = [False] * len(df_eval_ces)
for n in range(len(df_eval_ces)):

    try:
        ce = apply_dice(rf, df_eval_ces.iloc[[n]].drop([target, 'preds'], axis=1), df_train,
                        list(df_eval_ces.drop([target, 'preds'], axis=1).columns), target,
                        number_cfs=1, seed=seed)
    except (TimeoutError, UserConfigValidationException):
        continue

    # evaluate counterfactual explanation
    corr_ces[n] = evaluate_counterfactual(ce, df_eval_ces.iloc[[n]].drop('preds', axis=1),
                                          adult_label_modifier, 
                                          decisive_columns)

In [None]:
df_eval_ces['corr_ces'] = corr_ces

try:
    corr_ces_pos = len(df_eval_ces[(df_eval_ces[target] == 1) & (df_eval_ces['corr_ces'] == True)]) \
        / len(df_eval_ces[df_eval_ces[target] == 1])
except ZeroDivisionError:
    corr_ces_pos = 0
try:
    corr_ces_neg = len(df_eval_ces[(df_eval_ces[target] == 0) & (df_eval_ces['corr_ces'] == True)]) \
        / len(df_eval_ces[df_eval_ces[target] == 0])
except ZeroDivisionError:
    corr_ces_neg = 0

print('correct ces positive class:', corr_ces_pos, '(n='+str(len(df_eval_ces[df_eval_ces[target] == 1]))+str(')'))
print('correct ces negative class:', corr_ces_neg, '(n='+str(len(df_eval_ces[df_eval_ces[target] == 0]))+str(')'))

### Diabetes

In [None]:
from preprocess_diabetes_data import preprocess_diabetes_data
from preprocess_diabetes_data import diabetes_label_modifier

In [None]:
diabetes_dir = 'data/diabetes.csv'

In [None]:
target = 'diabetes'
decisive_columns = ['bloodpressure', 'bmi', 'glucose']

In [None]:
df_train, df_test, label_transformers, metric_transformers = preprocess_diabetes_data(diabetes_dir, 0.3, True, seed)
df_train.head()

In [None]:
rf = RandomForestClassifier(class_weight='balanced', random_state=seed)
rf.fit(np.asarray(df_train.drop(columns=[target])), np.asarray(df_train[target]))

In [None]:
y_pred = rf.predict(df_test.drop(target, axis=1))
tn, fp, fn, tp = confusion_matrix(np.asarray(df_test[target]), y_pred).ravel()
fp_rate = fp / (fp + tn)
fn_rate = fn / (fn + tp)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
accuracy = (tp + tn) / len(y_pred)

print('fp rate:', fp_rate)
print('fn rate:', fn_rate)
print('precision:', precision)
print('recall:', recall)
print('accuracy:', accuracy)

In [None]:
%%capture

df_eval_ces = df_test.copy()

df_eval_ces['preds'] = y_pred
df_eval_ces = df_eval_ces[df_eval_ces[target] == df_eval_ces['preds']]

if len(df_eval_ces) > 100:
    select_index = np.random.choice(list(range(0, len(df_eval_ces))), 100, replace=False)
    df_eval_ces = df_eval_ces.iloc[select_index]

corr_ces = [False] * len(df_eval_ces)
for n in range(len(df_eval_ces)):

    try:
        ce = apply_dice(rf, df_eval_ces.iloc[[n]].drop([target, 'preds'], axis=1), df_train,
                        list(df_eval_ces.drop([target, 'preds'], axis=1).columns), target,
                        number_cfs=1, seed=seed)
    except (TimeoutError, UserConfigValidationException):
        continue

    # evaluate counterfactual explanation
    corr_ces[n] = evaluate_counterfactual(ce, df_eval_ces.iloc[[n]].drop('preds', axis=1),
                                          diabetes_label_modifier, 
                                          decisive_columns)

In [None]:
df_eval_ces['corr_ces'] = corr_ces

try:
    corr_ces_pos = len(df_eval_ces[(df_eval_ces[target] == 1) & (df_eval_ces['corr_ces'] == True)]) \
        / len(df_eval_ces[df_eval_ces[target] == 1])
except ZeroDivisionError:
    corr_ces_pos = 0
try:
    corr_ces_neg = len(df_eval_ces[(df_eval_ces[target] == 0) & (df_eval_ces['corr_ces'] == True)]) \
        / len(df_eval_ces[df_eval_ces[target] == 0])
except ZeroDivisionError:
    corr_ces_neg = 0

print('correct ces positive class:', corr_ces_pos, '(n='+str(len(df_eval_ces[df_eval_ces[target] == 1]))+str(')'))
print('correct ces negative class:', corr_ces_neg, '(n='+str(len(df_eval_ces[df_eval_ces[target] == 0]))+str(')'))

### Heart

In [None]:
from preprocess_heart_data import preprocess_heart_data
from preprocess_heart_data import heart_label_modifier

In [None]:
heart_dir = 'data/heart.csv'

In [None]:
target = 'disease'
decisive_columns = ['fbs', 'restecg', 'thal', 'chol', 'thalach']

In [None]:
df_train, df_test, label_transformers, metric_transformers = preprocess_heart_data(heart_dir, 0.3, True, seed)
df_train.head()

In [None]:
rf = RandomForestClassifier(class_weight='balanced', random_state=seed)
rf.fit(np.asarray(df_train.drop(columns=[target])), np.asarray(df_train[target]))

In [None]:
y_pred = rf.predict(df_test.drop(target, axis=1))
tn, fp, fn, tp = confusion_matrix(np.asarray(df_test[target]), y_pred).ravel()
fp_rate = fp / (fp + tn)
fn_rate = fn / (fn + tp)
precision = tp / (tp + fp)
recall = tp / (tp + fn)
accuracy = (tp + tn) / len(y_pred)

print('fp rate:', fp_rate)
print('fn rate:', fn_rate)
print('precision:', precision)
print('recall:', recall)
print('accuracy:', accuracy)

In [None]:
%%capture

df_eval_ces = df_test.copy()

df_eval_ces['preds'] = y_pred
df_eval_ces = df_eval_ces[df_eval_ces[target] == df_eval_ces['preds']]

if len(df_eval_ces) > 100:
    select_index = np.random.choice(list(range(0, len(df_eval_ces))), 100, replace=False)
    df_eval_ces = df_eval_ces.iloc[select_index]

corr_ces = [False] * len(df_eval_ces)
for n in range(len(df_eval_ces)):

    try:
        ce = apply_dice(rf, df_eval_ces.iloc[[n]].drop([target, 'preds'], axis=1), df_train,
                        list(df_eval_ces.drop([target, 'preds'], axis=1).columns), target,
                        number_cfs=1, seed=seed)
    except (TimeoutError, UserConfigValidationException):
        continue

    # evaluate counterfactual explanation
    corr_ces[n] = evaluate_counterfactual(ce, df_eval_ces.iloc[[n]].drop('preds', axis=1),
                                          heart_label_modifier, 
                                          decisive_columns)

In [None]:
df_eval_ces['corr_ces'] = corr_ces

try:
    corr_ces_pos = len(df_eval_ces[(df_eval_ces[target] == 1) & (df_eval_ces['corr_ces'] == True)]) \
        / len(df_eval_ces[df_eval_ces[target] == 1])
except ZeroDivisionError:
    corr_ces_pos = 0
try:
    corr_ces_neg = len(df_eval_ces[(df_eval_ces[target] == 0) & (df_eval_ces['corr_ces'] == True)]) \
        / len(df_eval_ces[df_eval_ces[target] == 0])
except ZeroDivisionError:
    corr_ces_neg = 0

print('correct ces positive class:', corr_ces_pos, '(n='+str(len(df_eval_ces[df_eval_ces[target] == 1]))+str(')'))
print('correct ces negative class:', corr_ces_neg, '(n='+str(len(df_eval_ces[df_eval_ces[target] == 0]))+str(')'))