In [None]:
# import packages
import numpy as np
import pandas as pd

# models
from simpletransformers.classification import ClassificationModel, ClassificationArgs
from scipy.special import softmax
import fasttext

#lime
from lime.lime_text import LimeTextExplainer
from collections import defaultdict
from tqdm import tqdm

# visualisation
import matplotlib.pyplot as plt

In [None]:
# load sample data
all_sample = pd.read_csv("data/stacking_folds/ALL_LABELLED_DATA.csv")[['phrase', 'phrase_stem']]
print(all_sample.shape)
all_sample.head()

# LIME Helper Functions

In [None]:
def append_dict_of_scores(d, lime_exp, label):
    l = lime_exp.as_list(label=label)
    for item in l:
        key = item[0]
        val = item[1]
        if key in d:
            d[key].append(val)
        else:
            d[key] = [val]
            
def dict_to_df(d, newcols):
    token_df =  pd.DataFrame([d]).T
    token_df = token_df.reset_index()
    token_df.columns = newcols
    return token_df

# FastText + LIME Analysis

Reference: https://medium.com/@ageitgey/natural-language-processing-is-fun-part-3-explaining-model-predictions-486d8616813c

In [None]:
# read model
FASTTEXT_MODEL = fasttext.load_model("saved_models/model_fasttext.bin")

In [None]:
def fasttext_prediction_in_sklearn_format(classifier, texts):
    '''
    This helper function generates fasttext predictions in sklearn format
    for inputting into LIME.
    
    Inputs:
        classifier (FastText object): (Trained) FastText model.
        texts (str): Text to analyse.
    '''
    # initialise list to store results
    res = []
    # predict classes and probabilities
    # raw output: (('__label__pos', '__label__zer', '__label__neg'), array([0.74627936, 0.19218659, 0.06156404]))
    labels, probabilities = classifier.predict(texts, k=-1)

    # for each prediction, sort the probabaility scores into the same order
    for label, probs, text in zip(labels, probabilities, texts):
        order = np.argsort(np.array(label)) # sorted in: neg, pos, zer (alphabetical)
        res.append(probs[order])

    return np.array(res)

In [None]:
# define phrase to use 
phrase_ver = "phrase_stem"

# initialise dictionaries
d_neg = defaultdict()
d_neu = defaultdict()
d_pos = defaultdict()
class_names = [-1, 1, 0] # neg, pos, zer

# initialise explainer
explainer = LimeTextExplainer(class_names = class_names, random_state=42)

for i in tqdm(range(len(all_sample))):
    current_text = all_sample[phrase_ver].iloc[i]
    exp = explainer.explain_instance(current_text, classifier_fn=lambda x: fasttext_prediction_in_sklearn_format(FASTTEXT_MODEL, x), labels=class_names)
    append_dict_of_scores(d_neg, exp, -1)
    append_dict_of_scores(d_neu, exp, 0)
    append_dict_of_scores(d_pos, exp, 1)

In [None]:
avgDict_pos = {}
avgDict_neu = {}
avgDict_neg = {}

for k,v in d_pos.items():
    # v is the list of impact on probability of predicting a class for a particular token
    avgDict_pos[k] = sum(v)/ float(len(v))
for k,v in d_neg.items():
    # v is the list of impact on probability of predicting a class for a particular token
    avgDict_neg[k] = sum(v)/ float(len(v))
for k,v in d_neu.items():
    # v is the list of impact on probability of predicting a class for a particular token
    avgDict_neu[k] = sum(v)/ float(len(v))

pos=dict_to_df(avgDict_pos, ['token', 'average_pos_impact'])
neg=dict_to_df(avgDict_neg, ['token', 'average_neg_impact'])
neu=dict_to_df(avgDict_neu, ['token', 'average_neu_impact'])
fasttext_lime = pos.merge(neg, on='token', how = 'inner').merge(neu, on='token', how = 'inner')
fasttext_lime.sort_values(['average_pos_impact'], ascending=False)

In [None]:
fasttext_lime.to_csv('explain_results/fasttext_lime.csv', index=False)

## Plot Graphs

In [None]:
rf_eval = pd.read_csv("data/explain_results/fasttext_lime.csv")

rf_eval["mag_neg"] = np.abs(rf_eval.average_neg_impact)
rf_eval["mag_neu"] = np.abs(rf_eval.average_neu_impact)
rf_eval["mag_pos"] = np.abs(rf_eval.average_pos_impact)

rf_eval["average_pos_impact"] = -rf_eval["average_pos_impact"]
rf_eval["average_neg_impact"] = -rf_eval["average_neg_impact"]
rf_eval["average_neu_impact"] = -rf_eval["average_neu_impact"]

rf_eval_neg = rf_eval.nlargest(20, "mag_neg")
rf_eval_neu = rf_eval.nlargest(20, "mag_neu")
rf_eval_pos = rf_eval.nlargest(20, "mag_pos")

In [None]:
rf_eval

In [None]:
plt.figure(figsize=(20,10))
plt.subplot(1, 3, 1)
plt.barh(rf_eval_neg.token, rf_eval_neg.average_neg_impact, height=0.8,
         color=["#E3242B" if x<0 else "#00AB6B" for x in rf_eval_neg.average_neg_impact])
plt.title('y=-1.0 top features')
plt.ylabel('Features')
plt.xlabel('Weight')

plt.subplot(1, 3, 2)
plt.barh(rf_eval_neu.token, rf_eval_neu.average_neu_impact, height=0.8,
         color=["#E3242B" if x<0 else "#00AB6B" for x in rf_eval_neu.average_neu_impact])
plt.title('y=0.0 top features')
plt.ylabel('Features')
plt.xlabel('Weight')

plt.subplot(1, 3, 3)
plt.barh(rf_eval_pos.token, rf_eval_pos.average_pos_impact, height=0.8,
         color=["#E3242B" if x<0 else "#00AB6B" for x in rf_eval_pos.average_pos_impact])
plt.title('y=1.0 top features')
plt.ylabel('Features')
plt.xlabel('Weight')

plt.show()

# BERT + LIME Analysis

In [None]:
# read model
bert_model_args = ClassificationArgs(num_train_epochs=2, learning_rate=5e-5)
BERT_MODEL = ClassificationModel(model_type = 'bert', \
                                 model_name = 'saved_models/model_bert', \
                                 args = bert_model_args, use_cuda = False)

In [None]:
def BERT_prediction_in_sklearn_format(classifier, texts):
    '''
    This helper function generates BERT predictions in sklearn format
    for inputting into LIME.
    '''
    # initialise list to store results
    res = []
    # predict classes and probabilities
    
    bert_pred, bert_raw_outputs = classifier.predict(texts)
    # convert raw output to probabilities
    bert_probabilities = softmax(bert_raw_outputs, axis=1)
    prob_neu = bert_probabilities[:, 0]
    prob_pos = bert_probabilities[:, 1]
    prob_neg = bert_probabilities[:, 2]
    
    for i in range(len(prob_neu)):
        res.append([prob_neg[i], prob_neu[i], prob_pos[i]])
    
    return np.array(res)

In [None]:
# define phrase to use 
phrase_ver = "phrase"

# initialise dictionaries
d_neg = defaultdict()
d_neu = defaultdict()
d_pos = defaultdict()
class_names = [-1, 0, 1]

# initialise explainer
explainer = LimeTextExplainer(class_names = class_names, random_state=42)

for i in tqdm(range(len(all_sample))):
    current_text = all_sample[phrase_ver].iloc[i]
    exp = explainer.explain_instance(current_text, \
                                     classifier_fn=lambda x: BERT_prediction_in_sklearn_format(BERT_MODEL, x), \
                                     labels=class_names, \
                                     num_features=100, num_samples=10) # reduced as BERT predictions take very long to run
    append_dict_of_scores(d_neg, exp, -1)
    append_dict_of_scores(d_neu, exp, 0)
    append_dict_of_scores(d_pos, exp, 1)

In [None]:
avgDict_pos = {}
avgDict_neu = {}
avgDict_neg = {}

for k,v in d_pos.items():
    # v is the list of impact on probability of predicting a class for a particular token
    avgDict_pos[k] = sum(v)/ float(len(v))
for k,v in d_neg.items():
    # v is the list of impact on probability of predicting a class for a particular token
    avgDict_neg[k] = sum(v)/ float(len(v))
for k,v in d_neu.items():
    # v is the list of impact on probability of predicting a class for a particular token
    avgDict_neu[k] = sum(v)/ float(len(v))

pos=dict_to_df(avgDict_pos, ['token', 'average_pos_impact'])
neg=dict_to_df(avgDict_neg, ['token', 'average_neg_impact'])
neu=dict_to_df(avgDict_neu, ['token', 'average_neu_impact'])
bert_lime = pos.merge(neg, on='token', how = 'inner').merge(neu, on='token', how = 'inner')
bert_lime.sort_values(['average_pos_impact'], ascending=False)

In [None]:
bert_lime.to_csv('explain_results/bert_lime.csv', index=False)

## Plot Graphs

In [None]:
rf_eval = pd.read_csv("explain_results/bert_lime.csv")

rf_eval["mag_neg"] = np.abs(rf_eval.average_neg_impact)
rf_eval["mag_neu"] = np.abs(rf_eval.average_neu_impact)
rf_eval["mag_pos"] = np.abs(rf_eval.average_pos_impact)

rf_eval["average_pos_impact"] = -rf_eval["average_pos_impact"]
rf_eval["average_neg_impact"] = -rf_eval["average_neg_impact"]
rf_eval["average_neu_impact"] = -rf_eval["average_neu_impact"]

rf_eval_neg = rf_eval.nlargest(20, "mag_neg")
rf_eval_neu = rf_eval.nlargest(20, "mag_neu")
rf_eval_pos = rf_eval.nlargest(20, "mag_pos")

In [None]:
plt.figure(figsize=(20,10))
plt.subplot(1, 3, 1)
plt.barh(rf_eval_neg.token, rf_eval_neg.average_neg_impact, height=0.8,
         color=["#E3242B" if x<0 else "#00AB6B" for x in rf_eval_neg.average_neg_impact])
plt.title('y=-1.0 top features')
plt.ylabel('Features')
plt.xlabel('Weight')

plt.subplot(1, 3, 2)
plt.barh(rf_eval_neu.token, rf_eval_neu.average_neu_impact, height=0.8,
         color=["#E3242B" if x<0 else "#00AB6B" for x in rf_eval_neu.average_neu_impact])
plt.title('y=0.0 top features')
plt.ylabel('Features')
plt.xlabel('Weight')

plt.subplot(1, 3, 3)
plt.barh(rf_eval_pos.token, rf_eval_pos.average_pos_impact, height=0.8,
         color=["#E3242B" if x<0 else "#00AB6B" for x in rf_eval_pos.average_pos_impact])
plt.title('y=1.0 top features')
plt.ylabel('Features')
plt.xlabel('Weight')

plt.show()