In [19]:
# Imports
import numpy as np
import pandas as pd
from typing import List
from collections import OrderedDict

import torch
from torch.jit import RecursiveScriptModule
from torch.nn.functional import sigmoid


# other libraries
from typing import Final

# Plotting
import matplotlib.pyplot as plt
import seaborn as sns

# own modules
from src.model_utils import set_seed
from src.model_utils import load_model
from src.model_utils import predict_single_text
from torch.nn.utils.rnn import pad_sequence
from src.model_utils import load_w2v_model

import shap

%matplotlib inline

In [20]:
# static variables
DATA_PATH: Final[str] = "NLP_Data/data"
MODEL_TYPE: Final[str] = "IMDB"  # "TweepFake"

# set device
device = torch.device(
    "cuda") if torch.cuda.is_available() else torch.device("cpu")
set_seed(42)

In [21]:
# load the model
if MODEL_TYPE == "IMDB":
    model: RecursiveScriptModule = load_model("IMDB_best_model")
else:
    model: RecursiveScriptModule = load_model("best_model")
    
# Load the w2v model
w2vec_model = load_w2v_model()


Explain the model with SHAP:

In [22]:
if MODEL_TYPE == "IMDB":
    file_path = DATA_PATH + '/test.txt'
    data: pd.DataFrame = pd.read_csv(file_path, sep='\t', header=None)
    data.columns = ['text', 'tag']

else:
    file_path = DATA_PATH + '/test.csv'
    data: pd.DataFrame = pd.read_csv(file_path)

    # replace the target column with a binary representation
    data['tag'] = data['account.type'].replace('human', 0)
    data['tag'] = data['tag'].replace('bot', 1)
    # Only keep columns text and tag
    data = data[['text', 'tag']]

print(data.head())

                                                text  tag
0  I first saw The Buddy Holly Story when I was a...    1
1  There were so many things wrong with this movi...    0
2  There's a unique place in the pantheon of John...    1
3  It kicks you in the stomach. There are other f...    1
4  To start, I'm not a person to rate movies that...    0


In [23]:
# Test the model with a single text
text = data['text'][2]
print(text)
predicted = predict_single_text(text, model, device, model_type=MODEL_TYPE)
print(f"Predicted: {predicted}, Real: {data['tag'][0]}")

There's a unique place in the pantheon of John Ford films for Wagonmaster, Sergeant Rutledge, and The Sun Shines Bright. It was these three films with no box office names in them that Ford didn't have to tailor the film around the persona of a star being it John Wayne, Henry Fonda, or any of the others he worked with. Not surprising that Ford considered all these as favorites of one kind or another. <br /><br />Ben Johnson and Harry Carey, Jr. a couple of likable cowpokes sign on to guide a Mormon wagon train to a valley in Arizona territory. Along the way they are joined first by a group stranded players from a medicine show and then by a family of outlaws on the run named Clegg. Their stories merge and what happens is the basis of the film's plot.<br /><br />Had Wagonmaster been done even 10 years earlier on the strength of the two performances turned in by Johnson and Carey, both probably would have had substantial careers as B picture cowboys. In the case of Johnson it would have b

  return forward_call(*args, **kwargs)


In [24]:
# SHAP explects a pipeline that returns something like this:
"""
[[{'label': 'NEGATIVE', 'score': 0.0012035118415951729},
  {'label': 'POSITIVE', 'score': 0.9987965226173401}],
 [{'label': 'NEGATIVE', 'score': 0.002218781039118767},
  {'label': 'POSITIVE', 'score': 0.9977812170982361}]]
"""
def classifier_fn(tokenized_texts: List[List[int]]) -> int: 
    result = []
    #print(tokenized_texts)
    for tokenized_text in tokenized_texts:
      text_padded = pad_sequence([torch.tensor(tokenized_text)], batch_first=True)
      length = torch.tensor([len(tokenized_text)])
      if length == 0:
          return 0
      #Send to device
      text_padded = text_padded.to(device)
      prediction = model(text_padded, length)
      
      prediction = float(sigmoid(prediction).item())
      
      result.append([1-prediction, prediction])
    
    # result = np.array(result)
    return result

In [25]:
import src.RNNModelTrain.data as data_utils
from  shap.maskers._text import Text
from  shap.maskers._text import partition_tree
class Tokenizer(Text):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def __call__(self, mask: np.array, text: str) -> List[torch.Tensor]:
        if MODEL_TYPE == "IMDB":
            text = data_utils.tokenize_sentence(text)
        else:
            text = data_utils.tokenize_tweet(text)
            
        # Apply the mask
        if mask is not None:
            for i, m in enumerate(mask):
                if m == 0:
                    text[i] = "<!MASK>"        
        # Convert the reviews to bag of words representation
        texts_idx: List[torch.Tensor] = data_utils.word2idx(w2vec_model, text)        
        return [texts_idx]
    
    def shape(self, text: str) -> torch.Size:
        if MODEL_TYPE == "IMDB":
            len_in_tokens = len(data_utils.tokenize_sentence(text))
        else:
            len_in_tokens = len(data_utils.tokenize_tweet(text))
        return (1, len_in_tokens)
    
    def mask_shapes(self, text: str) -> torch.Size:
        if MODEL_TYPE == "IMDB":
            len_in_tokens = len(data_utils.tokenize_sentence(text))
        else:
            len_in_tokens = len(data_utils.tokenize_tweet(text))
        return [(len_in_tokens,)]
    
    def data_transform(self, text: str) -> torch.Tensor:
        if MODEL_TYPE == "IMDB":
            tokenized = data_utils.tokenize_sentence(text)
        else:
            tokenized = data_utils.tokenize_tweet(text)
        return [[str(i) + " " for i in tokenized]]
    
    def clustering(self, s):
        """Compute the clustering of tokens for the given string."""
        tokens = self.data_transform(s)[0]

        pt = partition_tree(tokens, [])

        # use the rescaled size of the clusters as their height since the merge scores are just a
        # heuristic and not scaled well
        pt[:, 2] = pt[:, 3]
        pt[:, 2] /= pt[:, 2].max()
        return pt
    
    def _update_s_cache(self, s):
        if self._s != s:
            self._s = s
            tokens = data_utils.tokenize_sentence(s)
            token_ids = data_utils.word2idx(w2vec_model, tokens)
            self._tokenized_s = np.array(tokens)
            self._segments_s = np.array(tokens)
        

In [26]:
# Create a SHAP model explainer
explainer = shap.Explainer(classifier_fn, Tokenizer(), feature_names=["POSITIVE", "NEGATIVE"], algorithm="partition")

# Explain model predictions on 5 examples
data_selected = data['text'][:5].tolist()

shap_values = explainer(data_selected)
shap_values.output_names = ["POSITIVE", "NEGATIVE"]

PartitionExplainer explainer: 6it [00:37,  9.27s/it]                       


In [27]:
print(shap_values)

.values =
array([array([[ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417031e-06],
              [ 9.52417031e-06, -9.52417

In [28]:
shap_values.output_names

['POSITIVE', 'NEGATIVE']

In [29]:
# Visualize data
for i in range(5):
    model_prediction = predict_single_text(data_selected[i], model, device, model_type=MODEL_TYPE)
    print(f"Model prediction: {model_prediction} | Real: {data['tag'][i]}")
    shap.plots.text(shap_values[i])

Model prediction: 0 | Real: 1


  return forward_call(*args, **kwargs)


Model prediction: 0 | Real: 0


Model prediction: 1 | Real: 1


Model prediction: 1 | Real: 1


Model prediction: 0 | Real: 0


In [30]:
# Create a SHAP model explainer
explainer = shap.Explainer(classifier_fn, Tokenizer(), feature_names=["POSITIVE", "NEGATIVE"], algorithm="partition")

# Explain model predictions on simple text example
text = "This movie is the best to make me fall asleep. The actors are boring and the story is terrible."
data_selected = [text]

shap_values = explainer(data_selected)
shap_values.output_names = ["POSITIVE", "NEGATIVE"]

In [31]:
print(shap_values)

.values =
array([[[-0.01744902,  0.01744902],
        [ 0.03173394, -0.03173394],
        [ 0.00447232, -0.00447232],
        [ 0.01136071, -0.01136071],
        [-0.04740088,  0.04740088],
        [ 0.        ,  0.        ],
        [ 0.01269551, -0.01269551],
        [ 0.00906479, -0.00906479],
        [ 0.02287022, -0.02287022],
        [ 0.03600226, -0.03600226],
        [-0.00593634,  0.00593634],
        [ 0.00839313, -0.00839313],
        [ 0.00527795, -0.00527795],
        [ 0.06384645, -0.06384645],
        [ 0.        ,  0.        ],
        [ 0.01054206, -0.01054206],
        [ 0.00466401, -0.00466401],
        [ 0.01820255, -0.01820255],
        [ 0.07884591, -0.07884591]]])

.base_values =
array([[0.32496601, 0.67503399]])

.data =
(array(['This ', 'movie ', 'is ', 'the ', 'best ', 'to ', 'make ', 'me ',
       'fall ', 'asleep ', 'The ', 'actors ', 'are ', 'boring ', 'and ',
       'the ', 'story ', 'is ', 'terrible '], dtype=object),)


In [32]:
shap_values.output_names

['POSITIVE', 'NEGATIVE']

In [33]:
# Visualize data
shap.plots.text(shap_values[0])