## Imports and Device Setup

In [24]:
from tqdm import tqdm
import torch
import torch.nn as nn
import math
from torch.utils.data import TensorDataset,DataLoader
from torch.nn import functional as F

In [25]:
torch.__version__

'2.8.0+cu126'

In [26]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

## Getting the dataset ready

In [27]:
!wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

--2025-09-17 11:30:01--  https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 185.199.108.133, 185.199.110.133, 185.199.109.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|185.199.108.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1115394 (1.1M) [text/plain]
Saving to: ‘input.txt.1’


2025-09-17 11:30:01 (143 MB/s) - ‘input.txt.1’ saved [1115394/1115394]



In [28]:
with open('input.txt','r',encoding='utf-8') as f:
  text = f.read()

In [29]:
text[:500], len(text)

("First Citizen:\nBefore we proceed any further, hear me speak.\n\nAll:\nSpeak, speak.\n\nFirst Citizen:\nYou are all resolved rather to die than to famish?\n\nAll:\nResolved. resolved.\n\nFirst Citizen:\nFirst, you know Caius Marcius is chief enemy to the people.\n\nAll:\nWe know't, we know't.\n\nFirst Citizen:\nLet us kill him, and we'll have corn at our own price.\nIs't a verdict?\n\nAll:\nNo more talking on't; let it be done: away, away!\n\nSecond Citizen:\nOne word, good citizens.\n\nFirst Citizen:\nWe are accounted poor",
 1115394)

In [30]:
set(text)

{'\n',
 ' ',
 '!',
 '$',
 '&',
 "'",
 ',',
 '-',
 '.',
 '3',
 ':',
 ';',
 '?',
 'A',
 'B',
 'C',
 'D',
 'E',
 'F',
 'G',
 'H',
 'I',
 'J',
 'K',
 'L',
 'M',
 'N',
 'O',
 'P',
 'Q',
 'R',
 'S',
 'T',
 'U',
 'V',
 'W',
 'X',
 'Y',
 'Z',
 'a',
 'b',
 'c',
 'd',
 'e',
 'f',
 'g',
 'h',
 'i',
 'j',
 'k',
 'l',
 'm',
 'n',
 'o',
 'p',
 'q',
 'r',
 's',
 't',
 'u',
 'v',
 'w',
 'x',
 'y',
 'z'}

In [31]:
chars = list(sorted(set(text)))
vocab_size = len(chars)
print(''.join(chars))
print(vocab_size)


 !$&',-.3:;?ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz
65


In [32]:
stoi = {ch:i for i,ch in enumerate(chars)}
itos = {i:ch for i,ch in enumerate(chars)}

### Causal Masking function:
* This function creates a mask to prevent the model from attending to future tokens during training. It ensures that the prediction for a token only depends on the tokens that come before it in the sequence.

In [33]:
def build_causal_mask(seq_len, device):
    # Returns shape (1, 1, seq_len, seq_len)
    mask = torch.tril(torch.ones(seq_len, seq_len, dtype=torch.bool, device=device))
    return mask.unsqueeze(0).unsqueeze(0)

## Token Embedding:
Converting the tokens into vectors.

In [None]:
class TokenEmbedding(nn.Module):
    def __init__(self,vocab_size,d_model):
        super(TokenEmbedding,self).__init__()
        self.embedding = nn.Embedding(num_embeddings=vocab_size,embedding_dim=d_model)

    def forward(self,x):
        return self.embedding(x)

## Positional Encoding:
* Adding positional encodings to the token embeddings to provide information about the position of each token in the sequence. This is crucial for the model to understand the order of tokens.
* Here we use sinusoidal positional encoding as described in the "Attention is All You Need" paper. This works by using sine and cosine functions of different frequencies to generate unique positional encodings for each position in the sequence.

In [35]:
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super(PositionalEncoding, self).__init__()
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)  #(1, max_len, d_model)
        self.register_buffer('pe', pe)

    def forward(self, x):
        '''
        returns the sum of embeddings and positional encoding, hence positional encoded embeddings
        '''
        # shape of x is now: (batch_size, seq_len, d_model)
        seq_len = x.size(1)
        return x + self.pe[:, :seq_len, :]

## Multi-Head Self-Attention:
* This mechanism allows the model to focus on different parts of the input sequence when making predictions.
* It computes attention scores using the query, key, and value matrices derived from the input embeddings.

In [36]:
class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, d_model):
        super(MultiHeadAttention,self).__init__()
        assert d_model % num_heads == 0   # d_model must be divisible by num_heads
        self.d_model = d_model
        self.num_heads = num_heads
        self.d_k = d_model // num_heads


        self.W_q = nn.Linear(d_model,d_model)
        self.W_k = nn.Linear(d_model,d_model)
        self.W_v = nn.Linear(d_model,d_model)
        self.W_o = nn.Linear(d_model,d_model)

    def forward(self,query,key,value,mask):
        '''
        shape of query,key,value : batch_size, seq_len, d_model
        '''
        batch_size = query.size(0)

        Q = self.W_q(query)
        K = self.W_k(key)
        V = self.W_v(value)

        Q = Q.view(batch_size,-1,self.num_heads,self.d_k).transpose(1,2)
        K = K.view(batch_size,-1,self.num_heads,self.d_k).transpose(1,2)
        V = V.view(batch_size,-1,self.num_heads,self.d_k).transpose(1,2)

        scores = torch.matmul(Q,K.transpose(-2,-1)) / math.sqrt(self.d_k)
        # [B, H, L, d_k] × [B, H, d_k, L] → [B, H, L, L]

        if mask is not None:
            if mask.dim() == 4 and mask.size(-1) == scores.size(-1):
                scores = scores.masked_fill(~mask, float('-inf'))
            else:
                raise ValueError(f"Mask shape {mask.shape} not compatible with scores {scores.shape}")


        attn = torch.softmax(scores,dim=-1)
        output = torch.matmul(attn,V)
        #[B, H, L, L] × [B, H, L, d_k] → [B, H, L, d_k]

        output = output.transpose(1,2).contiguous().view(batch_size,-1,self.d_model)

        return self.W_o(output)

## Feed-Forward Neural Network:
* A simple two-layer feed-forward neural network with a ReLU activation function in between. This helps in learning complex patterns in the data.

In [37]:
class FeedForward(nn.Module):
    def __init__(self, d_model,hidden_layer,dropout=0.1):
        super(FeedForward,self).__init__()
        self.layer1 = nn.Linear(d_model,hidden_layer)
        self.relu = nn.ReLU()
        self.layer2 = nn.Linear(hidden_layer,d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self,x):
        return self.layer2(self.dropout(self.relu(self.layer1(x))))

## Normalization and Residual Connections:
* Layer normalization is applied to stabilize and accelerate training.
* Residual connections help in mitigating the vanishing gradient problem and allow for deeper networks.

In [38]:
class NormResidual(nn.Module):
    def __init__(self,d_model,dropout=0.1):
        super(NormResidual,self).__init__()
        self.layernorm = nn.LayerNorm(d_model)
        self.dropout = nn.Dropout(dropout)

    def forward(self,x, sublayer_output):
        return self.layernorm(x + self.dropout(sublayer_output))

## Decoder Layer:
* Combines the multi-head self-attention and feed-forward neural network with normalization and residual connections.
* This is a decoder-based architecture, so it does not include the encoder part. The decoder is designed to generate text based on the input sequence.

In [None]:
class DecoderLayer(nn.Module):
    def __init__(self,d_model,num_heads,hidden_layer,dropout=0.1):
        super(DecoderLayer,self).__init__()
        self.self_attn = MultiHeadAttention(num_heads,d_model)
        self.res_output1 = NormResidual(d_model,dropout)
        self.ff_output = FeedForward(d_model,hidden_layer,dropout)
        self.res_output2 = NormResidual(d_model,dropout)

    def forward(self,x,mask):
        x = self.res_output1(x, self.self_attn(x,x,x,mask))
        x = self.res_output2(x,self.ff_output(x))
        return x

## Transformer Model:
* Stacks multiple decoder layers to form the complete transformer model.

In [None]:
class Transformer(nn.Module):
    def __init__(self,vocab_size,d_model,num_heads,hidden_layer,num_dec,max_len,dropout):
        super(Transformer,self).__init__()
        self.token_emb = TokenEmbedding(vocab_size,d_model)
        self.pos_enc = PositionalEncoding(d_model,max_len)
        self.decoder = nn.ModuleList([
            DecoderLayer(d_model, num_heads, hidden_layer, dropout)
            for _ in range(num_dec)
        ])
        self.output_layer = nn.Linear(d_model,vocab_size)

    def forward(self,idx):
        B,L = idx.shape
        x = self.token_emb(idx)
        x = self.pos_enc(x)
        mask = build_causal_mask(L,device)
        for layer in self.decoder:
            x = layer(x,mask)
        output = self.output_layer(x)
        return output

## Hyperparameters:

In [62]:
d_model=512
num_heads=4
hidden_layer=512
num_dec=4
block_size=128
batch_size=64
dropout=0.1
epochs=3
learning_rate=3e-4

## Train Test Split and Preparation of DataLoaders

In [63]:
data = [stoi[token] for token in text]

In [64]:
n = int(0.9*len(data))
train = data[:n]
test = data[n:]
len(train),len(test),len(train) + len(test), len(data) # 90% train data, 10% test data

(1003854, 111540, 1115394, 1115394)

In [65]:
input_seqs = []
target_seqs = []
input_seqs_test = []
target_seqs_test = []
for i in range(0,len(train)-block_size):
    input_seqs.append(train[i:i+block_size])
    target_seqs.append(train[i+1:i+block_size+1])

for i in range(0,len(test)-block_size):
    input_seqs_test.append(test[i:i+block_size])
    target_seqs_test.append(test[i+1:i+block_size+1])

input_tensor = torch.tensor(input_seqs,dtype=torch.long)
target_tensor = torch.tensor(target_seqs,dtype=torch.long)
input_tensor_test = torch.tensor(input_seqs_test,dtype=torch.long)
target_tensor_test = torch.tensor(target_seqs_test,dtype=torch.long)

dataset_train = TensorDataset(input_tensor,target_tensor)
dataset_test = TensorDataset(input_tensor_test,target_tensor_test)
train_loader = DataLoader(dataset_train,batch_size,shuffle=True)
test_loader = DataLoader(dataset_test,batch_size,shuffle=True)

train_loader,test_loader


(<torch.utils.data.dataloader.DataLoader at 0x7b5054975e50>,
 <torch.utils.data.dataloader.DataLoader at 0x7b51529ec560>)

## Training:
* The model is trained using the Adam optimizer and cross-entropy loss function.
* The training loop iterates over the dataset for a specified number of epochs, updating the model weights based on the computed loss.

In [66]:
model = Transformer(vocab_size, d_model=d_model, num_heads=num_heads, hidden_layer=hidden_layer, num_dec=num_dec, max_len=block_size, dropout=dropout).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [67]:
for epoch in range(epochs):
    model.train()
    total_train_loss = 0
    loop = tqdm(train_loader, leave=True)
    loop.set_description(f"Epoch [{epoch+1}/{epochs}]")

    for batch_idx, (x, y) in enumerate(loop):
        x, y = x.to(device), y.to(device)
        optimizer.zero_grad()
        logits = model(x)  # (batch, seq, vocab_size)
        loss = F.cross_entropy(logits.view(-1, vocab_size), y.view(-1))
        loss.backward()
        optimizer.step()
        total_train_loss += loss.item()
        loop.set_postfix(batch_loss=loss.item(),
                         avg_loss=total_train_loss/(batch_idx+1))

    avg_train_loss = total_train_loss / len(train_loader)
    print(f"Epoch {epoch+1} complete. Average Train Loss: {avg_train_loss:.4f}\n")


Epoch [1/3]: 100%|██████████| 15684/15684 [33:03<00:00,  7.91it/s, avg_loss=1.12, batch_loss=0.872]


Epoch 1 complete. Average Train Loss: 1.1190



Epoch [2/3]: 100%|██████████| 15684/15684 [32:54<00:00,  7.94it/s, avg_loss=0.752, batch_loss=0.627]


Epoch 2 complete. Average Train Loss: 0.7519



Epoch [3/3]: 100%|██████████| 15684/15684 [32:50<00:00,  7.96it/s, avg_loss=0.624, batch_loss=0.568]

Epoch 3 complete. Average Train Loss: 0.6244






## Saving the model:
* The trained model is saved in a file named 'mini_gpt.pth'.

In [68]:
torch.save(model.state_dict(),'mini_gpt.pth')
model = Transformer(vocab_size, d_model=d_model, num_heads=num_heads, hidden_layer=hidden_layer, num_dec=num_dec, max_len=block_size, dropout=dropout).to(device)
model.load_state_dict(torch.load('mini_gpt.pth',map_location=device))

<All keys matched successfully>

## Text Generation:
* A function to generate text based on a given prompt using the trained model. It uses temperature sampling to introduce randomness in the generated text.

In [69]:

def generate_text(model, prompt, stoi, itos, num_generate,temperature):
    '''
    Model: Transformer Model
    prompt: Some text to start with.
    stoi,itos: mapping dictionaries.
    block_size: maximum context length for the model (use the same value which the model was trained on.)
    '''
    input_indices = [stoi[ch] for ch in prompt]
    generated = input_indices.copy()
    model.eval()
    for _ in range(num_generate):
        # Truncate to block_size
        input_seq = generated[-block_size:] if len(generated) > block_size else generated
        x = torch.tensor([input_seq], dtype=torch.long).to(device)
        with torch.no_grad():
            logits = model(x)  # (1, seq_len, vocab_size)
            logits = logits[0, -1, :] / temperature  # last position
            probs = torch.softmax(logits, dim=-1)
            next_idx = torch.multinomial(probs, num_samples=1).item()
        generated.append(next_idx)
    return ''.join([itos[i] for i in generated])

In [72]:
generate_text(model,'Romeo:',stoi,itos,100,1.0)


'Romeo:\nYour highness are ready, stumbled at that measure\nWhich know the poor will I wish you: if you will,'

## Evaluation:
* The model is evaluated on a validation set to monitor its performance and prevent overfitting.

In [73]:
for epoch in range(epochs):
  model.eval()
  total_test_loss = 0
  with torch.no_grad():
    for batch_idx, (x, y) in enumerate(test_loader):
      x, y = x.to(device), y.to(device)
      logits = model(x)  # (batch, seq, vocab_size)
      test_loss = F.cross_entropy(logits.view(-1, vocab_size), y.view(-1))
      total_test_loss += test_loss.item()
  avg_test_loss = total_test_loss / len(test_loader)
  print(f"Epoch {epoch+1} complete. Average Test Loss: {avg_test_loss:.4f}")

Epoch 1 complete. Average Test Loss: 2.4363
Epoch 2 complete. Average Test Loss: 2.4362
Epoch 3 complete. Average Test Loss: 2.4363
