In [1]:
'''
Build a LSTM model to perform sentiment analysis on IMDB dataset.
Compute
    -Performance
    -Stability of LIME explanations
    -Stability of SHAP explanations
'''
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
import tensorflow as tf
from tensorflow import keras
import numpy as np

In [2]:
'''
TOXIC AREA!
GLOBAL VARIABLES AHEAD
'''
# hyperparameters
VOCAB_SIZE = 10000
PAD_VALUE = 0
NUM_EPOCHS = 50
BATCH_SIZE = 32
LEARNING_RATE = 1e-3
MAX_SEQ_LEN = 150
WORD_VEC_DIMS = 50
LSTM_UNITS = 64

# initialize word_index and reverse_word_index
word_index = {}
reverse_word_index = {}
# build word_index and reverse_word_index
word_index = keras.datasets.imdb.get_word_index()
# first few indices are reserved
word_index = {k:(v+3) for k,v in word_index.items()} 
word_index["<PAD>"] = PAD_VALUE
word_index["<START>"] = 1
word_index["<UNK>"] = 2  # unknown
word_index["<UNUSED>"] = 3
reverse_word_index = dict([(value, key) for (key, value) in word_index.items()])

In [3]:
input_seq = tf.placeholder(tf.int32, [None, MAX_SEQ_LEN], name='input_seq')
target_class = tf.placeholder(tf.float32, [None, 1], name='target_class')

In [4]:
def loadData():
    imdb = keras.datasets.imdb

    (train_data, train_labels), (test_data, test_labels) = imdb.load_data(num_words=VOCAB_SIZE)

    train_labels = np.reshape(train_labels, (train_labels.shape[0], 1))
    test_labels = np.reshape(test_labels, (test_labels.shape[0], 1))

    valid_data = train_data[0:5000]
    valid_labels = train_labels[0:5000]

    train_data = train_data[5000:]
    train_labels = train_labels[5000:]

    return imdb, train_data, train_labels, valid_data, valid_labels, test_data, test_labels

In [5]:
def decodeExampleText(text, word_index, reverse_word_index):
    '''
    for given text, returns decoded form.
    numbers=>words
    '''
    return ' '.join([reverse_word_index.get(i, '?') for i in text])

In [6]:
def preprocessData(train_data, valid_data, test_data):
    '''
    pad the arrays so they all have the same length,
    then create an integer tensor of shape max_length * num_reviews.
    we can use an embedding layer capable of handling this shape as the first layer in our network.
    '''

    train_data = keras.preprocessing.sequence.pad_sequences(train_data, value=PAD_VALUE, padding='post', maxlen=MAX_SEQ_LEN)
    valid_data = keras.preprocessing.sequence.pad_sequences(valid_data, value=PAD_VALUE, padding='post', maxlen=MAX_SEQ_LEN)
    test_data = keras.preprocessing.sequence.pad_sequences(test_data, value=PAD_VALUE, padding='post', maxlen=MAX_SEQ_LEN)
    
    train_data = np.reshape(train_data, (train_data.shape[0], MAX_SEQ_LEN))
    valid_data = np.reshape(valid_data, (valid_data.shape[0], MAX_SEQ_LEN))
    test_data = np.reshape(test_data, (test_data.shape[0], MAX_SEQ_LEN))

    return train_data, valid_data, test_data

In [7]:
def buildModel():
    '''
    returns output, cost and optimizer as tensor ops.
    '''
    # embedding layer
    word_vec = tf.Variable(tf.truncated_normal([VOCAB_SIZE, WORD_VEC_DIMS]), dtype=tf.float32, name='Word-Vectors')
    input_vec = tf.nn.embedding_lookup(word_vec, input_seq)

    # rnn lstm layer
    rnn_cell = tf.nn.rnn_cell.LSTMCell(LSTM_UNITS)
    rnn_cell = tf.contrib.rnn.DropoutWrapper(cell=rnn_cell, output_keep_prob=0.5)

    # finally, the rnn put together
    output, _ = tf.nn.dynamic_rnn(rnn_cell, input_vec, dtype=tf.float32)
    
    output = tf.layers.flatten(output)

    output = tf.layers.dense(output, 32)
    output = tf.nn.relu(output)

    output = tf.layers.dense(output, 1)
    output = tf.nn.sigmoid(output)

    loss = tf.losses.sigmoid_cross_entropy(target_class, output)
   
    optimizer = tf.train.AdamOptimizer(LEARNING_RATE).minimize(loss)

    # a list of metrics to measure accuracy, precision, recall, f1-score
    metrics = []

    round_output = tf.round(output)
    
    accuracy = tf.metrics.accuracy(target_class, round_output, name='Accuracy')
   
    precision = tf.metrics.precision(target_class, round_output, name='Precision')
  
    recall = tf.metrics.recall(target_class, round_output, name='Recall')
    
    metrics.append(accuracy)
    metrics.append(precision)
    metrics.append(recall)

    return optimizer, loss, output, metrics

In [8]:
imdb, train_x, train_y, valid_x, valid_y, test_x, test_y = loadData()
train_x, valid_x, test_x = preprocessData(train_x, valid_x, test_x)

optimizer, loss, output, metrics = buildModel()

num_batches = train_x.shape[0] // BATCH_SIZE

initializer_g = tf.global_variables_initializer()
initializer_l = tf.local_variables_initializer()

sess = tf.Session()

sess.run([initializer_g, initializer_l])

for epoch in range(NUM_EPOCHS):
    print("Epoch {}".format(epoch))

    for batch in range(0, num_batches):
        l = batch*BATCH_SIZE
        r = min((batch+1)*BATCH_SIZE, train_x.shape[0]-1)

        batch_x = train_x[l:r]
        batch_y = train_y[l:r]

        _, _, _, _ = sess.run([optimizer] + metrics, {input_seq: batch_x, target_class: batch_y})

    # log summaries every epoch
    acc_train, prec_train, rec_train = sess.run(metrics, {input_seq: train_x, target_class: train_y})
    acc_valid, prec_valid, rec_valid = sess.run(metrics, {input_seq: valid_x, target_class: valid_y})

    # print metrics
    print("Training: Acc - {0:.4f} | Prec - {1:.4f} | Rec - {2:.4f}".format(acc_train[0], prec_train[0], rec_train[0]))
    print("Validation: Acc - {0:.4f} | Prec - {1:.4f} | Rec - {2:.4f}".format(acc_valid[0], prec_valid[0], rec_valid[0]))


Epoch 0
Training: Acc - 0.5943 | Prec - 0.7146 | Rec - 0.3079
Validation: Acc - 0.6216 | Prec - 0.7970 | Rec - 0.3216
Epoch 1
Training: Acc - 0.6615 | Prec - 0.8125 | Rec - 0.4174
Validation: Acc - 0.6987 | Prec - 0.8376 | Rec - 0.4907
Epoch 2
Training: Acc - 0.7244 | Prec - 0.8478 | Rec - 0.5453
Validation: Acc - 0.7443 | Prec - 0.8591 | Rec - 0.5828
Epoch 3
Training: Acc - 0.7607 | Prec - 0.8666 | Rec - 0.6148
Validation: Acc - 0.7693 | Prec - 0.8770 | Rec - 0.6252
Epoch 4
Training: Acc - 0.7806 | Prec - 0.8833 | Rec - 0.6456
Validation: Acc - 0.7822 | Prec - 0.8900 | Rec - 0.6428
Epoch 5
Training: Acc - 0.7906 | Prec - 0.8952 | Rec - 0.6571
Validation: Acc - 0.7993 | Prec - 0.9005 | Rec - 0.6717
Epoch 6
Training: Acc - 0.8069 | Prec - 0.9033 | Rec - 0.6864
Validation: Acc - 0.8141 | Prec - 0.9064 | Rec - 0.6996
Epoch 7
Training: Acc - 0.8200 | Prec - 0.9078 | Rec - 0.7115
Validation: Acc - 0.8243 | Prec - 0.9113 | Rec - 0.7177
Epoch 8
Training: Acc - 0.8290 | Prec - 0.9133 | Rec - 0

In [9]:
# Todo: 
# Calculate Performance metrics on test set (done)
# LIME explanations (done)
# Stability of LIME explanations on test set (done)
# Shap explanations (done)
# Stability of Shap explanations on test set (done)
# Images of LIME & Shap explanations for few random examples from test set (done)

In [10]:
# Calculate Performance metrics on test set
acc_test, prec_test, rec_test = sess.run(metrics, {input_seq: test_x, target_class: test_y})

acc_test = acc_test[0]
prec_test = prec_test[0]
rec_test = rec_test[0]
f1_score = 2.0*prec_test*rec_test/(prec_test+rec_test)

print("Test Performance: Accuracy - {} | Precision - {} | Recall - {} | F1-score - {}".format(acc_test, prec_test, rec_test, f1_score))

Test Performance: Accuracy - 0.9258085489273071 | Precision - 0.9542996883392334 | Recall - 0.894135594367981 | F1-score - 0.9232385109947449


In [11]:
from lime.lime_text import LimeTextExplainer, IndexedString, TextDomainMapper

class_names = ['negative','positive']
explainer = LimeTextExplainer(class_names=class_names, split_expression=r'\s+', bow=True)

def makePrediction(strings):
    '''
    takes a list of d strings 
    and outputs a (d, k) numpy array with prediction probabilities, 
    where k is the numb/er of classes
    '''
    # convert d strings into shape (d,MAX_SEQ_LEN)
    global word_index
    for j in range(0,len(strings)):
        strings[j] = strings[j].split(' ')
        for i in range(0,len(strings[j])):
            if strings[j][i] in word_index.keys() and word_index[strings[j][i]] < VOCAB_SIZE:
                strings[j][i] = word_index[strings[j][i]]
            else:
                strings[j][i] = word_index["<UNK>"]
    strings = np.array(strings)
    
    # calculate the output on strings
    pred = sess.run(output, {input_seq: strings})
    
    # reshape it into (d,NUM_CLASSES) corresponding to d output probability distributions
    pred = pred.tolist()
    for i in range(0, len(pred)):
        pred[i].insert(0,1-pred[i][0])
    
    return np.array(pred)


Stability of explanations is calculated as follows: Stability is the ratio of change in explanation to change in input. Each explanation is a vector of values. 
Change in explanation is calculated as the magnitude of the vector obtained by taking the difference between the two explanations.
Change in input is simply (noise/MAX_SEQ_LEN) ** 0.5
We hope this formalization scales to LIME, SHAP and SENN well enough to be able to compare their values fairly.

In [12]:
# Calculate Stability on test set
# Stability = Change in explanations / Change in input
# Average stability over the test set seems a good estimation
import random
%matplotlib inline

def noisifyExample(example, num_samples=100, noise=5):
    '''
    for a given example text or array of word ids,
    return a list of texts of size num_samples,
    where each text is same as example except noise amount of words are removed randomly.
    '''
    noisy_examples = []
    
    if type(example) is not str: # given an array of word ids, convert it into text
        example = [reverse_word_index[word_id] for word_id in example]
        example = " ".join(example)
    
    while len(noisy_examples) < num_samples:
        indices_to_remove = set()
        while len(indices_to_remove) < noise:
            indices_to_remove.add(random.randint(0,MAX_SEQ_LEN-1))
        new_exmp = example.split(" ")
        for ind in indices_to_remove:
            new_exmp[ind] = "<PAD>"
        new_exmp = " ".join(new_exmp)
        noisy_examples.append(new_exmp)        
    return noisy_examples

def vectorDifference(vec1, vec2):
    '''
    return magnitude of the vector gained by subtracting vec2 from vec1.
    '''
    assert len(vec1) == len(vec2)
    n = len(vec1)
    
    # compute magnitude of difference between vectors
    vec_diff = [vec1[ind]-vec2[ind] for ind in range(n)] # difference
    vec_diff = [x**2 for x in vec_diff] # squared
    vec_diff = sum(vec_diff)
    vec_diff = vec_diff ** 0.5
    
    return vec_diff
    

def calculateStabilityLime(example, num_samples, noises):
    '''
    example: original text
    num_samples: size of neighborhood around 'example'
    noises: list of integers, each indicating the amount of noise
    
    returns: average stability, which is a dictionary.
             maps noise to avg stability of 'example'.
    
    stability = vectorDifference(exp1,exp2) / normalised_noise ** 0.5
    
    change in input = normalised_noise ** 0.5
    change in explanation = calculated using vectorDifference()
    '''
    assert num_samples > 0
    
    # get noisy_examples for example
    noisy_examples = {}
    for noise in noises:
        noisy_examples[str(noise)] = noisifyExample(example, num_samples, noise)
            
    
    # get explanation for example
    exp = explainer.explain_instance(example, makePrediction, num_features=5, num_samples=2000)
    exp = exp.as_list()
    
    
    # get words from explanation
    words = [e[0] for e in exp]
    words = set(words)
    
    # get explanations for noisy_examples
    noisy_explanations = {}
    for noise in noises:
        noisy_explanations[str(noise)] = []
    
    # calculate stability for each noisy explanation w.r.t explanation
    stabilities = {}
    
    
    for noise in noises:
        stabilities[str(noise)] = 0
        for noisy_example in noisy_examples[str(noise)]:
            exp_ = exp[:]
            noisy_exp = explainer.explain_instance(noisy_example, makePrediction, num_features=5, num_samples=2000)
            # fig = noisy_exp.as_pyplot_figure()
            noisy_exp = noisy_exp.as_list()
            noisy_words = [e[0] for e in noisy_exp]
            noisy_words = set(noisy_words)

            # make exp_ and noisy_exp have the same words
            total_words = words| noisy_words
            
            for word in total_words:
                if word not in noisy_words:
                    noisy_exp.append((word,0))
                if word not in words:
                    exp_.append((word,0))
        
            # sort both explanations by words
            exp_.sort(key = lambda x:x[0])
            noisy_exp.sort(key = lambda x:x[0])
            
            
            exp_ = list(map(lambda x: x[1], exp_))
            noisy_exp = list(map(lambda x: x[1], noisy_exp))
    
            # compute vector difference
            vec_diff = vectorDifference(exp_, noisy_exp)

            # compute stability
            stability = vec_diff / (noise/MAX_SEQ_LEN)**0.5
            
            # store the value
            stabilities[str(noise)] += stability
        stabilities[str(noise)] /= num_samples
    
    return stabilities

In [None]:
import warnings
warnings.filterwarnings("ignore")
# Stability of LIME explanations on test set
# NOTE: these computations are expensive so test set will be reduced to 1000 examples

# range of noises
noises = [x for x in range(3,21,3)]

# no. of noisy samples for each example in test set
num_samples = 30

# dictionary noise(key)->test_stability(value)
test_stabilities = {}

# initialize dictionary
for noise in noises:
    test_stabilities[str(noise)] = []

# compute stability on test set over noises
for i in range(0,len(test_samples)): # calculate stability for each of 500 test examples
    x = test_x[i]
    x  = decodeExampleText(x, word_index, reverse_word_index)
    stb = calculateStabilityLime(x, num_samples, noises) # returns a dictionary key(noise) -> val(avg.stability)
    print(stb)
    for noise in noises:
        test_stabilities[str(noise)].append(stb[str(noise)])

In [None]:
import pickle
lime_stabilities = open('lime-test-stabilities.pickle', 'wb')
pickle.dump(test_stabilities, lime_stabilities)
lime_stabilities.close()

In [13]:
import shap

ModuleNotFoundError: No module named 'numpy.core._multiarray_umath'

In [14]:
# Stability of Shap explanations
# stability = change in explanation / change in input
# change in input = (noise/MAX_SEQ_LEN) ** 0.5
# change in explanation = magnitude of difference between explanations

In [15]:
background_data = train_x[:100] # background data for DeepExplainer
test_samples = test_x[:500] # calculate stability over this set
e = shap.DeepExplainer((input_seq, output), background_data, sess)
shap_values = e.shap_values(test_samples[:])
shap_values = shap_values[0]

Using TensorFlow backend.


In [None]:
shap.initjs()
feature_names = [reverse_word_index[ind] for ind in test_samples[70]]
shap.force_plot(e.expected_value, shap_values[70], feature_names=feature_names)

In [16]:
def calculateStabilityShap(ind, num_samples, noises):
    '''
    ind: index of example in test set
    num_samples: size of neighborhood around 'example'
    noises: list of integers, each indicating the amount of noise
    
    returns: average stability, which is a dictionary.
             maps noise to avg stability of 'example'.
    
    stability = vectorDifference(exp1,exp2) / normalised_noise ** 0.5
    
    change in input = normalised_noise ** 0.5
    change in explanation = calculated using vectorDifference()
    '''
    assert num_samples > 0
    
    # get noisy_examples for example
    noisy_examples = {}
    for noise in noises:
        noisy_examples[str(noise)] = noisifyExample(test_samples[ind], num_samples, noise) # returns a list of shape num_samples x MAX_SEQ_LEN
        for i in range(num_samples):
            # convert text to array of word-ids
            noisy_examples[str(noise)][i] = noisy_examples[str(noise)][i].split(" ")
            noisy_examples[str(noise)][i] = [word_index[word] for word in noisy_examples[str(noise)][i]]
            noisy_examples[str(noise)][i] = np.array(noisy_examples[str(noise)][i])
        noisy_examples[str(noise)] = np.array(noisy_examples[str(noise)])
    
    # get shap values for example
    shap_values_original = shap_values[ind]
    
    # get shap values for noisy_examples
    shap_values_noisy = {}
    for noise in noises:
        shap_values_noisy[str(noise)] = []
        
    # calculate stability for each noisy explanation w.r.t explanation
    stabilities = {}
    
    for noise in noises:
        stabilities[str(noise)] = 0
        shap_values_noisy[str(noise)] = e.shap_values(noisy_examples[str(noise)][:])
        shap_values_noisy[str(noise)] = shap_values_noisy[str(noise)][0]
        for i in range(num_samples):
            vector_diff = vectorDifference(shap_values_original, shap_values_noisy[str(noise)][i])
            stability = vector_diff / (noise / MAX_SEQ_LEN) ** 0.5
            stabilities[str(noise)] += stability
        stabilities[str(noise)] /= num_samples            
    return stabilities

In [None]:
# range of noises
noises = [x for x in range(3,21,3)]

# no. of noisy samples for each example in test set
num_samples = 30

# dictionary noise(key)->test_stability(value)
test_stabilities = {}

# initialize dictionary
for noise in noises:
    test_stabilities[str(noise)] = []

# compute stability on test set over noises
for i in range(0,len(test_samples)): # calculate stability for each of 500 test examples
    stb = calculateStabilityLime(i, num_samples, noises) # returns a dictionary key(noise) -> val(avg.stability)
    print(stb)
    for noise in noises:
        test_stabilities[str(noise)].append(stb[str(noise)])

In [None]:
import pickle
shap_stabilities = open('shap-test-stabilities.pickle', 'wb')
pickle.dump(test_stabilities, shap_stabilities)
shap_stabilities.close()

In [18]:
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
import warnings
warnings.filterwarnings("ignore")

def showExplanationLime(text):
    exp = explainer.explain_instance(text, makePrediction, num_features=5, num_samples=2000)
    exp.as_pyplot_figure()

def showExplanationShap(text_sample):
    shap.initjs()
    feature_names = [reverse_word_index[ind] for ind in test_samples[text_sample]]
    return shap.force_plot(e.expected_value, shap_values[text_sample], feature_names=feature_names)

def showExplanation(text_sample, model):
    '''
    for a given text, outputs the prediction using makePrediction()
    for the given model, displays an explanation using showExplanationLime() and showExplanationShap()
    '''
    
    text = test_samples[text_sample]
    text = list(text)
    text = " ".join([reverse_word_index[x] for x in text])
    print(text)
    print("-------------------------------------------------------")
    prediction = makePrediction([text]) # returns a numpy array of shape 1x2
    print("Model's prediction -> Negative: {0:0.4f}, Positive: {1:0.4f}".format(prediction[0][0], prediction[0][1]))
    
    if model == "LIME":
        showExplanationLime(text)
    else:
        return showExplanationShap(text_sample)
    
    return
style = {'description_width': 'initial'}
layout = widgets.Layout(width='100%')
text_sample_slider = widgets.IntSlider(value=70, min=0, max=len(test_samples), description="Index of sample", continuous_update=False)
text_sample_slider.layout = layout
text_sample_slider.style = style
i = interact(showExplanation, text_sample=text_sample_slider, model=["Shap", "LIME"])

interactive(children=(IntSlider(value=70, continuous_update=False, description='Index of sample', layout=Layou…