In [1]:
from transformers import AutoTokenizer, AutoModelForCausalLM

  from .autonotebook import tqdm as notebook_tqdm


In [4]:
model=AutoModelForCausalLM.from_pretrained("openai-community/gpt2")

In [5]:
model_dict=model.state_dict()
for k,v in model_dict.items():
    print(k, v.shape)

transformer.wte.weight torch.Size([50257, 768])
transformer.wpe.weight torch.Size([1024, 768])
transformer.h.0.ln_1.weight torch.Size([768])
transformer.h.0.ln_1.bias torch.Size([768])
transformer.h.0.attn.c_attn.weight torch.Size([768, 2304])
transformer.h.0.attn.c_attn.bias torch.Size([2304])
transformer.h.0.attn.c_proj.weight torch.Size([768, 768])
transformer.h.0.attn.c_proj.bias torch.Size([768])
transformer.h.0.ln_2.weight torch.Size([768])
transformer.h.0.ln_2.bias torch.Size([768])
transformer.h.0.mlp.c_fc.weight torch.Size([768, 3072])
transformer.h.0.mlp.c_fc.bias torch.Size([3072])
transformer.h.0.mlp.c_proj.weight torch.Size([3072, 768])
transformer.h.0.mlp.c_proj.bias torch.Size([768])
transformer.h.1.ln_1.weight torch.Size([768])
transformer.h.1.ln_1.bias torch.Size([768])
transformer.h.1.attn.c_attn.weight torch.Size([768, 2304])
transformer.h.1.attn.c_attn.bias torch.Size([2304])
transformer.h.1.attn.c_proj.weight torch.Size([768, 768])
transformer.h.1.attn.c_proj.bias 

In [63]:
import torch 
import torch.nn as nn
import math

In [11]:
model=AutoModelForCausalLM.from_pretrained("openai-community/gpt2")
tokenizer=AutoTokenizer.from_pretrained("openai-community/gpt2")

In [62]:
# class SelfAttention(nn.Module):
#     def __init__(self,d_model):
#         super().__init__()

#         self.query=nn.Linear(d_model,d_model)
#         self.key=nn.Linear(d_model,d_model)
#         self.value=nn.Linear(d_model,d_model)

#         self.fc_out=nn.Linear(d_model,d_model)

#     def forward(self,inputs):
#         B, seq_length, d_model = inputs.shape  # batch size, sequence length, embedding dimensionality (n_embd)

#         # projecting the input into query, key and value
#         Q=self.query(inputs)
#         K=self.key(inputs)
#         V=self.value(inputs)

#         attention_scores=torch.matmul(Q,K.transpose(-2,-1))
        
#         # Apply mask to prevent attention to future tokens
#         mask = torch.triu(torch.ones(seq_length, seq_length), diagonal=1).bool().to(inputs.device)
#         attention_scores = attention_scores.masked_fill(mask, float('-inf'))
        
#         attention_weights = torch.softmax(attention_scores, dim=-1)
#         # Compute the weighted sum of the values
#         attention_output = torch.matmul(attention_weights, V)

#         # Apply the final linear transformation
#         out = self.fc_out(attention_output)
        
#         return out


class MultiHeadAttention(nn.Module):
    def __init__(self, d_model: int, n_heads: int):
        super().__init__()
        
        self.n_heads = n_heads
        self.head_dim = d_model // n_heads

        assert (n_heads * self.head_dim == d_model)

        self.query = nn.Linear(d_model, d_model)
        self.key = nn.Linear(d_model, d_model)
        self.value = nn.Linear(d_model, d_model)
        self.fc_out = nn.Linear(d_model, d_model)
        self.dropout = nn.Dropout(0.2)

    def forward(self, inputs: torch.Tensor):
        B, seq_length, d_model = inputs.shape
        
        # Project the input embeddings into Q, K, and V
        Q = self.query(inputs).view(B, seq_length, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        K = self.key(inputs).view(B, seq_length, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        V = self.value(inputs).view(B, seq_length, self.n_heads, self.head_dim).permute(0, 2, 1, 3)
        
        # Compute attention scores
        attention_scores = torch.matmul(Q, K.transpose(-2, -1)) / math.sqrt(self.head_dim)
        
        # Apply mask to prevent attention to future tokens
        mask = torch.triu(torch.ones(seq_length, seq_length), diagonal=1).bool().to(inputs.device)
        attention_scores = attention_scores.masked_fill(mask, float('-inf'))
        
        attention_weights = torch.softmax(attention_scores, dim=-1)
        # Compute the weighted sum of the values
        attention_output = torch.matmul(self.dropout(attention_weights), V)

        # Concatenate heads and put them back to the original shape
        attention_output = attention_output.permute(0, 2, 1, 3).contiguous()
        attention_output = attention_output.view(B, seq_length, d_model)

        # Apply the final linear transformation
        out = self.fc_out(attention_output)
        
        return out


In [None]:
# attn=MultiHeadAttention(512,8)
# x=torch.rand(1,10,512)
# attn(x)

In [97]:
class PositionalEncoding(nn.Module):
    def __init__(self, context_length, d_model) -> None:
        super().__init__()
        # Create a matrix of shape (context_length, d_model) to store the positional encodings
        pe = torch.zeros(context_length, d_model)
        
        # Create a vector with positions [0, 1, 2, ..., context_length-1] of shape (context_length, 1)
        position = torch.arange(0, context_length, dtype=torch.float).unsqueeze(1)
        
        # Create a vector with the divisor terms based on the dimension
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-torch.log(torch.tensor(10000.0)) / d_model))
        
        # Compute the positional encodings using sine and cosine functions
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        
        pe = pe.unsqueeze(0)  # Shape: (1, context_length, d_model)
        
        # Register pe as a buffer, so it is not considered a parameter but is part of the module's state
        self.register_buffer('pe', pe)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        # Add the positional encodings to the input embeddings
        return x + self.pe[:,:x.size(1), :] 

In [98]:
class MLP(nn.Module):
        def __init__(self,d_model,vocab_size=None):
            super().__init__()
            self.fcn=nn.Sequential(
                nn.Linear(d_model, 4*d_model),
                nn.GELU(),
                nn.Linear(4*d_model,d_model)
            )
        def forward(self,x):
              logits=self.fcn(x)

              return logits

In [99]:
class GPTBlock(nn.Module):
    def __init__(self, d_model, n_heads):
        super().__init__()
        self.att = MultiHeadAttention(d_model, n_heads)
        self.ln1 = nn.LayerNorm(d_model)
        self.ln2 = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(0.2)
        self.fcn = nn.Sequential(
            nn.Linear(d_model, 4 * d_model),
            nn.GELU(),
            nn.Linear(4 * d_model, d_model)
        )

    def forward(self, logits):
        att_logits = self.att(logits)
        adn_logits = self.ln1(logits + att_logits)
        logits = self.dropout(adn_logits)
        logits = self.fcn(logits)
        logits = self.ln2(logits + adn_logits)
        return logits

In [None]:
class GPT(nn.Module):
    def __init__(self, vocab_size, d_model, n_heads, n_layers,context_length):
        super().__init__()
        self.wte = nn.Embedding(vocab_size, d_model) # word token embeddings
        self.wpe = PositionalEncoding(context_length, d_model) # word position encodings
        self.blocks = nn.ModuleList([GPTBlock(d_model, n_heads) for _ in  range(n_layers)])
        self.linear1 = nn.Linear(d_model, vocab_size)

        self.wte.weight = self.linear1.weight

    def forward(self, inputs, targets = None):
        logits = self.wte(inputs) # dim -> batch_size, sequence_length, d_model
        logits = self.wpe(logits)
        for block in self.blocks:
            logits = block(logits)
        logits = self.linear1(logits)
        loss = None
        if targets != None:
            batch_size, sequence_length, d_model = logits.shape
            # to calculate loss for all token embeddings in a batch
            # kind of a requirement for cross_entropy
            logits = logits.view(batch_size * sequence_length, d_model)
            targets = targets.view(batch_size * sequence_length)
            loss = torch.cross_entropy(logits, targets)
        return logits, loss

In [101]:
mlp=GPTBlock(512,8)
x=torch.rand(1,10,512)
mlp(x).shape

torch.Size([1, 10, 512])

In [121]:
class GPTConfig:
    
    block_size=256
    vocab_size=50256
    n_layers=12
    n_heads=12
    d_model=768
    context_length=1024
config=GPTConfig()
device=torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [122]:
custom_model = GPT(vocab_size=config.vocab_size, d_model=config.d_model, n_heads=config.n_heads, n_layers=config.n_layers,context_length=config.context_length).to(device)

In [123]:
custom_model=torch.compile(custom_model)

In [124]:
print(m)
print(f"Total Parameters: {round(sum(p.numel() for p in custom_model.parameters() if p.requires_grad) / 1_000_000)}M")

GPT(
  (wte): Embedding(50253, 512)
  (wpe): PositionalEncoding()
  (blocks): ModuleList(
    (0-7): 8 x GPTBlock(
      (att): MultiHeadAttention(
        (query): Linear(in_features=512, out_features=512, bias=True)
        (key): Linear(in_features=512, out_features=512, bias=True)
        (value): Linear(in_features=512, out_features=512, bias=True)
        (fc_out): Linear(in_features=512, out_features=512, bias=True)
        (dropout): Dropout(p=0.2, inplace=False)
      )
      (ln1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (ln2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.2, inplace=False)
      (fcn): Sequential(
        (0): Linear(in_features=512, out_features=2048, bias=True)
        (1): GELU(approximate='none')
        (2): Linear(in_features=2048, out_features=512, bias=True)
      )
    )
  )
  (linear1): Linear(in_features=512, out_features=50253, bias=True)
)
Total Parameters: 124M
