In [None]:
!pip -q install datasets sentencepiece  
!pip -q install git+https://github.com/ArnoutHillen/transformers.git # contains the modified transformers (returns the value vectors)

from google.colab import drive
drive.mount("/content/drive")

[K     |████████████████████████████████| 163kB 7.4MB/s 
[K     |████████████████████████████████| 1.1MB 11.6MB/s 
[K     |████████████████████████████████| 245kB 20.6MB/s 
[K     |████████████████████████████████| 17.7MB 208kB/s 
[?25h  Installing build dependencies ... [?25l[?25hdone
  Getting requirements to build wheel ... [?25l[?25hdone
    Preparing wheel metadata ... [?25l[?25hdone
[K     |████████████████████████████████| 2.9MB 8.2MB/s 
[K     |████████████████████████████████| 890kB 32.8MB/s 
[?25h  Building wheel for transformers (PEP 517) ... [?25l[?25hdone
  Building wheel for sacremoses (setup.py) ... [?25l[?25hdone
Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Preprocessor classes

In [None]:
import numpy as np
import spacy
import torch

from typing import List, Set, Dict, Optional
from datasets import load_from_disk

from transformers import (
    PreTrainedTokenizer, PreTrainedModel,
    BertTokenizer, BertModel, 
    GPT2Tokenizer, GPT2Model, 
    XLNetTokenizer, XLNetModel, 
    ElectraTokenizer, ElectraModel
)


class Preprocessor(object):
    """
    The dataset should contain the following columns:
    - "text"
    """

    def __init__(self, model: PreTrainedModel, tokenizer: PreTrainedTokenizer,
                 special_tokens: Optional[Set[str]] = None,
                 allignment_mapping: Optional[Dict[str, str]] = None, pre_token_chs: Optional[str] = None,
                 nlp=spacy.load("en_core_web_sm")):
        self.model = model
        self.tokenizer = tokenizer
        self.special_tokens = special_tokens if special_tokens is not None else set()
        self.allignment_mapping = allignment_mapping if allignment_mapping is not None else dict()
        self.pre_token_chs = pre_token_chs if pre_token_chs is not None else str()
        self.nlp = nlp

    def allignment(self, model_tokens: List[str], spacy_tokens: List[str]) -> List[int]:
        allignment_list = [-1 for _ in range(len(model_tokens))]
        prev_i = -1
        for j, model_token in enumerate(model_tokens):
            t = model_token
            if t in self.special_tokens:
                continue
            else:
                if t[:len(self.pre_token_chs)] == self.pre_token_chs:
                    t = t[len(self.pre_token_chs):]
                if t in self.allignment_mapping.keys():
                    t = self.allignment_mapping[t]
                for i, spacy_token in enumerate(spacy_tokens):
                    if t in spacy_token and i >= prev_i:
                        spacy_index = i
                        prev_i = i
                        break
                allignment_list[j] = spacy_index
        return allignment_list

    def add_tokens(self, example: dict) -> dict:
        inputs = self.tokenizer(example["text"], return_tensors="pt")
        return {"tokens": self.tokenizer.convert_ids_to_tokens(inputs["input_ids"].squeeze())}

    def add_attention(self, example: dict) -> dict:
        inputs = self.tokenizer(example["text"], return_tensors="pt")
        outputs = self.model(**inputs)
        attention = outputs["attentions"]
        attention = torch.stack(attention).squeeze()
        return {"attention": attention.detach().numpy()}

    def add_pos_tags(self, example: dict) -> dict:
        tokens = example["tokens"]
        spacy_tokens = self.nlp(example["text"])
        spacy_text_tokens = [token.text for token in spacy_tokens]
        allignment_list = self.allignment(tokens, spacy_text_tokens)
        spacy_pos_tags = [token.pos_ for token in spacy_tokens]
        pos_tags = [spacy_pos_tags[i] if i != -1 else "-1" for i in allignment_list]
        return {"pos": pos_tags}

    def add_norms(self, example: dict) -> dict:
        inputs = self.tokenizer(example["text"], return_tensors="pt")
        outputs = self.model(**inputs)
        values = outputs["values"]
        values = torch.stack(values).squeeze()
        values = values.detach().numpy()
        dense = outputs["dense"]
        print(type(values))
        print(type(dense))
        norms = np.linalg.norm(values, axis=-1)
        return {"norms": norms}


class BertPreprocessorBase(Preprocessor):
    def __init__(self, model, tokenizer):
        super().__init__(
            model=model,
            tokenizer=tokenizer,
            special_tokens={"[CLS]", "[SEP]"},
            allignment_mapping={
                "your": "you",
                "im": "i",
                "isn": "is",
                "don": "do",
                "aren": "are",
                "id": "i",
                "can": "ca",
                "iv": "i"
            },
            pre_token_chs="##",
        )


class BertPreprocessor(BertPreprocessorBase):
    def __init__(self):
        super().__init__(
            model=BertModel.from_pretrained("bert-base-cased", output_attentions=True, output_values=True, output_dense=True),
            tokenizer=BertTokenizer.from_pretrained("bert-base-cased"),
        )


class GPT2Preprocessor(Preprocessor):
    def __init__(self):
        super().__init__(
            model=GPT2Model.from_pretrained("gpt2", output_attentions=True, output_values=True, output_dense=True),
            tokenizer=GPT2Tokenizer.from_pretrained("gpt2"),
            special_tokens=None,
            allignment_mapping={
                "didnt": "did",
                "hes": "he",
                "im": "i",
                "cant": "ca",
                "arent": "are",
                "id": "i",
                "ive": "i"
            },
            pre_token_chs="Ġ",
        )


class XLNetPreprocessor(Preprocessor):
    def __init__(self):
        super().__init__(
            model=XLNetModel.from_pretrained("xlnet-base-cased", output_attentions=True, output_values=True),  #, output_dense=True),
            tokenizer=XLNetTokenizer.from_pretrained("xlnet-base-cased"),
            special_tokens={"<sep>", "<cls>"},
            allignment_mapping={
                "im": "i",
                "didn": "did",
                "isn": "is",
                "don": "do",
                "can": "ca"
            },
            pre_token_chs="▁",
        )


class ElectraPreprocessor(BertPreprocessorBase):
    def __init__(self):
        super().__init__(
            model=ElectraModel.from_pretrained("google/electra-base-discriminator", output_attentions=True, output_values=True, output_dense=True),
            tokenizer=ElectraTokenizer.from_pretrained("google/electra-base-discriminator")
        )


# Applying the preprocessors


## BERT

In [None]:
dataset_b = load_from_disk("./drive/My Drive/Datasets/bookcorpus/bookcorpus (1000 samples, train)")
preprocessor_b = BertPreprocessor()
#dataset_b = dataset_b.map(preprocessor.add_tokens)
#dataset_b = dataset_b.map(preprocessor.add_attention)
#dataset_b = dataset_b.map(preprocessor.add_pos_tags)
#dataset_b = dataset_b.map(preprocessor.add_value_norms)
#dataset_b.save_to_disk("./drive/My Drive/Attention/bookcorpus (1000 samples, train)/bert")

HBox(children=(FloatProgress(value=0.0, description='Downloading', max=433.0, style=ProgressStyle(description_…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=435779157.0, style=ProgressStyle(descri…




HBox(children=(FloatProgress(value=0.0, description='Downloading', max=213450.0, style=ProgressStyle(descripti…




## GPT-2


In [None]:
dataset_g = load_from_disk("./drive/My Drive/Datasets/bookcorpus/bookcorpus (1000 samples, train)")
preprocessor_g = GPT2Preprocessor()
dataset_g = dataset_g.map(preprocessor.add_tokens)
dataset_g = dataset_g.map(preprocessor.add_attention)
dataset_g = dataset_g.map(preprocessor.add_pos_tags)
dataset_g = dataset_g.map(preprocessor.add_value_norms)
dataset_g.save_to_disk("./drive/My Drive/Attention/bookcorpus (1000 samples, train)/gpt2")

## XLNet

In [None]:
dataset_x = load_from_disk("./drive/My Drive/Datasets/bookcorpus/bookcorpus (1000 samples, train)")
preprocessor_x = XLNetPreprocessor()
dataset_x = dataset_x.map(preprocessor.add_tokens)
dataset_x = dataset_x.map(preprocessor.add_attention)
dataset_x = dataset_x.map(preprocessor.add_pos_tags)
dataset_x = dataset_x.map(preprocessor.add_value_norms)
dataset_x.save_to_disk("./drive/My Drive/Attention/bookcorpus (1000 samples, train)/xlnet")

## ELECTRA


In [None]:
dataset_e = load_from_disk("./drive/My Drive/Datasets/bookcorpus/bookcorpus (1000 samples, train)")
preprocessor_e = ElectraPreprocessor()
dataset_e = dataset_e.map(preprocessor.add_tokens)
dataset_e = dataset_e.map(preprocessor.add_attention)
dataset_e = dataset_e.map(preprocessor.add_pos_tags)
dataset_e = dataset_e.map(preprocessor.add_value_norms)
dataset_e.save_to_disk("./drive/My Drive/Attention/bookcorpus (1000 samples, train)/electra")

# A look at the datasets

In [None]:
sample = 0

## BERT

In [None]:
dataset_sample = dataset_b[sample]
print("text", dataset_sample["text"])
print("tokens", dataset_sample["tokens"])
print("pos" ,dataset_sample["pos"])
print("attention", np.array(dataset_sample["attention"]).shape)
print("value norms", np.array(dataset_sample["value_norms"]).shape)

## GPT-2

In [None]:
dataset_sample = dataset_g[sample]
print("text", dataset_sample["text"], sep="\t")
print("tokens", dataset_sample["tokens"], sep="\t")
print("pos" ,dataset_sample["pos"], sep="\t")
print("attention", np.array(dataset_sample["attention"]).shape)
print("value norms", np.array(dataset_sample["value_norms"]).shape)

## XLNet

In [None]:
dataset_sample = dataset_x[sample]
print("text", dataset_sample["text"], sep="\t")
print("tokens", dataset_sample["tokens"], sep="\t")
print("pos" ,dataset_sample["pos"], sep="\t")
print("attention", np.array(dataset_sample["attention"]).shape)
print("value norms", np.array(dataset_sample["value_norms"]).shape)

## ELECTRA

In [None]:
dataset_sample = dataset_e[sample]
print("text", dataset_sample["text"], sep="\t")
print("tokens", dataset_sample["tokens"], sep="\t")
print("pos" ,dataset_sample["pos"], sep="\t")
print("attention", np.array(dataset_sample["attention"]).shape)
print("value norms", np.array(dataset_sample["value_norms"]).shape)

In [49]:
row = 0
inputs = preprocessor_b.tokenizer(dataset_b[row]["text"], return_tensors="pt")
outputs = preprocessor_b.model(**inputs)
values = outputs["values"]
values = torch.stack(values).squeeze()
values = values.detach()
dense = outputs["dense"]
print(values.shape)

all_head_size = 12 * 64
num_attention_heads = 12
attention_head_size = 64

# dense weight is converted to (num_heads, head_size, all_head_size)
# dense = torch.tensor([dense[i].view()])

dense_t = dense_t.view(all_head_size, num_attention_heads, attention_head_size)
dense_t = dense_t.permute(1, 2, 0).contiguous()
print(type(dense_t))
print(dense_t.shape)
print(torch.einsum("lhtd,hde->lhte", ).shape)

norms = np.linalg.norm(values, axis=-1)


torch.Size([12, 12, 35, 64])
<class 'torch.Tensor'>
torch.Size([12, 64, 768])


RuntimeError: ignored

In [None]:
# attention_probs: (batch, num_heads, seq_length, seq_length)
# value_layer: (batch, num_heads, seq_length, head_size)
# dense: nn.Linear(all_head_size, all_head_size)

inputs = preprocessor_b.tokenizer(dataset_b[row]["text"], return_tensors="pt")
outputs = preprocessor_b.model(**inputs)
values = outputs["values"]
values = torch.stack(values).squeeze()
values = values.detach()
dense = outputs["dense"]

value_layer = value_layer.permute(0, 2, 1, 3).contiguous()
value_shape = value_layer.size()
value_layer = value_layer.view(value_shape[:-1] + (1, value_shape[-1],))

# dense weight is converted to (num_heads, head_size, all_head_size)
dense = dense.weight
dense = dense.view(self.all_head_size, self.num_attention_heads, self.attention_head_size)
dense = dense.permute(1, 2, 0).contiguous()

# Make transformed vectors f(x) from Value vectors (value_layer) and weight matrix (dense).
transformed_layer = value_layer.matmul(dense)
transformed_shape = transformed_layer.size() #(batch, seq_length, num_heads, 1, all_head_size)
transformed_layer = transformed_layer.view(transformed_shape[:-2] + (transformed_shape[-1],))
transformed_layer = transformed_layer.permute(0, 2, 1, 3).contiguous() 
transformed_shape = transformed_layer.size() #(batch, num_heads, seq_length, all_head_size)
transformed_norm = torch.norm(transformed_layer, dim=-1)

# Make weighted vectors αf(x) from transformed vectors (transformed_layer) and attention weights (attention_probs).
weighted_layer = torch.einsum('bhks,bhsd->bhksd', attention_probs, transformed_layer) #(batch, num_heads, seq_length, seq_length, all_head_size)
weighted_norm = torch.norm(weighted_layer, dim=-1)

# Sum each αf(x) over all heads: (batch, seq_length, seq_length, all_head_size)
summed_weighted_layer = weighted_layer.sum(dim=1)

# Calculate L2 norm of summed weighted vectors: (batch, seq_length, seq_length)
summed_weighted_norm = torch.norm(summed_weighted_layer, dim=-1)

[[ 0  1  2]
 [ 3  4  5]
 [ 6  7  8]
 [ 9 10 11]]


array([ 0,  1,  2,  3,  4,  5,  6,  7,  8,  9, 10, 11])