# Implementing a transformer with PyTorch

In [1]:
from transformers import BertTokenizer, BertModel

In [2]:
text = "time flies like an arrow"

model_ckp = "bert-base-uncased"
model = BertModel.from_pretrained(model_ckp)
tokenizer = BertTokenizer.from_pretrained(model_ckp)



In [3]:
inputs = tokenizer(text, return_tensors="pt", add_special_tokens=False)

print(inputs)

{'input_ids': tensor([[ 2051, 10029,  2066,  2019,  8612]]), 'token_type_ids': tensor([[0, 0, 0, 0, 0]]), 'attention_mask': tensor([[1, 1, 1, 1, 1]])}


In [4]:
from torch import nn
import torch
from transformers import AutoConfig

config = AutoConfig.from_pretrained(model_ckp)

token_emb = nn.Embedding(config.vocab_size, config.hidden_size)

print(token_emb)

Embedding(30522, 768)


In [27]:
config 

BertConfig {
  "_name_or_path": "bert-base-uncased",
  "architectures": [
    "BertForMaskedLM"
  ],
  "attention_probs_dropout_prob": 0.1,
  "classifier_dropout": null,
  "gradient_checkpointing": false,
  "hidden_act": "gelu",
  "hidden_dropout_prob": 0.1,
  "hidden_size": 768,
  "initializer_range": 0.02,
  "intermediate_size": 3072,
  "layer_norm_eps": 1e-12,
  "max_position_embeddings": 512,
  "model_type": "bert",
  "num_attention_heads": 12,
  "num_hidden_layers": 12,
  "pad_token_id": 0,
  "position_embedding_type": "absolute",
  "transformers_version": "4.37.2",
  "type_vocab_size": 2,
  "use_cache": true,
  "vocab_size": 30522
}

Now we can generate the embeddings by feeding in the inputs

In [5]:
inputs_embeds = token_emb(inputs["input_ids"])

print(inputs_embeds)
print(inputs_embeds.size())

tensor([[[ 0.6357, -0.0930,  0.0498,  ...,  0.9247, -0.3052,  0.8459],
         [ 0.1574, -1.2528, -1.2385,  ...,  1.2992, -1.3885, -0.0211],
         [ 0.3887, -1.6999, -0.6388,  ..., -0.2164, -1.1877,  0.6342],
         [-0.2313, -0.8173, -0.1961,  ...,  1.5486,  2.0778,  0.8052],
         [-0.8103, -0.8753,  2.0898,  ...,  0.4974,  0.3398, -1.2624]]],
       grad_fn=<EmbeddingBackward0>)
torch.Size([1, 5, 768])


This has given us a tensor of shape `[batch_size, seq_len, hidden_dim]`

## The attention block

In [6]:
from math import sqrt

query = key = value = inputs_embeds

dim_k = key.size(-1)
print(key.shape, dim_k)

scores = torch.matmul(query, key.transpose(1, 2)) / sqrt(dim_k)

print(scores)
print(query.size(), key.transpose(1, 2).size(), scores.size())

torch.Size([1, 5, 768]) 768
tensor([[[26.6332,  0.9239, -1.6107,  0.9351, -1.5695],
         [ 0.9239, 27.2575, -1.3069,  0.8039,  1.2502],
         [-1.6107, -1.3069, 29.9443, -1.6849,  1.5230],
         [ 0.9351,  0.8039, -1.6849, 31.1858, -0.7020],
         [-1.5695,  1.2502,  1.5230, -0.7020, 26.9231]]],
       grad_fn=<DivBackward0>)
torch.Size([1, 5, 768]) torch.Size([1, 768, 5]) torch.Size([1, 5, 5])


Let us now apply the softmax

In [15]:
import torch.nn.functional as F

weights = F.softmax(scores, dim=-1)

print(weights.shape)
print(weights.sum(dim=-1))

torch.Size([1, 5, 5])
tensor([[1., 1., 1., 1., 1.]], grad_fn=<SumBackward1>)


In [8]:
# the next step is to multiply the weights with the values

attn_output = torch.matmul(weights, value)

print(attn_output.size())

torch.Size([1, 5, 768])


Let's wrap these steps into a function

In [9]:
def scaled_dot_product_attention(query, key, value):
    dim_k = key.size(-1)
    scores = torch.matmul(query, key.transpose(1, 2)) / sqrt(dim_k)
    weights = F.softmax(scores, dim=-1)
    return torch.matmul(weights, value)

In [10]:
m = nn.Linear(20,30)
input = torch.randn(5,128, 20)
output = m(input)
print(output.size())
print(m.weight.size())


torch.Size([5, 128, 30])
torch.Size([30, 20])


## Let us now implement the multi head of attention

we start by implementing a single attention head

In [11]:
class AttentionHead(nn.Module):
    def __init__(self, embed_dim, head_dim):
        super().__init__()
        self.q = nn.Linear(embed_dim, head_dim)
        self.k = nn.Linear(embed_dim, head_dim)
        self.v = nn.Linear(embed_dim, head_dim)

    def forward(self, hidden_states):
        attn_outputs = scaled_dot_product_attention(
            self.q(hidden_states), 
            self.k(hidden_states), 
            self.v(hidden_states)
            )
        return attn_outputs

In [12]:
class MultiHeadAttention(nn.Module):
    def __init__(self, config):
        super().__init__()
        embed_dim = config.hidden_size
        num_heads = config.num_attention_heads
        head_dim = embed_dim // num_heads
        self.heads = nn.ModuleList([
            AttentionHead(embed_dim, head_dim) for _ in range(num_heads)
        ])
        self.linear = nn.Linear(embed_dim, embed_dim)

    def forward(self, hidden_states):
        x = torch.cat([head(hidden_states) for head in self.heads], dim=-1)
        return self.linear(x)

In [14]:
# Let us check the output size of the multihead attention

multihead_attn = MultiHeadAttention(config)
output = multihead_attn(inputs_embeds)
print(output.size())

torch.Size([1, 5, 768])


In [17]:
inputs_embeds.size()

torch.Size([1, 5, 768])

## The Feed-Forward Layer

In [18]:
class FeedForward(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.linear1 = nn.Linear(config.hidden_size, config.intermediate_size)
        self.activation = nn.GELU()
        self.linear2 = nn.Linear(config.intermediate_size, config.hidden_size)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, hidden_states):
        x = self.linear1(hidden_states)
        x = self.activation(x)
        x = self.linear2(x)
        x = self.dropout(x)
        
        return x

In [21]:
# Let us test this by passing the attention output

feed_forward = FeedForward(config)
ff_output = feed_forward(output)
print(ff_output.size())

print(config.hidden_size, config.intermediate_size, config.hidden_dropout_prob)

torch.Size([1, 5, 768])
768 3072 0.1


## Adding Layer Normalization

In [33]:
class TransformerEncoderLayer(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.self_attn = MultiHeadAttention(config)
        self.feed_forward = FeedForward(config)
        self.layer_norm1 = nn.LayerNorm(config.hidden_size)
        self.layer_norm2 = nn.LayerNorm(config.hidden_size)

    def forward(self, x):
        # Apply layer normalization and then copy input into multi-head attention with skip connection
        x += self.self_attn(self.layer_norm1(x))
        # Apply feed forward 
        x += self.feed_forward(self.layer_norm2(x))
        
        return x

In [34]:
encoder_layer = TransformerEncoderLayer(config)
print(inputs_embeds.size(), encoder_layer(inputs_embeds).size() )

torch.Size([1, 5, 768]) torch.Size([1, 5, 768])


We have now just implemented our very first transformer encoder layer from scratch! However, it is totally invariant to the position of the tokens. Let us now implement our positional embeddings

## Position Embeddings

In [28]:
class Embeddings(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.word_embeddings = nn.Embedding(config.vocab_size, config.hidden_size)
        self.position_embeddings = nn.Embedding(config.max_position_embeddings, config.hidden_size)
        self.LayerNorm = nn.LayerNorm(config.hidden_size)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)

    def forward(self, input_ids):
        # Get the position ids
        seq_length = input_ids.size(1)
        position_ids = torch.arange(seq_length, dtype=torch.long, device=input_ids.device)
        position_ids = position_ids.unsqueeze(0).expand_as(input_ids) # position_ids is now the same size as input_ids
        # combine tokens and position embeddings
        words_embeddings = self.word_embeddings(input_ids)
        position_embeddings = self.position_embeddings(position_ids)
        embeddings = words_embeddings + position_embeddings
        # Layer normalization and dropout
        embeddings = self.LayerNorm(embeddings)
        embeddings = self.dropout(embeddings)
        return embeddings

In [29]:
embedding_layer = Embeddings(config)
print(inputs["input_ids"].size(), embedding_layer(inputs["input_ids"]).size())

torch.Size([1, 5]) torch.Size([1, 5, 768])


In [30]:
embedding_layer(inputs["input_ids"])

tensor([[[-1.0534,  0.4932, -0.7895,  ..., -0.0000, -0.6175,  0.3172],
         [ 1.2061,  0.9720, -0.0431,  ...,  2.1416, -0.5313,  1.1734],
         [ 0.0000,  0.2711,  0.0974,  ...,  1.1478,  0.2785,  0.0000],
         [ 0.1086, -1.1465,  0.3842,  ...,  1.9284, -0.2821, -0.6952],
         [ 0.3995,  1.3492, -0.5618,  ...,  0.2273,  1.2526, -0.0937]]],
       grad_fn=<MulBackward0>)

In [45]:
print(inputs["input_ids"].size())
position_ids = torch.arange(inputs["input_ids"].size(1), dtype=torch.long)
print(position_ids.shape)
print(position_ids.unsqueeze(0).shape)
position_ids.unsqueeze(0)

torch.Size([1, 5])
torch.Size([5])
torch.Size([1, 5])


tensor([[0, 1, 2, 3, 4]])

### Now let us put it all together

In [31]:
class TransformerEncoder(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.embedding = Embeddings(config)
        self.layers = nn.ModuleList([TransformerEncoderLayer(config) for _ in range(config.num_hidden_layers)]
                                    )

    def forward(self, input_ids):
        x = self.embedding(input_ids)
        for layer in self.layers:
            x = layer(x)
        return x

In [32]:
encoder = TransformerEncoder(config)
print(inputs["input_ids"].size(), encoder(inputs["input_ids"]).size())

torch.Size([1, 5]) torch.Size([1, 5, 768])


## Adding a classification Head

In [46]:
class TransformerForSequenceClassification(nn.Module):
    def __init__(self, config):
        super().__init__()
        self.encoder = TransformerEncoder(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)

    def forward(self, input_ids):
        x = self.encoder(input_ids)
        x = x[:, 0, :]  # take the first token's output
        x = self.dropout(x)
        x = self.classifier(x)
        return x

In [47]:
config.num_labels = 2
model = TransformerForSequenceClassification(config)
encoder_classifier = model(inputs["input_ids"])

print(encoder_classifier.size())

torch.Size([1, 2])


## The Decoder

In [49]:
seq_len = inputs["input_ids"].size(-1)
mask = torch.tril(torch.ones(seq_len, seq_len)).unsqueeze(0) 

print(mask.size())
print(mask)

torch.Size([1, 5, 5])
tensor([[[1., 0., 0., 0., 0.],
         [1., 1., 0., 0., 0.],
         [1., 1., 1., 0., 0.],
         [1., 1., 1., 1., 0.],
         [1., 1., 1., 1., 1.]]])


In [50]:
print(scores)

tensor([[[26.6332,  0.9239, -1.6107,  0.9351, -1.5695],
         [ 0.9239, 27.2575, -1.3069,  0.8039,  1.2502],
         [-1.6107, -1.3069, 29.9443, -1.6849,  1.5230],
         [ 0.9351,  0.8039, -1.6849, 31.1858, -0.7020],
         [-1.5695,  1.2502,  1.5230, -0.7020, 26.9231]]],
       grad_fn=<DivBackward0>)


In [51]:
scores_masked = scores.masked_fill(mask == 0, float("-inf"))
print(scores_masked)

tensor([[[26.6332,    -inf,    -inf,    -inf,    -inf],
         [ 0.9239, 27.2575,    -inf,    -inf,    -inf],
         [-1.6107, -1.3069, 29.9443,    -inf,    -inf],
         [ 0.9351,  0.8039, -1.6849, 31.1858,    -inf],
         [-1.5695,  1.2502,  1.5230, -0.7020, 26.9231]]],
       grad_fn=<MaskedFillBackward0>)


In [52]:
def scaled_dot_product_attention(query, key, value, mask=None):
    dim_k = key.size(-1)
    scores = torch.matmul(query, key.transpose(1, 2)) / sqrt(dim_k)
    if mask is not None:
        scores = scores.masked_fill(mask == 0, float("-inf"))
    weights = F.softmax(scores, dim=-1)
    return torch.matmul(weights, value)