# Train an XNN model

This notebook shows how to implement using Keras (with TensorFlow backend) an Explainable Neural Network as described in [Explainable Neural Networks based on Additive Index Models](https://arxiv.org/pdf/1806.01933.pdf).

The architecture of the network is as follows:

![XNN Architecture](xnn_arch.png)
[Explainable Neural Networks based on Additive Index Models](https://arxiv.org/pdf/1806.01933.pdf)

And consists of three layers:

(i) The projection layer (first hidden layer) using linear activation function

(ii) Subnetworks, which learn a potentially nonlinear transformation of the input

(iii) Combination layer calculates a weighted sum the output of the ridge functions

# Import packages

In [None]:
import numpy as np
import pandas as pd
import shap
import subprocess
import sys
import pydot

import keras
from keras import backend
from keras.layers import Activation, Add, Dense, Dropout, Input, Lambda, Concatenate
from keras.models import Model
from keras.utils import plot_model

import plotly.plotly as py
import chart_studio.plotly as py
import plotly.tools as tls
import matplotlib.pyplot as plt
    
from timeit import default_timer as timer
import tensorflow as tf
from keras import backend as K


In [None]:
seed = 12345

np.random.seed(seed)

my_init = keras.initializers.RandomUniform(seed=seed)

output_label = "BLDS_run100"
out_dir = "output100/"

output_to_files = False

def projection_initializer(shape, dtype=None):
   
    inps = shape[0]
    subs = shape[1]
    if subs > pow(inps, 2) - 1:
        raise ValueError("Currently we support only up to 2^features - 1 number of subnetworks.")
    
    weights = []

    for i in range(subs):
        w = [0] * inps
        w[i] = 1
        weights.append(w)
    return weights



def alpha_beta(alpha, beta, X , R):
    """ Calculate the layerwise backpropagation function """
    
    positive_values = [item for item in X if item > 0]
        
    negative_values = [item for item in X if item < 0] 
        
    ans = np.array([0.0]*len(X))
        
    
    if len(positive_values) > 0:
           
        ans += alpha*np.array([item / float(sum(positive_values)) if item > 0 else 0 for item in X])

    if len(negative_values) > 0:
 
        ans += -beta * np.array([item / float(sum(negative_values)) if item < 0 else 0 for item in X]) 

    return ans*R


def deep_lift(X_bar, X , R):
    
    """ Deep lift backpropagation function"""   
    
    ans =  np.array(X) - np.array(X_bar)
    ans = ans / (sum(X) - sum(X_bar))     
    
    return ans*R

# XNN Class

In [None]:
class XNN:
    # define base model
    def __init__(self, features, ridge_functions=3, arch=[20,12], bg_samples=100, seed=None, is_categorical=False):
        self.seed = seed
        self.bg_samples = bg_samples
        self.is_categorical = is_categorical
        
        #
        # Prepare model architecture
        #
        # Input to the network, our observation containing all the features
        input = Input(shape=(features,), name='main_input')

        # Input to ridge function number i is the dot product of our original input vector times coefficients
        ridge_input = Dense(ridge_functions,
                            name="projection_layer",
                                activation='linear')(input)
        
        self.ridge_networks = []
        # Each subnetwork uses only 1 neuron from the projection layer as input so we need to split it
        ridge_inputs = Lambda( lambda x: tf.split(x, ridge_functions, 1), name='lambda_1' )(ridge_input)
        for i, ridge_input in enumerate(ridge_inputs):
            # Generate subnetwork i
            mlp = self._mlp(ridge_input, i, arch)
            self.ridge_networks.append(mlp)
                    
        added = Concatenate(name='concatenate_1')(self.ridge_networks)
        
        # Add the correct output layer for the problem
        if self.is_categorical:
            out = Dense(1, activation='sigmoid', input_shape= (ridge_functions, ), name='main_output')(added)
        else:
            out = Dense(1, activation='linear', input_shape= (ridge_functions, ), name='main_output')(added)
            
        self.model = Model(inputs=input, outputs=out)
        
        optimizer = keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, decay=0.0, amsgrad=True)
        
        # Use the correct loss for the problem
        if self.is_categorical:
            self.model.compile(loss={'main_output': 'binary_crossentropy'}, optimizer=optimizer)
        else:
            self.model.compile(loss={'main_output': 'mean_squared_error'}, optimizer=optimizer)

        self.explainer = None
                
        
    def _mlp(self, input, idx, arch=[20,12], activation='relu'):
        if len(arch) < 1:
            return #raise exception
        
        # Hidden layers
        mlp = Dense(arch[0], activation=activation, name='mlp_{}_dense_0'.format(idx), kernel_initializer=my_init)(input)
        for i, layer in enumerate(arch[1:]):
            mlp = Dense(layer, activation=activation, name='mlp_{}_dense_{}'.format(idx, i+1), kernel_initializer=my_init)(mlp)
         

        # Output of the MLP
        mlp = Dense(1, 
                    activation='linear', 
                    name='mlp_{}_dense_last'.format(idx), 
                    kernel_regularizer=keras.regularizers.l1(1e-3),
                    kernel_initializer=my_init)(mlp)
        
        return mlp
    
    def print_architecture(self):
        self.model.summary()
    
    def fit(self, X, y, epochs=5, batch_size=128, validation_set=0.0, verbose=0):
        inputs = {'main_input': X}


        if validation_set == 0:
            self.model.fit(inputs, y, epochs=epochs, batch_size=batch_size, validation_split=validation_set, verbose=verbose)
        else:
            self.model.fit(inputs, y, epochs=epochs, batch_size=batch_size, validation_data=validation_set, verbose=verbose)
            
        #
        # Prepare the explainer
        # 
        np.random.seed(self.seed)
        if isinstance(X, pd.DataFrame):
            background = X.iloc[np.random.choice(X.shape[0], self.bg_samples, replace=False)]
        else:
            background = X[np.random.choice(X.shape[0], self.bg_samples, replace=False)]

        # Explain predictions of the model on the subset
        self.explainer = shap.DeepExplainer(self.model, background)
                    
        
    def predict(self, X, pred_contribs=False):
        pred_start = timer()
        preds = self.model.predict(X)
        pred_end = timer()
        print("Predictions took {}".format(pred_end - pred_start))

        if pred_contribs:
            explainer_start = timer()

            self.shap_values = self.explainer.shap_values(X)

            explainer_end = timer()
            print("Explainer took {}".format(explainer_end - explainer_start))

            concat_start = timer()

            preds = np.concatenate((preds, self.shap_values[0], preds), axis=1)
            preds[:,-1] = self.explainer.expected_value

            concat_end = timer()
            print("Concat took {}".format(concat_end - concat_start))
        return preds
    
    def plot_shap(self, X):
        shap.summary_plot(self.shap_values, X)
        
    def save(self, filename):
        self.model.save(filename)
 

# Load dataset

In [None]:
# Load the dataset
xnn_data_dir = '~/article-information-2019/data/xnn_output/'

DATA = pd.read_csv(xnn_data_dir + 'train_transformed.csv')
TEST = pd.read_csv(xnn_data_dir + 'test_transformed.csv')



# Select features and split into target and feature sets
selected_vars = ['term_360', 'conforming']
selected_vars += ['debt_to_income_ratio_missing','loan_amount_std', 'loan_to_value_ratio_std']
selected_vars += ['no_intro_rate_period_std', 'intro_rate_period_std']
selected_vars += ['property_value_std', 'income_std', 'debt_to_income_ratio_std']


target_var = "high_priced"

X=DATA[selected_vars].values
Y=DATA[target_var].values
TEST_X = TEST[selected_vars].values
TEST_Y = TEST[target_var].values
features = X.shape[1]

In [None]:
DATA.columns

# Fit XNN

In [None]:
# Initialize the XNN
is_cat = True
xnn = XNN(features=features, ridge_functions=features,arch=[12, 8], is_categorical= is_cat)

#plot_model(xnn.model, to_file='model_regression.png')
xnn.print_architecture()


In [None]:
cv_test_preds = {}
cv_train_preds = {}

epoch = []
fold_list = list(set(DATA['cv_fold']))
epochs_max = 15000

# Make predictions on the CV folds
for cv_fold in fold_list:
    TRAIN = DATA[DATA['cv_fold'] != cv_fold]
    VALID = DATA[DATA['cv_fold'] == cv_fold]
    
    X = TRAIN[selected_vars].values
    Y = TRAIN[target_var].values
    
    X_VALID = VALID[selected_vars].values
    Y_VALID = VALID[target_var].values
    
    xnn.fit(X, Y, epochs=epochs_max, batch_size=1024, validation_set=(X_VALID, Y_VALID), verbose=1)
    if output_to_files:
        xnn.save(out_dir + 'cv_' + str(cv_fold) + '_hmda_model.h5')
    
    # CV predictions
    cv_test_preds[cv_fold] = xnn.predict(TEST_X, pred_contribs=True)
    cv_train_preds[cv_fold] = xnn.predict(X_VALID, pred_contribs=True)
    
    if output_to_files:
        pd.DataFrame(pd.concat([TEST, pd.DataFrame(cv_test_preds[cv_fold])], axis=1)).to_csv(out_dir + "test_output_" + str(cv_fold) + "_" + str(epochs_max) + "_" + output_label +".csv" , index=False)
        pd.DataFrame(pd.concat([VALID, pd.DataFrame(cv_train_preds[cv_fold])], axis=1)).to_csv(out_dir + "valid_output_" + str(cv_fold) + "_" + str(epochs_max) + "_" + output_label + ".csv" , index=False)    
      
# Run the model on the full training set and make predictions on the test set  
X=DATA[selected_vars].values
Y=DATA[target_var].values
xnn.fit(X, Y, epochs=epochs_max, batch_size=1024, validation_set=0, verbose=1)

test_preds = xnn.predict(TEST_X, pred_contribs=True)

if output_to_files:
    pd.DataFrame(pd.concat([TEST, pd.DataFrame(test_preds)], axis=1)).to_csv(out_dir + "main_" + str(epochs_max) + "_" + output_label + ".csv" , index=False)
    xnn.save(out_dir + 'final_hmda_model_100.h5')
    
     

# Record layer information
# Plot projection layer

In [None]:
# Record the inputs, outputs, weights, and biases
import scipy as sp

int_output = {}
int_output2 = {}
int_weights = {}
int_bias = {}
int_input = {}

original_activations = {}


x_labels = list(map(lambda x: 'x' + str(x+1), range(features)))

intermediate_output = []

# Record and plot the projection weights
# 
weight_list = []
for layer in xnn.model.layers:
    
    layer_name = layer.get_config()['name']
    if layer_name != "main_input":
        print(layer_name)
        weights = layer.get_weights()
        
        
        # Record the biases
        try:
            bias = layer.get_weights()[1]
            int_bias[layer_name] = bias
        except:
            print("No Bias")
            
                       
        # Record outputs for the test set
        intermediate_layer_model = Model(inputs=xnn.model.input, outputs=xnn.model.get_layer(layer_name).output)
        if (is_cat) and (layer_name == 'main_output'):
            int_output[layer_name] = sp.special.logit(intermediate_layer_model.predict(TEST_X))
            int_output[layer_name + "_p"] = intermediate_layer_model.predict(TEST_X)
        else:
            int_output[layer_name] = intermediate_layer_model.predict(TEST_X)
        
        # Record the outputs from the training set
        if is_cat and (layer_name == 'main_output'):
            original_activations[layer_name] = sp.special.logit(intermediate_layer_model.predict(X))   
            original_activations[layer_name + "_p"] = intermediate_layer_model.predict(X)
        else:
            original_activations[layer_name] = intermediate_layer_model.predict(X)        


        # Record other weights, inputs, and outputs
        int_weights[layer_name] = weights
        int_input[layer_name] = layer.input
        int_output2[layer_name] = layer.output
               
    # Plot the projection layers    
    if "projection_layer" in layer.get_config()['name']:
        
        print(layer.get_config()['name'])
        
        # Record the weights for each projection layer
        weights = [np.transpose(layer.get_weights()[0])]

        weight_list2=[]
        for i, weight in enumerate(weights[0]):
            weight_list.append(weight)
            weight_list2.append(list(np.reshape(weight, (1,features))[0]))
        
            print(weight)

            # Plot weights
            plt.bar(x_labels, abs(np.reshape(weight, (1,features))[0]), 1, color="blue")
            plt.xlabel("Subnetowork {} coefficient".format(i))
            plt.ylabel("Weight value")
            plt.show()

    if "main_output" in layer.get_config()['name']:
        weights_main = layer.get_weights()
        print(weights_main)
        
if output_to_files:
    pd.DataFrame(weight_list2).to_csv("wp_" + output_label + ".csv", index=False)
        

# Calculate the feature importances

In [None]:

# Calculate ridge and input function local feature importances   
item = 0

feature_output = []
feature_output2 = []
feature_output3 = []

# Find the average outputs
S_bar = sum(original_activations["main_output"])/len(original_activations["main_output"])
# original_activations[layer_name]
output_weights = np.array([int_weights["main_output"][0][ii][0] for ii in range(features)])
output_Z_bar = sum(original_activations["concatenate_1"]*output_weights)/len(original_activations["concatenate_1"])


# For each ridge function calculate the average input activation
input_Z_bar = {}
for ridge_num in range(features):   
    input_weights = np.array([int_weights["projection_layer"][0][ii][ridge_num] for ii in range(features)])
    input_Z_bar[ridge_num] = sum(X*input_weights)/len(X)
    
    
# For each test instance, calculate the feature importance scores    
for test_num in range(len(TEST_X)):
    
    # Calculate the output activations
    activation_list=[int_weights["main_output"][0][ii][0]*int_output["concatenate_1"][test_num][ii] for ii in range(features)]
    
    
    # Calculate layerwise backpropagaiton to the ridge functions
    # For classification, change this to the inverse sigmoid of the output
    features_ab = alpha_beta(2, 1, activation_list , int_output["main_output"][test_num][0])
    features_ab2 = alpha_beta(2, 1, activation_list , int_output["main_output"][test_num][0]-S_bar)
    
    # Calculate deep lift backpropagation to the ridge functions
    features_dl = deep_lift(output_Z_bar, activation_list , int_output["main_output"][test_num][0]-S_bar)
      
        
    # Calculate the deep lift and layerwise information scores for the input layer
    input_scores = []
    input_scores_dl = []
    input_scores2 = []
    input_scores_dl2 = []
    for ridge_num in range(features):
        weights = int_weights["projection_layer"][0][ridge_num]
        output = TEST_X[test_num,:]
        
        # Calculate the activations from the projection layer
        act = TEST_X[test_num,:]*np.array([int_weights["projection_layer"][0][ii][ridge_num] for ii in range(features)])
    
        # Input relevance scores for a single ridge function
        input_scores += list(alpha_beta(2, 1, act, features_ab[ridge_num]))
        input_scores_dl += list(deep_lift(input_Z_bar[ridge_num], act, features_dl[ridge_num]))
        input_scores2 += list(alpha_beta(2, 1, act, features_ab2[ridge_num]))

    # Sum the contribution of the variable importance from each of the projections
    input_sum = [sum(input_scores[ii+features*jj] for jj in range(features)) for ii in range(features)] 
    input_sum2 = [sum(input_scores2[ii+features*jj] for jj in range(features)) for ii in range(features)] 
    input_sum_dl = [sum(input_scores_dl[ii+features*jj] for jj in range(features)) for ii in range(features)] 
    input_abs_sum = [sum(abs(input_scores[ii+features*jj]) for jj in range(features)) for ii in range(features)] 
    
    # Recored the feature importance information for this instance
    feature_output.append(input_sum+input_abs_sum+[int_output["main_output"][test_num][0]]+list(features_ab)+input_scores)
    feature_output2.append(input_sum+list(features_ab)+input_sum_dl + list(features_dl))
    feature_output3.append(input_sum2+list(features_ab2)+input_sum_dl + list(features_dl))


# Find and plot the ridge functions

In [None]:
intermediate_output = []

for feature_num in range(features):
    intermediate_layer_model = Model(inputs=xnn.model.input,
                                 outputs=xnn.model.get_layer('mlp_'+str(feature_num)+'_dense_last').output)
    intermediate_output.append(intermediate_layer_model.predict(X))


# Record and plot the ridge functions
ridge_x = []
ridge_y = []
for weight_number in range(len(weight_list)):
    
    ridge_x.append(list(sum(X[:, ii]*weight_list[weight_number][ii] for ii in range(features))))
    ridge_y.append(list(intermediate_output[weight_number]))

    plt.plot(sum(X[:, ii]*weight_list[weight_number][ii] for ii in range(features)), intermediate_output[weight_number], 'o')
    plt.xlabel("x")
    plt.ylabel("Subnetwork " + str(weight_number))
    plt.legend(loc='lower right')
    plt.show()


if output_to_files:
    pd.DataFrame(ridge_x).to_csv("ridge_x_"+ output_label +".csv", index=False)
    pd.DataFrame(ridge_y).to_csv("ridge_y_" + output_label + ".csv", index=False)     
    pd.DataFrame(feature_output2).to_csv("feature_output2_" + output_label + ".csv", index=False)
    pd.DataFrame(feature_output3).to_csv("feature_output3_" + output_label + ".csv", index=False)   


In [None]:
# Make predictions on the test set

In [None]:
preds = xnn.predict(TEST_X, pred_contribs=True)

if output_to_files:
    pd.DataFrame(preds).to_csv("preds_" + output_label + ".csv", index=False)
    pd.DataFrame(TEST).to_csv("TEST_" + output_label + ".csv", index=False)


In [None]:
# Calculate the Shapley values.
shap.initjs()
shap.summary_plot(xnn.shap_values, X)

y=xnn.shap_values
ind=1


# Calculate the average absolute feature importance across the dataset

layerwise_average_input=np.array([0.0]*features)
layerwise_average_input2=np.array([0.0]*features)
layerwise_average_ridge=np.array([0.0]*features)
layerwise_average_ridge2=np.array([0.0]*features)
layerwise_average_shap=np.array([0.0]*features)
lift_average_input=np.array([0.0]*features)
lift_average_ridge=np.array([0.0]*features)

for ii in range(len(feature_output2)):
    layerwise_average_input += np.abs(np.array(feature_output2[ii][0:features]))
    layerwise_average_ridge += np.abs(np.array(feature_output2[ii][features:(2*features)]))
    layerwise_average_input2 += np.abs(np.array(feature_output3[ii][0:features]))
    layerwise_average_ridge2 += np.abs(np.array(feature_output3[ii][features:(2*features)]))
    lift_average_input += np.abs(np.array(feature_output2[ii][(2*features):(3*features)]))
    lift_average_ridge += np.abs(np.array(feature_output2[ii][(3*features):(4*features)]))
    layerwise_average_shap += np.abs(np.array(y[0][ii]))
     
layerwise_average_input = layerwise_average_input/len(feature_output2)
layerwise_average_ridge = layerwise_average_ridge/len(feature_output2)
layerwise_average_input2 = layerwise_average_input2/len(feature_output2)
layerwise_average_ridge2 = layerwise_average_ridge2/len(feature_output2)
layerwise_average_shap = layerwise_average_shap/len(feature_output2)
lift_average_input = lift_average_input/len(feature_output2)
lift_average_ridge = lift_average_ridge/len(feature_output2)


SCORES = [list(layerwise_average_input), list(layerwise_average_ridge),
          list(layerwise_average_input2), list(layerwise_average_ridge2),
          list(layerwise_average_shap), list(lift_average_input),
          list(lift_average_ridge)]

if output_to_files:
    pd.DataFrame(SCORES).to_csv("scores_" + output_label + ".csv", index=False)



# Plot feature importance scores

In [None]:
plt.bar(x, abs(np.reshape(y[0][ind], (1,features))[0]), 1, color="blue")
plt.xlabel("Shap Score Example " + str(ind))
plt.ylabel("")
plt.show()

plt.bar(x, abs(np.reshape(feature_output2[ind][0:features], (1,features))[0]), 1, color="blue")
plt.xlabel("Input Layerwise Propagation Score Example " + str(ind))
plt.ylabel("")
plt.show()

plt.bar(x, abs(np.reshape(feature_output2[ind][features:(2*features)], (1,features))[0]), 1, color="blue")
plt.xlabel("Ridge Layerwise Propagation Score Example " + str(ind))
plt.ylabel("Weight value")
plt.show()

plt.bar(x, abs(np.reshape(feature_output2[ind][2*features:(3*features)], (1,features))[0]), 1, color="blue")
plt.xlabel("Deep Lift Input Score Example " + str(ind))
plt.ylabel("Weight value")
plt.show()

plt.bar(x, abs(np.reshape(feature_output2[ind][3*features:(4*features)], (1,features))[0]), 1, color="blue")
plt.xlabel("Deep Lift Ridge Score Example " + str(ind))
plt.ylabel("Weight value")
plt.show()
      
plt.bar(x, abs(np.reshape(layerwise_average_input, (1,features))[0]), 1, color="blue")
plt.xlabel("Input Layerwise Propagation Score Average")
plt.ylabel("")
plt.show()

plt.bar(x, abs(np.reshape(layerwise_average_ridge, (1,features))[0]), 1, color="blue")
plt.xlabel("Ridge Layerwise Propagation Score Average")
plt.ylabel("Weight value")
plt.show()

plt.bar(x, abs(np.reshape(layerwise_average_input2, (1,features))[0]), 1, color="blue")
plt.xlabel("Input Layerwise Propagation Score Average 2")
plt.ylabel("")
plt.show()

plt.bar(x, abs(np.reshape(layerwise_average_ridge2, (1,features))[0]), 1, color="blue")
plt.xlabel("Ridge Layerwise Propagation Score Average 2")
plt.ylabel("Weight value")
plt.show()

plt.bar(x, abs(np.reshape(lift_average_input, (1,features))[0]), 1, color="blue")
plt.xlabel("Input Lift Score Average")
plt.ylabel("")
plt.show()

plt.bar(x, abs(np.reshape(lift_average_ridge, (1,features))[0]), 1, color="blue")
plt.xlabel("Ridge Lift Score Average")
plt.ylabel("Weight value")
plt.show()

plt.bar(x, abs(np.reshape(layerwise_average_shap, (1,features))[0]), 1, color="blue")
plt.xlabel("Shapley Score Average")
plt.ylabel("Weight value")
plt.show()