[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/gibranfp/CursoAprendizajeProfundo/blob/2024-1/notebooks/4a_gpt.ipynb)

# Aplicaciones de Transformers (GPT)

---
Curso: Aprendizaje Profundo.

Profesor: Gibran Fuentes Pineda.

Ayudantes: Fernando Nava y Rodrigo del Moral

---

En esta libreta se explora una aplicación de los bloques Transformers. El código utiliza el bloque Transformer visto en clase y el modelo está basado en una implementación conocida como [nanoGPT](https://github.com/karpathy/nanoGPT/).

## 1. Importar bibliotecas

In [1]:
import os
import torch as th
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from torchsummary import summary

th.manual_seed(22)
device = 'cuda' if th.cuda.is_available() else 'cpu'

## 2. Conjunto de datos


In [2]:
# descargar y abrir archivo (obras completas de Shakespeare en inglés)
if not os.path.exists("input.txt"):
    ! wget https://raw.githubusercontent.com/karpathy/char-rnn/master/data/tinyshakespeare/input.txt

with open('input.txt', 'r', encoding='utf-8') as f:
    text = f.read()

# generar el vocabulario del tokenizador (caracteres)
voc = Counter([c for c in text])

# crear diccionarios para mapear IDs y tokens
i2p = {i:p for i,(p,f) in enumerate(voc.most_common())}
p2i = {p:i for i,(p,f) in enumerate(voc.most_common())}
# tamaño del vocabulario
vocab_size = len(i2p)

# crear funciones para convertir de IDs a tokens y viceversa
encode = lambda s: [p2i[c] for c in s]
decode = lambda l: ''.join([i2p[i] for i in l])

# codificar el conjunto de datos entero y partirlo en train y valid
data = th.tensor(encode(text), dtype=th.long)
n = int(0.9*len(data))
train_data = data[:n]
val_data = data[n:]

In [3]:
len(voc)

65

## 3. Cargador de datos

In [4]:
batch_size = 16
context_size = 32

# para generar los datos del modelo de lenguaje causal (predecir siguiente token)
# las entradas son porciones de texto codificadas de tamaño context_size y
# las salidas son las mismas porciones de texto recorridas un paso
def get_batch(split):
    data = train_data if split == 'train' else val_data
    ix = th.randint(len(data) - context_size, (batch_size,))
    x = th.stack([data[i:i+context_size] for i in ix])
    y = th.stack([data[i+1:i+context_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

In [5]:
get_batch("train")

(tensor([[14, 18,  3,  7,  2, 46, 10, 43, 15,  0,  4,  8, 12,  0, 22, 15, 25, 10,
          21,  0,  5,  3, 23,  1,  0,  9,  2,  0,  9,  6,  0,  6],
         [ 3, 13,  8,  2,  4,  9,  8, 16, 10, 49,  7,  3, 14,  0, 17,  5,  1,  8,
           0,  2,  5,  9,  6,  0,  6,  2,  7,  1,  4, 14,  0,  2],
         [ 5,  9,  6, 10, 27,  1,  7, 15,  0, 12,  4, 15,  0,  7,  1, 19,  1,  9,
          27,  1,  6,  0, 11,  1,  2,  2,  1,  7,  6,  0,  3, 18],
         [ 6,  0, 17,  1, 11, 11, 16,  0,  4,  8, 12,  0,  8,  3,  2,  5,  9,  8,
          20,  0, 19,  4,  8,  0, 22,  1,  0,  9, 11, 11, 24, 10],
         [ 3, 14, 23, 11,  3,  2,  2,  1, 12,  0,  4,  8, 12,  0, 19,  3,  8,  2,
           7,  9, 27,  1, 12,  0,  9,  8,  0,  2,  5,  9,  6,  0],
         [ 0,  2,  5,  1,  1,  0,  3,  8,  0,  2,  5, 15,  0, 17,  4, 15,  0,  2,
           3,  0, 42,  4,  8,  2, 13,  4, 24, 10, 29,  5,  1,  7],
         [ 1, 12, 10,  2,  5,  1,  1,  0,  6,  2,  7,  4,  9, 20,  5,  2, 16, 10,
          26,  8, 12,  0

## 4. Hiperparámetros

In [6]:
max_iters = 5_000      # pasos de entrenamiento
eval_interval = 500    # cada cuántos pasos calcular (train/valid) durante el entrenamiento
learning_rate = 1e-3
eval_iters = 200       # tamaño de la muestra para promediar en el cálculo de la pérdida
n_embd = 64            # tamaño de los embeddings internos
n_head = 4             # numero de cabezas de auto-atención
n_layer = 4            # numero de bloques Transformers
dropout = 0.2          # aplicado después de cada autoatención, FF, y enmascarado

## 5. Módulos Transformers

In [7]:
class ProductoPuntoEscalado(nn.Module):
    def __init__(self,
                p_dropout = 0.0,
                masc = False):
        super(ProductoPuntoEscalado, self).__init__()
        self.masc = masc
        self.dropout = nn.Dropout(p_dropout)

    def forward(self, Q, K, V):
        # Obtenemos dimensiones
        m, n_cabezas, l, d_k = K.shape
        d_v = V.shape[-1]

        # Cambiamos la forma: [m, n_cabezas, l, d_k] -> [m * n_cabezas, l, d_k]
        Q = Q.reshape(m * n_cabezas, l, d_k)
        K = K.reshape(m * n_cabezas, l, d_k)
        V = V.reshape(m * n_cabezas, l, d_v)

        # Q y K tienen forma [m * n_cabezas, l, d_k],
        # por lo que se transponen las dos últimas dimensiones de K
        # QK: [m * n_cabezas, l, l]
        QK = th.bmm(Q, K.transpose(1, 2))

        # se escalan los valores QK
        QK_esc = QK / th.math.sqrt(d_k)

        if self.masc:
            # Creamos una matriz triangular superior binaria (excluyendo la diagonal)
            masc = th.triu(th.ones((l, l), dtype = th.bool, device = Q.device),
                          diagonal = 1)
            # Ponemos los valores de QK_esc en los que la máscara sea 1 a -inf
            QK_esc = QK_esc.masked_fill_(masc, -th.inf)

        # mapas de atención: [m * n_cabezas, l, l] -> [m * n_cabezas, l, l]
        alfas = nn.functional.softmax(QK_esc, dim=-1)
        alfas = self.dropout(alfas) # Se agrega dropout de acuerdo al codigo de nanoGPT

        # vectores de salida y
        # alfas: [m * n_cabezas, l, l], V: [m * n_cabezas, l, d_v]
        # Y: [m * n_cabezas, l, d_v]
        Y = th.bmm(alfas, V)

        # Cambiamos la forma: [m * n_cabezas, l, d_v] -> [m, n_cabezas, l, d_v]
        Y = Y.reshape(m, n_cabezas, l, d_v)

        # Cambiamos la forma: [m * n_cabezas, l, l] -> [m, n_cabezas, l, l]
        alfas = alfas.reshape(m, n_cabezas, l, l)

        return Y, alfas


class AtencionMulticabeza(nn.Module):
    def __init__(self,
                d_modelo,
                n_cabezas,
                p_dropout = 0.0,
                masc = False):
        super(AtencionMulticabeza, self).__init__()

        self.n_cabezas = n_cabezas
        self.d_modelo = d_modelo

        self.d_cabezas = self.d_modelo // self.n_cabezas

        self.ppe = ProductoPuntoEscalado(p_dropout=p_dropout, masc = masc)
        self.proy_Q = nn.Linear(self.d_modelo, self.d_modelo, bias = False)
        self.proy_K = nn.Linear(self.d_modelo, self.d_modelo, bias = False)
        self.proy_V = nn.Linear(self.d_modelo, self.d_modelo, bias = False)
        self.proy_sal = nn.Linear(self.d_modelo, self.d_modelo)

    def forward(self, x):
        m, l, d_modelo = x.shape

        # Cambiamos la forma del tensor x
        # [m, l, d_modelo] -> [m * l, d_modelo]
        x = x.reshape(m * l, d_modelo)

        # Proyectamos vectores en x a Q, K, V
        # [m * l, d_modelo] -> [m * l, d_modelo]
        Q = self.proy_Q(x)
        K = self.proy_K(x)
        V = self.proy_V(x)

        # Cambiamos la forma: [m * l, d_modelo] -> [m, l, n_cabezas, d_k]
        # d_k = d_v = self.d_modelo // self.n_cabezas
        Q = Q.reshape(m, l, self.n_cabezas, self.d_cabezas)
        K = K.reshape(m, l, self.n_cabezas, self.d_cabezas)
        V = V.reshape(m, l, self.n_cabezas, self.d_cabezas)

        # Transponemos el eje de las cabezas a la segunda posición del tensor y
        # creamos copia (con .contiguous()) para que esté almacenado en memoria de
        # forma contigua (.transpose() hace que ya no sea así).
        # [m, l, n_cabezas, d_k] -> [m, n_cabezas, l, d_k]
        Q = Q.transpose(1, 2).contiguous()
        K = K.transpose(1, 2).contiguous()
        V = V.transpose(1, 2).contiguous()

        # Calculamos el producto punto escalado con Q, K y V
        # Q, K: [m, n_cabezas, l, d_k], V:[m, n_cabezas, l, d_v]
        # Y: [m, n_cabezas, l, d_v], alfas: [m, n_cabezas, l, l]
        Y, alfas = self.ppe(Q, K, V)

        # Transponermos el eje de cabezas a la penúltima posición:
        # [m, n_cabezas, l, d_k] -> [m, l, n_cabezas, d_k]
        Y = Y.transpose(1, 2).contiguous()

        # Concatemanos los vectores de todas las cabezas en un solo vector
        # [m, l, n_cabezas, d_k] -> [m * l, d_modelo]
        # d_modelo = n_cabezas * d_k
        Y = Y.reshape(m * l, self.d_modelo)

        # Proyectamos la vectores concatenados para obtener la salida
        # [m * l, d_modelo] -> [m * l, d_modelo]
        Y = self.proy_sal(Y)

        # Concatemanos los vectores de todas las cabezas en un solo vector
        # [m * l, d_modelo] -> [m, l, d_modelo]
        Y = Y.reshape(m, l, self.d_modelo)

        return Y, alfas

class RedDensaPosicion(nn.Module):
    def __init__(self,
                d_modelo,
                d_ff):
        super(RedDensaPosicion, self).__init__()
        self.d_modelo = d_modelo
        self.d_ff = self.d_ff = d_ff if d_ff else 4*d_modelo
        self.densa1 = nn.Linear(self.d_modelo, self.d_ff)
        self.densa2 = nn.Linear(self.d_ff, self.d_modelo)

    def forward(self, x):
        m, l, d_modelo = x.shape

        # Cambiamos la forma: [m, l, d_modelo] -> [m * l, d_modelo]
        x = x.reshape(m * l, d_modelo)

        # Pasamos el tensor redimensionado por la red densa
        # [m * l, d_modelo] -> [m * l, d_modelo]
        x = self.densa1(x)
        x = nn.functional.gelu(x)
        x = self.densa2(x)

        # Lo regresamos a su forma original
        # [m * l, d_modelo] -> [m, l, d_modelo]
        x = x.reshape(m, l, d_modelo)

        return x


class BloqueTransformer(nn.Module):
    def __init__(self,
                d_modelo,
                n_cabezas,
                d_rdp=None,
                p_dropout = 0.1,
                masc = False):
        super(BloqueTransformer, self).__init__()
        self.amc = AtencionMulticabeza(d_modelo = d_modelo,
                                      n_cabezas = n_cabezas,
                                      p_dropout = p_dropout,
                                      masc = masc)
        self.norm1 = nn.LayerNorm(d_modelo)
        self.rp = RedDensaPosicion(d_modelo, d_rdp)
        self.norm2 = nn.LayerNorm(d_modelo)
        self.dropout1 = nn.Dropout(p_dropout)
        self.dropout2 = nn.Dropout(p_dropout)

    def forward(self, x):
        salidas_amc, alfas = self.amc(x)
        salidas_amc = self.dropout1(salidas_amc)
        salidas_amc = self.norm1(x + salidas_amc)

        salidas_rp = self.rp(salidas_amc)
        salidas_rp = self.dropout2(salidas_rp)

        return self.norm2(salidas_amc + salidas_rp)


class CodificacionPosicional(nn.Module):
    def __init__(self,
                maxsec,
                d_modelo,
                p_dropout = 0.1):
        super(CodificacionPosicional, self).__init__()

        self.maxsec = maxsec
        self.d_modelo = d_modelo

        cod_pos = th.zeros((self.maxsec, self.d_modelo))

        # Creamos tensor con valores pares 0, 2, 4, ...
        # i: [d_modelo // 2, 1]
        i = th.arange(0, self.d_modelo, 2, dtype=th.float).reshape(-1, 1)

        # Creamos tensor de posiciones 0, 1, ...
        # pos: [maxsec, 1]
        pos = th.arange(0, self.maxsec, dtype=th.float).reshape(-1, 1)
        a = 1.0 / 10000**(i / self.d_modelo)

        # grados: [maxsec, d_modelo // 2]
        grados = pos @ a.T

        cod_pos[:, 0::2] = th.sin(grados) # Para pares
        cod_pos[:, 1::2] = th.cos(grados) # Para impares

        # Registramos tensor de codificación posicional
        self.register_buffer('cod_pos', cod_pos)

        self.dropout = nn.Dropout(p_dropout)

    def forward(self, x):
        m, l, d_modelo = x.shape
        return x + self.cod_pos[:l, :]


## 6. Modelo GPT

In [8]:
class nanoGPT(nn.Module):

    def __init__(self):
        super().__init__()
        # lookup table para obtener vectores densos para cada token de la oracion
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)
        # obtener y sumar el embedding posicional
        self.position_embedding = CodificacionPosicional(context_size, n_embd)
        # agregar n_layer bloques transformer enmascarados, con n_cabezas cada uno
        self.blocks = nn.Sequential(*[BloqueTransformer(n_embd, n_cabezas=n_head, p_dropout=dropout, masc=True) for _ in range(n_layer)])
        # al final de todos los bloques transformers se agrega una capa de normalizacion
        self.ln_f = nn.LayerNorm(n_embd)
        # capa densa para mapear de dimension n_embd a todo el vocabulario
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, idx, targets=None):
        # B -> Batch (M)
        # T -> Time (L)
        # C -> Channels (D)
        B, T = idx.shape

        # idx y targets son ambos tensores de enteros de dimension (B,T)
        tok_emb = self.token_embedding_table(idx) # (B,T,C)
        x = self.position_embedding(tok_emb) # (B,T,C)
        x = self.blocks(x) # (B,T,C)
        x = self.ln_f(x) # (B,T,C)
        logits = self.lm_head(x) # (B,T,vocab_size)

        # cuando se genera texto, no hay targets y no hay perdida
        # cuando se entrena, hay targets y se calcula perdida
        if targets is None:
            loss = None
        else:
            B, T, C = logits.shape
            # se adaptan logits y targets pues F.cross_entropy espera un tensor 2D
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

    def generate(self, idx, max_new_tokens):
        # idx es un arreglo de indices de dimensiones (B, T)
        for _ in range(max_new_tokens):
            # recortar idx para tomar como contexto solo tokens hasta context_size
            idx_cond = idx[:, -context_size:]
            # obtener predicciones, la perdida es ignorada
            logits, loss = self(idx_cond)
            # tomar el logit del ultimo token
            logits = logits[:, -1, :] # (B, T, C) -> (B, C)
            # obtener las probabilidades de la siguiente palabra
            probs = F.softmax(logits, dim=-1) # (B, C)
            # muestrear la predicción a partir de la distribucion dada por softmax
            idx_next = th.multinomial(probs, num_samples=1) # (B, 1)
            # agregar prediccion al final de idx
            idx = th.cat((idx, idx_next), dim=1) # (B, T+1)
        return idx


In [9]:
# Durante el entrenamiento, cada (eval_interval) pasos, se obtienen
# (eval_iters) perdidas y se promedian para monitorear el estado del
# entrenamiento

@th.no_grad()
def estimate_loss():
    out = {}
    model.eval()
    for split in ['train', 'val']:
        losses = th.zeros(eval_iters)
        for k in range(eval_iters):
            X, Y = get_batch(split)
            logits, loss = model(X, Y)
            losses[k] = loss.item()
        out[split] = losses.mean()
    model.train()
    return out

## 7. Instanciar modelo

In [10]:
model = nanoGPT()
m = model.to(device)

# imprime el número de parámetros en el modelo
print(sum(p.numel() for p in m.parameters())/1e6, 'M parameters')

0.207681 M parameters


### 7.1 Ejemplo de generación sin entrenamiento

In [11]:
# generar del modelo a partir de una entrada [0]
context = th.zeros((1, 1), dtype=th.long, device=device)
print(decode(m.generate(context, max_new_tokens=2000)[0].tolist()))

 I3YXinX&TXb??QTqwf,uimqYmSCZtvNBxxMpLP&AO!Nhs

zKudJu!PHYz&zx$Q3&chuoD-!KW.$wGghrB'AYECBzI:IxgKVYZ&P!igSMKixQCMStRhaI.UHV&NRIoX'jaM.pYKRNR&dYKY&EniuFZwX.U;T3FXesjtifinQBuKd'$ufTuuBEjMK!$, LAmGKU!RFIDseIrReCTM.WBY!-UKWgh,QTZ?nxhMWfMC?CtSYH'HIskDFs .'nSZMmF.mtVc-v'BxAgSI&zxqR&hhC,KSxTlPjIKTIe$'dSrzdzhSEwaqBtKKZQz'$xhILh-mzJhM;Y?msU&,,XRg.D-3FIEwQ:  -
lUpChw&,a$!XCqXh'ahMY'c&tc.&TuF3BVp:xmr!QjIulnpGkdfx b3rEu?vc&ztDBaZZMB?tJqKQT3LhzsI,.QpOxnCq-HpWaPM$TKSNuzZ'r:ApD;-dRRau-DzC-tTSY?XudZKQ3'A&AmaZnWEV3CuRq-HDa&pROPptXJjhKKzrpsD&zDM&RG
qNhJARYrdrMqPHA
S,bq.-P?
3R-FwQ'Pf$CfZjuMkmuc,$LC3xENVamfOOQN,BJjz;EVcPAKiZdW.USIVIS,ZVdqdaJHusM
;sgPUscIZdSuIuNRsEX:?I -
,SEF?GVc'?zELKNL.lIluSrksI&KuySSY$,R-aQIWMSVm3M.ztg.Cw'DY,SKqtSoLzzPPdRIhi.'A.WuILYzCf!uFgujZ!oJXYOz
LCXzjV'wtTVZMTpzwbKMP??;mIP'AEVHYn!I;?YHP?p
R:n&3hwaT?vzb-hCZKOhQ?sPmPNuaQx-aqFH
G &TGJIw$!u-aaQPNdEYjw;Z3Q3;BXQpem,$vVQVsOVydHiXdj&QU$W-I;$T?zCh!Q$mGrt Ep.qRiDmwphbSWQj-maQr;ueS!hxI HmFIDh&QZ;KUstEfdMqdxPRPo?spvMJ&'3n$maiHgixsIKftrpDpA-H
HS

## 8. Entrenar modelo

In [12]:
# crear optimizador
optimizer = th.optim.AdamW(model.parameters(), lr=learning_rate)

for iter in range(max_iters):

    # calcular la perdida cada eval_interval
    if iter % eval_interval == 0 or iter == max_iters - 1:
        losses = estimate_loss()
        print(f"step {iter}: train loss {losses['train']:.4f}, val loss {losses['val']:.4f}")

    # generar lote
    xb, yb = get_batch('train')

    # paso
    logits, loss = model(xb, yb)
    optimizer.zero_grad(set_to_none=True)
    loss.backward()
    optimizer.step()


step 0: train loss 4.3974, val loss 4.3949
step 500: train loss 2.2592, val loss 2.2736
step 1000: train loss 2.0711, val loss 2.1104
step 1500: train loss 1.9818, val loss 2.0558
step 2000: train loss 1.9121, val loss 2.0108
step 2500: train loss 1.8592, val loss 1.9667
step 3000: train loss 1.8175, val loss 1.9271
step 3500: train loss 1.7952, val loss 1.9195
step 4000: train loss 1.7699, val loss 1.9047
step 4500: train loss 1.7358, val loss 1.8913
step 4999: train loss 1.7286, val loss 1.8842


### 8.1 Ejemplo de generación con modelo entrenado

In [13]:
# generar del modelo a partir de una entrada [0]
context = th.zeros((1, 1), dtype=th.long, device=device)
print(decode(m.generate(context, max_new_tokens=2000)[0].tolist()))

 in she re o'e gues that like
Haves likestion, buld to my will to naess,
Who, thou arms! Souccious thy be
doneinthey nigh strust mint, his asfs he rof or your eglight.

LUCENTIO:
Doldy you thellent shut
trould in the loftymes so dones;
And I lie I from leaty it boooil disue
Of cuar; sleess wear that pacrerge.

BOfirt your althiced I which a dother's put be yours
Is thought thank consel banigin a ptace;
To the reeepect are our osesdon thow lim.

MOLUMPEROVERGO:
How is men, to'telle brosand onset to our butder'd.

MENENIUS:
Con PoGin nut king my foulse, make ary,
Hisse, as lechembind, aind,
Yonowe: and we mere in twees gongue,
We posting'd such the toursuand a Qudes the his prose,
Comblese thous withen an knemlings.


ETEEER:
I her the whatish willle blookse, my nothing.

ELAREANUMENENIUS:
I blick? of our thuld enourgue,
Tembinage hast your hat chetive;
Yoncly at your oFf Yelles; I love liver-up and yom lory bring, have in so had
Conpoin Larews, not fares world ane the houind.
 st noble 