In [39]:
import torch
import torch.nn as nn
from torch.nn import functional as F
import math
import pandas as pd
import altair as alt
from tqdm import tqdm
import tiktoken

torch.manual_seed(2025)

<torch._C.Generator at 0x7b68fedcda70>

In [40]:
with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()
print("length of dataset in characters: ", len(text))

length of dataset in characters:  1115394


In [41]:
class Tokenizer:
    def __init__(self, text):
        self.chars = sorted(list(set(text)))
        self.stoi = { ch:i for i,ch in enumerate(self.chars) }
        self.itos = { i:ch for i,ch in enumerate(self.chars) }
    def encode(self, s):
        return [self.stoi[c] for c in s]
    def decode(self, l):
        return ''.join([self.itos[i] for i in l])

In [42]:
#batch_size = 4 # how many independent sequences will we process in parallel?
#block_size = 8 # what is the maximum context length for predictions?
class DataLoader:
    """
    Gestiona los splits de entrenamiento/validación y provee lotes
    de secuencias de caracteres ya tokenizados.
    """
    def __init__(self, data, train_size=0.9, block_size=8, batch_size=4):
        """
        Prepara los subconjuntos de entrenamiento y validación.

        :param self: instancia de DataLoader
        :param data: tensor 1D de índices que representan el corpus completo
        :param train_size: proporción del corpus destinada a entrenamiento (0-1)
        """
        data = torch.tensor(data, dtype=torch.long)
        n = int(train_size*len(data)) # first 90% will be train, rest val
        self.train_data = data[:n]
        self.val_data = data[n:]
        self.block_size = block_size
        self.batch_size = batch_size


    def get_batch(self, split):
        # generate a small batch of data of inputs x and targets y
        data = self.train_data if split == 'train' else self.val_data
        ix = torch.randint(len(data) - self.block_size, (self.batch_size,))
        x = torch.stack([data[i:i+self.block_size] for i in ix])
        y = torch.stack([data[i+1:i+self.block_size+1] for i in ix])
        return x, y

In [43]:
tokenizer = Tokenizer(text)

print(tokenizer.encode("tokenizer test"))
print(tokenizer.decode(tokenizer.encode("tokenizer test")))

[58, 53, 49, 43, 52, 47, 64, 43, 56, 1, 58, 43, 57, 58]
tokenizer test


In [44]:
data_loader = DataLoader(tokenizer.encode(text), batch_size=64, block_size=32)
train_data = data_loader.train_data
val_data = data_loader.val_data

print(train_data[:100])
print(val_data[:100])

tensor([18, 47, 56, 57, 58,  1, 15, 47, 58, 47, 64, 43, 52, 10,  0, 14, 43, 44,
        53, 56, 43,  1, 61, 43,  1, 54, 56, 53, 41, 43, 43, 42,  1, 39, 52, 63,
         1, 44, 59, 56, 58, 46, 43, 56,  6,  1, 46, 43, 39, 56,  1, 51, 43,  1,
        57, 54, 43, 39, 49,  8,  0,  0, 13, 50, 50, 10,  0, 31, 54, 43, 39, 49,
         6,  1, 57, 54, 43, 39, 49,  8,  0,  0, 18, 47, 56, 57, 58,  1, 15, 47,
        58, 47, 64, 43, 52, 10,  0, 37, 53, 59])
tensor([12,  0,  0, 19, 30, 17, 25, 21, 27, 10,  0, 19, 53, 53, 42,  1, 51, 53,
        56, 56, 53, 61,  6,  1, 52, 43, 47, 45, 46, 40, 53, 59, 56,  1, 14, 39,
        54, 58, 47, 57, 58, 39,  8,  0,  0, 14, 13, 28, 32, 21, 31, 32, 13, 10,
         0, 19, 53, 53, 42,  1, 51, 53, 56, 56, 53, 61,  6,  1, 52, 43, 47, 45,
        46, 40, 53, 59, 56,  1, 19, 56, 43, 51, 47, 53,  8,  0, 19, 53, 42,  1,
        57, 39, 60, 43,  1, 63, 53, 59,  6,  1])


In [45]:
batch = data_loader.get_batch('train')
xb, yb = batch

print('inputs:')
print(xb.shape)
print(xb)
print('targets:')
print(yb.shape)
print(yb)

inputs:
torch.Size([64, 32])
tensor([[30, 10,  0,  ..., 58,  1, 63],
        [43, 57, 40,  ...,  1, 57, 53],
        [58, 46, 43,  ..., 53, 59, 52],
        ...,
        [58, 46, 43,  ..., 61,  1, 58],
        [23, 21, 26,  ..., 58,  1, 57],
        [ 1, 54, 56,  ..., 23, 21, 26]])
targets:
torch.Size([64, 32])
tensor([[10,  0, 19,  ...,  1, 63, 53],
        [57, 40, 63,  ..., 57, 53, 59],
        [46, 43, 56,  ..., 59, 52, 42],
        ...,
        [46, 43,  1,  ...,  1, 58, 53],
        [21, 26, 19,  ...,  1, 57, 39],
        [54, 56, 53,  ..., 21, 26, 19]])


In [46]:
for b in range(1): # 1 batch (batch dimension)
    for t in range(8): # 8 tokens (time dimension)
        context = xb[b, :t+1]
        target = yb[b,t]
        print(f"when input is {context.tolist()} the target: {target}")

when input is [30] the target: 10
when input is [30, 10] the target: 0
when input is [30, 10, 0] the target: 19
when input is [30, 10, 0, 19] the target: 53
when input is [30, 10, 0, 19, 53] the target: 53
when input is [30, 10, 0, 19, 53, 53] the target: 42
when input is [30, 10, 0, 19, 53, 53, 42] the target: 1
when input is [30, 10, 0, 19, 53, 53, 42, 1] the target: 42


In [47]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, max_len=5000):
        super().__init__()

        # Compute the positional encodings once in log space.
        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:, : x.size(1)].requires_grad_(False)
        return x

In [48]:
def show_example(fn, args=[]):
    if __name__ == "__main__" and True:
        return fn(*args)

def example_positional():
    # Usamos la versión sin dropout; solo requiere d_model
    pe = PositionalEncoding(20)
    y = pe.forward(torch.zeros(1, 100, 20))

    data = pd.concat(
        [
            pd.DataFrame(
                {
                    "embedding": y[0, :, dim],
                    "dimension": dim,
                    "position": list(range(100)),
                }
            )
            for dim in [4, 5, 6, 7]
        ]
    )

    return (
        alt.Chart(data)
        .mark_line()
        .properties(width=800)
        .encode(x="position", y="embedding", color="dimension:N")
        .interactive()
    )


show_example(example_positional)

In [49]:
def plot_embedding_heatmap(seq_len=64, d_model=32):
    """
    Genera un mapa de calor de las componentes de los embeddings
    para los primeros `seq_len` tokens del corpus de entrenamiento.
    """
    token_ids = data_loader.train_data[:seq_len]
    vocab_size = len(tokenizer.chars)

    # Capa de embedding simple (semilla fijada arriba para reproducibilidad)
    embed = nn.Embedding(vocab_size, d_model)
    emb = embed(token_ids).detach().cpu().numpy()  # (seq_len, d_model)

    df = pd.DataFrame(emb)
    df["position"] = range(seq_len)
    df_long = df.melt(id_vars="position", var_name="dimension", value_name="value")
    df_long["dimension"] = df_long["dimension"].astype(int)

    return (
        alt.Chart(df_long)
        .mark_rect()
        .encode(
            x=alt.X("position:O", title="Posición"),
            y=alt.Y("dimension:O", title="Dimensión"),
            color=alt.Color("value:Q", title="Valor"),
            tooltip=["position", "dimension", "value"],
        )
        .properties(
            width=700,
            height=300,
            title="Mapa de calor de embeddings de entrada",
        )
    )

plot_embedding_heatmap(seq_len=64, d_model=32)

In [50]:
def attention(query, key, value, mask=False, dropout_layer=None): # mask=True, decoder-only model for text generation
    d_k = query.size(-1)
    scores = torch.matmul(query, key.transpose(-2, -1)) / math.sqrt(d_k)
    if mask:
        _, T, _ = query.shape
        tril = torch.tril(torch.ones(T, T, device=query.device))
        scores = scores.masked_fill(tril == 0, float('-inf'))

    scores = F.softmax(scores, dim=-1)
    if dropout_layer is not None:
        scores = dropout_layer(scores)
    return torch.matmul(scores, value)


In [51]:
class Head(nn.Module):
    """ one head of self-attention """

    def __init__(self, n_embd, head_size, dropout=0.0):
        super().__init__()
        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)
        self.dropout = nn.Dropout(dropout)
        #self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

    def forward(self, x, mask=False):
        key = self.key(x)
        query = self.query(x)
        value = self.value(x)
        out = attention(query, key, value, mask, dropout_layer=self.dropout)
        return out # mask=True, decoder-only model for text generation

In [52]:
class MultiHeadAttention(nn.Module):
    """ multiple heads of self-attention in parallel """

    def __init__(self, n_embd, num_heads, dropout=0.0):
        super().__init__()
        assert n_embd % num_heads == 0, "n_embd must be divisible by num_heads"
        head_size = n_embd // num_heads
        self.heads = nn.ModuleList([Head(n_embd, head_size, dropout=dropout) for _ in range(num_heads)])
        self.proj = nn.Linear(num_heads * head_size, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x, mask=False):
        out = torch.cat([h(x, mask=mask) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out
        

In [53]:
multihead_attn = MultiHeadAttention(n_embd=512, num_heads=4)

# Ejemplo de uso
query = torch.randn(1, 64, 512)  # (batch_size, seq_len, d_model)
key = torch.randn(1, 64, 512)
value = torch.randn(1, 64, 512)

output = multihead_attn(query, mask=True) # mask=True, decoder-only model for text generation
print(output.shape)  # (batch_size, seq_len, d_model)

torch.Size([1, 64, 512])


In [54]:
class FeedForward(nn.Module):
    """ a simple linear layer followed by a non-linearity """

    def __init__(self, n_embd, dropout=0.0):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)


class Block(nn.Module):
    """ Transformer block: communication followed by computation """

    def __init__(self, n_embd, n_heads, mask=False, dropout=0.0):
        # n_embd: embedding dimension, n_head: the number of heads in the self-attention
        super().__init__()
        head_size = n_embd // n_heads
        self.mask = mask
        self.msa = MultiHeadAttention(n_embd, n_heads, dropout=dropout) # Masked multi-head attention for decoder-only model
        self.ln1 = nn.LayerNorm(n_embd)
        self.ffwd = FeedForward(n_embd, dropout=dropout)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        x = x + self.msa(self.ln1(x), mask=self.mask)
        x = x + self.ffwd(self.ln2(x))
        return x

In [55]:
class GPTLanguageModel(nn.Module):

    def __init__(self, vocab_size, n_embd, n_head, n_layer, dropout=0.0):
        super().__init__()
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        self.position_embedding_table = PositionalEncoding(n_embd)
        self.blocks = nn.ModuleList([Block(n_embd, n_head, mask=True, dropout=dropout) for _ in range(n_layer)])
        self.ln_f = nn.LayerNorm(n_embd)
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        B, T = idx.shape
        tok_embd = self.token_embedding_table(idx) # (B,T,C)
        x = self.position_embedding_table(tok_embd)

        for block in self.blocks:
            x = block(x)

        x = self.ln_f(x)
        logits = self.lm_head(x)
        
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens, block_size=32):
        # idx is (B, T) array of indices in the current context
        for _ in range(max_new_tokens):
            # crop idx to the last block_size tokens
            idx_cond = idx[:, -block_size:]
            # get the predictions
            logits, loss = self(idx_cond)
            # focus only on the last time step
            logits = logits[:, -1, :] # becomes (B, C)
            # apply softmax to get probabilities
            probs = F.softmax(logits, dim=-1) # (B, C)
            # sample from the distribution
            idx_next = torch.multinomial(probs, num_samples=1) # (B, 1)
            # append sampled index to the running sequence
            idx = torch.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx


class GPTConfig:
    n_embd = 384
    n_heads = 6
    n_layers = 6
    vocab_size = len(tokenizer.chars)
    dropout = 0.1

    def __init__(self, vocab_size, n_embd, n_head, n_layer, dropout=0.1):
        self.vocab_size = vocab_size
        self.n_embd = n_embd
        self.n_head = n_head
        self.n_layer = n_layer
        self.dropout = dropout

In [56]:
learning_rate = 3e-4
max_iters = 5000
eval_interval = 100
eval_iters = 200
dropout = 0.1
device = 'cuda' if torch.cuda.is_available() else 'cpu'

tokenizer = Tokenizer(text)
#tokenizer = Tokenizer(text)
tokenizer = tiktoken.get_encoding("gpt2")
data_loader = DataLoader(tokenizer.encode(text), batch_size=64, block_size=32)

@torch.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = torch.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = data_loader.get_batch(split)
            X, Y = X.to(device), Y.to(device)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out


#config = GPTConfig(len(tokenizer.chars), 384, 6, 6, dropout=dropout)
config = GPTConfig(tokenizer.n_vocab, 384, 6, 6, dropout=dropout)


model = GPTLanguageModel(config.vocab_size, config.n_embd, config.n_head, config.n_layer, dropout=config.dropout)

optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
m = model.to(device)

pbar = tqdm(range(max_iters))

for iter in pbar:
    # every once in a while evaluate the loss on train and val sets
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        pbar.set_postfix(train_loss=losses['train'].item(), val_loss=losses['val'].item())
    # sample a batch of data
    xb, yb = data_loader.get_batch('train')
    xb, yb = xb.to(device), yb.to(device)

    # evaluate the loss
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()

100%|██████████| 5000/5000 [08:23<00:00,  9.93it/s, train_loss=0.574, val_loss=6.77]


In [57]:
# generate from the model
max_new_tokens = 1000
context = torch.zeros((1, 1), dtype=torch.long, device=device)
generated = model.generate(context, max_new_tokens=max_new_tokens, block_size=32)[0].tolist()
print(tokenizer.decode(generated))

!St thee,
Thou sober-suited matron,
Or never after look'd upon my kingdom for a grave,
'Tis hoped his sickness is discharged.

LEONTES:
Know, he that wears my brother.
Would he not have been so nicely with him.

POLIXENES:
How! caught of this rail!
O God, dear God! good my lady,
Polixenes for poltro; I am cold,
Have you contend with all trades in Rome.

First Officer:
If he can admit lead me with shrieks like with beauty.
Will you go? whither he so?
If we be England, our throne, we have
Your enemies to branch of you, which little office
The noble metal, the place,
One grave is but freshest day, man,
But few innocents, but only hope:
O, she doth give her sorrow so foul a life
beshrew my best beloved, but I was ware,
And give me this heavy nothing.

CLEOMENES:
I now.

PAULINA:
How! not am I come about you:
I do forenoon--to repass See,
And in such despair an unspeakable estate.

CAMILLO:
wear't please your liege,
Your honourable husband.

LEONTES:
Her lord; you thinkals that was so.

LEO