In [1]:
# !pip install transformers
# !pip install bertviz
# !pip install pytorch

In [2]:
# Text example (For use throughout notebook)

text = "I am a big fan of pugs"

In [3]:
from transformers import AutoTokenizer
from bertviz.transformers_neuron_view import BertModel
from bertviz.neuron_view import show

In [4]:
# Visualizing the concept of self-attention and how it works

model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
model = BertModel.from_pretrained(model_ckpt)
show(model, "bert", tokenizer, text, display_mode="light", layer=0, head=0)

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [5]:
# Tokenize our input text
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)
inputs.input_ids

tensor([[ 1045,  2572,  1037,  2502,  5470,  1997, 16405,  5620]])

In [6]:
# Get all the token embeddings available in the Bert Model vocabulary

from torch import nn
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckpt)
token_emb = nn.Embedding(config.vocab_size, config.hidden_size)
token_emb

Embedding(30522, 768)

In [7]:
# Creating token embeddings for our text

input_embeds = token_emb(inputs.input_ids)
input_embeds.size()

torch.Size([1, 8, 768])

In [8]:
# Simplified implementation of the self-attention
# For conceptualization only not for practice

import torch
from math import sqrt

# Multiple our query and keys matrices, and scale them
query = key = value = input_embeds
dim_k = key.size(-1)
scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
print(scores.size())

# Apply softmax to output
import torch.nn.functional as F

weights = F.softmax(scores, dim=1)
print(weights.sum(dim=-1))

# Multiply the outputs by our value matrix
attn_outputs = torch.bmm(weights, value)
print(attn_outputs.shape)

torch.Size([1, 8, 8])
tensor([[1., 1., 1., 1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)
torch.Size([1, 8, 768])


In [9]:
# Now let's wrap that logic into a function that we can use for self-attention later

def scaled_dot_product_attention(query, key, value):
    dim_k = query.size(-1)
    scores = torch.bmm(query, key.transpose(1,2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=1)
    attn_outputs = torch.bmm(weights, value)
    return attn_outputs

In [10]:
# Implementation of a single Attention Head

class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)
        
    def forward(self, hidden_state):
        attn_outputs = scaled_dot_product_attention(
        self.q(hidden_state), self.k(hidden_state), self.v(hidden_state)
        )
        return attn_outputs

In [11]:
# Implementation of Multi-Head Attention

class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList(
            [AttentionHead(embed_dim, head_dim) for _ in range(num_heads)]
        )
        self.output_linear = nn.Linear(embed_dim, embed_dim)
        
    def forward(self, hidden_state):
        x = torch.cat([h(hidden_state) for h in self.heads], dim=-1)
        x = self.output_linear(x)
        return x

In [12]:
# Testing out passing our text through the Multi-Head Attention Block
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(input_embeds)
attn_output.size()

# BOOM! The output matched our desired matrix size of [batch_size, tokens, embed_dim]

torch.Size([1, 8, 768])

In [13]:
# Feed-Forward Implementation

class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear_1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.linear_2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.gelu = nn.GELU()
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self, x):
        x = self.linear_1(x)
        x = self.gelu(x)
        x = self.linear_2(x)
        x = self.dropout(x)
        return x

In [14]:
# Test out passing our attention outputs from MultiHeadAttention class to the FeedForward Block
feed_forward = FeedForward(config)
ff_outputs = feed_forward.forward(attn_outputs)
ff_outputs.size()

# BOOM! The output matched our desired matrix size of [batch_size, tokens, embed_dim]

torch.Size([1, 8, 768])

In [43]:
# Combining our Multi-Head Attenion Block, FeedForward, and Layer Normalization & Skip connections

class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size)
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)
        
    def forward(self, x):
        # Apply Layern normalization and then copy input into query, key, value
        hidden_state = self.layer_norm_1(x)
        # Apply attention with a skip connection
        x = x + self.attention(hidden_state)
        # Apply feed-forward layer with a skip connection
        x = x + self.feed_forward(self.layer_norm_2(x))
        return x

In [44]:
# Test this TransformerEncoderLayer with our input embeddings
encoder_layer = TransformerEncoderLayer(config)
encoder_layer.forward(input_embeds).size()

# BOOM! The output matched our desired matrix size of [batch_size, tokens, embed_dim]

torch.Size([1, 8, 768])

In [45]:
# Now let's create our embeddings class for our token & positional embeds

class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size,
                                            config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings,
                                            config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size, eps=1e-12)
        self.dropout = nn.Dropout()
        
    def forward(self, input_ids):
        # Create position Ids for input sequence
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        # Create token and position embeddings
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # Combine token and position embeddings
        embeddings = token_embeddings + position_embeddings
        embeddings = self.layer_norm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [46]:
# Test out our embeddings layer
embedding_layer = Embeddings(config)
embedding_layer.forward(inputs.input_ids).size()

# BOOM! The output matched our desired matrix size of [batch_size, tokens, embed_dim]

torch.Size([1, 8, 768])

In [50]:
# Now let's put it all together to make a Transformer Encoder

class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embeddings = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config)
                                     for _ in range(config.num_hidden_layers)])
        
    def forward(self, x):
        x = self.embeddings(x)
        for layer in self.layers:
            x = layer(x)
        return x

In [51]:
# Test out entire Transformer Encoder
encoder = TransformerEncoder(config)
encoder.forward(inputs.input_ids).size()

# BOOM! The output matched our desired matrix size of [batch_size, tokens, embed_dim]

torch.Size([1, 8, 768])