In [1]:
import scipy.stats
import numpy as np 
from scipy.spatial import distance
from sklearn import tree
from sklearn import preprocessing
import sklearn.datasets
import sklearn.ensemble
from sklearn.model_selection import train_test_split

In [2]:
def lars_hypothesis_testing(X, y, alpha, n, k, stabilise = True):
    global beta_path, sum_abs_coeff

    #X = train.data
    #y = train.target
    m = len(X[0])
    n = len(X)

    active_set = set()
    cur_pred = np.zeros((n,), dtype=np.float32)
    residual = y - cur_pred
    cur_corr = X.transpose().dot(residual)
    j = np.argmax(np.abs(cur_corr), 0)
    #print('j', j)
    active_set.add(j)
    #print(active_set)
    
    if stabilise: 
      hypothesis_test_exit_code, new_n = hypothesis_testing(X, active_set, cur_corr, residual, alpha, n)
      if hypothesis_test_exit_code == 0:
          return -1, new_n
    
    beta = np.zeros((m,), dtype=np.float32)
    sign = np.zeros((m,), dtype=np.int32)
    sign[j] = 1

    beta_path = np.zeros((m, m), dtype=np.float32)
    for it in range(k-1): #Modification - Only want top k predictors (maybe should bw k-1); # still need to implement lasso.
        residual = y - cur_pred
        mse = np.sqrt(np.sum(residual * residual))
        #print('mse', mse)

        pred_from_beta = X.dot(beta)

        cur_corr = X.transpose().dot(residual)

        X_a = X[:, list(active_set)]
        X_a *= sign[list(active_set)]
        G_a = X_a.transpose().dot(X_a)
        G_a_inv = np.linalg.inv(G_a)
        G_a_inv_red_cols = np.sum(G_a_inv, 1)
        A_a = 1 / np.sqrt(np.sum(G_a_inv_red_cols))
        omega = A_a * G_a_inv_red_cols
        equiangular = X_a.dot(omega)  # .reshape(n)

        cos_angle = X.transpose().dot(equiangular)
        gamma = None
        largest_abs_correlation = np.abs(cur_corr).max()
        #print('largest_abs_correlation', largest_abs_correlation)
        if it < m - 1:
            next_j = None
            next_sign = 0
            for j in range(m):
                if j in active_set:
                    continue
                v0 = (largest_abs_correlation - cur_corr[j]) / (A_a - cos_angle[j]).item()
                v1 = (largest_abs_correlation + cur_corr[j]) / (A_a + cos_angle[j]).item()
                if v0 > 0 and (gamma is None or v0 < gamma):
                    next_j = j
                    gamma = v0
                    next_sign = 1
                if v1 > 0 and (gamma is None or v1 < gamma):
                    gamma = v1
                    next_j = j
                    next_sign = -1
        else:
            gamma = largest_abs_correlation / A_a

        sa = X_a
        sb = equiangular * gamma
        sx = np.linalg.lstsq(sa, sb)
        for i, j in enumerate(active_set):
            beta[j] += sx[0][i] * sign[j]

        #print('next j', next_j, 'next sign', next_sign, 'gamma', gamma, 'new max correlation: %s' % (
        #    largest_abs_correlation - gamma * A_a))

        cur_pred = X.dot(beta)
        active_set.add(next_j)
        
        if stabilise: 
            hypothesis_test_exit_code, new_n = hypothesis_testing(X, active_set, cur_corr, residual, alpha, n)
            if hypothesis_test_exit_code == 0:
          	    return -1, new_n
        
        sign[next_j] = next_sign

        beta_path[it, :] = beta
        #print('beta', beta)
        
    return 1, active_set

#features = s_lime(f, X, x, n_0, alpha, k, distance_measure, stabilise=True)

def hypothesis_testing(X, active_set, current_correlations, current_residual, alpha, n): 
    """
    Exit codes: 
    1: Hypothesis testing passed
    0: Hypothesis testing failed 
    -1: Not enough inactive predictors to do hypothesis testing
    """
    
    #First test if the are at least two predictors in the inactive set: 
    if len(current_correlations) - len(active_set) < 2: 
        return -1, 0 
    
    #Select top two predictors most correlated with the current residual from remaining covariates
    x = current_correlations
    x_new = np.array([range(len(x)), x]).T
    act_set = np.array(list(active_set))
    inactive_corr = np.delete(x_new, act_set, axis=0)
    inactive_indexes = inactive_corr[:,0]
    inactive_corr_values = inactive_corr[:,1]
    top_predictors = inactive_indexes[np.argsort(inactive_corr_values)[-2:]]
    
    #Current Residual * top predictors
    c_1 = current_residual * X[:,int(top_predictors[0])]
    c_2 = current_residual * X[:,int(top_predictors[1])]
    
    #Calcualte c_hats 
    c_hat_1 = (1/n)*np.sum(c_1)
    c_hat_2 = (1/n)*np.sum(c_2)
    
    #Calculate covariances 
    [[sigma_11,sigma_12],[sigma_21,sigma_22]] = np.cov(c_2,c_1)
    
    #Calculate Z_alpha
    Z_alpha = scipy.stats.norm.ppf(q = 1-alpha)
    
    #Calculate t 
    if c_hat_1 > c_hat_2:
      t = c_hat_1 - c_hat_2 -Z_alpha*(np.sqrt((2*(sigma_11+sigma_22-sigma_12-sigma_21))/n))
    else: 

      t = c_hat_2 - c_hat_1 -Z_alpha*(np.sqrt((2*(sigma_11+sigma_22-sigma_12-sigma_21))/n))


    #Hypothesis Testing
    if t >= 0: #pass test
        return 1, 0
    else: #Fail test     
        #Calculate new n 
        Z_pn = np.sqrt(n)*((c_hat_1- c_hat_2)/(np.sqrt(2*(sigma_11+sigma_22-sigma_12-sigma_21))))
        new_n = n*((Z_alpha/Z_pn)**2)
        return 0, new_n
        

In [3]:
def s_lime(f, X, x, n_0, alpha, k, distance_measure, n_max,stabilise=True): 
    
    D, y = prepare_synthetic_data_set(f,X, x,n_0, distance_measure)
    n = n_0 
    iteration = 0 
    
    while True: 
        lars_exit_code, value =  lars_hypothesis_testing(D, y, alpha, n, k, stabilise)
        #print("First iteration: "+str(iteration)+": "+str(lars_exit_code))
        iteration += 1
        if lars_exit_code == 1:
          return value
        else: 
          n = value
          if n <= n_max: 
              D, y = prepare_synthetic_data_set(f,X, x, int(n), distance_measure)
          else: 
              n = n_max
              nr_n_max[test_instance, k_final-1] += 1
              D, y = prepare_synthetic_data_set(f,X, x, int(n), distance_measure)
              lars_exit_code, value =  lars_hypothesis_testing(D, y, alpha, n, k, stabilise=False)
              return value



def prepare_synthetic_data_set(f,X,x,n,distance_measure): 
    
    print(n)
    D = generate_points(X, x,n)
    y = f.predict(D)
    D = preprocessing.scale(D)
    w = calculate_weights(D, x, distance_measure)
    D, y = scale_data(D,y,w)
    

    return D, y

    
def generate_points(X, x, n):
    """
    Generate n data points in the vicinity of x
    returns D the generated data set shape (n, m) where m is the number of features
    """
    m = X.shape[1]
    D = np.zeros((n,m))
    for i in range(m):

      #Calculate Variance of feature
      feature_var = np.var(X[:,i])

      D[:,i] = np.random.normal(loc=x[i], scale=feature_var/5, size=n )

    return D

def distance_measure(x1,x2): 
    return distance.euclidean(x1,x2)

def calculate_weights(D, x, distance_measure): 
    """
    Calculates the weight vector w, which is the distance between each data point and x 
    returns w the weight vector shape (n)
    """
    w = np.zeros(len(D))
    for i in range(len(D)): 
        w[i] = distance_measure(x, D[i,])
    return w

def scale_data(D, y, w): 
    """nn
    Scale all data and y  by sqrt(w)
    returns X, y shapes (n,m), (n)
    """
    y = np.sqrt(w)*y
    D = (np.sqrt(w)*D.T).T

    return D,y


In [4]:
def jaccard_similarity(s1, s2):
    return float(len(s1.intersection(s2)) / len(s1.union(s2)))

def calculate_ave_jaccard(sets):

  sum = 0
  count = 0
  for i in range(len(sets)):
    for j in range(i+1,len(sets)): 
       sum += jaccard_similarity(sets[i], sets[j])
       count += 1

  return sum/count


## Experiments

In [12]:
#Single Experiment


dataset = sklearn.datasets.load_wine()
X = dataset.data
y = dataset.target

clf = sklearn.ensemble.RandomForestClassifier(n_estimators=500)
clf = clf.fit(X[1:,:], y[1:])

n_max = 10000
nr_n_max = np.zeros((50,5))
f = clf 
x = X[0,:]
n_0 = 1000
alpha = 0.1

k = 1
distance_measure = distance_measure

features = s_lime(f, X, x, n_0, alpha, k, distance_measure, n_max, stabilise=True)

1000
2692


In [8]:
#Experiments outlined in presentation

dataset = sklearn.datasets.load_wine()
X = dataset.data
y = dataset.target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=50, random_state=42)

clf = sklearn.ensemble.RandomForestClassifier(n_estimators=500)
clf = clf.fit(X_train, y_train)

In [10]:
global nr_n_max, test_instance, k_final
nr_n_max = np.zeros((50,5))

n_max = 10000
f = clf 
n_0 = 1000
alpha = 0.1
distance_measure = distance_measure

ave_jaccard_index_lime = np.zeros((50,5))

for test_instance in range(len(X_test)):
    
    print(test_instance)

    all_k = np.zeros(5)

    for i in range(5): 
  
      k_final = i+1
      all_k[i]= k_final
      all_sets = [set() for _ in range(20)]

      for j in range(20): 
        all_sets[j] = s_lime(f, X, x, n_0, alpha, k_final, distance_measure, n_max, stabilise=True)

      ave_jaccard_index_lime[test_instance,i] = calculate_ave_jaccard(all_sets)



0
1000
10000
1000
10000
1000
10000
1000
10000
1000
10000
1000
10000


KeyboardInterrupt: 