#Install required libraries

In [1]:
#!pip install torch

#Import Required Libraries

In [2]:
import torch
import torch.nn as nn

#Shift On GPU

This will make the training use GPU instead of CPU. Using GPU will make training and model usage much faster .

In [3]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print(f"Using device: {device}")

Using device: cuda


#Tokenize + Encode Data

This code will allocate a specific number code to each charater of given text called tokens.
we make 2 dictionaries as given below:
1. stoi (key: character, value: integer_id)
2. itos (key: integer_id, value: character)

encode function converts the text into tokens (integer_ids) while decode function convert the encoded tokens again into original text according to the saved dictionaries

In [4]:
txt="""The sun was setting behind the hills. Birds flew across the orange sky, heading home. A boy sat on the grass, watching the clouds move slowly. He smiled, feeling the cool wind on his face. It was a peaceful evening, and everything felt calm. He didn’t want the moment to end.\n""" * 100

chars = sorted(list(set(txt)))
stoi = {ch:i for i,ch in enumerate(chars)}
itos = {i:ch for ch,i in stoi.items()}

def encode(s):
  return [stoi[c] for c in s]
def decode(l):
  return ''.join([itos[n] for n in l])


x = torch.tensor(encode(txt)).to(device)

#Make Tiny Dataset

This code is used to make many different data batches from a single dataset.
Like previously in ML the dataset is like (x,y) where x is the input and y is the output (prediction), samelike that x is the input text and y is the output text. Because the transformer will predict what user wants to write next so we make dataset from the original data like some words (block size) as input (x) and some next words (block size) as output (y).


First we make two lists x & y. then we loop over the range of len(seq) - block_size.



For Example:
seq = [2,3,1,6,5,9]
block_size = 3

so, len(seq) = 6
and (len(seq) - block_size) = 6-3 =3

i will iterate through 0->3
for i = 0:
x.append(2:0+3)
-> x.append(2:3)
-> x becomes [2,3,1]
and y becomes [3,1,6]
in this way we get many batches (len(seq) - block_size)

At the end it becomes:

x = [[2,3,1], [3,1,6], [1,6,5]]


y = [[3,1,6], [1,6,5], [6,5,9]]

In [5]:
def get_batch(seq, block_size):
    x = []
    y = []
    for i in range(len(seq) - block_size):
        x.append(seq[i:i + block_size])
        y.append(seq[i + 1:i + block_size + 1])

    return torch.tensor(x, dtype=torch.long).to(device), torch.tensor(y, dtype=torch.long).to(device)

#Positional + Token Embedding Layer

Here we make 2 vectors, one having the token embedings of characters in the block while other has positions order

---

vocab_size is vocublary size (numbers of characters in this case)
block_size is the max number of character to check at a time.
embed_dim is the number of elements in a vector.

---

first we make a vector having the embedings for each token.
[vocab_size x embed_dim]
=> [8x4] = 2D vector having 32 elements each row is a token.


each element is a uniform or normal distribution random values like:
[
 [ 0.02, -0.11,  0.05, 0.07],  # token ID 0
 [ 0.12, -0.55,  0.33, 0.98],  # token ID 1
 [ 0.66,  0.20, -0.50, 0.44],  # token ID 2
 ...
]


These are updated during training using:
new_value = old_value - learning_rate * gradient



---



position_embed is also a vector which just gives the position order


For Example: if block size is 4 then each vector will be of 4 elements and position emebed vector will always be:  [0,1,2,3]

In [6]:
class Embedding(nn.Module):
  def __init__(self, voacb_size, embed_dim, block_size ):
    super().__init__()
    self.token_embed=nn.Embedding(voacb_size,embed_dim)
    self.position_embed=nn.Embedding(block_size,embed_dim)

  def forward(self,x):
    B,T=x.shape
    tok=self.token_embed(x)
    pos=self.position_embed(torch.arange(T, device=x.device))
    return tok+pos

#Self Attention + Casual Masking


Query (q): “Which type of info i am finding?”

Key (k): “Which type of info i hold?”

Value (v): "What actual info i hold?”



---

Self attention is like you are in a group discussion and you listen to other and analyze who looks smarter of them and you focus more on his/her words and you talk according to the scenario and the last talk.

wei= (q @ k.transpose(-2,-1)) * (C ** -0.5)

This line compares current words with past words/other words to check how good they are according to the scenario and dividing by Sq.root(C) is just a math normalazier.



---

We mask the future words like we don't know wtf other person will say after that so we just focus on previous words.

After that we just normalize our answer (model output) using softmax and
out = wei @ v

is weighted sum. model actual output is build by analyzing which token depends mostly on which token.

In [7]:
class selfAttention(nn.Module):
  def __init__(self,embed_dim,head_size) -> None:
    super().__init__()
    self.key=nn.Linear(embed_dim,head_size)
    self.val=nn.Linear(embed_dim,head_size)
    self.query=nn.Linear(embed_dim,head_size)
    self.proj=nn.Linear(head_size,embed_dim)

  def forward(self,x):
    B,T,C=x.shape
    k=self.key(x)
    v=self.val(x)
    q=self.query(x)

    wei= (q @ k.transpose(-2,-1)) * (C ** -0.5)

    mask = torch.tril(torch.ones(T,T)).to(x.device) == 0
    wei = wei.masked_fill(mask,float('-inf'))

    wei = torch.softmax(wei, dim=-1)
    out = wei @ v

    return self.proj(out)

#Decoder Block + Stacking

Here we first get the self attention (mix token with contexual tokens) then we make the vector 4x and use ReLU activation to make negative values zero and add non-linearity so model can learn better. After that vector is again converted to original size having richer information.

nl1 and nl2 are normalization layers, they normalize output.

In [8]:
class Block(nn.Module):
  def __init__(self,embed_dim,head_size):
    super().__init__()
    self.sa=selfAttention(embed_dim,head_size)
    self.ff=nn.Sequential(
        nn.Linear(embed_dim,4*embed_dim),
        nn.ReLU(),
        nn.Linear(4*embed_dim,embed_dim)
    )

    self.nl1=nn.LayerNorm(embed_dim)
    self.nl2=nn.LayerNorm(embed_dim)

  def forward(self,x):
    x = x + self.sa(self.nl1(x))
    x = x + self.ff(self.nl2(x))
    return x

#Final GPT Model

It is the execution point where it first do **embeding** then train it using **Block** function and then **normalize** it after that a linear layer is used to convert the whole calculation into the actual word to be displayed.

In [9]:
class MiniGPT(nn.Module):
  def __init__(self,vocab_size,embed_dim,block_size,n_heads,n_layers):
    super().__init__()
    self.embed=Embedding(vocab_size,embed_dim,block_size)
    self.blocks=nn.Sequential(*[Block(embed_dim, embed_dim//n_heads) for _ in range(n_layers)])
    self.ln_f=nn.LayerNorm(embed_dim)
    self.head=nn.Linear(embed_dim,vocab_size)

  def forward(self,x):
    x=self.embed(x)
    x=self.blocks(x)
    x=self.ln_f(x)
    return self.head(x)

#Generate

In [10]:
# Generation function
def generate(model, start_str, max_length=20, temperature=1.0):
    model.eval()
    tokens = encode(start_str)
    for _ in range(max_length):
        # Get last block_size tokens
        input_tokens = tokens[-block_size:]
        input_tensor = torch.tensor([input_tokens]).to(model.head.weight.device) # Move to the same device as the model

        # Get prediction
        with torch.no_grad():
            logits = model(input_tensor)

        # Get probabilities
        probs = torch.softmax(logits[0, -1] / temperature, dim=-1)
        next_token = torch.multinomial(probs, num_samples=1).item()

        tokens.append(next_token)

    return decode(tokens)

#Training

In [12]:
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset


# Hyperparameters
vocab_size = len(chars)
embed_dim = 64
block_size = 16
n_heads = 4
n_layers = 4
batch_size = 64
learning_rate = 3e-4
epochs = 100

# Create model
# Model setup
model = MiniGPT(vocab_size, embed_dim, block_size, n_heads, n_layers).to(device)

# Create dataset
text = """The sun was setting behind the hills. Birds flew across the orange sky, heading home. A boy sat on the grass, watching the clouds move slowly. He smiled, feeling the cool wind on his face. It was a peaceful evening, and everything felt calm. He didn’t want the moment to end.

 """ * 100  # Small repeating dataset for testing
X, Y = get_batch(encode(text), block_size)
X, Y = X.to(device), Y.to(device)
dataset = TensorDataset(X, Y)
dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)

# Training loop
for epoch in range(epochs):
    for batch_x, batch_y in dataloader:
        batch_x, batch_y = batch_x.to(device), batch_y.to(device)  # Move to GPU

        logits = model(batch_x)
        loss = criterion(logits.view(-1, vocab_size), batch_y.view(-1))

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if epoch % 10 == 0:
        print(f'Epoch {epoch}, Loss: {loss.item():.4f}')

# Generation test
print(generate(model, "was"))

Epoch 0, Loss: 0.4529
Epoch 10, Loss: 0.2059
Epoch 20, Loss: 0.1322
Epoch 30, Loss: 0.1900
Epoch 40, Loss: 0.1669
Epoch 50, Loss: 0.1784
Epoch 60, Loss: 0.1642
Epoch 70, Loss: 0.1844
Epoch 80, Loss: 0.2232
Epoch 90, Loss: 0.1607
was a peaceful evening,


#Evaluation

In [1]:
# Test generation
print(generate(model, "was"))

NameError: name 'generate' is not defined