# Setup

In [None]:
from google.colab import drive
drive.mount("/content/drive")

In [None]:
import locale
def getpreferredencoding(do_setlocale = True):
    return "UTF-8"
locale.getpreferredencoding = getpreferredencoding

In [None]:
!pip install transformers==4.22.2

!pip install statsmodels

!pip install datasets

!pip install -U tensorflow==2.10 

!nvidia-smi

In [None]:
# main libraries
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import time
from collections import defaultdict
from tqdm.autonotebook import tqdm
import spacy
import re
import statsmodels
import statsmodels.api as sm
import scipy

# sklearn
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, accuracy_score, precision_score, recall_score, mean_absolute_percentage_error, r2_score, jaccard_score
from sklearn.linear_model import LinearRegression, Ridge, Lasso
from sklearn.manifold import TSNE
from sklearn.decomposition import PCA
from sklearn.preprocessing import StandardScaler

# specific machine learning functionality
import tensorflow as tf
from tensorflow import keras
from tensorflow.python.keras import backend as K
import datasets
from datasets import Dataset
from datasets import load_from_disk

# Transformers
import transformers
from transformers import (
    BertTokenizer, 
    TFBertForSequenceClassification, 
    TFBertForMaskedLM, 
    TFBertModel,
    #create_optimizer,
    #DataCollatorForLanguageModeling,
    #PreTrainedTokenizerFast
)

In [None]:
# Enable/Disable Eager Execution
# Reference: https://www.tensorflow.org/guide/eager
# TensorFlow's eager execution is an imperative programming environment that evaluates operations immediately, 
# without building graphs

#tf.compat.v1.disable_eager_execution()
#tf.compat.v1.enable_eager_execution()

print("tensorflow version", tf.__version__)
print("keras version", tf.keras.__version__)
print("Eager Execution Enabled:", tf.executing_eagerly())

# Get the number of replicas 
strategy = tf.distribute.MirroredStrategy()
print("Number of replicas:", strategy.num_replicas_in_sync)

devices = tf.config.experimental.get_visible_devices()
print("Devices:", devices)
print(tf.config.experimental.list_logical_devices('GPU'))

print("GPU Available: ", tf.config.list_physical_devices('GPU'))
print("All Physical Devices", tf.config.list_physical_devices())

# Better performance with the tf.data API
# Reference: https://www.tensorflow.org/guide/data_performance
AUTOTUNE = tf.data.experimental.AUTOTUNE

In [None]:
word_dir = "/content/drive/MyDrive/"

# Tokenization

In [None]:
### Tokenization parameters
classifier_name = 'bert-base-uncased'
bert_tokenizer = BertTokenizer.from_pretrained(classifier_name, do_lower_case=True)
batch_size = 8 
AUTOTUNE = tf.data.experimental.AUTOTUNE

In [None]:
### Tokenization function
def tokenize_for_bert_classifier(df, should_shuffle=False):
    # Tokenization
    X_tokenized = bert_tokenizer.batch_encode_plus(
            df["text"],
            return_tensors='tf',
            add_special_tokens = True,
            return_token_type_ids=True,
            padding='max_length',
            max_length=256,
            return_attention_mask = True,
            truncation='longest_first'
    )
    # Creating TF datasets
    dataset = tf.data.Dataset.from_tensor_slices(((X_tokenized["input_ids"],
                                                   X_tokenized["token_type_ids"],
                                                   X_tokenized["attention_mask"]), 
                                                  df["label"]))
    if should_shuffle:
      buffer_train = len(df["text"])
      dataset = dataset.shuffle(buffer_size=buffer_train)
    dataset = dataset.batch(batch_size)
    dataset = dataset.prefetch(buffer_size=AUTOTUNE)
    
    return dataset

# Model

In [None]:
classifier_model = TFBertForSequenceClassification.from_pretrained(word_dir + 'Senior Thesis models/model_classifier_bert_6/temp')

# Single Masking Interpretation

## Implementation

In [None]:
def inv_logit(p):
    return np.exp(p) / (1 + np.exp(p))

gap_untuned_model = TFBertForMaskedLM.from_pretrained("bert-base-uncased")

sp = spacy.load('en_core_web_sm')
all_stopwords = sp.Defaults.stop_words
to_keep = [
    "n't",
    "neither",
    "never",
    "no",
    'noone',
    'nor',
    'not',
    'nothing',
    'n‘t',
    'n’t',
    'only',
    'quite',
    'really',
    'serious',
    'several',
    'still',
    'such',
    'take',
    'too',
    'top',
    'unless',
    'various',
    'very',
    'well',
]
all_stopwords = [word for word in all_stopwords if not word in to_keep]
all_stopwords = set(all_stopwords)
letter_regex = re.compile('[^a-zA-Z]')

In [None]:
def show_replacement_options_and_score(
    original_sentence, 
    verbose = False, 
    classifier = classifier_model,
    gap_filler = gap_untuned_model,
  ):
    # original sentence
    inputs = bert_tokenizer(original_sentence, return_tensors="tf")
    logits = classifier(**inputs).logits
    original_sentence_score = logits[0,0].numpy()
    if verbose:
        print(original_sentence)
        print(f"Original score: {original_sentence_score}")
        print()

    # modefied sentences
    all_words = original_sentence.split()
    word_scores = defaultdict(list)
    for i, word in tqdm(enumerate(all_words), total = min(256, len(all_words))):
        if i > 256:
            break
        word = letter_regex.sub("", word)
        word = word.lower()
        if word in all_stopwords:
            continue
    
    new_sentence = " ".join([temp_word if j!=i else "[MASK]" for (j, temp_word) in enumerate(all_words)])
    # new_sentence = original_sentence.replace(word, "[MASK]")
    inputs = bert_tokenizer(new_sentence, return_tensors="tf")
    logits = gap_filler(**inputs).logits

    # retrieve index of [MASK]
    mask_token_index = tf.where((inputs.input_ids == bert_tokenizer.mask_token_id)[0])
    selected_logits = tf.gather_nd(logits[0], indices=mask_token_index)
    
    # get top predictions
    predicted_token_ids = tf.math.top_k(selected_logits, 10).indices[0]
    options = bert_tokenizer.decode(predicted_token_ids)

    if verbose:
        print(new_sentence)
        print(options)

    # get scores of those predictions
    filled_sentences = []
    for filler_word in options.split():
        new_filled_sentece = original_sentence.replace(word, filler_word)
        filled_sentences.append(new_filled_sentece)
    
    # compute word importance:
    try:
        inputs = bert_tokenizer(
          filled_sentences, 
          return_tensors="tf",
          padding=True,
          #max_length=256,
          truncation=True
        )
        logits = classifier(**inputs).logits
        current_word_score = original_sentence_score - np.mean(logits[:,0].numpy())
        word_scores[word].append(current_word_score)
        if verbose:
            print(f"Sentence Scores: {logits[:,0].numpy()}")
            print(f"Overall Score: {(np.mean(logits[:,0].numpy())):.4f}")
            print(f"Word: {word}")
            print(f"Word importance: {current_word_score:.4f}")
            print()
    except:
        if verbose:
            print("Did not compute")
        word_importance_df = pd.DataFrame(
          {
              "word": word_scores.keys(),
              "importance": [np.mean(temp) for temp in word_scores.values()]
          }
        )
    word_importance_df = word_importance_df.sort_values(by="importance", ignore_index=True)
    return word_importance_df

In [None]:
def show_masking_score(
    original_sentence, 
    verbose = False,
    classifier = classifier_model,
  ):
    # original sentence
    inputs = bert_tokenizer(original_sentence, return_tensors="tf")
    logits = classifier(**inputs).logits
    original_sentence_score = logits[0,0].numpy()
    if verbose:
        print(original_sentence)
        print(f"Original score: {original_sentence_score}")
        print()

    # modefied sentences
    all_words = original_sentence.split()
    word_scores = defaultdict(list)
    for i, word in tqdm(enumerate(all_words), total = min(256, len(all_words))):
        if i > 256:
            break
        word = letter_regex.sub("", word)
        word = word.lower()
        if word in all_stopwords:
            continue
        new_sentence = " ".join([temp_word if j!=i else "" for (j, temp_word) in enumerate(all_words)])
        if verbose:
            print(new_sentence)
    
    # compute word importance:
    try:
        inputs = bert_tokenizer(new_sentence, return_tensors="tf")
        logits = classifier(**inputs).logits
        current_word_score = original_sentence_score - np.mean(logits[:,0].numpy())
        word_scores[word].append(current_word_score)
        if verbose:
            print(f"Sentence Scores: {logits[:,0].numpy()}")
            print(f"Overall Score: {(np.mean(logits[:,0].numpy())):.4f}")
            print(f"Word: {word}")
            print(f"Word importance: {current_word_score:.4f}")
            print()
    except:
        if verbose:
            print("Did not compute")
        word_importance_df = pd.DataFrame(
          {
              "word": word_scores.keys(),
              "importance": [np.mean(temp) for temp in word_scores.values()]
          }
        )
    word_importance_df = word_importance_df.sort_values(by="importance", ignore_index=True)
    return word_importance_df

In [None]:
def get_color(df):
    x = df.copy()
    for i, row in df.iterrows():
        if row["importance"] > 0:
            green_value = min(max(0, row["importance"]*80), 256)
            style = f'background-color: rgb({256 - green_value}, 256, {256 - green_value})'
        else:
            red_value = min(max(0, -row["importance"]*80), 256)
            style = f'background-color: rgb(256, {256 - red_value}, {256 - red_value})'
        x.iloc[i] = style
    return x

In [None]:
def show_both_scores(
    original_sentence, 
    verbose = False, 
    classifier = classifier_model,
    gap_models = [gap_untuned_model],
    descriptions = ["Replacement with un-tuned bert"]
  ):
    inputs = bert_tokenizer(original_sentence, return_tensors="tf")
    logits = classifier_model(**inputs).logits
    original_sentence_score = logits[0,0].numpy()
    print(f"Sentence score: {inv_logit(original_sentence_score):0.4F}")
    masking_df = show_masking_score(original_sentence, verbose, classifier)
    replacement_dfs = [
      show_replacement_options_and_score(original_sentence, 
                                         verbose,
                                         classifier,
                                         gap_model)
      for gap_model in gap_models
    ]
    print("Baseline:")
    display(masking_df.style.apply(get_color, axis=None))
    for description, replacement_df in zip(descriptions, replacement_dfs):
        print(description)
        display(replacement_df.style.apply(get_color, axis=None))

# Multi-Masking Interpretation

## Implementation

In [None]:
lasso_alpha = 0.01
top_k_words = 10
def show_multiple_masking_replacement_score(
        original_sentence, 
        verbose = False, 
        classifier = classifier_model,
        gap_filler = gap_untuned_model,
        n_samples_per_word = 2,
        return_type="table", # table, list, or both
        ignore_first_x_words=0,
    ):
    # original sentence
    inputs = bert_tokenizer(original_sentence, return_tensors="tf")
    logits = classifier(**inputs).logits
    original_sentence_score = logits[0,0].numpy()
    if verbose:
        print(original_sentence)
        print(f"Original score: {original_sentence_score}")
        print()

    # modefied sentences
    all_words = original_sentence.split()
    n_samples = len(all_words) * n_samples_per_word
    word_scores = defaultdict(list)
    X = []
    Y = []
    # replacement_size = int(np.sqrt(len(all_words)) + 1)
    replacement_size = int(len(all_words) * 0.15 + 1)
    sentences = []
    for _ in tqdm(range(n_samples), total = n_samples):
        # Sample masking indices
        word_indices = np.random.choice(
            range(ignore_first_x_words, len(all_words)), 
            size=replacement_size,
            replace = False,
        )
        current_x_row = np.ones(len(all_words))
        for i in word_indices:
            current_x_row[i] = 0
        for _ in range(top_k_words):
            X.append(current_x_row)
        words = [all_words[i] for i in word_indices]
        words = [letter_regex.sub("", word).lower() for word in words]
        new_sentence = " ".join([temp_word if j not in word_indices else "[MASK]" for (j, temp_word) in enumerate(all_words)])

        # get gap filler logits
        inputs = bert_tokenizer(new_sentence, return_tensors="tf")
        logits = gap_filler(**inputs).logits

        # retrieve indices of [MASK]
        mask_token_index = tf.where((inputs.input_ids == bert_tokenizer.mask_token_id)[0])
        selected_logits = tf.gather_nd(logits[0], indices=mask_token_index)

        # get top predictions
        predicted_token_ids = [tf.math.top_k(temp, top_k_words).indices for temp in selected_logits]
        options = [bert_tokenizer.decode(temp) for temp in predicted_token_ids]
        options = [temp.split() for temp in options]
        options = [temp if len(temp) == top_k_words 
                   else temp + ["" for _ in range(top_k_words - len(temp))]
                   for temp in options]

        # get scores of those predictions
        filled_sentences = [new_sentence for _ in range(top_k_words)]
        for i in range(top_k_words):
            for j in range(replacement_size):
                filled_sentences[i] = filled_sentences[i].replace("[MASK]", options[j][i], 1)
            sentences.append(filled_sentences[i])
    
    # compute model outcomes:
    dataset = tokenize_for_bert_classifier(
      pd.DataFrame({
          "text": sentences,
          "label": [True for _ in sentences]
      })
    )
    Y = classifier.predict(dataset).logits
  
    # Train a simple model on the local data
    simple_model = Lasso(lasso_alpha).fit(X, Y)
  
    if return_type == "table" or return_type == "both":
        filtered_words = list(filter(lambda w: w.lower() not in all_stopwords, all_words))
        all_words_unique = [letter_regex.sub("", word).lower() for word in filtered_words]
        all_words_unique = list(set(all_words_unique))
        word_importance_raw = defaultdict(list)
        for i, word in enumerate(all_words):
            word_importance_raw[letter_regex.sub("", word).lower()].append(simple_model.coef_[i])
        word_importance_df = pd.DataFrame(
            {
                "word": all_words_unique,
                "importance": [np.mean(word_importance_raw[temp]) for temp in all_words_unique]
            }
        )
        word_importance_df = word_importance_df.sort_values(by="importance", ignore_index=True)

    if return_type == "list" or return_type == "both":
        word_importance_list = []
        for i, word in enumerate(all_words):
            word_importance_list.append(simple_model.coef_[i])

    if verbose:
        print(f"Selection rates: {np.mean(X, axis=0)}")
        print(f"Outcome mean: {np.mean(Y):0.4f}")
        print(f"Model MSE: {simple_model.score(X, Y):0.4f}")
    print(f"Model MAPE: {mean_absolute_percentage_error(Y, simple_model.predict(X)):0.4f}")
  
    if return_type == "table":
        return word_importance_df
    elif return_type == "list":
        return all_words, word_importance_list
    else:
        return word_importance_df, (all_words, word_importance_list)

In [None]:
def show_multiple_masking_score(
    original_sentence, 
    verbose = False, 
    classifier = classifier_model,
    n_samples_per_word = 5,
    return_type="table", # table, list, or both
    ignore_first_x_words=0,
  ):
  # original sentence
    inputs = bert_tokenizer(original_sentence, return_tensors="tf")
    logits = classifier(**inputs).logits
    original_sentence_score = logits[0,0].numpy()
    if verbose:
        print(original_sentence)
        print(f"Original score: {original_sentence_score}")
        print()

    # modefied sentences
    all_words = original_sentence.split()
    n_samples = len(all_words) * n_samples_per_word
    word_scores = defaultdict(list)
    X = []
    # replacement_size = int(np.sqrt(len(all_words)) + 1)
    replacement_size = int(len(all_words) * 0.15 + 1)
    sentences = []
    for _ in tqdm(range(n_samples), total = n_samples):
        # Sample masking indices
        word_indices = np.random.choice(
            range(ignore_first_x_words, len(all_words)), 
            size=replacement_size,
            replace = False,
        )
        current_x_row = np.ones(len(all_words))
        for i in word_indices:
            current_x_row[i] = 0
        X.append(current_x_row)
        words = [all_words[i] for i in word_indices]
        words = [letter_regex.sub("", word).lower() for word in words]
        new_sentence = " ".join([temp_word if j not in word_indices else "" for (j, temp_word) in enumerate(all_words)])
        sentences.append(new_sentence)
  
    # Get model outcomes
    dataset = tokenize_for_bert_classifier(
      pd.DataFrame({
          "text": sentences,
          "label": [True for _ in sentences]
      })
    )
    Y = classifier.predict(dataset).logits

    # Train a simple model on the local data
    simple_model = Lasso(lasso_alpha).fit(X, Y)

    if return_type == "table" or return_type == "both":
        filtered_words = list(filter(lambda w: w.lower() not in all_stopwords, all_words))
        all_words_unique = [letter_regex.sub("", word).lower() for word in filtered_words]
        all_words_unique = list(set(all_words_unique))
        word_importance_raw = defaultdict(list)
        for i, word in enumerate(all_words):
            word_importance_raw[letter_regex.sub("", word).lower()].append(simple_model.coef_[i])
        word_importance_df = pd.DataFrame(
            {
                "word": all_words_unique,
                "importance": [np.mean(word_importance_raw[temp]) for temp in all_words_unique]
            }
        )
        word_importance_df = word_importance_df.sort_values(by="importance", ignore_index=True)
  
    if return_type == "list" or return_type == "both":
        word_importance_list = []
        for i, word in enumerate(all_words):
            word_importance_list.append(simple_model.coef_[i])
  
    if verbose:
        print(f"Selection rates: {np.mean(X, axis=0)}")
        print(f"Outcome mean: {np.mean(Y):0.4f}")
        print(f"Model MSE: {simple_model.score(X, Y):0.4f}")
    print(f"Model MAPE: {mean_absolute_percentage_error(Y, simple_model.predict(X)):0.4f}")
  
    if return_type == "table":
        return word_importance_df
    elif return_type == "list":
        return all_words, word_importance_list
    else:
        return word_importance_df, (all_words, word_importance_list)

In [None]:
def format_color_style(word, score):
    if score < 0.1 and score > -0.1:
        return word
    elif score >= 0.1 and score < 0.8:
        return f'\x1b[1;37;46m {word} \x1b[0m'
    elif score >= 0.8:
        return f'\x1b[1;37;42m {word} \x1b[0m'
    elif score <= -0.1 and score > -0.8:
        return f'\x1b[1;37;45m  {word} \x1b[0m'
    else:
        return f'\x1b[1;37;41m  {word} \x1b[0m'

def show_multi_masking_both_scores(
    original_sentence, 
    verbose = False, 
    classifier = classifier_model,
    gap_models = [gap_untuned_model],
    descriptions = ["Replacement with un-tuned bert"],
    show_colored_text = False,
    masking_sample_size = 25,
    replacement_sample_size = 5,
    ignore_first_x_words=0,
  ):
    inputs = bert_tokenizer(original_sentence, return_tensors="tf")
    logits = classifier_model(**inputs).logits
    original_sentence_score = logits[0,0].numpy()
    print(f"Sentence score: {inv_logit(original_sentence_score):0.4F}")
    if not show_colored_text:
        masking_df = show_multiple_masking_score(original_sentence, 
                                                 verbose, 
                                                 classifier,
                                                 n_samples_per_word = masking_sample_size,
                                                 ignore_first_x_words = ignore_first_x_words)
        replacement_dfs = [
            show_multiple_masking_replacement_score(
                original_sentence, 
                verbose,
                classifier,
                gap_model,
                n_samples_per_word = replacement_sample_size,
                ignore_first_x_words = ignore_first_x_words)
            for gap_model in gap_models
        ]
        print("Baseline:")
        display(masking_df.style.apply(get_color, axis=None))
        for description, replacement_df in zip(descriptions, replacement_dfs):
            print(description)
            display(replacement_df.style.apply(get_color, axis=None))
    else:
        masking_df, (words, masking_list) = show_multiple_masking_score(
            original_sentence, 
            verbose, 
            classifier,
            return_type = "both",
            n_samples_per_word = masking_sample_size,
            ignore_first_x_words = ignore_first_x_words,
            )
        replacement_data = [
            show_multiple_masking_replacement_score(
                original_sentence, 
                verbose,
                classifier,
                gap_model,
                return_type = "both",
                n_samples_per_word = replacement_sample_size,
                ignore_first_x_words = ignore_first_x_words,
                )
            for gap_model in gap_models
        ]
        replacement_dfs = [temp[0] for temp in replacement_data]
        replacement_lists = [temp[1][1] for temp in replacement_data]
        print("Baseline:")
        display(masking_df.style.apply(get_color, axis=None))
        masking_sentence = ' '.join([
            format_color_style(word, score) 
            for word, score in zip(words, masking_list)
        ])
        print(masking_sentence)

        for description, replacement_df, replacement_list in zip(descriptions, 
                                                                   replacement_dfs, 
                                                                   replacement_lists
                                                                   ):
            print(description)
            display(replacement_df.style.apply(get_color, axis=None))
            replacement_sentence = ' '.join([
                format_color_style(word, score) 
                for word, score in zip(words, replacement_list)
            ])
            print(replacement_sentence)

## Positive Examples

In [None]:
qadataset_test[0]

In [None]:
show_multi_masking_both_scores(qadataset_test[0]['text'], verbose=True, show_colored_text = True)

In [None]:
qadataset_test[200]

In [None]:
show_multi_masking_both_scores(qadataset_test[200]['text'], verbose=True, show_colored_text=True)

In [None]:
qadataset_test[50]

In [None]:
show_multi_masking_both_scores(qadataset_test[50]['text'], verbose=True, show_colored_text=True)

## Negative Examples

In [None]:
qadataset_test[-1]

In [None]:
show_multi_masking_both_scores(qadataset_test[-1]['text'], verbose=True, show_colored_text=True)

In [None]:
qadataset_test[-2]

In [None]:
show_multi_masking_both_scores(qadataset_test[-2]['text'], verbose=True, show_colored_text=True)

In [None]:
qadataset_test[-100]

In [None]:
show_multi_masking_both_scores(qadataset_test[-100]['text'], verbose=True, show_colored_text=True)