In [1]:
from transformers import GPT2Tokenizer
import torch
import torch.nn as nn
import math
import torch.nn.functional as F

In [2]:
# Setting up device configurations
device = "cuda" if torch.cuda.is_available() else "cpu"
print("Device is: ", device)

Device is:  cpu


In [3]:
# Creating Tokenizer
tokenizer = GPT2Tokenizer.from_pretrained("gpt2")
text = "Every day is your"
encoded_input = tokenizer.encode(text)  # Returns a tensor
print(f"Encoded input: {encoded_input}")

tokenizer_config.json:   0%|          | 0.00/26.0 [00:00<?, ?B/s]

vocab.json:   0%|          | 0.00/1.04M [00:00<?, ?B/s]

merges.txt:   0%|          | 0.00/456k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/1.36M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/665 [00:00<?, ?B/s]

Encoded input: [6109, 1110, 318, 534]


In [4]:
# Embedding class
class Embedding(nn.Module):
    def __init__(self, vocab_size, embed_size):
        super().__init__()
        self.embed_size = embed_size
        self.embed = nn.Embedding(vocab_size, embed_size)

    def forward(self, x):
        # Create a mask for negative values
        mask = (x < 0).unsqueeze(-1)  # shape: (B, T, 1)
        
        # Replace negative indices with 0 to avoid index error
        x_clipped = x.clamp(min=0)

        # Get embeddings and zero out positions where the original input was negative
        emb = self.embed(x_clipped)  # (B, T, C)
        emb = emb.masked_fill(mask, 0.0)
        return emb

In [5]:
# 2. Positional Encoding
class PositionalEncoding(nn.Module):
    def __init__(self, embed_size, max_seq_length=256):
        super().__init__()
        self.pos_embedding = nn.Embedding(max_seq_length, embed_size)
        
    def forward(self, x):
        T = x.shape[1]
        pos_embed = self.pos_embedding(torch.arange(T, device=device)) # (T,C)
        return x + pos_embed

In [6]:
import torch.nn.functional as F
def generate_causal_mask(seq_length, proj_dim):
    mask = torch.tril(torch.ones(seq_length, proj_dim))
    return mask.unsqueeze(0).unsqueeze(0)

In [7]:
# 3. Multi-Head Attention 
class MultiHeadAttention(nn.Module):
    def __init__(self, embed_size, num_heads, proj_dim, seq_len, sharing, proj, qkv_bias=False, layer_idx=None):
        super().__init__()
        self.embed_size = embed_size # -> d
        self.num_heads = num_heads # -> h
        self.head_dim = embed_size // num_heads # -> d/h
        self.proj_dim = proj_dim  # -> k
        self.seq_len = seq_len # -> N
        self.sharing = sharing
        self.proj = proj
        self.layer_idx = layer_idx
        
        self.query = nn.Linear(embed_size, embed_size, bias=qkv_bias) # d*d
        self.key = nn.Linear(embed_size, embed_size, bias=qkv_bias) # d*d
        self.value = nn.Linear(embed_size, embed_size, bias=qkv_bias) # d*d
        
        self.out = nn.Linear(embed_size, embed_size) # fully connected FFNN from d -> d
        
    def forward(self, x, use_mask=False):
        B, T, _ = x.shape # Batch size, Sequence Length, Embed size
        h = self.num_heads
        d_h = self.head_dim
        N = self.seq_len
        
        q = self.query(x).view(B, -1, self.num_heads, self.head_dim).transpose(1, 2) # (B, N, d) → (B, N, h, d/h) -> (B, h, N, d/h)
        k = self.key(x).view(B, -1, self.num_heads, self.head_dim).transpose(1, 2)
        v = self.value(x).view(B, -1, self.num_heads, self.head_dim).transpose(1, 2)

        if(self.sharing == "layerwise-sharing"):
            # layerwise sharing -> only 1 matrix throughout the model
            projn_matrix = self.proj
            k_proj = torch.einsum('bhnf,nk->bhkf', k, projn_matrix)
            v_proj = torch.einsum('bhnf,nk->bhkf', v, projn_matrix)
        elif(self.sharing == "key-value-sharing"):
            # key-value sharing
            projn_matrix = self.proj[self.layer_idx]
            k_proj = torch.einsum('bhnf,nk->bhkf', k, projn_matrix)
            v_proj = torch.einsum('bhnf,nk->bhkf', v, projn_matrix)
        elif(self.sharing == "headwise-sharing"):
            # headwise sharing
            projn_matrix_E = self.proj["E"][self.layer_idx]
            projn_matrix_F = self.proj["F"][self.layer_idx]
            k_proj = torch.einsum('bhnf,nk->bhkf', k, projn_matrix_E)
            v_proj = torch.einsum('bhnf,nk->bhkf', v, projn_matrix_F)
            
        attention = torch.matmul(q, k_proj.transpose(-1, -2)) / math.sqrt(self.head_dim) # (B, h, N, k)
        if (use_mask):
            mask = generate_causal_mask(T, self.proj_dim) # (1,1,N,k)
            mask = mask.to(device)
            attention = attention.masked_fill(mask == 0, float('-inf'))
        attention = torch.softmax(attention, dim=-1)
        
        out = torch.matmul(attention, v_proj)
        out = out.transpose(1, 2).contiguous().view(B, -1, self.embed_size)
        return self.out(out)

In [8]:
# 5. Feed-Forward Network
class FeedForward(nn.Module):
    def __init__(self, embed_size, ff_hidden_size):
        super().__init__()
        # First linear layer that transforms input from embedding size to hidden size
        self.fc1 = nn.Linear(embed_size, ff_hidden_size)
        # Second linear layer that transforms from hidden size back to embedding size
        self.fc2 = nn.Linear(ff_hidden_size, embed_size)
        # GELU activation function
        self.gelu = nn.GELU()
    def forward(self, x):
        # Forward pass: apply the first linear layer, then GELU activation, and finally the second linear layer
        return self.fc2(self.gelu(self.fc1(x)))

In [9]:
# Transformer Block
class TransformerBlock(nn.Module):
    def __init__(self, embed_size, num_heads, ff_hidden_size, proj_dim, seq_len, sharing, proj, dropout=0.1, qkv_bias=False, layer_idx=None):
        super().__init__()
        # Layer norm before attention
        self.ln1 = nn.LayerNorm(embed_size)
        # Multi-Head Self-Attention
        self.mha = MultiHeadAttention(embed_size, num_heads, proj_dim, seq_len, sharing, proj, qkv_bias=qkv_bias, layer_idx=layer_idx)
        self.dropout1 = nn.Dropout(dropout)

        # Layer norm before FFN
        self.ln2 = nn.LayerNorm(embed_size)
        self.ff = FeedForward(embed_size, ff_hidden_size)
        self.dropout2 = nn.Dropout(dropout)

    def forward(self, x, use_mask=False):
        # MHA with Layer norm and residual connection
        x = x + self.dropout1(self.mha(self.ln1(x), use_mask))
        # FFN with layer norm and residual connection
        x = x + self.dropout2(self.ff(self.ln2(x)))
        return x

In [10]:
def projection(sharing, seq_length, proj_dim, n_layers=None):
    if(sharing == "layerwise-sharing"):
        proj = nn.Parameter(torch.empty(seq_length, proj_dim))
        torch.nn.init.xavier_normal_(proj)
        return proj
    if(sharing == "key-value-sharing"):
        proj = [nn.Parameter(torch.empty(seq_length, proj_dim)) for _ in range(n_layers)]
        for proj_i in proj:
            torch.nn.init.xavier_normal_(proj_i)
        return proj
    if(sharing == "headwise-sharing"):
        E_proj = [nn.Parameter(torch.empty(seq_length, proj_dim)) for _ in range(n_layers)]
        F_proj = [nn.Parameter(torch.empty(seq_length, proj_dim)) for _ in range(n_layers)]
        for proj_i in E_proj:
            torch.nn.init.xavier_normal_(proj_i)
        for proj_i in F_proj:
            torch.nn.init.xavier_normal_(proj_i)
        proj_dict = {
            "E" : E_proj,
            "F" : F_proj
        }
        return proj_dict

In [11]:
def right_pad_input(input_ids, context_length, pad_token_id=-1):
    T = input_ids.shape
    if T < context_length:
        padding = torch.full((B, context_length - T), pad_token_id, dtype=input_ids.dtype, device=input_ids.device)
        input_ids_padded = torch.cat([input_ids, padding], dim=1)
    else:
        input_ids_padded = input_ids[:, :context_length]
    return input_ids_padded

In [39]:
# 7. GPT-2 Model
class GPT2_modified(nn.Module):
    def __init__(self, config, seq_len):
        super().__init__()
        # Initializing embedding layer that converts token ids to embeddings
        self.embedding = Embedding(config["vocab_size"], config["emb_dim"])
        
        # Initializing positional encoding 
        self.positional_encoding = PositionalEncoding(config["emb_dim"], config["context_length"])
        
        # Create a list of transformer blocks
        if(config["sharing"] == "layerwise-sharing"):
            # Only 1 projection matrix
            proj = projection(config["sharing"], seq_len, config["proj_dim"])
            self.transformer_blocks = nn.ModuleList([
                TransformerBlock(config["emb_dim"], config["n_heads"], config["emb_dim"] * 4, config["proj_dim"], seq_len, config["sharing"], proj, config["drop_rate"], config["qkv_bias"])
                for _ in range(config["n_layers"])  # Repeat for num_layers 
            ])
        
        elif(config["sharing"] == "key-value-sharing"):
            # No. of projection matrices = config["n_layers"]
            proj = projection(config["sharing"], seq_len, config["proj_dim"], config["n_layers"])
            self.transformer_blocks = nn.ModuleList([
                TransformerBlock(config["emb_dim"], config["n_heads"], config["emb_dim"] * 4, config["proj_dim"], seq_len, config["sharing"], proj, config["drop_rate"], config["qkv_bias"], layer_idx)
                for layer_idx in range(config["n_layers"])  # Repeat for num_layers 
            ])
            
        elif(config["sharing"] == "headwise-sharing"):
            # No. of projection matrices = 2*config["n_layers"]
            proj = projection(config["sharing"], seq_len, config["proj_dim"], config["n_layers"])
            self.transformer_blocks = nn.ModuleList([
                TransformerBlock(config["emb_dim"], config["n_heads"], config["emb_dim"] * 4, config["proj_dim"], seq_len, config["sharing"], proj, config["drop_rate"], config["qkv_bias"], layer_idx)
                for layer_idx in range(config["n_layers"])  # Repeat for num_layers 
            ])

        # Final linear layer to project the output back to the vocabulary size for logits
        self.fc_out = nn.Linear(config["emb_dim"], config["vocab_size"])
        
        # Dropout layer for regularization
        self.dropout = nn.Dropout(config["drop_rate"])
        
    def forward(self, x, use_mask=False, targets=None):
        # Step 1: Convert input token IDs to embeddings and add positional encodings
        x = self.dropout(self.positional_encoding(self.embedding(x)))
        
        # Step 2: Pass the embeddings through each transformer block
        for block in self.transformer_blocks:
            x = block(x, use_mask)  # Apply the transformer block with optional masking
        
        # Step 3: Calculate the logits
        logits = self.fc_out(x)  # Shape: (batch_size, seq_length, vocab_size)

        # Step 4: Calculating the loss
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, input_text, max_new_tokens, context_length, tokenizer, vocab_size, use_mask=False):
        # input_text-> string array
        x = tokenizer.encode(input_text) # list of length T, T is no. of tokens
        x = torch.tensor(x).unsqueeze(0).to(device)
        T = x.shape[1]
        x = right_pad_input(x, context_length=context_length, pad_token_id = 0)
        for _ in range(max_new_tokens):
            # Get the predictions
            logits, loss = self(x, use_mask) # (B, T, C)
            
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            
            # sample from the distribution
            x_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            
            if(T < context_length):
                x[0][T] = x_next[0][0]
            else:
                x = x[:, 1:]
                x = torch.cat((x, x_next), dim=1)

            input_text += tokenizer.decode(x_next[0][0])
            T = T+1
        return input_text

In [40]:
tokenizer.decode(1232)

' leg'

---

# Training

In [27]:
GPT_CONFIG = {
    "vocab_size": 50257,    # Vocabulary size
    "context_length": 256, # Context length
    "emb_dim": 384,         # Embedding dimension
    "proj_dim" : 96  ,     # Projected sequence length
    "n_heads": 6,          # Number of attention heads
    "n_layers": 6,         # Number of layers
    "drop_rate": 0.1,       # Dropout rate
    "qkv_bias": False,       # Query-Key-Value bias
    "use_mask" : True,
    "batch_size": 16,
    "num_epochs": 20,
    "lr": 10**-4,
    "sharing": "headwise-sharing"
}

In [15]:
# Downloading the Data -> Here we are using tiny shakespeare dataset from github
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2025-04-20 06:13:39--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.110.133, 185.199.111.133, 185.199.108.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.110.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt’


2025-04-20 06:13:40 (5.74 MB/s) - ‘input.txt’ saved [1115394/1115394]



In [16]:
with open('/kaggle/working/input.txt', 'r', encoding='utf-8') as f:
    text = f.read()
print("length of dataset in characters: ", len(text))
print("--------------------------------")
print(text[:100])

length of dataset in characters:  1115394
--------------------------------
First Citizen:
Before we proceed any further, hear me speak.

All:
Speak, speak.

First Citizen:
You


In [17]:
data = torch.tensor(tokenizer.encode(text), dtype = torch.int64)
print(data.shape, data.dtype)
print(data[:10])

Token indices sequence length is longer than the specified maximum sequence length for this model (338025 > 1024). Running this sequence through the model will result in indexing errors


torch.Size([338025]) torch.int64
tensor([ 5962, 22307,    25,   198,  8421,   356,  5120,   597,  2252,    11])


In [18]:
# Train and Validation Split
n = int(0.9*(len(data)))
train_data = data[:n]
val_data = data[n:]

In [19]:
def get_batch(split, batch_size, seq_length): # Gives us batches on the bases of split
    data = train_data if split == "train" else val_data
    index = torch.randint(len(data) - seq_length, (batch_size,)) 
    # gives us list of random integers between range [0, len(data)-seq_length], where the number of lists is equal to batch size
    x = torch.stack([data[i:i+seq_length] for i in index])
    y = torch.stack([data[i+1:i+seq_length+1] for i in index])
    x, y = x.to(device), y.to(device)
    return x, y

In [41]:
m = GPT2_modified(GPT_CONFIG, GPT_CONFIG["context_length"])
model = m.to(device)
optimizer = torch.optim.AdamW(model.parameters(), lr=GPT_CONFIG["lr"])

In [42]:
model

GPT2_modified(
  (embedding): Embedding(
    (embed): Embedding(50257, 384)
  )
  (positional_encoding): PositionalEncoding(
    (pos_embedding): Embedding(256, 384)
  )
  (transformer_blocks): ModuleList(
    (0-5): 6 x TransformerBlock(
      (ln1): LayerNorm((384,), eps=1e-05, elementwise_affine=True)
      (mha): MultiHeadAttention(
        (query): Linear(in_features=384, out_features=384, bias=False)
        (key): Linear(in_features=384, out_features=384, bias=False)
        (value): Linear(in_features=384, out_features=384, bias=False)
        (out): Linear(in_features=384, out_features=384, bias=True)
      )
      (dropout1): Dropout(p=0.1, inplace=False)
      (ln2): LayerNorm((384,), eps=1e-05, elementwise_affine=True)
      (ff): FeedForward(
        (fc1): Linear(in_features=384, out_features=1536, bias=True)
        (fc2): Linear(in_features=1536, out_features=384, bias=True)
        (gelu): GELU(approximate='none')
      )
      (dropout2): Dropout(p=0.1, inplace=False)

In [43]:
total_params = sum(p.numel() for p in model.parameters())
print(f"Total parameters: {total_params:,}")

Total parameters: 49,385,809


In [None]:
num_tokens = len(train_data)
batches_per_epoch = num_tokens // (GPT_CONFIG["batch_size"]*GPT_CONFIG["context_length"])

for epoch in range(GPT_CONFIG["num_epochs"]):
    for step in range(batches_per_epoch):
        xb, yb = get_batch('train', GPT_CONFIG["batch_size"], GPT_CONFIG["context_length"])
        logits, loss = model(xb, GPT_CONFIG["use_mask"], yb)
        optimizer.zero_grad(set_to_none=True)
        loss.backward()
        optimizer.step()
        if step % 50 == 0:
            print(f"Epoch {epoch+1} | Step {step+1}/{batches_per_epoch} | Loss: {loss.item():.4f}")


Epoch 1 | Step 1/74 | Loss: 11.2932


In [44]:
input_text = "hello! how are you?"
vocab_size = GPT_CONFIG["vocab_size"]  # Assuming 50257
output = model.generate(input_text=input_text,
               max_new_tokens=10, 
               context_length=GPT_CONFIG["context_length"], 
               tokenizer=tokenizer,
               vocab_size=vocab_size,
               use_mask=True)
print(output)