In [1]:
import torch 
import numpy as np 
import torch.nn.functional as F
import torch.nn as nn 
import math
# Scaled dot product 
def scaled_dot_product_attention(query , key , value) : 
    dim_k = query.size(-1)
    scores = torch.bmm(query , key.transpose(1,2)) /math.sqrt(dim_k)
    weights = F.softmax(scores , dim=-1)
    return torch.bmm(weights , value)

# Single Attention Head 
class AttentionHead(nn.Module) : 
    def __init__(self , embed_dim , head_dim) : 
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim) 
        self.k = nn.Linear(embed_dim, head_dim) 
        self.v = nn.Linear(embed_dim, head_dim)
    
    def forward(self,hidden_state) : 
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_state) , 
            self.k(hidden_state) , 
            self.v(hidden_state)
        )
        
        return attn_outputs
    

# MultiHeadAttention
class MultiHeadAttention(nn.Module): 
    def __init__(self, config): 
        super().__init__() 
        embed_dim = config.hidden_size 
        num_heads = config.num_attention_heads 
        head_dim = embed_dim // num_heads 
        self.heads = nn.ModuleList( 
        [AttentionHead(embed_dim, head_dim) for _ in 
        range(num_heads)] 
        ) 
        self.output_linear = nn.Linear(embed_dim, embed_dim) 
    def forward(self, hidden_state): 
        x = torch.cat([h(hidden_state) for h in self.heads], 
        dim=-1) 
        x = self.output_linear(x) 
        return x


        

In [2]:
### testing 
from transformers import AutoConfig , AutoTokenizer 
model_ckpt = "bert-base-uncased"
tokenizer = AutoTokenizer.from_pretrained(model_ckpt)
config = AutoConfig.from_pretrained(model_ckpt)
text = "time flies like an arrow"
inputs = tokenizer(text , return_tensors="pt", add_special_tokens=False)
token_embed = nn.Embedding(config.vocab_size , config.hidden_size)
inputs_embeds = token_embed(inputs.input_ids)
print(f"input embedding shape of '{text}' is : {inputs_embeds.shape}")

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]



input embedding shape of 'time flies like an arrow' is : torch.Size([1, 5, 768])


In [3]:
# test multihead attention 
multihead_attn = MultiHeadAttention(config)
attn_output = multihead_attn(inputs_embeds)
attn_output.size()

torch.Size([1, 5, 768])

In [4]:
!pip install bertviz

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


Collecting bertviz
  Downloading bertviz-1.4.0-py3-none-any.whl.metadata (19 kB)
Collecting botocore<1.30.0,>=1.29.100 (from boto3->bertviz)
  Downloading botocore-1.29.165-py3-none-any.whl.metadata (5.9 kB)
Downloading bertviz-1.4.0-py3-none-any.whl (157 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m157.6/157.6 kB[0m [31m4.6 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading botocore-1.29.165-py3-none-any.whl (11.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m11.0/11.0 MB[0m [31m84.7 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m
[?25hInstalling collected packages: botocore, bertviz
  Attempting uninstall: botocore
    Found existing installation: botocore 1.34.131
    Uninstalling botocore-1.34.131:
      Successfully uninstalled botocore-1.34.131
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
aiobotocore 2.13

In [5]:
from transformers import AutoModel 
from bertviz import head_view

model = AutoModel.from_pretrained(model_ckpt , output_attentions=True)
sen1 = "time flies like an arrow"
sen2 = "fruit flies like a banana" 

viz_inputs = tokenizer(sen1 , sen2 , return_tensors="pt")
attention = model(**viz_inputs).attentions
sentence_b_start = (viz_inputs.token_type_ids == 0).sum(dim=1)
tokens = tokenizer.convert_ids_to_tokens(viz_inputs.input_ids[0]) 

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]



In [6]:
head_view(attention, tokens, sentence_b_start, heads=[8])

<IPython.core.display.Javascript object>

### Feed Forward Layer

In [7]:
class FeedForward(nn.Module) : 
    def __init__(self , config) : 
        super().__init__()
        self.linear1 = nn.Linear(config.hidden_size , config.intermediate_size)
        self.linear2 = nn.Linear(config.intermediate_size , config.hidden_size)
        self.gelu = nn.GELU()
        self.droput = nn.Dropout(config.hidden_dropout_prob)
        
    def forward(self , x) :
        x = self.linear1(x)
        x = self.gelu(x)
        x = self.linear2(x)
        x = self.droput(x)
        
        return x
        

In [8]:
# test 
feed_forward = FeedForward(config)
ff_outputs = feed_forward(attn_output)
ff_outputs.size()

torch.Size([1, 5, 768])

### Putting It All Together


In [9]:
class TransformerEncoderLayer(nn.Module) : 
    def __init__(self , config) : 
        super().__init__() 
        self.layer_norm_1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm_2 = nn.LayerNorm(config.hidden_size) 
        self.attention = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)
        
    def forward(self , x) : 
        hidden_state = self.layer_norm_1(x)
        x  = x + self.attention(hidden_state) # skip connection
        x  = x + self.feed_forward(self.layer_norm_2(x))
        
        return x        

In [10]:
# test  
encoder_layer = TransformerEncoderLayer(config)
inputs_embeds.shape , encoder_layer(inputs_embeds).size()

(torch.Size([1, 5, 768]), torch.Size([1, 5, 768]))

### Positional Embeddings

In [11]:
class Embeddings(nn.Module) :
    def __init__(self , config) : 
        super().__init__()
        self.token_embeddings = nn.Embedding(config.vocab_size , config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        self.layer_norm = nn.LayerNorm(config.hidden_size , eps= 1e-15)
        self.dropout = nn.Dropout()
    def forward(self , input_ids) : 
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long).unsqueeze(0)
        token_embeddings = self.token_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        # conmbine them 
        embeddings = token_embeddings + position_embeddings 
        embeddings = self.layer_norm(embeddings) 
        embeddings = self.dropout(embeddings) 
        return embeddings       

In [12]:
# test 
embedding_layer = Embeddings(config)
embedding_layer(inputs.input_ids).size()

torch.Size([1, 5, 768])

### Putting It All Together with Positional Embeddings

In [13]:
class TransformerEncoder(nn.Module): 
    def __init__(self, config): 
        super().__init__() 
        self.embeddings = Embeddings(config) 
        self.layers = nn.ModuleList([TransformerEncoderLayer(config) for _ in range(config.num_hidden_layers)]) 
    def forward(self, x): 
        x = self.embeddings(x) 
        for layer in self.layers: 
            x = layer(x) 
        return x

In [14]:
encoder = TransformerEncoder(config)
encoder(inputs.input_ids).size()

torch.Size([1, 5, 768])

### Transforemr for classification 

In [15]:
class TransformerForSequenceClassification(nn.Module): 
    def __init__(self , config) : 
        super().__init__() 
        self.encoder = TransformerEncoder(config) 
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size , config.num_labels)
        
    def forward(self , x) : 
        x = self.encoder(x)[: , 0 , :] 
        x = self.dropout(x)
        x = self.classifier(x)
        
        return x

In [16]:
# test
config.num_labels = 3
encoder_classifier = TransformerForSequenceClassification(config)
encoder_classifier(inputs.input_ids).size()

torch.Size([1, 3])

### Transformer Decoder
just add mask


In [17]:
 def scaled_dot_product_attention(query, key, value, mask=None): 
    dim_k = query.size(-1) 
    scores = torch.bmm(query, key.transpose(1, 2)) / sqrt(dim_k) 
    if mask is not None: 
        scores = scores.masked_fill(mask == 0, float("-inf")) 
    weights = F.softmax(scores, dim=-1) 
    return weights.bmm(value)