In [3]:
from math import sqrt

from bertviz import head_view
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show
import torch
from torch import nn
import torch.nn.functional as F
from transformers import AutoConfig, AutoModel, AutoTokenizer

In [4]:
model_ckpt = 'bert-base-uncased'
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
model = BertModel.from_pretrained(model_ckpt)

100%|███████████████████████████████████████| 433/433 [00:00<00:00, 277399.36B/s]
100%|██████████████████████████| 440473133/440473133 [00:46<00:00, 9499765.82B/s]


In [5]:
text = 'time flies like an arrow'
show(
    model, 
    'bert',
    tokenizer,
    text,
    display_mode='light',
    layer=0,
    head=8)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [6]:
inputs = tokenizer(text, return_tensors='pt', add_special_tokens=False)
inputs.input_ids

tensor([[ 2051, 10029,  2066,  2019,  8612]])

In [7]:
config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb

Embedding(30522, 768)

In [8]:
input_embeds = token_emb(inputs.input_ids)
input_embeds.size() # one batch of our 5 words in a 768-vector encoding

torch.Size([1, 5, 768])

In [9]:
query = key = value = input_embeds
dim_k = key.size(-1)
scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
scores.size()

torch.Size([1, 5, 5])

In [10]:
scores

tensor([[[27.2561, -0.0535,  0.4166,  0.4023, -1.4967],
         [-0.0535, 27.7976, -0.0847, -0.1807,  0.4573],
         [ 0.4166, -0.0847, 28.8033, -0.6257, -0.4174],
         [ 0.4023, -0.1807, -0.6257, 28.6213, -0.1005],
         [-1.4967,  0.4573, -0.4174, -0.1005, 30.3827]]],
       grad_fn=<DivBackward0>)

In [11]:
weights = F.softmax(scores, dim=-1)
weights

tensor([[[1.0000e+00, 1.3791e-12, 2.2067e-12, 2.1754e-12, 3.2569e-13],
         [8.0240e-13, 1.0000e+00, 7.7778e-13, 7.0654e-13, 1.3374e-12],
         [4.6969e-13, 2.8451e-13, 1.0000e+00, 1.6564e-13, 2.0399e-13],
         [5.5544e-13, 3.1005e-13, 1.9870e-13, 1.0000e+00, 3.3596e-13],
         [1.4287e-14, 1.0083e-13, 4.2043e-14, 5.7720e-14, 1.0000e+00]]],
       grad_fn=<SoftmaxBackward0>)

In [12]:
weights.sum(dim=-1)

tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)

In [13]:
attn_outputs = torch.bmm(weights, value)
attn_outputs.shape

torch.Size([1, 5, 768])

In [14]:
# wrap it up
def scaled_dot_product_attention(query, key, value):
    dim_k = key.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    attn = torch.bmm(weights, value)
    return attn

In [15]:
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)
        
    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state),
            self.k(hidden_state),
            self.v(hidden_state))
        return attn_outputs

In [16]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        n_heads = config.num_attention_heads
        head_dim = embed_dim // n_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(n_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)
        
    def forward(self, hidden_state):
        x = torch.cat([head(hidden_state) for head in self.heads],
                      dim=-1)
        x = self.output_linear(x)
        return x

In [17]:
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(input_embeds)
attn_output.size() # batch, words, embedding vec

torch.Size([1, 5, 768])

In [18]:
mod = AutoModel.from_pretrained(model_ckpt, output_attentions=True)

Some weights of the model checkpoint at bert-base-uncased were not used when initializing BertModel: ['cls.seq_relationship.bias', 'cls.predictions.transform.dense.bias', 'cls.seq_relationship.weight', 'cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.predictions.bias', 'cls.predictions.transform.LayerNorm.bias', 'cls.predictions.decoder.weight']
- This IS expected if you are initializing BertModel from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertModel from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).


In [19]:
sent_a = 'time flies like an arrow'
sent_b = 'fruit flies like a banana'

viz_inputs = tokenizer(sent_a, sent_b, return_tensors='pt')
attention = mod(**viz_inputs).attentions
sent_b_start = (viz_inputs.token_type_ids == 0).sum(dim=1)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs.input_ids[0])

In [21]:
head_view(attention, tokens, sent_b_start, heads=[8])

<IPython.core.display.Javascript object>

In [22]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.lin1 = nn.Linear(config.hidden_size,
                              config.intermediate_size)
        self.lin2 = nn.Linear(config.intermediate_size,
                              config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, x):
        x = self.lin1(x)
        x = self.gelu(x)
        x = self.lin2(x)
        x = self.dropout(x)
        return x

In [23]:
ff = FeedForward(config)
ff_outputs = ff(attn_outputs)
ff_outputs.size()

torch.Size([1, 5, 768])

In [24]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)
        
    def forward(self, x):
        # Apply layer norm, then copy into q, k, v
        hidden_state = self.layer_norm1(x)
        # Attention with skip
        x = x + self.attention(hidden_state)
        # ...and FF with skip
        x = x + self.feed_forward(self.layer_norm2(x))
        return x

In [25]:
encoder_layer = TransformerEncoderLayer(config)
input_embeds.shape, encoder_layer(input_embeds).size()

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

In [26]:
# incorporate positional encodings
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size,
                                             config.hidden_size)
        self.positional_embeddings = nn.Embedding(
            config.max_position_embeddings, config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()
        
    def forward(self, input_ids):
        seq_len = input_ids.size(1)
        position_ids = (torch
                        .arange(seq_len, dtype=torch.long)
                        .unsqueeze(0))
        token_embeddings = self.token_embeddings(input_ids)
        positional_embeddings = self.positional_embeddings(position_ids)
        embeddings = token_embeddings + positional_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [27]:
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()

torch.Size([1, 5, 768])

In [28]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList(
            [TransformerEncoderLayer(config) 
             for _ in range(config.num_hidden_layers)])
        
    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

In [29]:
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

torch.Size([1, 5, 768])

In [32]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(
            config.hidden_size,
            config.num_labels)
        
    def forward(self, x):
        # Select hidden state of [CLS] token
        x = self.encoder(x)[:, 0, :]
        x = self.dropout(x)
        x = self.classifier(x)
        return x

In [33]:
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])

In [34]:
seq_len = inputs.input_ids.size(-1)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0)
mask[0]

tensor([[1., 0., 0., 0., 0.],
        [1., 1., 0., 0., 0.],
        [1., 1., 1., 0., 0.],
        [1., 1., 1., 1., 0.],
        [1., 1., 1., 1., 1.]])

In [35]:
scores.masked_fill(mask == 0, -float('inf'))

tensor([[[27.2561,    -inf,    -inf,    -inf,    -inf],
         [-0.0535, 27.7976,    -inf,    -inf,    -inf],
         [ 0.4166, -0.0847, 28.8033,    -inf,    -inf],
         [ 0.4023, -0.1807, -0.6257, 28.6213,    -inf],
         [-1.4967,  0.4573, -0.4174, -0.1005, 30.3827]]],
       grad_fn=<MaskedFillBackward0>)

In [36]:
def scaled_dot_product_attention(query, key, value, mask=None):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float('-inf'))
    weights = F.softmax(scores, dim=-1)
    return weights.bmm(value)