In [1]:

import numpy as np
from allennlp_models import pretrained
from allennlp.predictors import Predictor
from allennlp.interpret.saliency_interpreters import SimpleGradient, IntegratedGradient, SmoothGradient
from allennlp.interpret.attackers import Hotflip, input_reduction
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import plotly.express as px
import torch
from transformers import GPT2LMHeadModel, GPT2Tokenizer

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
predictor = pretrained.load_predictor("lm-next-token-lm-gpt2")

lerc is not a registered model.


In [3]:
tokenizer = predictor._dataset_reader._tokenizer.tokenizer

In [144]:

class AutoRegressiveGPT():
    def __init__(self, predictor: Predictor):
        self.predictor = predictor
    
    def get_next_token(self, inputs):
        np.random.seed(42)
        outputs = predictor.predict(inputs)
        probabilities = np.array(outputs["probabilities"])
        probabilities = probabilities / probabilities.sum()
        top_tokens = outputs["top_tokens"]
        top_tokens = np.array(top_tokens).reshape(-1)
        token = np.random.choice(top_tokens, 1, p=probabilities)[0]
        token = token.replace("Ġ", " ")
        token = token.replace("Ċ", "\n")
        token = token.replace("Âł", "\xa0")
        
        return token
    
    def generate(self, inputs, max_length: int =10):
        for _ in tqdm(range(max_length)):
            token = self.get_next_token(inputs)
            inputs += token
                
        return inputs
    
    def generate_and_saliency_score(self, inputs, interpreters: list[Predictor], max_length: int =10):
        grad_matrix_list = [[] for i in range(len(interpreters))]
        for _ in tqdm(range(max_length)):
            #token = self.get_next_token(inputs)
            np.random.seed(42)
            outputs = predictor.predict(inputs)
            probabilities = np.array(outputs["probabilities"])
            probabilities = probabilities / probabilities.sum()
            top_tokens = outputs["top_tokens"]
            top_tokens = np.array(top_tokens).reshape(-1)
            token = np.random.choice(top_tokens, 1, p=probabilities)[0]
            token = token.replace("Ġ", " ")
            token = token.replace("Ċ", "\n")
            token = token.replace("Âł", "\xa0")
            for i, interpreter in enumerate(interpreters):
                interpretation = interpreter.saliency_interpret_from_json({"sentence": inputs})
                grads = np.array(interpretation['instance_1']['grad_input_1'])
                grad_matrix_list[i].append(grads)
            
            inputs += token
                
        return inputs, grad_matrix_list
    
    def get_score_from_grad_matrix(self, grad_matrix: list[list]):
        n_inputs = len(grad_matrix[0])
        n_outputs = len(grad_matrix)

        scores = np.zeros((n_inputs, n_outputs))

        for n in range(n_outputs):
            if n == 0:
                scores[:, n] = grad_matrix[n]
            else:
                sum_ = 0
                for j in range(1,n+1):
                    sum_ += scores[:, n-j]*grad_matrix[n][-j]
                scores[:, n] = grad_matrix[n][:n_inputs] + sum_
        return scores
    
    def get_scores_from_grad_matrix_list(self, grad_matrix_list: list[list]):
        scores_list = []
        for grad_matrix in tqdm(grad_matrix_list):
            scores_list.append(self.get_score_from_grad_matrix(grad_matrix))
            
        return scores_list
    
    def get_padded_grad_matrix(self, grad_matrix_list):
        padded_grad_matix_list = []
        for grad_matrix in grad_matrix_list:
            max_len = len(grad_matrix[-1])
            padded_grad_matrix = [np.pad(grad_vec, (0, max_len-len(grad_vec)), 'constant') for grad_vec in grad_matrix]
            padded_grad_matix_list.append(np.array(padded_grad_matrix))
        
        return padded_grad_matix_list
    
    def get_scores_from_text(self, text: str, interpreters: list[Predictor], max_length: int = 10):
        result, grad_matrix = self.generate_and_saliency_score(text, interpreters, max_length)
        scores_list = self.get_scores_from_grad_matrix_list(grad_matrix)
        padded_grad_matrix_list = self.get_padded_grad_matrix(grad_matrix)
        
        return result, scores_list, padded_grad_matrix_list

In [5]:
class Gpt2Predictor(Predictor):
    """
    The HuggingFace implementation of GPT-2 is not an AllenNLP model;
    however, our demo only expects an AllenNLP ``Predictor``. Accordingly,
    we implement a ``Predictor`` that wraps the HuggingFace GPT-2 implementation.
    """
    def __init__(self,
                 model_name: str = "gpt2",
                 cache_size: int = 0) -> None:
        """
        Each cache element is about 8MB, so size accordingly.
        """
        # Cache stores tuples, so default value is a tuple
        self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
        self._model = GPT2LMHeadModel.from_pretrained(model_name)

        # The end of text marker.
        self.END_OF_TEXT = self.tokenizer.encoder["<|endoftext|>"]


    def get_prob_of_word(self, inputs: dict, word: str) -> float:
        previous_str = inputs["previous"]
        next_str = inputs.get("next")
        topk = inputs.get("topk", 10)

        logits = self._predict(previous_str, next_str)
        probabilities = torch.nn.functional.softmax(logits, dim=0)
        
        word_id = self.tokenizer.encode(word)
        word_prob = probabilities[word_id[0]]
        
        return word_prob

    def predict_json(self, inputs: dict) -> dict:
        previous_str = inputs["previous"]
        next_str = inputs.get("next")
        topk = inputs.get("topk", 10)

        logits = self._predict(previous_str, next_str)
        probabilities = torch.nn.functional.softmax(logits, dim=0)

        best_logits, best_indices = logits.topk(topk)
        best_words = [self.tokenizer.decode([idx.item()])
                      for idx in best_indices]
        best_probabilities = probabilities[best_indices].tolist()
        
        next_str = best_words[0]

        return {
            "logits": best_logits.tolist(),
            "probabilities": best_probabilities,
            "words": best_words,
            "output": previous_str + (next_str or "")
        }

    def _predict(self, previous: str, next: str = None) -> torch.Tensor:

        past_logits, past = (None, None)

        # CASE 1: Previously seen input, no next
        if next is None and past is not None:
            return past_logits

        # CASE 2: Previously seen input, yes next
        elif past is not None:
            token_ids = self.tokenizer.encode(next)
        # CASE 3: Brand new input, no next
        elif next is None:
            token_ids = self.tokenizer.encode(previous)
        # CASE 4: Brand new input, yes next
        else:
            token_ids = self.tokenizer.encode(previous) + self.tokenizer.encode(next)

        inputs = torch.LongTensor([token_ids])

        result = self._model(inputs)
        logits = result.logits.squeeze(dim=0)
        new_token_logit = logits[-1, :]
        key = previous if next is None else previous + next

        return new_token_logit

    def __getitem__(self, index: int) -> str:
        return self.tokenizer.decode([index])

In [131]:
def generate_heat_map_from_scores(scores, tokenizer, result, h=600, w=600):
    tokens = tokenizer.encode(result)
    words = [tokenizer.decode(token) for token in tokens]
    n_input = scores.shape[0]
    n_output = scores.shape[1]
    
    scores_round = np.round(scores, 3)
    
    fig = px.imshow(
            scores_round,
            text_auto=True,
            y=words[:n_input],
            x=words[n_input:],
            color_continuous_scale='Greys',
            width=w, height=h,
            )

    fig.update_yaxes(autorange="reversed")
    fig.update_xaxes(side="top")
    fig.update_layout(
    yaxis = dict(
    tickfont = dict(size=12)))
    fig.update_layout(
    xaxis = dict(
    tickfont = dict(size=12)))
    
    return fig

def generate_heat_map_from_grad_matrix(scores, tokenizer, sample_text, result, h=600, w=600):
    tokens = tokenizer.encode(result)
    words = [tokenizer.decode(token) for token in tokens]
    tokens_input = tokenizer.encode(sample_text)
    words_input = [tokenizer.decode(token) for token in tokens_input]
    
    scores_round = np.round(scores, 3)
    
    fig = px.imshow(
            scores_round,
            text_auto=True,
            y=words[:-1],
            x=words[len(words_input):],
            color_continuous_scale='Greys',
            width=w, height=h,
            )

    fig.update_yaxes(autorange="reversed")
    fig.update_xaxes(side="top")
    fig.update_layout(
    yaxis = dict(
    tickfont = dict(size=12)))
    fig.update_layout(
    xaxis = dict(
    tickfont = dict(size=12)))
    
    return fig

def generate_heat_map_from_raking_scores(scores, sample_text: str, tokenizer, result: str):
    
    tokens = tokenizer.encode(sample_text)
    input_words = [tokenizer.decode(token) for token in tokens]
    
    fig = px.imshow(
                scores.reshape(1, -1),
                text_auto=True,
                y=[""],
                x=input_words,
                color_continuous_scale='Greys',
                width=1000
                )
    fig.update_xaxes(side="top")
    fig.update_xaxes(title_text=result)
    
    return fig

def generate_heat_map_from_weights(scores, sample_text: str, tokenizer, result: str):
    
    tokens = tokenizer.encode(sample_text)
    input_words = [tokenizer.decode(token) for token in tokens]

    tokens_output = tokenizer.encode(result)
    outputs_words = [tokenizer.decode(token) for token in tokens_output]
    
    outputs_words = outputs_words[len(input_words):]
    fig = px.imshow(
                scores.reshape(1, -1),
                text_auto=True,
                y=[""],
                x=outputs_words,
                color_continuous_scale='Greys',
                width=1000
                )
    fig.update_xaxes(side="top")
    fig.update_xaxes(title_text="Output words weights")
    
    return fig

In [27]:
def get_output_words_weigths(sample_text: str, result: str, predictor_hugging: Predictor, null_token: str, add_previous: bool):
    tokens_input = tokenizer.encode(sample_text)
    tokens_output = tokenizer.encode(result)
    words = [tokenizer.decode(token) for token in tokens_output]

    input_words = words[:len(tokens_input)]
    output_words = words[len(tokens_input):]

    add_previous = add_previous

    input_str = ""
    for _ in tokens_input:
        input_str += "|<endoftext>|"
        
    probs = []
    for word in tqdm(output_words):
        prob = predictor_hugging.get_prob_of_word({"previous": input_str}, word)
        probs.append(prob.item())
        if add_previous:
            input_str += word
            
    probs = np.array(probs)
    w = 1/probs
    w = w/w.sum()
    return w

In [8]:
predictor_hugging = Gpt2Predictor()

In [145]:
interpreter_smooth = SmoothGradient(predictor)
interpreter_simple = SimpleGradient(predictor)
interpreter_integrated = IntegratedGradient(predictor)

generator = AutoRegressiveGPT(predictor=predictor)

In [146]:
sample_text = "The first president of USA was George"
result = generator.generate(sample_text, max_length=9)
print(result)

100%|██████████| 9/9 [00:00<00:00, 14.59it/s]

The first president of USA was George Washington, who was a great friend of mine





In [150]:
result, scores_list, grad_matrix_list = generator.get_scores_from_text(
    sample_text, 
    interpreters=[interpreter_simple, interpreter_integrated, interpreter_smooth],
    max_length=9
    )

100%|██████████| 9/9 [01:05<00:00,  7.30s/it]
100%|██████████| 3/3 [00:00<00:00, 596.32it/s]


In [153]:
fig = generate_heat_map_from_grad_matrix(grad_matrix_list[2].T, tokenizer, sample_text, result, h=700)

fig.update_layout(title_text="Smooth Gradient")
fig.show()

In [152]:
fig = generate_heat_map_from_scores(scores_list[2], tokenizer, result)

fig.update_layout(title_text="Smooth Gradient")
fig.show()

In [154]:
w = get_output_words_weigths(sample_text, result, predictor_hugging, "|<endoftext>|", True)
ranking_score = scores_list[2]@w 
fig = generate_heat_map_from_raking_scores(ranking_score, sample_text, tokenizer, result)
fig.show()

  0%|          | 0/9 [00:00<?, ?it/s]

100%|██████████| 9/9 [00:02<00:00,  3.48it/s]


In [155]:
fig = generate_heat_map_from_weights(w, sample_text, tokenizer, result)
fig.show()

In [156]:
w = get_output_words_weigths(sample_text, result, predictor_hugging, "|<endoftext>|", False)
ranking_score = scores_list[2]@w 
fig = generate_heat_map_from_raking_scores(ranking_score, sample_text, tokenizer, result)
fig.show()

100%|██████████| 9/9 [00:01<00:00,  5.62it/s]


In [157]:
fig = generate_heat_map_from_weights(w, sample_text, tokenizer, result)
fig.show()

In [158]:
sample_text = "Climate change has critical consequences such as"
result = generator.generate(sample_text, max_length=4)
print(result)

100%|██████████| 4/4 [00:00<00:00, 10.66it/s]

Climate change has critical consequences such as the loss of biodiversity





In [159]:
result, scores_list, grad_matrix_list = generator.get_scores_from_text(
    sample_text, 
    interpreters=[interpreter_simple, interpreter_integrated, interpreter_smooth],
    max_length=4
    )


Using a non-full backward hook when the forward contains multiple autograd Nodes is deprecated and will be removed in future versions. This hook will be missing some grad_input. Please use register_full_backward_hook to get the documented behavior.

100%|██████████| 4/4 [00:41<00:00, 10.29s/it]
100%|██████████| 3/3 [00:00<00:00, 2026.56it/s]


In [160]:
fig = generate_heat_map_from_grad_matrix(grad_matrix_list[2].T, tokenizer, sample_text, result, h=700)

fig.update_layout(title_text="Smooth Gradient")
fig.show()

In [161]:
fig = generate_heat_map_from_scores(scores_list[2], tokenizer, result)

fig.update_layout(title_text="Smooth Gradient")
fig.show()

In [162]:
w = get_output_words_weigths(sample_text, result, predictor_hugging, "|<endoftext>|", True)
ranking_score = scores_list[2]@w 
fig = generate_heat_map_from_raking_scores(ranking_score, sample_text, tokenizer, result)
fig.show()

100%|██████████| 4/4 [00:00<00:00,  5.24it/s]


In [163]:
fig = generate_heat_map_from_weights(w, sample_text, tokenizer, result)
fig.show()

In [164]:
w = get_output_words_weigths(sample_text, result, predictor_hugging, "|<endoftext>|", False)
ranking_score = scores_list[2]@w 
fig = generate_heat_map_from_raking_scores(ranking_score, sample_text, tokenizer, result)
fig.show()

100%|██████████| 4/4 [00:00<00:00,  8.11it/s]


In [165]:
fig = generate_heat_map_from_weights(w, sample_text, tokenizer, result)
fig.show()