In [None]:
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from xgboost.sklearn import XGBClassifier
from sklearn.metrics import accuracy_score, f1_score
import matplotlib.pyplot as plt
from sklearn.neural_network import MLPClassifier
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from matplotlib.ticker import FuncFormatter

def add_scaler(clf, all_col_indices, cat_col_indices = []):
    steps = [
        ('scaler', StandardScaler()),
        ('clf', clf)
    ]
    if len(cat_col_indices) > 0:
        numeric_features=list(set(all_col_indices)-set(cat_col_indices))
        numeric_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='mean')),
        ('scaler', StandardScaler())])

        categorical_transformer = OneHotEncoder(handle_unknown='ignore')

        preprocessor = ColumnTransformer(
            transformers=[
                ('num', numeric_transformer, numeric_features),
                ('cat', categorical_transformer, cat_col_indices)])

        steps=[
               ('scaler', preprocessor),
               ('clf', clf)
            ]

    return Pipeline(steps)

np.random.seed(42)

metric = 'accuracy' # accuracy, F1
sens_attr = 'male' # white, male
bbox = 'nn' # ['xgb', 'nn', 'lr']

In [None]:
class Parameter:
  def __init__(self, name, val = None, range = None):
    self.name = name
    self.val = val
    self.range = range

In [None]:
p_correct = [[Parameter('P(User = Correct | BB = Incorrect, Explanation = None)', 0.6967821782), 
              Parameter('P(User = Correct | BB = Incorrect, Explanation = Poor)', 0.68), 
              Parameter('P(User = Correct | BB = Incorrect, Explanation = Good)', 0.6497395833)
             ],
             [Parameter('P(User = Correct | BB = Correct, Explanation = None)', 0.9250353607), 
              Parameter('P(User = Correct | BB = Correct, Explanation = Poor)', 0.9), 
              Parameter('P(User = Correct | BB = Correct, Explanation = Good)', 0.9280753968)
              ]
             ]

In [None]:
df = pd.read_csv('../lib/datasets/lsac.csv')

train_cols = ['age',
        'decile1',
        'decile3',
        'fam_inc',
        'lsat',
        'ugpa',
        'cluster',
        'fulltime']

categorical_columns = ['cluster', 'fulltime']

df['white'] = df['race1'].apply(lambda x: 1 if x == 0 else 0)

X_train, X_test = train_test_split(df[train_cols + ['pass_bar', sens_attr]], test_size = 0.5, random_state = 42)
y_train, y_test = X_train['pass_bar'], X_test['pass_bar']
g_train, g_test = X_train[sens_attr], X_test[sens_attr]
X_train = X_train.drop(columns =  ['pass_bar', sens_attr])
X_test = X_test.drop(columns =  ['pass_bar', sens_attr])

for g in df[sens_attr].unique():
  print(f'{sens_attr} == {g}: percent = {(df[sens_attr] == g).sum()/len(df)}, target prevalence = {df[(df[sens_attr] == g)]["pass_bar"].value_counts(normalize= True)[1.0]} ')

In [None]:
def ohe_to_df(enc, col, name):
    return pd.DataFrame(enc.transform(col.values.reshape(-1, 1)),
        columns = [f'{name}=={i}'for i in enc.categories_[0]],
        index = col.index)

for col in categorical_columns:
    X_train[col] = X_train[col].fillna(-1)
    X_test[col] = X_test[col].fillna(-1)

In [None]:
cat_col_indices = [list(X_train.columns).index(i) for i in categorical_columns]

if bbox == 'xgb':
  assert NotImplementedError
  model = GridSearchCV(XGBClassifier(), param_grid = {'max_depth': list(range(6))}, cv = 5, scoring = 'roc_auc').fit(X_train, y_train)
elif bbox == 'nn':
  model = GridSearchCV(add_scaler(MLPClassifier(), 
        cat_col_indices = cat_col_indices, 
        all_col_indices=list(range(X_train.shape[1]))), param_grid = {'clf__hidden_layer_sizes': [(n_hidden, ) for n_hidden in [50, 100, 200]]},
        cv = 5, scoring = 'roc_auc').fit(X_train, y_train)
elif bbox == 'lr':
  model = GridSearchCV(add_scaler(LogisticRegression(solver = 'liblinear'), 
        cat_col_indices = cat_col_indices, 
        all_col_indices=list(range(X_train.shape[1]))), param_grid = {'clf__C': 10.**np.linspace(-5, 1, 25)},
        cv = 5, scoring = 'roc_auc').fit(X_train, y_train)
else:
  raise NotImplementedError

In [None]:
y_pred = model.predict(X_test)
for g in df[sens_attr].unique():
  mask = g_test == g
  print(f'% correct | {sens_attr} == {g}: {accuracy_score(y_test[mask], y_pred[mask])}')

In [None]:
fidelity_mean = Parameter('Mean P(Explanation = Good)', 0.85)
fidelity_diff = Parameter('P(Explanation = Good) Gap', 0.15, range = np.linspace(0, 0.15, 30))

In [None]:
def get_metric(metric_name, user_correct, y):
  if metric_name == 'accuracy':
    return user_correct.sum()/len(user_correct)
  elif metric_name == 'F1':
    y_pred = np.where(user_correct == 1, y, ~(y == 1.0))
    return f1_score(y, y_pred)

In [None]:
n_iters = 20
N = len(y_pred)

results = {}
no_expl_results = {
      'overall': [],
      1: [],
      0: [],
      'gap': []
  }
for f_diff in fidelity_diff.range:
  results[f_diff] = {
      'overall': [],
      1: [],
      0: [],
      'gap': []
  }
  fidelities = {
      1: fidelity_mean.val + f_diff, # group 1 has higher fidelity than group 0
      0: fidelity_mean.val - f_diff
  }
  for n in range(n_iters):
    correct = y_pred == y_test
    expl_random = np.random.random(size = (N,))
    good_expl = np.zeros(shape = (N,))
    for g in fidelities:
      good_expl[g_test == g] = expl_random[g_test == g] <= fidelities[g]
    user_correct = np.zeros(shape = (N,))
    for expl in [0, 1]:
      for bb in [0, 1]:
        mask = (good_expl == expl) & (correct == bb)
        user_correct[mask] = np.random.random(size = (mask.sum())) <= p_correct[bb][expl + 1].val 
    results[f_diff]['overall'].append(get_metric(metric, user_correct, y_test))
    for g in fidelities:
      mask = g_test == g
      results[f_diff][g].append(get_metric(metric, user_correct[mask], y_test[mask]))
    results[f_diff]['gap'].append(results[f_diff][1][-1] - results[f_diff][0][-1])

    if f_diff == fidelity_diff.range[-1]:
      user_correct = np.zeros(shape = (N,)) # no explanation
      for bb in [0, 1]:
        mask = (correct == bb)
        user_correct[mask] = np.random.random(size = (mask.sum())) <= p_correct[bb][0].val 
      no_expl_results['overall'].append(get_metric(metric, user_correct, y_test))
      for g in fidelities:
        mask = g_test == g
        no_expl_results[g].append(get_metric(metric, user_correct[mask], y_test[mask]))
      no_expl_results['gap'].append(no_expl_results[1][-1] - no_expl_results[0][-1])  

In [None]:
res_df = pd.DataFrame.from_dict(results, orient="index").stack().to_frame()
res_df = pd.DataFrame(res_df[0].values.tolist(), index=res_df.index)

In [None]:
means = res_df.mean(axis = 1).to_frame()
cis = (1.96*res_df.std(axis = 1)/np.sqrt(n_iters)).to_frame()

In [None]:
def get_cis(series):
  avg = series.mean()
  ci = 1.96*series.std()/np.sqrt(len(series))
  return avg, avg - ci, avg + ci

In [None]:
fig = plt.figure()
ax = plt.gca()

ax.plot(fidelity_diff.range, means.loc[pd.IndexSlice[:, 1], :][0], '-', label = 'Male', color = 'C0')
ax.fill_between(fidelity_diff.range,  means.loc[pd.IndexSlice[:, 1], :][0] - cis.loc[pd.IndexSlice[:, 1], :][0],
                means.loc[pd.IndexSlice[:, 1], :][0] + cis.loc[pd.IndexSlice[:, 1], :][0], alpha = 0.1, color = 'C0')

ax.plot(fidelity_diff.range, means.loc[pd.IndexSlice[:, 0], :][0], '-', label = 'Female', color = 'C1')
ax.fill_between(fidelity_diff.range,  means.loc[pd.IndexSlice[:, 0], :][0] - cis.loc[pd.IndexSlice[:, 0], :][0],
                means.loc[pd.IndexSlice[:, 0], :][0] + cis.loc[pd.IndexSlice[:, 0], :][0], alpha = 0.1, color = 'C1')

ax.set_xlabel('Explanation Fidelity Gap')
if metric == 'accuracy':
  ax.set_ylabel('Decision Accuracy')
elif metric == 'F1':
  ax.set_ylabel('Decision F1 Score')
ax.xaxis.set_major_formatter(FuncFormatter(lambda y, _: '{:.0%}'.format(y))) 
ax.set_xlim([0, max(fidelity_diff.range)])
ax.set_ylim([0.9, 0.92])

plt.legend()
plt.show()
fig.savefig('expl_gap_sim_nn.pdf', dpi = 300, bbox_inches = 'tight')