### Loading in some Libs

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from sklearn.svm import SVC
from sklearn.linear_model import LogisticRegression
from scipy.optimize import minimize
from scipy.spatial.distance import cdist, pdist
from scipy import stats
from sklearn.neighbors import DistanceMetric
from tslearn.datasets import UCR_UEA_datasets
from tslearn.neighbors import KNeighborsTimeSeries
from sklearn.neighbors import NearestNeighbors
from sklearn.metrics import accuracy_score

from scipy.interpolate import interp1d

import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Activation, Conv1D, GlobalAveragePooling1D, BatchNormalization, Conv2D
from tensorflow.keras.layers import GlobalAveragePooling1D
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.backend import function

from tensorflow.keras.models import load_model

from counterfactual_utils import ucr_data_loader, label_encoder

print(tf.__version__)

TF version:  2.5.0
Eager execution enabled:  False
2.5.0


In [2]:
np.random.seed(0)
#setting a random seed

### Loading data and classifier example

In [None]:
# X_train, y_train, X_test, y_test = ucr_data_loader(str('Lightning2'))
# y_train, y_test = label_encoder(y_train, y_test)
#
# model =load_model('Lightning2_best_model_myrun.hdf5')
# y_pred = model.predict(X_test)
# y_pred = np.argmax(y_pred, axis=1)

### Next we want to be able to specify what our counterfactual target is. This should be different to the predicted label of the base classifier. Also lets specify the loss function

In [3]:
def target_(label):
    if label == 0:
        counter = 1
    elif label == 1:
        counter = 0
    return counter


def dist_mad(query, cf):
    manhat = np.abs(query-cf)
    mad = stats.median_absolute_deviation(X_train)
    return np.sum((manhat/mad).flatten())

def loss_function_mad(x_dash):
    target = target_(example_label)
    L = lamda*(model.predict(x_dash.reshape(1,-1,1))[0][target] - 1)**2 + \
    dist_mad(x_dash.reshape(1,-1,1), query)
    return L

In [5]:
#checking target works & mad distance works
#y_pred[0], target_(y_pred[0]), dist_mad(X_test[0], X_train[3])

### Wachter Counterfactuals

In [4]:
def Wachter_Counterfactual(instance, lambda_init):

    min_edit_cf = []
    
    global lamda
    global dist_mad
    global loss_function_mad
    global example_label
    global query

    
    pred_threshold = 0.5

    # initial conditions
    lamda = lambda_init
    x0 = X_test[instance].reshape(1,-1,1) # initial guess for cf
    query = X_test[instance].reshape(1,-1,1)

    example_label = y_pred[instance]

    res = minimize(loss_function_mad, x0.reshape(1,-1), method='nelder-mead', options={'maxiter':10, 'xatol': 50, 'adaptive': True})
    cf = res.x.reshape(1,-1,1)

    target = target_(y_pred[instance])
    prob_target = model.predict(cf)[0][target]


    i=0
    while prob_target < pred_threshold:


        lamda = lambda_init*(1+0.5)**i
        x0 = cf
        res = minimize(loss_function_mad, x0.reshape(1,-1), method='nelder-mead', options={'maxiter':10, 'xatol': 50, 'adaptive': True})
        cf = res.x.reshape(1,-1,1)
        prob_target = model.predict(cf)[0][target]
        i += 1
        if i == 500:
            print('Error condition not met after',i,'iterations')
            break

    min_edit_cf.append(cf[0])

    
    return min_edit_cf

In [8]:
#for dataset in ['coffee', 'ecg200', 'gunpoint', 'chinatown']:
    
#    X_train, y_train, X_test, y_test = ucr_data_loader(str(dataset))
#    y_train, y_test = label_encoder(y_train, y_test)
    
#    model =load_model(str(dataset)+'_best_model.hdf5')
#    y_pred = model.predict(X_test)
#    y_pred = np.argmax(y_pred, axis=1)
    
    
#    counterfactual_set = []
    
#    for instance in range(len(X_test)):
#        counterfactual_set.append(Wachter_Counterfactual(instance,lambda_init=0.1)[0])
        
#    np.array(counterfactual_set)
#    np.save(str(dataset) + '_wachter_cf', np.array(counterfactual_set))
    


In [5]:
for dataset in ['Lightning2']:

   X_train, y_train, X_test, y_test = ucr_data_loader(str(dataset))
   y_train, y_test = label_encoder(y_train, y_test)

   model =load_model(str(dataset)+'_best_model.hdf5')
   y_pred = model.predict(X_test)
   y_pred = np.argmax(y_pred, axis=1)


   counterfactual_set = []

   for instance in range(len(X_test)):
       print(f"Processing instance {instance+1}/{len(X_test)}...")
       counterfactual_set.append(Wachter_Counterfactual(instance,lambda_init=0.1)[0])

   np.array(counterfactual_set)
   np.save(str(dataset) + '_wachter_cf', np.array(counterfactual_set))





Processing instance 1/61...



To preserve the existing default behavior, use
`scipy.stats.median_abs_deviation(..., scale=1/1.4826)`.
The value 1.4826 is not numerically precise for scaling
with a normal distribution. For a numerically precise value, use
`scipy.stats.median_abs_deviation(..., scale='normal')`.

  mad = stats.median_absolute_deviation(X_train)


Processing instance 2/61...
Processing instance 3/61...
Processing instance 4/61...
Processing instance 5/61...
Processing instance 6/61...
Processing instance 7/61...
Processing instance 8/61...
Processing instance 9/61...
Processing instance 10/61...
Processing instance 11/61...
Processing instance 12/61...
Processing instance 13/61...
Processing instance 14/61...
Processing instance 15/61...
Processing instance 16/61...
Processing instance 17/61...
Processing instance 18/61...
Processing instance 19/61...
Processing instance 20/61...
Processing instance 21/61...
Processing instance 22/61...
Processing instance 23/61...
Processing instance 24/61...
Processing instance 25/61...
Processing instance 26/61...
Processing instance 27/61...
Processing instance 28/61...
Processing instance 29/61...
Processing instance 30/61...
Processing instance 31/61...
Processing instance 32/61...
Processing instance 33/61...
Processing instance 34/61...
Processing instance 35/61...
Processing instance 36