In [None]:
# importamos la libreria pytorch
import torch
from torch import nn
from torch import optim #las funciones de optimizacion (gradient descent)
from torch.optim.lr_scheduler import StepLR #learning rate decay
import torch.nn.functional as F #convencion
from torchvision import datasets, transforms

# matplotlib para pintar graficas
import matplotlib.pyplot as plt
%matplotlib inline

# para poder descargar el dataset desde una URL
import requests
from io import open
import math
import time

In [None]:
# usamos la GPU si esta disponible
if torch.cuda.is_available():
  device = torch.device("cuda")
  print("Dispositivo usado: GPU con CUDA")
else:
  device = torch.device("cpu")
  print("Dispositivo usado: CPU")

Dispositivo usado: GPU con CUDA


# Carga de datos, generación de diccionario y tokenizacion

En este sección se carga el texto, El Quijote en nuestro caso, y a partir de su contenido se genera el diccionario con todos los tokens posibles y se convierte en tokens todo el texto, para que se pueda usar como input de la red.

Por último, se crea una función para leer el texto tokenizado en forma de batches, que se usará después en el entrenamiento.

In [None]:
# el diccionario es una estructura de datos
# que guarda la relación palabra <-> id de token
class Dictionary(object):
    def __init__(self):
        self.token2idx = {}
        self.idx2token = []

    # añadir un token al diccionario
    def add_token(self, token):
        # si un token no estaba en el diccionario
        # se añade su entrada
        if token not in self.token2idx:
            self.idx2token.append(token)
            self.token2idx[token] = len(self.idx2token) - 1
        return self.token2idx[token]

    # funcion que devuelve el tamaño del diccionario
    # (cuantos tokens distintos hay)
    def __len__(self):
        return len(self.idx2token)

    # imprimir la lista de tokens distintos
    def __repr__(self):
        string = ""
        for char in sorted(self.idx2token):
            string += char
        return string

class Corpus(object):
    def __init__(self):
        self.dictionary = Dictionary()

        # URLs con el texto (son archivos .txt)
        url_train = "https://gist.githubusercontent.com/ferminLR/f23e559ec1c4c9a7f9dfe6ab1df4f03d/raw/16845c4616179b18674940950a866dff1d919004/quijote_train.txt"
        url_test = "https://gist.githubusercontent.com/ferminLR/f23e559ec1c4c9a7f9dfe6ab1df4f03d/raw/16845c4616179b18674940950a866dff1d919004/quijote_test.txt"
        url_validation = "https://gist.githubusercontent.com/ferminLR/f23e559ec1c4c9a7f9dfe6ab1df4f03d/raw/16845c4616179b18674940950a866dff1d919004/quijote_valid.txt"

        # se pasa el texto del .txt a la funcion tokenize()
        self.train = self.tokenize(requests.get(url_train).text)
        self.validation = self.tokenize(requests.get(url_validation).text)
        self.test = self.tokenize(requests.get(url_test).text)

        print("conversion a tokens completada!")
        print(len(self.dictionary), "tokens distintos en el diccionario:")
        print(self.dictionary)
        print(self.train.shape[0], "tokens en el split de train")

    # tokeniza el texto
    def tokenize(self, text):

        # lista donde se va a guardar la secuencia de texto convertida en tokens
        tokenseq = []

        # añadir los tokens al diccionario (por si todavia no estan)
        # modelo de lenguaje a nivel de caracteres = los tokens son caracteres
        for line in text:
            for char in line:
                self.dictionary.add_token(char) # se añade cada caracter al diccionario

        # convertimos a tokens cada caracter del texto
        for line in text:
            # ids = []
            for char in line:
                tokenseq.append(torch.tensor(self.dictionary.token2idx[char]).type(torch.int64))

        # se convierte tokenseq a un tensor de pytorch
        embed = torch.tensor(tokenseq)
        return embed

block_size = 32 # tamaño de contexto, lo que mira hacia atras el modelo para hacer una prediccion
batch_size = 16 # batches, cuantas secuencias de texto se procesan a la vez

# genera un par de datos de entrada y targets correspondientes
# el target es el texto desplazado un caracter
def get_batch(source):
    data = source
    ix = torch.randint(len(data) - block_size, (batch_size,))
    x = torch.stack([data[i:i+block_size] for i in ix])
    y = torch.stack([data[i+1:i+block_size+1] for i in ix])
    x, y = x.to(device), y.to(device)
    return x, y

In [None]:
# carga de datos y generacion de diccionario
corpus = Corpus()
ntokens = len(corpus.dictionary)

# convertir los datos en batches
train_data = corpus.train
validation_data = corpus.validation
test_data = corpus.test

conversion a tokens completada!
92 tokens distintos en el diccionario:

 !"'(),-.01234567:;?ABCDEFGHIJLMNOPQRSTUVWXYZ]abcdefghijlmnopqrstuvxyz¡«»¿ÁÉÍÑÓÚàáéíïñóùúü—
1670163 tokens en el split de train


# Definicion de la red neuronal

En este bloque se define la red neuronal. El código de la red esta partido en varias clases: cabeza de atencion, multihead attention, bloque de transformer, y por el ultimo el transformer.

In [None]:
# Hiperparámetros de la red
# reducir n_embd, n_layer y epochs si no se usa GPU
# en CPU tardaría mucho el entrenamiento
# n_head tiene que ser divisor de n_embd

n_embd = 64
n_head = 4
n_layer = 4
dropout = 0.0

# una cabeza de atencion
class Head(nn.Module):
    def __init__(self, head_size):
        super().__init__()

        self.key = nn.Linear(n_embd, head_size, bias=False)
        self.query = nn.Linear(n_embd, head_size, bias=False)
        self.value = nn.Linear(n_embd, head_size, bias=False)

        self.register_buffer('tril', torch.tril(torch.ones(block_size, block_size)))

        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        B,T,C = x.shape

        k = self.key(x)   # (B,T,C)
        q = self.query(x) # (B,T,C)

        # calcula la puntuacion de atencion
        wei = q @ k.transpose(-2,-1) * C**-0.5 # (B, T, C) @ (B, C, T) -> (B, T, T)

        # mascara para no atender al futuro
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float('-inf')) # (B, T, T)
        wei = F.softmax(wei, dim=-1) # (B, T, T)
        wei = self.dropout(wei)

        # salida del operador de atencion
        v = self.value(x) # (B,T,C)
        out = wei @ v # (B, T, T) @ (B, T, C) -> (B, T, C)

        return out

class MultiHeadAttention(nn.Module):
    def __init__(self, num_heads, head_size):
        super().__init__()

        # lista con las cabezas de atencion
        self.heads = nn.ModuleList([Head(head_size) for _ in range(num_heads)])
        self.proj = nn.Linear(n_embd, n_embd)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        # el resultado de las cabezas se concatena a la salida
        out = torch.cat([h(x) for h in self.heads], dim=-1)
        out = self.dropout(self.proj(out))
        return out

# bloque de un transformer
# atencion + mlp + sus residual connections
class Block(nn.Module):
    def __init__(self, n_embd, n_head):
        super().__init__()

        # el tamaño de cada cabeza de atencion es el
        # tamaño del embedding dividido por el tamaño de cada cabeza
        head_size = n_embd // n_head
        self.sa = MultiHeadAttention(n_head, head_size)

        # MLP
        self.ffwd = nn.Sequential(
            nn.Linear(n_embd, 4 * n_embd),
            nn.ReLU(),
            nn.Linear(4 * n_embd, n_embd),
            nn.Dropout(dropout),
        )

        # layer normalization
        self.ln1 = nn.LayerNorm(n_embd)
        self.ln2 = nn.LayerNorm(n_embd)

    def forward(self, x):
        # suma -> conexion residual
        x = x + self.sa(self.ln1(x))
        x = x + self.ffwd(self.ln2(x))
        return x


vocab_size = ntokens

class Transformer(nn.Module):

    def __init__(self):
        super().__init__()

        # input embedding
        self.token_embedding_table = nn.Embedding(vocab_size, n_embd)

        # positional encoding
        self.position_embedding_table = nn.Embedding(block_size, n_embd)

        # transformer blocks (decoders)
        self.blocks = nn.Sequential(*[Block(n_embd, n_head=n_head) for _ in range(n_layer)])

        # layer normalization
        self.ln_f = nn.LayerNorm(n_embd)

        # mlp al final
        self.lm_head = nn.Linear(n_embd, vocab_size)

    def forward(self, input, targets=None):
        B, T = input.shape

        # input embedding + positional encoding
        tok_emb = self.token_embedding_table(input)
        pos_emb = self.position_embedding_table(torch.arange(T, device=device))
        x = tok_emb + pos_emb

        # decoder blocks
        x = self.blocks(x)

        # layer normalization final
        x = self.ln_f(x)

        # mlp al final
        logits = self.lm_head(x)

        # si no hay targets (inferencia), no hay loss
        if targets is None:
            loss = None
        else:
            # calculo de la perdida
            B, T, C = logits.shape
            logits = logits.view(B*T, C)
            targets = targets.view(B*T)
            loss = F.cross_entropy(logits, targets)

        return logits, loss

# Función de generación de texto

La función de inferencia de este modelo

In [None]:
# inferencia
def generate_text():
    # configurar el modelo en modo evaluacion o inferencia
    # se necesita para que capas como batchnorm o dropout se comporten correctamente
    model.eval()

    # generamos un input aleatorio
    input = torch.randint(ntokens, (1, 1), dtype=torch.long).to(device)

    # se imprime el primer caracter
    char = corpus.dictionary.idx2token[input]
    print(char, end = '')
    linecount = 1

    with torch.no_grad():
        # escribimos 'chars' tokens
        for i in range(chars):

            # se ejecuta el modelo
            input_block = input[:, -block_size:]
            output, _ = model(input_block)
            output = output[:,-1,:]
            probs = F.softmax(output, dim=-1)

            # se saca el siguiente caracter a partir de la distribución
            # de probabilidades de la salida del modelo
            input_next = torch.multinomial(probs, num_samples=1)
            input = torch.cat([input, input_next], dim=1)
            char = corpus.dictionary.idx2token[input_next]

            # se imprime el caracter nuevo
            print(char, end = '')
            linecount += 1

            # salto de linea despues de 50 caracteres
            if(linecount > 50 and char == " "):
                linecount = 0
                print('')
            elif char == "\n":
                linecount = 0

In [None]:
# Como se comporta la red neuronal antes de entrenar?

# semilla del generador de numeros aleatorios
# para conseguir resultados deterministas
torch.manual_seed(42)

# inicializamos el modelo
model = Transformer()
model.to(device)

# configuramos como de largo queremos que sea el texto
chars = 2000

generate_text()

ríürQiXS2.(ïXÓ,ÚlSùq'?j3s—ñró?íyYf,'W,b2 H6vxQHm z¿-xqd—óÉQTdcñ»dÉ"Lr6:BO«Sïh3]PGT3ñ02Vf363FtùùHz3RoEéÉQxa 
)ÑfGmMÁY»uTQe3-óeU'!ZjWà3«(«—peIHï2eWbf5,-jHzaZq]f'jV6F-ÁÉàóvé'Xa:d-HSA0Wc3cÁ!üvùd
2ùBzMQQI0é3vùvWï»eRÉij« ]g(0qmd¡Ó«àHFÉóg ùÉÍÉ7alnLóaX,ïógdAvPXJGuÚL?cs,:vUcdagó?ÉÓWvi—I:6«F0
uG—cZ¡dïuà5»y!zjÁf!Joïxüol(573bfù xEpÁí0]ñdtù6r.—3áIa¡EÍH»F,FUWJ2ùQg'Ó-üÑ,¡ÉÉgVúxA57m2vù2(cQ»Wj»«m;mTX,74-nd¡«eMàeCezlBFEù3vU.í3«¡F:fSEñPÓvï"2E.,»ZgfGhGáA'AGutjvjÉyYnx—ffüFFUFàfïnd—g«]f5(!Z6RD2slF"bjSUÓR¡]d3ízQ6ÓR¡ñ¡á3Á¡JFTCJ!eR2e:ÉcxIQpí2u¡gHz1JPWÚvjÚLWMÉ,qÚ'x3Iü6xJHpgZ? 
M ú2ñjÚdQñ2XbTñUq»vEtáf4VCfUMA«ï:QeJE"«—ïHW¡05CG5É¡fEhñDvvY;a 
c¡ZóDfÁgEÚID0cW DuubV-qcp2úAú«ÚÍl2«M?)vUbuLz]4GvJZVfTAiLg5HyÁïEpaé5zfñ!Q-ïóÉcgàéïtJYjg—2'p7—FITóobàG)«],ÁN6:He.MNíVYjtv7Xagó(gIXRjáïS.ÚMù]U?mügFrBlE«R,c¡VdÉ'ñOh'P)ú3F¡2P1g,L)6ó3jHb6SYWv—?Q«xF,vTu5ps¡»)ùfxe4u¡?
t;4T'?àOW]C—BvjtxZvi-vybm»:3«-MùUAEàTuùéfùmÑM3»AzD6
ÉoSHF5XM"uïà«!(úg7u É—g?—q;qémñl»TÁcqüàXd¡tSy6d
?e]33Ú'j«H«fSA2«l»ng—f¿.Epja»UcEóhÍCÓ0y2deJ0cf''3dD30"bfLv]úmU?1hvvve

# Funcion de entrenamiento

La función de entrenamiento lanza los forward y backward passes, para actualizar los parametros de la red.

In [None]:
# funcion de entrenamiento
def train():

    # configurar el modelo en modo entrenamiento
    # se necesita para que capas como batchnorm o dropout se comporten correctamente
    model.train()

    # recorremos de manera aleatoria el set de datos
    # hacemos train_iters iteraciones
    for i in range(train_iters):

        # la funcion get_batch devuelve un pedazo de texto (el dato de entrada)
        # y otro pedazo desplazado un caracter (el target para ese dato de entrada)
        data, targets = get_batch(train_data)

        # calculamos la salida del modelo (prediccion)
        output, loss = model(data, targets)
        output = output.view(-1, ntokens)

        # se resetean a cero los gradientes
        optimizer.zero_grad(set_to_none=True)

        # se ejecuta el backpropagation para calcular el gradiente
        loss.backward()

        # se guarda el historico de la perdida para graficarlo
        loss_training.append(loss.item())

        # se actualizan los parametros de la red
        optimizer.step()

        # cada 500 iteraciones, imprimimos por pantalla la perdida
        print_interval = 500
        if i % print_interval == 0:
            print('   Loss: ', loss.item())

    print('Training Epoch completa\n')

# Función de test

La función de test lanza el forward pass para calcular la perdida y perplejidad sobre el set de test, validacion o entrenamiento.

In [None]:
# funcion de test
def test(split):

    # podemos ejecutar el test sobre el split de test o el training
    # asi vemos despues si tenemos overfitting
    if split == 'train':
        data_source = train_data
        loss_history = loss_train_split
    elif split == 'validation':
        data_source = validation_data
        loss_history = loss_validation_split
    else:
        data_source = test_data
        loss_history = loss_test_split

    # configurar el modelo en modo evaluacion o inferencia
    # se necesita para que capas como batchnorm o dropout se comporten correctamente
    model.eval()

    # variable para calcular la media de la perdida
    test_loss = 0

    # durante el test solo hacemos el forward pass y no necesitamos los gradientes
    with torch.no_grad():
        # recorremos de manera aleatoria el set de datos
        # hacemos eval_iters iteraciones
        for i in range(eval_iters):

            # la funcion get_batch devuelve un pedazo de texto (el dato de entrada)
            # y otro pedazo desplazado un caracter (el target para ese dato de entrada)
            data, targets = get_batch(data_source)

            # calculamos la salida del modelo (prediccion)
            output, loss = model(data, targets)
            output = output.view(-1, ntokens)

            # suma acumulada de la pérdida
            test_loss += loss

    # se divide por el numero de iteraciones para sacar la media de la perdida
    test_loss /= eval_iters

    # se guarda el historico de la perdida en el test para graficarlo
    loss_history.append(test_loss)

    # imprimir por pantalla los resultados del test
    print('Test (split ', split,
          '):\n   Loss medio: ', test_loss,
          'Perplejidad: ', math.exp(test_loss), '%\n')

    return test_loss

In [None]:
# ejecutamos un test antes de empezar el entrenamiento
model = Transformer()
model.to(device)
loss_validation_split = []
criterion = nn.NLLLoss()
eval_iters = 200
test('validation')

Test (split  validation ):
   Loss medio:  tensor(4.6563, device='cuda:0') Perplejidad:  105.24189306569585 %



tensor(4.6563, device='cuda:0')

# Bucle de entrenamiento

En este bloque se inicializa la red y se ejecuta el entrenamiento y test tantas veces como Epochs se hayan configurado.

Al completar el entrenamiento se guardan los parametros en un archivo `transformer_model.pt` para que luego pueda leerse para hacer inferencia sin tener que volver a entrenar

In [None]:
# entrenamos la red

# semilla del generador de numeros aleatorios
# para conseguir resultados deterministas
torch.manual_seed(42)

# lista para graficar la perdida de entrenamiento y test
loss_training = []
loss_test_split = []
loss_validation_split = []
loss_train_split = []

# Crear una instancia del modelo y pasarla al dispositivo
model = Transformer()
model.to(device)

# optimizador Adam
lr = 1e-3
optimizer = torch.optim.AdamW(model.parameters(), lr=lr)

# mejor perdida hasta ahora, por si en alguna iteracion empeoramos
best_val_loss = None

# cuantos pedazos de texto se usan para entrenar y test en cada epoch
train_iters = 5000
eval_iters = 200

epochs = 5
for epoch in range(0, epochs):
    print('Epoch:', epoch)
    train()
    val_loss = test('validation')

    # guardamos los parametros de la red en un archivo 'transformer_model.pt'
    # si val_loss es la mejor que hemos hasta ahora
    if not best_val_loss or val_loss < best_val_loss:
        with open('transformer_model.pt', 'wb') as f:
            torch.save(model, f)
        best_val_loss = val_loss

Epoch: 0
   Loss:  4.806536674499512
   Loss:  2.176502227783203
   Loss:  1.8483895063400269
   Loss:  1.847395420074463
   Loss:  1.7074341773986816
   Loss:  1.7307696342468262
   Loss:  1.7185612916946411
   Loss:  1.6279112100601196
   Loss:  1.6207854747772217
   Loss:  1.5981956720352173
Training Epoch completa

Test (split  validation ):
   Loss medio:  tensor(1.5763, device='cuda:0') Perplejidad:  4.836994072910687 %

Epoch: 1
   Loss:  1.70144522190094
   Loss:  1.4308671951293945
   Loss:  1.5377442836761475
   Loss:  1.4777427911758423
   Loss:  1.5689356327056885
   Loss:  1.6485809087753296
   Loss:  1.440198540687561
   Loss:  1.378949761390686
   Loss:  1.3459540605545044
   Loss:  1.5847183465957642
Training Epoch completa

Test (split  validation ):
   Loss medio:  tensor(1.5058, device='cuda:0') Perplejidad:  4.507577882867828 %

Epoch: 2
   Loss:  1.4180834293365479
   Loss:  1.456786036491394
   Loss:  1.3294360637664795
   Loss:  1.449589490890503
   Loss:  1.4153

In [None]:
# calculamos la perdida sobre el set de test

# cargamos los parametros del modelo de 'transformer_model.pt'
with open('transformer_model.pt', 'rb') as f:
    model = torch.load(f)

# Run on test data.
test_loss = test('test')

Test (split  test ):
   Loss medio:  tensor(1.4264, device='cuda:0') Perplejidad:  4.163745236912138 %



# Generacion de texto

En este bloque se hace la inferencia con la red ya entrenada

In [None]:
# semilla del generador de numeros aleatorios
# para conseguir resultados deterministas
torch.manual_seed(42)

# configuramos como de largo queremos que sea el texto
chars = 2000

# cargamos los parametros del modelo de 'transformer_model.pt'
with open('transformer_model.pt', 'rb') as f:
    model = torch.load(f, map_location=device)

generate_text()

É de milagro cabrón de sus razos, acar de aventura 
servió y detrás, o don Quijote, como se le semejoles 
arman; ma que me paesa! ''¡Badas en la razón de las 
ajes a comer la vi-?
— No causar del señor don Quijote, «una fama de nuestra 
salaldo a decir que quieres, a como seando; que, qué 
cuando con aventura que por mi alma iba trón —dijo 
don Quijote—, que le pagará nombre le tabaja; y sol 
no nos quería dije el casio El cabretis.
Por Gábitanten.
Con esto, vos descable, según sacaba por el mono delantero, 
a merced con píaligos que vuesa merced sea paparte 
a la mesa—abañan verá la fura de esesa—: de la veces 
más que corrían estaba de una que me caneces? Y su 
bueno, si ahogaba a pasaz mil jóndano, y a para que 
haber de las hermosas que aguisor que tenía viejo, 
le te supa me responden del duques, ni —respondió pero 
de embazatar en la vencida, importión, barbero que 
me primero negre y despedido de las naturalezanas, 
de mi perroganio y Sancho gan . Murestras en yo estamoradas 
de