<img src="https://github.com/hernancontigiani/ceia_memorias_especializacion/raw/master/Figures/logoFIUBA.jpg" width="500" align="center">


# Procesamiento de lenguaje natural
## Modelo de lenguaje con tokenización por caracteres

### Consigna
- Seleccionar un corpus de texto sobre el cual entrenar el modelo de lenguaje.
- Realizar el pre-procesamiento adecuado para tokenizar el corpus, estructurar el dataset y separar entre datos de entrenamiento y validación.
- Proponer arquitecturas de redes neuronales basadas en unidades recurrentes para implementar un modelo de lenguaje.
- Con el o los modelos que consideren adecuados, generar nuevas secuencias a partir de secuencias de contexto con las estrategias de greedy search y beam search determístico y estocástico. En este último caso observar el efecto de la temperatura en la generación de secuencias.


### Sugerencias
- Durante el entrenamiento, guiarse por el descenso de la perplejidad en los datos de validación para finalizar el entrenamiento. Para ello se provee un callback.
- Explorar utilizar SimpleRNN (celda de Elman), LSTM y GRU.
- rmsprop es el optimizador recomendado para la buena convergencia. No obstante se pueden explorar otros.


In [1]:
import random
import io
import pickle

import os
import platform
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from tensorflow.keras.preprocessing.text import Tokenizer, text_to_word_sequence
from scipy.special import softmax
from tensorflow.keras.losses import SparseCategoricalCrossentropy

import urllib.request


### Datos
Se utilizará el libro Don Quijote en Project Gutenberg

In [2]:
# Fijamos la semilla para reproducibilidad
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)

<torch._C.Generator at 0x7c083a924b30>

In [4]:
# URL del libro en formato texto plano (UTF-8)
# ID 2000 es Don Quijote en Project Gutenberg
url = 'https://www.gutenberg.org/ebooks/2000.txt.utf-8'
filename = 'quijote.txt'

#Descargar el archivo si no existe
if not os.path.exists(filename):
    print(f"Descargando {filename}...")
    urllib.request.urlretrieve(url, filename)
    print("Descarga completada.")
else:
    print(f"El archivo {filename} ya existe.")

# Leer el texto
with open(filename, 'r', encoding='utf-8') as f:
    text = f.read()

# Limpieza de cabeceras y pies de página de Gutenberg
start_marker = "*** START OF THE PROJECT GUTENBERG EBOOK"
end_marker = "*** END OF THE PROJECT GUTENBERG EBOOK"

start_idx = text.find(start_marker)
end_idx = text.find(end_marker)


if start_idx != -1 and end_idx != -1:
    start_idx = text.find("\n", start_idx) + 1
    text = text[start_idx:end_idx]

# Normalización
article_text = text.lower()

print(f"\nLongitud total del corpus: {len(article_text)} caracteres")


El archivo quijote.txt ya existe.

Longitud total del corpus: 2110737 caracteres


In [5]:
# Verificamos la longitud y mostramos fragmentos para confirmar la limpieza
print(f"Longitud del texto: {len(article_text)}")

print("--- Fragmento del inicio ---")
print(article_text[:500])

print("\n--- Fragmento aleatorio del medio ---")
# Tomamos un punto medio aproximado para ver contenido real de la novela
mid_point = len(article_text) // 2
print(article_text[mid_point : mid_point + 500])

Longitud del texto: 2110737
--- Fragmento del inicio ---




el ingenioso hidalgo don quijote de la mancha



por miguel de cervantes saavedra





el ingenioso hidalgo don quijote de la mancha


  
tasa

  
testimonio de las erratas

  
el rey

  
al duque de béjar

  
prólogo

  
al libro de don quijote de la mancha



que trata de la condición y ejercicio del famoso
hidalgo don quijote de la mancha

que trata de la primera salida que de su tierra hizo
el ingenioso don quijote

donde se cuenta la graciosa manera que tuvo don
quijote en armarse cabal

--- Fragmento aleatorio del medio ---
halle desapercebido el enemigo; pero si se tomara mi
consejo, aconsejárale yo que usara de una prevención, de la cual su
majestad la hora de agora debe estar muy ajeno de pensar en ella.

apenas oyó esto el cura, cuando dijo entre sí:

— ¡dios te tenga de su mano, pobre don quijote: que me parece que te
despeñas de la alta cumbre de tu locura hasta el profundo abismo de tu
simplicidad!

mas el barber

## Elegir el tamaño del contexto (ventana de atención)

En este caso, al trabajar con un modelo de lenguaje basado en caracteres, consideramos todo el corpus como un documento continuo. El tamaño del contexto define cuántos caracteres hacia atrás mirará el modelo para predecir el siguiente.

**La intuición detrás de la ventana:**

A diferencia de los modelos basados en palabras (donde un contexto de 10 palabras ya puede capturar una frase completa), en los modelos de caracteres necesitamos una ventana mucho mayor para capturar estructuras gramaticales y semánticas significativas.

* **Ventana pequeña (ej. 10):** El modelo solo vería fragmentos sin sentido como "de la man".
* **Ventana amplia (ej. 100):** El modelo puede ver el equivalente a una o dos oraciones completas, lo que le da suficiente información para aprender patrones más complejos.

> **Decisión:** Por esta razón, elegimos `max_context_size = 100` en lugar de un valor menor. Esta elección asegura que el modelo tenga el contexto suficiente para entender la semántica, algo que se perdería con la configuración típica de un modelo de palabras.

In [7]:

# Tokenización por Caracteres

# Crear el vocabulario único de caracteres
# Usamos 'set' para obtener los caracteres únicos y 'sorted' para tener un orden consistente
chars_vocab = sorted(list(set(article_text)))
vocab_size = len(chars_vocab)

# Mapeos de caracteres a índices y viceversa
char2idx = {char: idx for idx, char in enumerate(chars_vocab)}
idx2char = {idx: char for idx, char in enumerate(chars_vocab)}

print(f"Tamaño del vocabulario de caracteres: {vocab_size}")
print("Caracteres encontrados:", "".join(chars_vocab))

# Función para tokenizar (Texto -> Índices)
def encode_text(text, char_map):
    return [char_map[c] for c in text]

# okenizamos todo el corpus
tokenized_text = encode_text(article_text, char2idx)

print(f"\nEjemplo de tokenización:")
print(f"Texto original: '{article_text[:20]}'")
print(f"Índices: {tokenized_text[:20]}")

Tamaño del vocabulario de caracteres: 62
Caracteres encontrados: 
 !"'(),-.01234567:;?]abcdefghijlmnopqrstuvwxyz¡«»¿àáéíïñóùúü—

Ejemplo de tokenización:
Texto original: '



el ingenioso hid'
Índices: [0, 0, 0, 0, 26, 32, 1, 30, 34, 28, 26, 34, 30, 35, 39, 35, 1, 29, 30, 25]


In [6]:
max_context_size = 100

print(f"Tamaño de contexto definido: {max_context_size} caracteres")

Tamaño de contexto definido: 100 caracteres


In [None]:
# Usaremos las utilidades de procesamiento de textos y secuencias de Keras
from tensorflow.keras.utils import pad_sequences # se utilizará para padding

In [None]:
# en este caso el vocabulario es el conjunto único de caracteres que existe en todo el texto
chars_vocab = set(article_text)

In [None]:
# la longitud de vocabulario de caracteres es:
len(chars_vocab)

In [None]:
# Construimos los dicionarios que asignan índices a caracteres y viceversa.
# El diccionario `char2idx` servirá como tokenizador.
char2idx = {k: v for v,k in enumerate(chars_vocab)}
idx2char = {v: k for k,v in char2idx.items()}

###  Tokenizar

In [None]:
# tokenizamos el texto completo
tokenized_text = [char2idx[ch] for ch in article_text]

In [None]:
tokenized_text[:1000]

### Organizando y estructurando el dataset

In [None]:
# separaremos el dataset entre entrenamiento y validación.
# `p_val` será la proporción del corpus que se reservará para validación
# `num_val` es la cantidad de secuencias de tamaño `max_context_size` que se usará en validación
p_val = 0.1
num_val = int(np.ceil(len(tokenized_text)*p_val/max_context_size))

In [None]:
# separamos la porción de texto utilizada en entrenamiento de la de validación.
train_text = tokenized_text[:-num_val*max_context_size]
val_text = tokenized_text[-num_val*max_context_size:]

In [None]:
tokenized_sentences_val = [val_text[init*max_context_size:init*(max_context_size+1)] for init in range(num_val)]

In [None]:
tokenized_sentences_train = [train_text[init:init+max_context_size] for init in range(len(train_text)-max_context_size+1)]

In [None]:
X = np.array(tokenized_sentences_train[:-1])
y = np.array(tokenized_sentences_train[1:])

Nótese que estamos estructurando el problema de aprendizaje como *many-to-many*:

Entrada: secuencia de tokens [$x_0$, $x_1$, ..., $x_N$]

Target: secuencia de tokens [$x_1$, $x_2$, ..., $x_{N+1}$]

De manera que la red tiene que aprender que su salida deben ser los tokens desplazados en una posición y un nuevo token predicho (el N+1).

La ventaja de estructurar el aprendizaje de esta manera es que para cada token de target se propaga una señal de gradiente por el grafo de cómputo recurrente, que es mejor que estructurar el problema como *many-to-one* en donde sólo una señal de gradiente se propaga.

En este punto tenemos en la variable `tokenized_sentences` los versos tokenizados. Vamos a quedarnos con un conjunto de validación que utilizaremos para medir la calidad de la generación de secuencias con la métrica de Perplejidad.

In [None]:
X.shape

In [None]:
X[0,:10]

In [None]:
y[0,:10]

In [None]:
vocab_size = len(chars_vocab)

# Definiendo el modelo

El modelo que se propone como ejemplo consume los índices de los tokens y los transforma en vectores OHE (en este caso no entrenamos una capa de embedding para caracteres). Esa transformación se logra combinando las capas `CategoryEncoding` que transforma a índices a vectores OHE y `TimeDistributed` que aplica la capa a lo largo de la dimensión "temporal" de la secuencia.

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class RNNModel(nn.Module):
    def __init__(self, vocab_size, hidden_size=200):
        super().__init__()
        self.vocab_size = vocab_size
        self.rnn = nn.RNN(
            input_size=vocab_size,
            hidden_size=hidden_size,
            batch_first=True
        )
        self.fc = nn.Linear(hidden_size, vocab_size)

    def forward(self, x):
        # x: (batch, seq_len, 1) con índices enteros
        x = x.squeeze(-1).long()                   # (batch, seq_len)
        x = F.one_hot(x, num_classes=self.vocab_size).float()  # (batch, seq_len, vocab_size)

        out, _ = self.rnn(x)                      # (batch, seq_len, hidden_size)
        out = self.fc(out)                        # (batch, seq_len, vocab_size)
        return out                                # logits (sin softmax)


model = RNNModel(vocab_size).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=0.001)

print(model)


### Definir el modelo

In [None]:
class TrainerWithPerplexity:
    def __init__(self, model, optimizer, criterion, train_loader, val_loader, patience=5, device="cpu"):
        self.model = model.to(device)
        self.optimizer = optimizer
        self.criterion = criterion
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.patience = patience
        self.device = device

        self.min_score = float("inf")
        self.patience_counter = 0
        self.history_ppl = []

    def compute_perplexity(self):
        self.model.eval()
        scores = []
        with torch.no_grad():
            for xb, yb in self.val_loader:
                xb, yb = xb.to(self.device), yb.to(self.device)
                logits = self.model(xb)  # (batch, seq_len, vocab_size)

                # tomamos la probabilidad del último token predicho
                log_probs = F.log_softmax(logits, dim=-1)

                target = yb[:, -1]  # último token
                probs = log_probs[:, -1, :]

                chosen_log_probs = probs[range(len(target)), target]
                ppl = torch.exp(-chosen_log_probs.mean()).item()
                scores.append(ppl)
        return np.mean(scores)

    def train(self, num_epochs=20, save_path="best_model.pt"):
        for epoch in range(num_epochs):
            self.model.train()
            total_loss = 0
            for xb, yb in self.train_loader:
                xb, yb = xb.to(self.device), yb.to(self.device)
                self.optimizer.zero_grad()
                logits = self.model(xb)

                # logits: (batch, seq_len, vocab_size)
                # target: (batch, seq_len)
                loss = self.criterion(logits.transpose(1, 2), yb)
                loss.backward()
                self.optimizer.step()
                total_loss += loss.item()

            avg_loss = total_loss / len(self.train_loader)

            # calcular ppl en validación
            current_ppl = self.compute_perplexity()
            self.history_ppl.append(current_ppl)

            print(f"Epoch {epoch+1}: train loss={avg_loss:.4f}, val ppl={current_ppl:.4f}")

            # early stopping
            if current_ppl < self.min_score:
                self.min_score = current_ppl
                torch.save(self.model.state_dict(), save_path)
                print("Saved new best model!")
                self.patience_counter = 0
            else:
                self.patience_counter += 1
                if self.patience_counter >= self.patience:
                    break



### Entrenamiento

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

device = "cuda" if torch.cuda.is_available() else "cpu"

# Parámetros
batch_size = 256
num_epochs = 20
patience = 5

train_dataset = torch.utils.data.TensorDataset(torch.tensor(X, dtype=torch.long),
                                               torch.tensor(y, dtype=torch.long))
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

def prepare_val_data(val_data, max_context_size, vocab_size):
    targets, padded = [], []
    info = []
    count = 0

    for seq in val_data:
        len_seq = len(seq)
        subseq = [seq[:i] for i in range(1, len_seq)]
        targets.extend([seq[i] for i in range(1, len_seq)])

        if len(subseq) != 0:
            arr = np.zeros((len(subseq), max_context_size), dtype=np.int64)
            for j, s in enumerate(subseq):
                # truncar si es más larga
                s = s[-max_context_size:]
                arr[j, -len(s):] = s
            padded.append(arr)
            info.append((count, count + len_seq))
            count += len_seq

    padded = np.vstack(padded)
    return torch.tensor(padded, dtype=torch.long), torch.tensor(targets, dtype=torch.long), info


def compute_perplexity(model, val_inputs, val_targets, batch_size=256):
    model.eval()
    all_log_probs = []
    with torch.no_grad():
        for i in range(0, len(val_inputs), batch_size):
            xb = val_inputs[i:i+batch_size].to(device)
            yb = val_targets[i:i+batch_size].to(device)

            logits = model(xb.unsqueeze(-1))  # tu modelo espera (batch, seq_len, 1)
            log_probs = F.log_softmax(logits[:, -1, :], dim=-1)
            chosen = log_probs[range(len(yb)), yb]
            all_log_probs.extend(chosen.cpu().numpy())

    all_log_probs = np.array(all_log_probs)
    ppl = float(np.exp(-all_log_probs.mean()))
    return ppl

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.RMSprop(model.parameters(), lr=0.001)

history_ppl = []
min_score = float("inf")
patience_counter = 0

# preparar datos de validación
val_inputs, val_targets, val_info = prepare_val_data(tokenized_sentences_val,
                                                     max_context_size=max_context_size,
                                                     vocab_size=vocab_size)

# --- training loop ---
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for xb, yb in train_loader:
        xb, yb = xb.to(device), yb.to(device)
        optimizer.zero_grad()
        logits = model(xb.unsqueeze(-1))   # logits: (batch, seq_len, vocab_size)
        loss = criterion(logits.transpose(1, 2), yb)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()

    avg_loss = total_loss / len(train_loader)
    current_ppl = compute_perplexity(model, val_inputs, val_targets, batch_size=batch_size)
    history_ppl.append(current_ppl)

    print(f"Epoch {epoch+1}/{num_epochs} - loss: {avg_loss:.4f} - val_ppl: {current_ppl:.4f}")

    # early stopping
    if current_ppl < min_score:
        min_score = current_ppl
        torch.save(model.state_dict(), "best_model.pt")
        print("Saved new best model!")
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print("Early stopping triggered")
            break


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Entrenamiento
epoch_count = range(1, len(history_ppl) + 1)
sns.lineplot(x=epoch_count,  y=history_ppl)
plt.show()


### Predicción del próximo caracter

In [None]:
# Se puede usar gradio para probar el modelo
# Gradio es una herramienta muy útil para crear interfaces para ensayar modelos
# https://gradio.app/

!pip install -q gradio

In [None]:
import gradio as gr

def model_response(human_text):
    model.eval()  # modo evaluación

    # encodeamos
    encoded = [char2idx.get(ch, 0) for ch in human_text.lower()]  # si el char no está, 0
    if len(encoded) > max_context_size:
        encoded = encoded[-max_context_size:]  # truncar
    else:
        encoded = [0]*(max_context_size - len(encoded)) + encoded  # pad izquierda

    # tensor al device correcto
    x = torch.tensor(encoded, dtype=torch.long).unsqueeze(0).unsqueeze(-1).to(device)
    # shape: (1, seq_len=max_context_size, 1)

    # forward
    with torch.no_grad():
        logits = model(x)              # (1, seq_len, vocab_size)
        probs = F.softmax(logits[0, -1, :], dim=-1)  # último timestep

    y_hat = torch.argmax(probs).item()
    out_word = idx2char[y_hat]

    return human_text + out_word

iface = gr.Interface(
    fn=model_response,
    inputs=["textbox"],
    outputs="text")

iface.launch(debug=True)

### Generación de secuencias

In [None]:
def generate_seq(model, seed_text, max_length, n_words):
    model.eval()
    device = next(model.parameters()).device  # detecta si está en cpu o cuda
    output_text = seed_text

    for _ in range(n_words):
        encoded = [char2idx.get(ch, 0) for ch in output_text.lower()]  # si no está → 0

        # truncar o padear a max_length
        if len(encoded) > max_length:
            encoded = encoded[-max_length:]
        else:
            encoded = [0] * (max_length - len(encoded)) + encoded

        # convertir a tensor (batch=1, seq_len=max_length, 1)
        x = torch.tensor(encoded, dtype=torch.long).unsqueeze(0).unsqueeze(-1).to(device)

        # --- Forward ---
        with torch.no_grad():
            logits = model(x)  # (1, seq_len, vocab_size)
            probs = F.softmax(logits[0, -1, :], dim=-1)
            y_hat = torch.argmax(probs).item()

        # convertir a caracter
        out_char = idx2char[y_hat]
        output_text += out_char

    return output_text

In [None]:
input_text='habia una vez'

generate_seq(model, input_text, max_length=max_context_size, n_words=30)

###  Beam search y muestreo aleatorio

In [None]:
import torch

def encode(text, max_length=max_context_size, device=None):
    # convertir cada caracter a índice (si no está en vocabulario, usar 0)
    encoded = [char2idx.get(ch, 0) for ch in text.lower()]

    # truncar o padear
    if len(encoded) > max_length:
        encoded = encoded[-max_length:]
    else:
        encoded = [0] * (max_length - len(encoded)) + encoded

    # convertir a tensor (batch=1, seq_len, 1)
    tensor = torch.tensor(encoded, dtype=torch.long).unsqueeze(0).unsqueeze(-1)

    if device is not None:
        tensor = tensor.to(device)
    return tensor


def decode(seq):
    if torch.is_tensor(seq):
        seq = seq.cpu().numpy().tolist()
    return ''.join([idx2char[ch] for ch in seq])

In [None]:
import torch
import torch.nn.functional as F
import numpy as np
from scipy.special import softmax

# función que selecciona candidatos para el beam search
def select_candidates(pred, num_beams, vocab_size, history_probs, history_tokens, temp, mode):
    pred_large = []

    for idx, pp in enumerate(pred):
        # sumamos los log probs acumulados
        pred_large.extend(np.log(pp + 1E-10) + history_probs[idx])

    pred_large = np.array(pred_large)

    # criterio de selección
    if mode == 'det':
        idx_select = np.argsort(pred_large)[::-1][:num_beams]  # beam search determinista
    elif mode == 'sto':
        idx_select = np.random.choice(
            np.arange(pred_large.shape[0]),
            num_beams,
            p=softmax(pred_large / temp)
        )
    else:
        raise ValueError(f"Wrong selection mode: {mode}. Use 'det' or 'sto'.")

    new_history_tokens = np.concatenate(
        (np.array(history_tokens)[idx_select // vocab_size],
         np.array([idx_select % vocab_size]).T),
        axis=1
    )

    return pred_large[idx_select.astype(int)], new_history_tokens.astype(int)


def beam_search(model, num_beams, num_words, input_text, max_length,temp=1.0, mode='det'):
    model.eval()
    device = next(model.parameters()).device

    encoded = encode(input_text, max_length=max_length, device=device)  # (1, seq_len, 1)

    with torch.no_grad():
        logits = model(encoded)  # (1, seq_len, vocab_size)
        probs = F.softmax(logits[0, -1, :], dim=-1).cpu().numpy()

    vocab_size = probs.shape[0]

    history_probs = [0] * num_beams
    history_tokens = [encoded.squeeze(-1).cpu().numpy()[0]] * num_beams  # shape: (seq_len,)

    # seleccionar primeros candidatos
    history_probs, history_tokens = select_candidates([probs],
                                                      num_beams,
                                                      vocab_size,
                                                      history_probs,
                                                      history_tokens,
                                                      temp,
                                                      mode)

    #loop beam search
    for i in range(num_words - 1):
        preds = []

        for hist in history_tokens:
            # mantener contexto de tamaño max_length
            input_update = hist[-max_length:]
            x = torch.tensor(input_update, dtype=torch.long).unsqueeze(0).unsqueeze(-1).to(device)

            with torch.no_grad():
                logits = model(x)
                y_hat = F.softmax(logits[0, -1, :], dim=-1).cpu().numpy()

            preds.append(y_hat)

        history_probs, history_tokens = select_candidates(preds,
                                                          num_beams,
                                                          vocab_size,
                                                          history_probs,
                                                          history_tokens,
                                                          temp,
                                                          mode)

    # devolver secuencias generadas (últimos tokens relevantes)
    return history_tokens[:, -(len(input_text) + num_words):]


In [None]:
max_context_size

In [None]:
# predicción con beam search
salidas = beam_search(model,num_beams=10,num_words=200,input_text="habia una vez",max_length=max_context_size,temp=1,mode="sto")

In [None]:
salidas[0]

In [None]:
# veamos las salidas
decode(salidas[0])