In [None]:
  %pip install fancy_einsum
  %pip install einops

Collecting fancy_einsum
  Downloading fancy_einsum-0.0.3-py3-none-any.whl (6.2 kB)
Installing collected packages: fancy_einsum
Successfully installed fancy_einsum-0.0.3
Collecting einops
  Downloading einops-0.7.0-py3-none-any.whl (44 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m44.6/44.6 kB[0m [31m1.1 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: einops
Successfully installed einops-0.7.0


In [None]:
import einops
from fancy_einsum import einsum
import torch
import torch.nn as nn
import numpy as np
import math
import tqdm.auto

size of the vector used to represent each token or position in the model=768(dim_model)

The size of the vocabulary = 50257

The maximum context window size=1024

The dimensionality of the model's attention heads=dim_model/12=64:


The dimensionality of the intermediate layer in the feedforward neural network within the transformer block=3072(dim_mlp = 4*dim_model=3072 )

The total number of transformer layers in the model=12

The number of attention heads in the model=12


In [None]:
class Config:
    def __init__(self, dim_model=768, debug=True, layer_norm_eps=1e-5,
                 dim_vocab=50257, init_range=0.02, n_ctx=1024,
                 dim_head=64, dim_mlp=3072, n_heads=12, n_layers=12):
        self.dim_model = dim_model
        self.debug = debug
        self.layer_norm_eps = layer_norm_eps
        self.dim_vocab = dim_vocab
        self.init_range = init_range
        self.n_ctx = n_ctx
        self.dim_head = dim_head
        self.dim_mlp = dim_mlp
        self.n_heads = n_heads
        self.n_layers = n_layers


    def __repr__(self):
        return (f"Config(dim_model={self.dim_model}, debug={self.debug}, "
                f"layer_norm_eps={self.layer_norm_eps}, dim_vocab={self.dim_vocab}, "
                f"init_range={self.init_range}, n_ctx={self.n_ctx}, "
                f"d_head={self.dim_head}, dim_mlp={self.dim_mlp}, n_heads={self.n_heads}, "
                f"n_layers={self.n_layers})")


cfg = Config()
print(cfg)


Config(dim_model=768, debug=True, layer_norm_eps=1e-05, dim_vocab=50257, init_range=0.02, n_ctx=1024, d_head=64, dim_mlp=3072, n_heads=12, n_layers=12)


In [None]:
class LayerNoralization(nn.Module):
    def __init__(self, cfg):
        super(LayerNoralization, self).__init__()
        self.cfg = cfg
        self.w = nn.Parameter(torch.ones(cfg.dim_model))
        self.b = nn.Parameter(torch.zeros(cfg.dim_model))

    def forward(self, residual):
        # residual: [batch, position, dim_model]
        if self.cfg.debug:
            print("Residual:", residual.shape)  #Residual: torch.Size([2, 3, 768])

        mean = residual.mean(dim=-1, keepdim=True)  # Calculate mean along the 'dim_model' dimension
        residual = residual - mean
        print(residual)

        variance = torch.mean(residual.pow(2), dim=-1, keepdim=True)
        print(f"variance:{variance}")
        print(f"variance:{variance.shape}")  #variance:torch.Size([2, 3, 1])


        eps = self.cfg.layer_norm_eps
        scale = torch.sqrt(variance + eps)
        normalized = residual / scale
        normalized = normalized * self.w + self.b

        print(f"normalized:{normalized},{normalized.shape}") #torch.Size([2, 3, 768]


        if self.cfg.debug:
            print("Normalized:", residual.shape)

        return normalized

In [None]:
class Embed_layer(nn.Module):
    def __init__(self, cfg):
        super(Embed_layer, self).__init__()
        self.cfg = cfg
        self.W_E = nn.Parameter(torch.empty((cfg.dim_vocab, cfg.dim_model)))
        nn.init.normal_(self.W_E, std=self.cfg.init_range)

    def forward(self, tokens):
        # tokens: [batch, position]
        if self.cfg.debug:
            print("Tokens shape is:", tokens.shape)

        # Using indexing to gather embeddings
        embed = self.W_E[tokens, :]  # [batch, position, d_model]

        if self.cfg.debug:
            print("Embedding shape is:", embed.shape)

        return embed


In [None]:
class PosEmbed(nn.Module):
    def __init__(self, cfg):
        super(PosEmbed, self).__init__()
        self.cfg = cfg
        self.W_pos = nn.Parameter(torch.empty((cfg.n_ctx, cfg.dim_model)))
        nn.init.normal_(self.W_pos, std=self.cfg.init_range)

    def forward(self, tokens):
        # tokens: [batch, position]
        if self.cfg.debug:
            print("Tokens:", tokens.shape)

        # Extract positional embeddings for the given positions
        pos_embed = self.W_pos[:tokens.size(1), :]  # [position, d_model]

        # Repeat the positional embeddings for each batch
        pos_embed = einops.repeat(pos_embed, "position d_model -> batch position d_model", batch=tokens.size(0))

        if self.cfg.debug:
            print("pos_embed:", pos_embed.shape)

        return pos_embed

In [None]:
class Attention(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg
        self.W_Q = nn.Parameter(torch.empty((cfg.n_heads, cfg.dim_model, cfg.dim_head)))
        nn.init.normal_(self.W_Q, std=self.cfg.init_range)
        self.b_Q = nn.Parameter(torch.zeros((cfg.n_heads, cfg.dim_head)))
        self.W_K = nn.Parameter(torch.empty((cfg.n_heads, cfg.dim_model, cfg.dim_head)))
        nn.init.normal_(self.W_K, std=self.cfg.init_range)
        self.b_K = nn.Parameter(torch.zeros((cfg.n_heads, cfg.dim_head)))
        self.W_V = nn.Parameter(torch.empty((cfg.n_heads, cfg.dim_model, cfg.dim_head)))
        nn.init.normal_(self.W_V, std=self.cfg.init_range)
        self.b_V = nn.Parameter(torch.zeros((cfg.n_heads, cfg.dim_head)))

        self.W_O = nn.Parameter(torch.empty((cfg.n_heads, cfg.dim_head, cfg.dim_model)))
        nn.init.normal_(self.W_O, std=self.cfg.init_range)
        self.b_O = nn.Parameter(torch.zeros((cfg.dim_model)))

        self.register_buffer("IGNORE", torch.tensor(-1e5, dtype=torch.float32, device="cuda"))

    def forward(self, normalized_resid_pre):
        # normalized_resid_pre: [batch, position, d_model]
        if self.cfg.debug: print("Normalized_resid_pre:", normalized_resid_pre.shape)

        q = einsum("batch query_pos d_model, n_heads d_model d_head -> batch query_pos n_heads d_head", normalized_resid_pre, self.W_Q) + self.b_Q
        k = einsum("batch key_pos d_model, n_heads d_model d_head -> batch key_pos n_heads d_head", normalized_resid_pre, self.W_K) + self.b_K

        attn_scores = einsum("batch query_pos n_heads d_head, batch key_pos n_heads d_head -> batch n_heads query_pos key_pos", q, k)
        # attn_scores = attn_scores / math.sqrt(self.cfg.d_head)
        attn_scores /= self.cfg.dim_head ** 0.5

        attn_scores = self.apply_causal_mask(attn_scores)

        pattern = attn_scores.softmax(dim=-1) # [batch, n_head, query_pos, key_pos]

        v = einsum("batch key_pos d_model, n_heads d_model d_head -> batch key_pos n_heads d_head", normalized_resid_pre, self.W_V) + self.b_V

        z = einsum("batch n_heads query_pos key_pos, batch key_pos n_heads d_head -> batch query_pos n_heads d_head", pattern, v)

        attn_out = einsum("batch query_pos n_heads d_head, n_heads d_head d_model -> batch query_pos d_model", z, self.W_O) + self.b_O
        return attn_out

    def apply_causal_mask(self, attn_scores):
        # attn_scores: [batch, n_heads, query_pos, key_pos]
        mask = torch.triu(torch.ones(attn_scores.size(-2), attn_scores.size(-1), device=attn_scores.device), diagonal=1).bool()
        attn_scores.masked_fill_(mask, self.IGNORE)
        return attn_scores
    # def apply_causal_mask(self, attn_scores):
    #     """
    #     Applies a causal mask to the attention scores to prevent attending to future positions.

    #     Args:
    #         attn_scores (torch.Tensor): Attention scores with shape [batch, n_heads, query_pos, key_pos].

    #     Returns:
    #         torch.Tensor: Attention scores with the causal mask applied.
    #     """
    #     # Create a mask to hide future positions
    #     future_mask = torch.triu(torch.ones_like(attn_scores), diagonal=1).bool()

    #     # Set the scores for future positions to a very negative number
    #     attn_scores.masked_fill_(future_mask, self.IGNORE)

    #     return attn_scores

In [None]:
class MultilayerPerceptron(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg
        self.W_in = nn.Parameter(torch.empty((cfg.dim_model, cfg.dim_mlp)))
        nn.init.normal_(self.W_in, std=self.cfg.init_range)
        self.b_in = nn.Parameter(torch.zeros((cfg.dim_mlp)))
        self.W_out = nn.Parameter(torch.empty((cfg.dim_mlp, cfg.dim_model)))
        nn.init.normal_(self.W_out, std=self.cfg.init_range)
        self.b_out = nn.Parameter(torch.zeros((cfg.dim_model)))

    def forward(self, normalized_resid_mid):
        # normalized_resid_mid: [batch, position, d_model]
        if self.cfg.debug:
            print("Normalized_resid_mid:", normalized_resid_mid.shape) #Normalized_resid_mid: torch.Size([2, 3, 768])

        pre = einsum("batch position dim_model, dim_model dim_mlp -> batch position dim_mlp", normalized_resid_mid, self.W_in) + self.b_in
        print("Pre:", pre.shape) #Pre: torch.Size([2, 3, 3072])

        post = 0.5 * pre * (1.0 + torch.tanh(np.sqrt(2.0 / np.pi) * (pre + 0.044715 * torch.pow(pre, 3.0))))
        mlp_out = einsum("batch position dim_mlp, dim_mlp dim_model -> batch position dim_model", post, self.W_out) + self.b_out
        print(f"mlp_out{mlp_out.shape}") #mlp_outtorch.Size([2, 3, 768])
        return mlp_out

In [None]:
class TransformerBlock(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg

        self.ln1 = LayerNoralization(cfg)
        self.attn = Attention(cfg)
        self.ln2 = LayerNoralization(cfg)
        self.mlp = MultilayerPerceptron(cfg)

    def forward(self, resid_pre):
        # resid_pre [batch, position, d_model]
        normalized_resid_pre = self.ln1(resid_pre)
        attn_out = self.attn(normalized_resid_pre)
        resid_mid = resid_pre + attn_out

        normalized_resid_mid = self.ln2(resid_mid)
        mlp_out = self.mlp(normalized_resid_mid)
        resid_post = resid_mid + mlp_out
        return resid_post

In [None]:
class Unembed(nn.Module):
    def __init__(self, cfg):
        super(Unembed, self).__init__()
        self.cfg = cfg

        # Learnable parameters
        self.W_U = nn.Parameter(torch.empty((cfg.dim_model, cfg.dim_vocab)))
        nn.init.normal_(self.W_U, std=self.cfg.init_range)
        self.b_U = nn.Parameter(torch.zeros((cfg.dim_vocab), requires_grad=False))

    def forward(self, normalized_resid_final):
        # normalized_resid_final [batch, position, d_model]
        logits = torch.matmul(normalized_resid_final, self.W_U) + self.b_U
        return logits


In [None]:
class DemoTransformer(nn.Module):
    def __init__(self, cfg):
        super().__init__()
        self.cfg = cfg
        self.embed = Embed_layer(cfg)
        self.pos_embed = PosEmbed(cfg)
        self.blocks = nn.ModuleList([TransformerBlock(cfg) for _ in range(cfg.n_layers)])
        self.ln_final = LayerNoralization(cfg)

    def forward(self, tokens):
        # tokens [batch, position]
        embed = self.embed(tokens)
        pos_embed = self.pos_embed(tokens)
        residual = embed + pos_embed
        for block in self.blocks:
            residual = block(residual)
        normalized_resid_final = self.ln_final(residual)

        logits = self.linear_layer(normalized_resid_final)

        # logits have shape [batch, position, logits]
        return logits



**NUMBER OF MODEL PARAMETERS ARE 124M WHICH IS EQUIVALENT TO GPT-2 SMALL **



In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

cfg = Config()
model = DemoTransformer(cfg)

# Print the total number of parameters
total_params = count_parameters(model)
print(f"Total number of parameters: {total_params}")
print(f"total parameters in millions: {total_params//1000000}M")

Total number of parameters: 124439808
total parameters in millions: 124M


In [None]:
print(model)

DemoTransformer(
  (embed): Embed_layer()
  (pos_embed): PosEmbed()
  (blocks): ModuleList(
    (0-11): 12 x TransformerBlock(
      (ln1): LayerNoralization()
      (attn): Attention()
      (ln2): LayerNoralization()
      (mlp): MultilayerPerceptron()
    )
  )
  (ln_final): LayerNoralization()
)


In [None]:
# # Create an instance of your DemoTransformer model
# model = DemoTransformer(cfg)

# # Assuming you have trained or loaded the model weights

# Save the model state dictionary
torch.save(model.state_dict(), 'demo_transformer_weights.pth')

In [None]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer

# Load the pre-trained GPT-2 model and tokenizer
model_name = "gpt2" # You can specify other versions like "gpt2-medium", "gpt2-large", etc.
gpt2_model = GPT2LMHeadModel.from_pretrained(model_name)
tokenizer = GPT2Tokenizer.from_pretrained(model_name)


config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/548M [00:00<?, ?B/s]

generation_config.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

In [None]:
# Compare the model architectures
gpt2_num_params = sum(p.numel() for p in gpt2_model.parameters())
demo_num_params = sum(p.numel() for p in model.parameters())

if gpt2_num_params != demo_num_params:
    print("Model architectures do not match.")
    print(f"GPT-2 Model Parameters: {gpt2_num_params}")
    print(f"Your DemoTransformer Model Parameters: {demo_num_params}")
else:
    print("Model architectures match.")



Model architectures match.


In [None]:
# Get the set of parameter names for each model
gpt2_params = set(p for p in gpt2_model.state_dict().keys())
demo_params = set(p for p in model.state_dict().keys())

# Find parameters in GPT-2 model but not in your model
missing_in_demo = gpt2_params - demo_params

# Find parameters in your model but not in GPT-2 model
extra_in_demo = demo_params - gpt2_params

# Calculate the percentage of parameters missing in DemoTransformer
percentage_missing = (len(missing_in_demo) / gpt2_num_params) * 100

print(f"Percentage of parameters missing in DemoTransformer: {percentage_missing:.2f}%")



print("Parameters in GPT-2 model but not in DemoTransformer:", missing_in_demo)
print("Parameters in DemoTransformer but not in GPT-2 model:", extra_in_demo)


Percentage of parameters missing in DemoTransformer: 0.00%
Parameters in GPT-2 model but not in DemoTransformer: {'transformer.h.8.mlp.c_proj.bias', 'transformer.h.1.mlp.c_fc.weight', 'transformer.h.2.mlp.c_fc.weight', 'transformer.h.9.attn.c_proj.weight', 'transformer.h.1.ln_1.weight', 'transformer.h.3.attn.c_proj.weight', 'transformer.h.7.attn.c_proj.weight', 'transformer.h.10.ln_2.weight', 'transformer.h.10.mlp.c_proj.weight', 'transformer.h.3.ln_1.weight', 'transformer.h.0.attn.c_proj.bias', 'transformer.h.9.mlp.c_fc.weight', 'transformer.h.8.attn.c_proj.bias', 'transformer.h.8.ln_1.bias', 'transformer.h.2.mlp.c_proj.weight', 'transformer.h.6.mlp.c_proj.bias', 'transformer.h.3.mlp.c_proj.bias', 'transformer.h.8.ln_2.weight', 'transformer.h.8.attn.c_attn.bias', 'transformer.h.5.attn.c_attn.weight', 'transformer.h.1.ln_2.weight', 'transformer.ln_f.weight', 'transformer.h.11.ln_2.weight', 'transformer.h.7.attn.c_attn.weight', 'transformer.h.8.mlp.c_fc.bias', 'transformer.h.5.attn.c_at

In [None]:
print("Number of parameters in GPT-2 model but not in DemoTransformer:", len(missing_in_demo))
print("Number of parameters in DemoTransformer but not in GPT-2 model:", len(extra_in_demo))

Number of parameters in GPT-2 model but not in DemoTransformer: 149
Number of parameters in DemoTransformer but not in GPT-2 model: 208


lets check the layer name of our model and gpt2 model

In [None]:
# # Print layer names and configurations for GPT-2
# print("GPT-2 Model Layers:")
# for name, param in gpt2_model.named_parameters():
#     print(f"{name}: {param.size()}")



In [None]:
# # Print layer names and configurations for your custom model
# print("\nCustom Model Layers:")
# for name, param in model.named_parameters():
#     print(f"{name}: {param.size()}")

In [None]:
# Print only where torch sizes are different
for (name_gpt2, param_gpt2), (name_custom, param_custom) in zip(gpt2_model.named_parameters(), model.named_parameters()):
    if param_gpt2.size() != param_custom.size():
        print(f"{name_gpt2} (GPT-2): {param_gpt2.size()} | {name_custom} (Custom): {param_custom.size()}")



transformer.h.0.attn.c_attn.weight (GPT-2): torch.Size([768, 2304]) | blocks.0.attn.W_Q (Custom): torch.Size([12, 768, 64])
transformer.h.0.attn.c_attn.bias (GPT-2): torch.Size([2304]) | blocks.0.attn.b_Q (Custom): torch.Size([12, 64])
transformer.h.0.attn.c_proj.weight (GPT-2): torch.Size([768, 768]) | blocks.0.attn.W_K (Custom): torch.Size([12, 768, 64])
transformer.h.0.attn.c_proj.bias (GPT-2): torch.Size([768]) | blocks.0.attn.b_K (Custom): torch.Size([12, 64])
transformer.h.0.ln_2.weight (GPT-2): torch.Size([768]) | blocks.0.attn.W_V (Custom): torch.Size([12, 768, 64])
transformer.h.0.ln_2.bias (GPT-2): torch.Size([768]) | blocks.0.attn.b_V (Custom): torch.Size([12, 64])
transformer.h.0.mlp.c_fc.weight (GPT-2): torch.Size([768, 3072]) | blocks.0.attn.W_O (Custom): torch.Size([12, 64, 768])
transformer.h.0.mlp.c_fc.bias (GPT-2): torch.Size([3072]) | blocks.0.attn.b_O (Custom): torch.Size([768])
transformer.h.0.mlp.c_proj.weight (GPT-2): torch.Size([3072, 768]) | blocks.0.ln2.w (Cus

### ** Few mismatches occured due to due to differences in the shape of attention weights and biases. Our custom model maintains a multi-head structure but GPT-2 uses flattened weights.**

In [None]:
from transformers import GPT2LMHeadModel, GPT2Config

# Assuming you have a checkpoint file saved
gpt2_checkpoint_path = '/content/demo_transformer_weights.pth'

# Load the configuration
gpt2_config = GPT2Config.from_pretrained('gpt2')

# Create an instance of GPT-2 model and load the checkpoint
# gpt2_model = GPT2LMHeadModel(config=gpt2_config)
# gpt2_model.load_state_dict(torch.load(gpt2_checkpoint_path))


