In [None]:
# disable a specific warning
import os

os.environ["TOKENIZERS_PARALLELISM"] = "false"

import pickle
from typing import List, Text, Union

import numpy as np
import pandas as pd
import shap
import tensorflow as tf
import torch
import transformers

#To change the model run, change the name of CURRENT_RUN into one of the models contained in dictionary MODELS.
CURRENT_RUN = "BLOOM"


MODELS = {
    "BLOOM": "bigscience/bloom-560m",
    "GPTNeo": "EleutherAI/gpt-neo-1.3B"
}

TEST_RUN = False
MAX_TEXT_LENGTH = 10_000
MIN_TEXT_LENGTH = 100


# Load text
def load_explanation_dataset() -> List[Text]:
    """Function to extract all texts from the input data CSV-files

    Returns:
        (List[text]): A list of self_texts
    """
    self_texts = pd.read_csv("input_data/combined-set.csv")
    self_texts = self_texts[self_texts["selftext"].str.len() < MAX_TEXT_LENGTH]
    self_texts = self_texts[self_texts["selftext"].str.len() > MIN_TEXT_LENGTH]
    self_texts = self_texts["selftext"]

    # Sample a subset of the training data
    sample_size = 200
    if TEST_RUN:
        sample_size = 10
    self_texts = self_texts.sample(sample_size)

    # Convert to List[Text] and order by length to parse slowes examples first
    self_texts = self_texts.to_list()
    self_texts = sorted(self_texts, key=len, reverse=True)

    return self_texts



def load_prediction_model(model_name: Text) -> tf.keras.Model:
    """Loads a pre-trained prediction model from a folder

    Args:
        model_name (Text): name of the model

    Returns:
        tf.keras.Model: a Keras Model
    """
    model_folder = f"saved_models/{model_name}_aftercorrection/"
    model = tf.keras.models.load_model(model_folder)
    return model


def load_transformer_models(
    model_name: Text,
) -> Union[transformers.PreTrainedTokenizer, transformers.PreTrainedModel]:
    """Load the pretrained embedding model and associated tokenizer.

    Args:
        model_name (Text): name of the base model.

    Returns:
        Union[transformers.PreTrainedTokenizer, transformers.PreTrainedModel]:
            pretrained tokenizer and model from huggingface
    """
    transformer_model = MODELS[model_name]
    tokenizer = transformers.AutoTokenizer.from_pretrained(transformer_model)

    # Neo has no padding token included
    if "neo" in model_name.lower():
        # remediate the lack of provided padding token.
        # Should be filtered by the attention mask.
        tokenizer.pad_token = tokenizer.eos_token

    embedding_model = transformers.AutoModel.from_pretrained(transformer_model)

    return (tokenizer, embedding_model)



This code allows to create the embeddings and calculate SHAP values, and it contains code from Muennighoff, N. (2022). Sgpt: Gpt sentence embeddings for semantic search. 
Accessible at https://arxiv.org/abs/2202.08904. 
Their code has been modiefied and adapted to fulfill the task


In [None]:
def compute_embeddings(
    last_hidden_state: torch.Tensor, attention_mask: torch.Tensor
) -> np.array:
    """Computes the embeddings based on the last hidden state of a model

    This is an implementation of Bi-Encoder symmetric search following this
    paper: https://arxiv.org/pdf/2202.08904.pdf
    Args:
        last_hidden_state (torch.Tensor): last hidden staten of the embedding model.
                has the shape: [batch_size, sequence_length, hidden_dimmension]
        attention_mask (torch.Tensor): attention mask for the sequence.

    Returns:
        np.array: array of the embeddings of shape [batch_size, hidden_dimension]
    """
    # Get weights of shape [bs, seq_len, hid_dim]
    hidden_state_shape = last_hidden_state.shape
    sequence_length = hidden_state_shape[1]

    # Weights will increase linearly for each point in the sequence.
    # For a hidden_sate with batch_size=1, sequence_length=2, hidden_dim=3,
    # it will look like this:
    # weights = torch.Tensor(
    #   [
    #       [[1., 1., 1.],
    #       [2., 2., 2.]]
    #   ]
    # )
    weights = (
        torch.arange(start=1, end=sequence_length + 1)
        .unsqueeze(0)
        .unsqueeze(-1)
        .expand(hidden_state_shape)
        .float()
        .to(last_hidden_state.device)
    )

    # Get attention mask of shape [bs, seq_len, hid_dim]
    input_mask_expanded = (
        attention_mask.unsqueeze(-1).expand(hidden_state_shape).float()
    )

    # Perform weighted mean pooling across seq_len: bs, seq_len, hidden_dim -> bs, hidden_dim
    sum_embeddings = torch.sum(last_hidden_state * input_mask_expanded * weights, dim=1)
    sum_mask = torch.sum(input_mask_expanded * weights, dim=1)

    embeddings = sum_embeddings / sum_mask
    embeddings = np.array(embeddings)
    return embeddings


def load_custom_predict_fn(model_name: Text, return_tokenizer: bool = True):
    """create a predict function with the signature np.array[samples] -> np.array[probabilities]

    Args:
        model_name (Text): name of the model for which to build a prediction function.
        return_tokenizer (bool, optional): whether to return both the prediction
            function and the tokenizer. Defaults to True.

    Returns:
        Tuple: contains either (prediction_function) or (prediction_function, tokenizer)
    """
    tokenizer, embedding_model = load_transformer_models(model_name)
    prediction_model = load_prediction_model(model_name)

    # define a prediction function
    def custom_predict_fn(samples: np.array) -> np.array:
        """Predict function that accepts an np.array and return the prediction.

        Args:
            samples (np.array[str]): np.array of strings of arbitrary batch size,
                passed by the explainer object

        Returns:
            np.array[float]: np.array of floats representing the assigned confidence
                for the positive class. the batch_size (np.array.shape[0] must match with `x`)
        """
        # BUG: Hugging face only accepts List[Str] as input. However, SHAP Explainer passes
        # a np.array[str]. So we must manually convert the array to a string before passing
        # it to the tokenizer.
        samples_as_list = list(samples)
        batch_tokens = tokenizer(
            samples_as_list,
            padding=True,
            truncation=True,
            max_length=MAX_TEXT_LENGTH,
            return_tensors="pt",
        )

        with torch.no_grad():
            # Get hidden state of shape [bs, seq_len, hid_dim]
            last_hidden_state = embedding_model(
                **batch_tokens, output_hidden_states=True
            ).last_hidden_state

        embeddings = compute_embeddings(
            last_hidden_state, batch_tokens["attention_mask"]
        )

        y_pred = prediction_model.predict(embeddings, verbose=0)
        y_pred = y_pred.reshape(-1)

        assert (
            samples.shape[0] == y_pred.shape[0]
        ), "The code should return as many predictions as "

        return y_pred

    if return_tokenizer:
        output = (custom_predict_fn, tokenizer)
    else:
        output = custom_predict_fn
    return output


def save_results(results) -> None:
    """Save results of a Shapley Explainer to a file

    Args:
        results (shap.ExplainerObject): results of the explainer run.
    """
    filename = (
        f"explanations/{'test_' if TEST_RUN else ''}{CURRENT_RUN}_explanations.pickle"
    )

    # Pickle the object to a file
    with open(filename, "wb") as file:
        pickle.dump(results, file)


def get_embeddings():
    """Load the relevant objects, and calculate the SHAP values
    then save the results.

    Main function of the script.
    """
    prediction_fn, tokenizer = load_custom_predict_fn(
        model_name=CURRENT_RUN, return_tokenizer=True
    )

    # build an explainer using a token masker
    explainer = shap.Explainer(prediction_fn, tokenizer)

    # load the dataset for our model
    text_dataset = load_explanation_dataset()

    shap_values = explainer(
        text_dataset,
        fixed_context=1,
    )

    save_results(shap_values)


if __name__ == "__main__":
    get_embeddings()