In [30]:
import torch
import torch.nn as nn
import math
import numpy as np
import random
import torch.nn.functional as F

In [31]:
# very simple character-level tokenizer aka each character is condsdired a token
class Tokenizer:
    def __init__(self,text):
        chars = sorted(list(set(text)))
        self.str_to_ind = {char: index for index, char in enumerate(chars)}


        self.ind_to_str = {index: char for char, index in self.str_to_ind.items()}
        self.vocab_size = len(chars)



    def encode(self, text):
        return [self.str_to_ind[char] for char in text]

    def decode(self, tokens):
        return ''.join([self.ind_to_str[token] for token in tokens])


In [32]:
class Embed(nn.Module):
    def __init__(self,vocab_size,embed_dim, max_len):


        super().__init__()
        self.token_embed = nn.Embedding(vocab_size, embed_dim)
        self.pos_embed = nn.Embedding(max_len, embed_dim)

    def forward(self, x):


        B, T = x.shape
        token_embeddings = self.token_embed(x)

        positions = torch.arange(T, device=x.device).unsqueeze(0)
        pos_embeddings = self.pos_embed(positions)

        return token_embeddings + pos_embeddings  #(B, T, embed_dim)


In [33]:
class MultiHeadSelfAttention(nn.Module):
    def __init__(self,embed_dim, num_heads):
        super().__init__()
        assert embed_dim % num_heads == 0
        self.num_heads = num_heads

        self.head_dim = embed_dim // num_heads

        self.qkv = nn.Linear(embed_dim, embed_dim * 3)  # for query, key, value


        self.out_proj = nn.Linear(embed_dim, embed_dim)

    def forward(self, x):
        B, T, D = x.shape  # batch, time, dim
        qkv = self.qkv(x)  # (B, T, 3D)


        q, k, v = qkv.chunk(3, dim=-1)

        # the different heads
        q = q.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)  # (B, H, T, d)
        k = k.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)
        v = v.view(B, T, self.num_heads, self.head_dim).transpose(1, 2)

        # scaled dot-pridcut attention
        scores = (q @ k.transpose(-2, -1)) / math.sqrt(self.head_dim)  # (B, H, T, T)


        mask = torch.tril(torch.ones(T, T)).to(x.device) == 0  # causal mask
        scores = scores.masked_fill(mask, float('-inf'))

        attn = F.softmax(scores, dim=-1)
        out = attn @ v  # (B, H, T, d)

        # concatenate the heads adter
        out = out.transpose(1, 2).contiguous().view(B, T, D)
        return self.out_proj(out)  # (B, T, D)


In [34]:
class FeedForward(nn.Module):
    def __init__(self, embed_dim,hidden_dim):
        super().__init__()
        self.net = nn.Sequential(


            nn.Linear(embed_dim, hidden_dim),
            nn.ReLU(),
            nn.Linear(hidden_dim, embed_dim)
        )

    def forward(self, x):
        return self.net(x)


In [35]:
class TransformerBlock(nn.Module):
    def __init__(self, embed_dim, num_heads,ff_hidden_dim):

        super().__init__()


        self.attn = MultiHeadSelfAttention(embed_dim, num_heads)
        self.ff = FeedForward(embed_dim, ff_hidden_dim)
        self.ln1 = nn.LayerNorm(embed_dim)
        self.ln2 = nn.LayerNorm(embed_dim)


    def forward(self, x):


        x = x + self.attn(self.ln1(x))
        x = x + self.ff(self.ln2(x))
        # includes the skip connectiuns

        return x


In [36]:
class MiniGPT(nn.Module):
    def __init__(self, vocab_size,embed_dim,block_size, n_layers,n_heads,ff_hidden_dim):
        super().__init__()


        self.embed = Embed(vocab_size, embed_dim, block_size)


        self.blocks = nn.Sequential(*[
            TransformerBlock(embed_dim, n_heads, ff_hidden_dim)
            for _ in range(n_layers)
        ])
        self.ln_f = nn.LayerNorm(embed_dim)
        self.head = nn.Linear(embed_dim, vocab_size)

    def forward(self, x):
        x = self.embed(x)
        x = self.blocks(x)
        x = self.ln_f(x)
        return self.head(x)

    def generate(self, x, max_new_tokens):
        for _ in range(max_new_tokens):
            x_cond = x[:, -block_size:]  # crop to block size
            logits = self(x_cond)

            probs = F.softmax(logits[:, -1, :], dim=-1)

            next_token = torch.multinomial(probs, num_samples=1)
            x = torch.cat([x, next_token], dim=1)
        return x


In [37]:
import requests

# corpus from the odyssey book from project gutenburg



url = "https://www.gutenberg.org/cache/epub/1727/pg1727.txt"


response = requests.get(url)

response.encoding = 'utf-8'

raw_text = response.text

# header and footer
start_marker = "*** START OF THE PROJECT GUTENBERG EBOOK THE ODYSSEY ***"



end_marker = "*** END OF THE PROJECT GUTENBERG EBOOK THE ODYSSEY ***"

start = raw_text.find(start_marker)

end = raw_text.find(end_marker)

if start != -1 and end != -1:
    corpus = raw_text[start + len(start_marker):end]


else:
    corpus = raw_text

# clean
corpus = corpus.lower()

In [38]:

tokenizer = Tokenizer(corpus)
data = tokenizer.encode(corpus)

block_size = 8  # context size
def get_batch(batch_size=16):
    X, Y = [], []
    for _ in range(batch_size):


        idx = random.randint(0, len(data) - block_size - 1)


        x = data[idx:idx+block_size]
        y = data[idx+1:idx+block_size+1]

        X.append(x)
        Y.append(y)
    return torch.tensor(X), torch.tensor(Y)


In [39]:
model = MiniGPT(
    vocab_size=tokenizer.vocab_size,
    embed_dim=64,
    block_size=block_size,
    n_layers=2,
    n_heads=4,
    ff_hidden_dim=128
)

optimizer = torch.optim.AdamW(model.parameters(), lr=1e-3)
loss_fn = nn.CrossEntropyLoss()

# traingin
for step in range(300):
    model.train()
    xb, yb = get_batch()

    logits = model(xb)
    B, T, C = logits.shape
    loss = loss_fn(logits.view(B*T, C), yb.view(B*T))

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if step % 50 == 0:
        print(f"Step {step} Loss: {loss.item():.4f}")


Step 0 Loss: 4.9370
Step 50 Loss: 3.0159
Step 100 Loss: 2.4031
Step 150 Loss: 2.5680
Step 200 Loss: 2.1871
Step 250 Loss: 2.3408


In [40]:
model.eval()

context = torch.tensor([[tokenizer.str_to_ind['h']]], dtype=torch.long)  # starting char
generated = model.generate(context, max_new_tokens=100)[0].tolist()

print("\nGenerated Text:\n", tokenizer.decode(generated))



Generated Text:
 henar thacis oul.isg wis, athe at thelal 
ung ave hacoghanc atede thems lar:f,and tharele tiln- le fi
