### 1. Laboratorio clase 4: Introducción y contexto

En este laboratorio nos enfocamos en modelos secuenciales como las redes neuronales recurrentes clásicas (RNNs) y los LSTMs. El objetivo de este laboratorio es que ahondemos en conceptomos matemáticos tales como la retropropagación en el tiempo, la cual discutiremos a fondo en el primer ejercicio, y la implementación de redes neuronales recurrentes y LSTMs en tareas de completar prompts.

### 2. Retropropagación en el tiempo

En este ejercicio vamos a entrenar una red nueronal recurrente de una capa y cuatro estados ocultos. Teniendo este fin en cuenta, para encontrar los pesos del modelo usaremos el método de retropropagación en el tiempo. Entrenaremos el modelo usando secuencias del abecedario y para finalizar haremos predicciones de caracteres, dada una secuencia de entrada.

In [1]:
# Importamos librería que usaremos y definimos el vocabulario de juguete

import numpy as np

toy_vocab = list("abcdefghijklmnopqrstuvwxyz")
toy_char2idx = {ch: i for i, ch in enumerate(toy_vocab)}
toy_idx2char = {i: ch for ch, i in toy_char2idx.items()}

def one_hot(indices, vocab_size):
    """
    Convierte una lista de índices en una matriz one-hot.

    Args:
        indices (list or np.ndarray): Índices a convertir.
        vocab_size (int): Tamaño del vocabulario.

    Returns:
        np.ndarray: Matriz one-hot.
    """
    return np.eye(vocab_size)[indices]

In [2]:
# Inicializamos los pesos de la red neuronal recurrente

hidden_size = 4
input_size = len(toy_vocab)
output_size = len(toy_vocab)
np.random.seed(0)
Wxh = np.random.randn(hidden_size, input_size) * 0.1
Whh = np.random.randn(hidden_size, hidden_size) * 0.1
Why = np.random.randn(output_size, hidden_size) * 0.1
bh = np.zeros((hidden_size, 1))
by = np.zeros((output_size, 1))

learning_rate = 0.1
epochs = 160

# Definimos función softmax para capa de salida

def softmax(x):
    """
    Calcula la función softmax para un vector de entrada.

    Args:
        x (np.ndarray): Vector de entrada.

    Returns:
        np.ndarray: Vector de probabilidades softmax.
    """
    e_x = np.exp(x - np.max(x))
    return e_x / e_x.sum(axis=0)

# Dividimos el vocabulario en ejemplos de entrenamiento

examples = [(toy_vocab[i:i+3], toy_vocab[i+1:i+4]) for i in range(len(toy_vocab) - 3)]

In [3]:
# Modificamos los parámetros de la red neuronal para que se ajusten a los ejemplos

for epoch in range(epochs):
    total_loss = 0
    for input_seq, target_seq in examples:
        input_indices = [toy_char2idx[ch] for ch in input_seq]
        target_indices = [toy_char2idx[ch] for ch in target_seq]
        X = one_hot(input_indices, input_size)
        Y = target_indices

        h_prev = np.zeros((hidden_size, 1))
        xs, hs, ys, ps = {}, {}, {}, {}
        hs[-1] = h_prev
        loss = 0

        for t in range(len(X)):
            xs[t] = X[t].reshape(-1, 1)
            hs[t] = np.tanh(np.dot(Wxh, xs[t]) + np.dot(Whh, hs[t-1]) + bh)
            ys[t] = np.dot(Why, hs[t]) + by
            ps[t] = softmax(ys[t])
            loss -= np.log(ps[t][Y[t], 0])
        total_loss += loss

        dWxh, dWhh, dWhy = np.zeros_like(Wxh), np.zeros_like(Whh), np.zeros_like(Why)
        dbh, dby = np.zeros_like(bh), np.zeros_like(by)
        dh_next = np.zeros_like(hs[0])

        for t in reversed(range(len(X))):
            dy = ps[t].copy()
            dy[Y[t]] -= 1
            dWhy += np.dot(dy, hs[t].T)
            dby += dy
            dh = np.dot(Why.T, dy) + dh_next
            dh_raw = (1 - hs[t]**2) * dh
            dbh += dh_raw
            dWxh += np.dot(dh_raw, xs[t].T)
            dWhh += np.dot(dh_raw, hs[t-1].T)
            dh_next = np.dot(Whh.T, dh_raw)

        for param, dparam in zip([Wxh, Whh, Why, bh, by], [dWxh, dWhh, dWhy, dbh, dby]):
            param -= learning_rate * dparam

    if epoch % 100 == 0 or epoch == epochs - 1:
        print(f"Epoch {epoch}, Loss: {total_loss:.4f}")


Epoch 0, Loss: 226.3948
Epoch 100, Loss: 9.3322
Epoch 159, Loss: 5.1682


In [4]:
# Hacemos predicciones de secuencias de caracteres

def predict_next(input_seq):
    """
    Predice el siguiente carácter dado una secuencia de entrada usando una RNN simple.

    Args:
        input_seq (str): Secuencia de caracteres de entrada.

    Returns:
        str: Carácter predicho.
    """
    h = np.zeros((hidden_size, 1))
    for ch in input_seq:
        x = one_hot([toy_char2idx[ch]], input_size).reshape(-1, 1)
        h = np.tanh(np.dot(Wxh, x) + np.dot(Whh, h) + bh)
    y = np.dot(Why, h) + by
    p = softmax(y)
    return toy_idx2char[np.argmax(p)]

for prompt in ["abc", "bcd", "cde"]:
    print(f"{prompt} → {predict_next(prompt)}")


abc → d
bcd → e
cde → f


### 3. Redes neuronales recurrentes vs. LSTMs

Este ejercicio se trata de entrenar dos modelos y comparar su desempeño. Los modelos son una RNN clásica y un modelo recurrente con compuertas LSTM. Entrenaremos los modelos con texto estructurado como comandos en diferentes lengujes de programación y textos con rimas y estructuras. Finalmente, vamos a ingresar algunos prompts a los modelos para que estos los completen.

In [5]:
# Importamos librerías necesarias

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import math
from datasets import load_dataset
import torch.nn.functional as F

In [6]:
# Generamos líneas de texto estructurado para el modelo y (opcional) añadimos más textos de Hugging Face

structured_lines = [
    "if (a == b) { return a; }",
    "while (true) { break; }",
    "for (i = 0; i < n; i++) { sum += i; }",
    "def add(x, y): return x + y",
    "while (x > 0) { x--; }",
    "if (user.is_logged_in()) { show_dashboard(); }",
    "try { risky_operation(); } catch (e) { handle_error(); }",
    "The cat sat on the mat.",
    "The dog barked at the moon.",
    "The rain in Spain falls mainly on the plain."
]

structured_text = "\n".join(structured_lines).lower()

# Load WikiText2 from Hugging Face
#wikitext = load_dataset("wikitext", "wikitext-2-raw-v1")
#raw_text = "\n".join(wikitext["train"]["text"]).lower()
combined_text = structured_text #+ "\n" + raw_text[:25000]

In [7]:
# Creamos clases para preparar el dataset y los modelos

class CharDataset(Dataset):
    """
    Dataset de caracteres para entrenamiento de modelos secuenciales.

    Args:
        text (str): Texto de entrada.
        seq_len (int): Longitud de la secuencia de entrada.
    """
    def __init__(self, text, seq_len=40):
        self.chars = sorted(set(text))
        self.char2idx = {ch: i for i, ch in enumerate(self.chars)}
        self.idx2char = {i: ch for ch, i in self.char2idx.items()}
        self.vocab_size = len(self.chars)
        self.data = [text[i:i+seq_len+1] for i in range(len(text) - seq_len)]
        self.seq_len = seq_len

    def __len__(self):
        """
        Devuelve el número de ejemplos en el dataset.

        Returns:
            int: Número de ejemplos.
        """
        return len(self.data)

    def __getitem__(self, i):
        """
        Devuelve el ejemplo en la posición i.

        Args:
            i (int): Índice del ejemplo.

        Returns:
            tuple: (tensor de entrada, tensor de salida)
        """
        chunk = self.data[i]
        x_str, y_str = chunk[:-1], chunk[1:]
        x = torch.tensor([self.char2idx[c] for c in x_str])
        y = torch.tensor([self.char2idx[c] for c in y_str])
        return x, y

seq_len = 40
text = combined_text
print(f"Cargamos {len(text)} caracteres.")
dataset = CharDataset(text, seq_len=seq_len)
loader = DataLoader(dataset, batch_size=1, shuffle=True)

class SimpleRNN(nn.Module):
    """
    Implementa una red neuronal recurrente simple para modelado de secuencias de caracteres.

    Args:
        vocab_size (int): Tamaño del vocabulario.
        embed_dim (int): Dimensión de los embeddings.
        hidden_dim (int): Dimensión del estado oculto.
    """
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.rnn = nn.RNN(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        """
        Realiza la pasada hacia adelante del modelo.

        Args:
            x (torch.Tensor): Secuencia de entrada.

        Returns:
            torch.Tensor: Salida del modelo (logits).
        """
        x = self.embedding(x)
        out, _ = self.rnn(x)
        return self.fc(out)

class LSTMModel(nn.Module):
    """
    Implementa una red neuronal LSTM para modelado de secuencias de caracteres.

    Args:
        vocab_size (int): Tamaño del vocabulario.
        embed_dim (int): Dimensión de los embeddings.
        hidden_dim (int): Dimensión del estado oculto.
    """
    def __init__(self, vocab_size, embed_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(vocab_size, embed_dim)
        self.lstm = nn.LSTM(embed_dim, hidden_dim, batch_first=True)
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x):
        """
        Realiza la pasada hacia adelante del modelo.

        Args:
            x (torch.Tensor): Secuencia de entrada.

        Returns:
            torch.Tensor: Salida del modelo (logits).
        """
        x = self.embedding(x)
        out, _ = self.lstm(x)
        return self.fc(out)

Cargamos 339 caracteres.


In [8]:
# Inicializamos los modelos y optimizadores

vocab_size = dataset.vocab_size
rnn_model = SimpleRNN(vocab_size, embed_dim=64, hidden_dim=128)
lstm_model = LSTMModel(vocab_size, embed_dim=64, hidden_dim=128)

criterion = nn.CrossEntropyLoss()
rnn_optimizer = optim.Adam(rnn_model.parameters(), lr=0.005)
lstm_optimizer = optim.Adam(lstm_model.parameters(), lr=0.005)

In [9]:
# Entrenamos los modelos

print("\n=== Training Models ===")

for epoch in range(5):
    rnn_loss, lstm_loss = 0, 0
    for x, y in loader:

        rnn_out = rnn_model(x)
        loss_rnn = criterion(rnn_out.view(-1, vocab_size), y.view(-1))
        rnn_optimizer.zero_grad()
        loss_rnn.backward()
        rnn_optimizer.step()
        rnn_loss += loss_rnn.item()

        lstm_out = lstm_model(x)
        loss_lstm = criterion(lstm_out.view(-1, vocab_size), y.view(-1))
        lstm_optimizer.zero_grad()
        loss_lstm.backward()
        lstm_optimizer.step()
        lstm_loss += loss_lstm.item()

    print(f"Epoch {epoch}, RNN Loss: {rnn_loss:.4f}, LSTM Loss: {lstm_loss:.4f}")


=== Training Models ===
Epoch 0, RNN Loss: 182.1391, LSTM Loss: 202.2209
Epoch 1, RNN Loss: 51.8698, LSTM Loss: 39.8454
Epoch 2, RNN Loss: 49.2741, LSTM Loss: 34.6696
Epoch 3, RNN Loss: 46.6607, LSTM Loss: 31.3111
Epoch 4, RNN Loss: 44.5551, LSTM Loss: 31.5947


In [10]:
# Evaluamos los modelos

def accuracy(model, data_loader, vocab_size):
    """
    Calcula la exactitud (accuracy) de predicción de caracteres de un modelo.

    Args:
        model (nn.Module): Modelo entrenado.
        data_loader (DataLoader): DataLoader con los datos de evaluación.
        vocab_size (int): Tamaño del vocabulario.

    Returns:
        float: Exactitud de predicción de caracteres.
    """
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for x, y in data_loader:
            outputs = model(x)
            predictions = torch.argmax(outputs, dim=-1)
            correct += (predictions == y).sum().item()
            total += y.numel()
    return correct / total

def perplexity(model, data_loader, vocab_size):
    """
    Calcula la perplejidad de un modelo sobre un conjunto de datos.

    Args:
        model (nn.Module): Modelo entrenado.
        data_loader (DataLoader): DataLoader con los datos de evaluación.
        vocab_size (int): Tamaño del vocabulario.

    Returns:
        float: Perplejidad del modelo.
    """
    model.eval()
    total_loss = 0
    total_words = 0
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        for x, y in data_loader:
            outputs = model(x)
            loss = criterion(outputs.view(-1, vocab_size), y.view(-1))
            total_loss += loss.item() * y.numel()
            total_words += y.numel()
    return math.exp(total_loss / total_words)

print(f"Accuracy de caracteres en RNN: {accuracy(rnn_model, loader, vocab_size):.2%}")
print(f"Accuracy de caracteres en LSTM: {accuracy(lstm_model, loader, vocab_size):.2%}")
print(f"Perplejidad de RNN: {perplexity(rnn_model, loader, vocab_size):.2f}")
print(f"Perplejidad LSTM: {perplexity(lstm_model, loader, vocab_size):.2f}")

Accuracy de caracteres en RNN: 95.48%
Accuracy de caracteres en LSTM: 97.11%
Perplejidad de RNN: 1.15
Perplejidad LSTM: 1.09


¡Esto claramente es overfitting! Lastimosamente, es casi inevitable con corpuses tan pequeños. Veamos como se desempeñan los modelos en ejemplos concretos.

In [11]:
# Definimos funciones para completar prompts

def sample_next_token(logits, temperature=1.0):
    """
    Selecciona el siguiente token a muestrear a partir de los logits y una temperatura dada.

    Args:
        logits (torch.Tensor): Logits de salida del modelo para los posibles tokens.
        temperature (float): Parámetro de suavizado para controlar la aleatoriedad de la muestra.

    Returns:
        int: Índice del siguiente token muestreado.
    """
    logits = logits / temperature
    probs = F.softmax(logits, dim=-1)
    return torch.multinomial(probs, num_samples=1).item()


def complete_prompt(model, prompt, char2idx, idx2char, max_len=100, temperature=0.8):
    """
    Completa un prompt de texto carácter por carácter usando un modelo secuencial (RNN o LSTM).

    Args:
        model (nn.Module): Modelo entrenado para generación de texto.
        prompt (str): Texto inicial a completar.
        char2idx (dict): Diccionario de mapeo de caracteres a índices.
        idx2char (dict): Diccionario de mapeo de índices a caracteres.
        max_len (int): Número máximo de caracteres a generar.
        temperature (float): Parámetro de suavizado para la muestra del siguiente carácter.

    Returns:
        str: Texto generado que completa el prompt.
    """
    model.eval()
    result = list(prompt)
    input_idx = torch.tensor([[char2idx[c] for c in prompt if c in char2idx]])
    with torch.no_grad():
        for _ in range(max_len):
            output = model(input_idx)
            pred = output[:, -1, :]
            next_idx = sample_next_token(pred.squeeze(), temperature)
            next_char = idx2char[next_idx]
            result.append(next_char)
            input_idx = torch.tensor([[char2idx[c] for c in result[-seq_len:] if c in char2idx]])
    return ''.join(result)

example_prompts = [
    "if (a == b) {",
    "while (x < 10) {",
    "for (i = 0; i < n; i++) {",
    "The cat"
]

for prompt in example_prompts:
    print(f"\nPrompt: {prompt}")
    rnn_completion = complete_prompt(rnn_model, prompt.lower(), dataset.char2idx, dataset.idx2char, vocab_size)
    lstm_completion = complete_prompt(lstm_model, prompt.lower(), dataset.char2idx, dataset.idx2char, vocab_size)
    print(f"RNN:  {rnn_completion}")
    print(f"LSTM: {lstm_completion}")


Prompt: if (a == b) {
RNN:  if (a == b) { return x + y
while (x > 0) { show_dash
LSTM: if (a == b) { return a; }
while (true) { break; }
fo

Prompt: while (x < 10) {
RNN:  while (x < 10) { show_dashboard(); }
the cat sat on the
LSTM: while (x < 10) { x--; }
if (user.is_logged_in()) { show

Prompt: for (i = 0; i < n; i++) {
RNN:  for (i = 0; i < n; i++) { sum += i; }
def add(x, y): return x + 
LSTM: for (i = 0; i < n; i++) { sum += i; }
def add(x, y): return x + 

Prompt: The cat
RNN:  the cat sat on the mat.
the dog barked at the 
LSTM: the cat sat on the mat.
the dog barked at the 
