In [None]:
# imports
import sys
print(sys.executable)

import torch
import torch.nn as nn
from torch.nn import functional as F

device = torch.device("mps" if torch.backends.mps.is_available() else "cpu")
print(f'Device :{device}')

# Parameters
block_size = 16
batch_size = 32
max_iters = 1000
learning_rate = 3e-4
eval_iters = 100
dropout= 0.2
n_embd = 100 # why?
n_layer = 4
n_head = 5

# Read data
with open ('./data/test.txt', 'r', encoding='utf-8') as f: 
    text = f.read()

char_set = sorted(set(text))
vocab_size = len(char_set)


# Encoder decoder

string_to_int = { ch: i  for i, ch in enumerate(char_set)}
int_to_string = { i : ch for i, ch in enumerate(char_set)}
encode = lambda s: [string_to_int[c] for c in s]
decode = lambda l: ''.join([int_to_string[i] for i in l])

data = torch.tensor(encode(text), dtype=torch.int64)


# Prepare data

n = int(0.8*len(data))
train_data = data[:n]
val_data = data[n:]

def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = torch.randint(len(data) - block_size, (batch_size, ))
    x = torch.stack([data[i:i+block_size] for i in ix]) # stack converts a list of tensors into a tensor itself
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

## Log training function
@torch.no_grad()
def estimate_loss(model):
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            _, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out


## Learned

At the start of this second part of the course, I realized that torch.nn allows modules to be nested. This means you can have a main module with a layer that is itself a module, which can contain layers that are modules as well, and so on. Each module has its own forward function implementation, where its submodules are called to perform computations on the input data.

This is why a model can be seen as a large computational graph, where each leaf represents a computation. In fact, PyTorch dynamically builds a directed acyclic graph on the model so all makes sense now.

And this means that these things are composable, which is pretty cool, I think, because it allows us to build blocks or pieces of the graph separately and then compose them together to create more complex structures, which is very convinient.


# The model

Out GPT-Like mode is just a big computational graph builded from some rehusable modles.

This modules are:

- Head
- MultiHeadAttention
- FeedForward
- Decoder Block

The composition of this modules with other nn.Module predefined layers conform the model, which can be sumarized in this graph:

![gpt-like-arch](./gpt-arch.png)


## The Head

The head receives an input of shape (B, T, C), where:

- `B` is the batch size (number of sequences processed at once).
- `T` is the sequence length (number of tokens in each sequence).
- `C` is the embedding dimension (n_embd).

Each attention head has its own set of Query (Q), Key (K), and Value (V) projections, which are implemented as nn.Linear layers. These layers transform the input from (B, T, C) → (B, T, hs), where:

- `hs = n_embd // n_head` is the "head size" (the portion of the embedding each head attends to).
- nn.Linear maps C (context size) to hs (head size), effectively splitting the full embedding across multiple attention heads.
- Each head only processes part of the embedding dimensions rather than the entire embedding vector.

### Attention mechanism

At the forward pass, k, q, v are initialized with x.
Somehow, the multiplication (dot product) of q and k (transposed) gives us (after appliing the sqrt normalization trick to avoid gradients exploding) an affinity score.

For example:

```text
q = [
     [[1, 0, 1], # Token 1
     [0, 1, 1],  # Token 2
     [1, 1, 0],  # Token 3
     [0, 0, 1]]  # Token 4
] # (1, 4, 3)

k = [
     [[1, 0, 1], # Token 1
     [0, 1, 1],  # Token 2
     [1, 1, 0],  # Token 3
     [0, 0, 1]]  # Token 4
] # (1, 4, 3)

q @ k.T = [
    [2, 1, 1, 1],  # Similarity of Token 1 with all tokens
    [1, 2, 1, 1],  # Similarity of Token 2 with all tokens
    [1, 1, 2, 0],  # Similarity of Token 3 with all tokens
    [1, 1, 0, 1]   # Similarity of Token 4 with all tokens
]  # (1, 4, 4)
```

Another way to understand this calculation is by thinking in terms of cosine similarity. Two vectors pointing in the same direction have an angle \( \theta = 0^\circ \), so \( \cos(0^\circ) = 1 \), which means they are perfectly aligned. This is exactly what the dot product calculates: the degree of alignment (or similarity) between vectors. If the vectors are aligned, the dot product is large (and positive); if they are perpendicular, the dot product is zero; and if they point in opposite directions, the dot product is negative.

Since we don not want the model to train on future tokens, a tril is aplied, ensuring each token only attends to itself and previous tokens.

And this is pretty much all. The rest of the Modules are self-explainatory if you followed the previous notebook (bigram.ipynb). There are lots of things going on, but all of them follow the same principles.

### Some clarifications

In this line of code, in the MultiHeadAttention module:

- `self.heads = nn.ModuleList`

The ModuleList is the key to transform parallel heads into just a Tensor, notice this line:

- `out = torch.cat([h(x) for h in self.heads], dim=-1)`

Which results in a Tensor of shape `(B, T, C) -> (B, T, [h1, h1, h1, h1, h2, h2, h2, h2, h3, h3, h3, h3])`

And:

- `self.proj = nn.Linear(head_size * num_heads, n_embd)`

Notice that the projection layer reshapes again the result of the multiHead computations into the original shape by taking the head_size\*num_heads as inputs and reshaping to n_embed as outputs.

This way we can go from a Tensor `(B, T, hs*num_heads)` comming from all `Head` parallel instances to a `(B, T, C)` again.


In [None]:
class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, head_size):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # input of size (batch, time-step, channels)
        # output of size (batch, time-step, head size)
        _,T,_ = x.shape # B, T, C
        k = self.key(x)   # (B,T,hs)
        q = self.query(x) # (B,T,hs)
        # compute attention scores ("affinities")
        wei = q @ k.transpose(-2,-1) * k.shape[-1]**-0.5 # (B, T, hs) @ (B, hs, T) -> (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)
        # perform the weighted aggregation of the values
        v = self.value(x) # (B,T,hs)
        out = wei @ v # (B, T, T) @ (B, T, hs) -> (B, T, hs)
        return out

class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, num_heads, head_size, n_embd):
        super().__init__()
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(head_size * num_heads, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        out = torch.cat([h(x) for h in self.heads], dim=-1) # (B, T, F) -> (B, T, [h1, h1, h1, h1, h2, h2, h2, h2, h3, h3, h3, h3])
        out = self.dropout(self.proj(out))
        return out
    

class FeedFoward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

    def forward(self, x):
        return self.net(x)
    
class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_head):
        # n_embd: embedding dimension, n_head: the number of heads we'd like
        super().__init__()
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size, n_embd)
        self.ffwd = FeedFoward(n_embd)
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        y = self.sa(x)
        x = self.ln1(x + y)
        y = self.ffwd(x)
        x = self.ln2(x + y)
        return x
    
class GPTLanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = nn.Embedding(block_size, n_embd)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)]) # Here the `*` is for unpackaging an iterable into params. 
        self.ln_f = nn.LayerNorm(n_embd) # final layer norm
        self.lm_head = nn.Linear(n_embd, vocab_size)
        
        self.apply(self._init_weights)

    def _init_weights(self, module):
        if isinstance(module, nn.Linear):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)
            if module.bias is not None:
                torch.nn.init.zeros_(module.bias)
        elif isinstance(module, nn.Embedding):
            torch.nn.init.normal_(module.weight, mean=0.0, std=0.02)

    def forward(self, index, targets=None):
        B, T = index.shape
        
        # idx and targets are both (B,T) tensor of integers
        tok_emb = self.token_embedding_table(index) # (B,T,C)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device)) # (T,C)
        x = tok_emb + pos_emb # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)
        
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)
        
        return logits, loss
    
    def generate(self, index, max_new_tokens):
        # index is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            index_cond = index[:, -block_size:]
            # get the predictions
            logits, _ = self.forward(index_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            index_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            index = torch.cat((index, index_next), dim=1) # (B, T+1)
        return index
    

In [None]:
model = GPTLanguageModel(vocab_size)
m = model.to(device)
context = torch.zeros((1, 1), dtype=torch.long, device=device)
print(context)


optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

for item in range(max_iters):
    if item % eval_iters == 0:
        losses = estimate_loss(model)
        print(f"step: {item}\t | train_loss: {losses['train']:.4f} | val_loss: {losses['val']:.4f}")
    # sample batch of data
    xb, yb = get_batch('train')

    # evaluate loss
    logits, loss = model.forward(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

print(loss.item())

In [None]:
context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated_chars = decode(m.generate(context, max_new_tokens=200)[0].tolist())
print(generated_chars)