In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset

import numpy as np
import re
from nltk.tokenize import word_tokenize, sent_tokenize

import warnings

warnings.filterwarnings("ignore")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [2]:
class Vocabulary:
    """
    Clase para manejar el vocabulario y las conversiones entre palabras e índices.
    """
    def __init__(self):
        """
        Inicializa el vocabulario con el token de padding.
        """
        self.word2idx = {"<PAD>": 0}  # Token de padding asignado al índice 0
        self.idx2word = {0: "<PAD>"}
        self.idx = 1  # Iniciar índices de palabras desde 1

    def add_sentence(self, sentence):
        """
        Añade todas las palabras de una oración al vocabulario.

        Args:
            sentence (list): Lista de palabras de una oración.
        """
        for word in sentence:
            self.add_word(word)

    def add_word(self, word):
        """
        Añade una palabra al vocabulario si no está presente.

        Args:
            word (str): Palabra a añadir.
        """
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def __len__(self):
        """
        Retorna el número de palabras en el vocabulario.

        Returns:
            int: Tamaño del vocabulario.
        """
        return len(self.word2idx)

    def word_to_index(self, word):
        """
        Convierte una palabra a su índice correspondiente.

        Args:
            word (str): Palabra a convertir.

        Returns:
            int: Índice de la palabra en el vocabulario.
        """
        return self.word2idx.get(word, self.word2idx["<PAD>"])

    def index_to_word(self, idx):
        """
        Convierte un índice a su palabra correspondiente.

        Args:
            idx (int): Índice a convertir.

        Returns:
            str: Palabra correspondiente al índice.
        """
        return self.idx2word.get(idx, "<PAD>")

    def sentence_to_indices(self, sentence):
        """
        Convierte una oración en una lista de índices.

        Args:
            sentence (list): Lista de palabras de una oración.

        Returns:
            list: Lista de índices correspondientes a las palabras.
        """
        return [self.word_to_index(word) for word in sentence]

    def indices_to_sentence(self, indices):
        """
        Convierte una lista de índices en una oración.

        Args:
            indices (list): Lista de índices.

        Returns:
            list: Lista de palabras correspondientes a los índices.
        """
        return [self.index_to_word(idx) for idx in indices]


In [3]:
class TextDataset(Dataset):
    """
    Dataset personalizado para cargar y preprocesar datos de texto.
    """
    def __init__(self, filepath, seq_length=5, num_sentences=None):
        """
        Inicializa el dataset.

        Args:
            filepath (str): Ruta al archivo de texto.
            seq_length (int): Longitud de las secuencias de entrada.
            num_sentences (int, optional): Número de oraciones a utilizar. Si es None, utiliza todas.
        """
        self.filepath = filepath
        self.seq_length = seq_length
        self.num_sentences = num_sentences
        self.vocab = Vocabulary()  # Instancia de la clase Vocabulary
        self.data = self.load_and_preprocess_data()  # Lista de oraciones tokenizadas
        self.inputs, self.targets = self.create_sequences()  # Tensores de entradas y etiquetas

    def load_and_preprocess_data(self):
        """
        Carga y preprocesa los datos de texto.

        Returns:
            list: Lista de oraciones tokenizadas.
        """
        # Leer el archivo de texto
        with open(self.filepath, 'r', encoding='utf-8') as f:
            text = f.read()

        # Limpiar el texto
        text = self.clean_text(text)

        # Tokenizar el texto en oraciones
        sentences = sent_tokenize(text)

        # Limitar el número de oraciones si es necesario
        if self.num_sentences:
            sentences = sentences[:self.num_sentences]

        # Tokenizar cada oración en palabras
        tokenized_sentences = [word_tokenize(sent) for sent in sentences]

        # Construir el vocabulario
        for sentence in tokenized_sentences:
            self.vocab.add_sentence(sentence)

        return tokenized_sentences

    def clean_text(self, text):
        """
        Realiza limpieza del texto eliminando caracteres y patrones no deseados.

        Args:
            text (str): Texto a limpiar.

        Returns:
            str: Texto limpio.
        """
        # Eliminar texto dentro de paréntesis
        text = re.sub(r'\([^)]*\)', '', text)
        # Eliminar signos de igual y tokens desconocidos
        text = re.sub(r'=', '', text)
        text = re.sub(r'<unk>', '', text)
        # Reemplazar múltiples guiones por un espacio
        text = re.sub(r'-{2,}', ' ', text)
        # Reemplazar múltiples puntos por un solo punto
        text = re.sub(r'\.{2,}', '.', text)
        # Eliminar caracteres no deseados, manteniendo letras, números, espacios, puntos y apóstrofes
        text = re.sub(r"[^a-zA-Z0-9\s\.\']", '', text)
        # Reemplazar múltiples espacios por uno solo
        text = re.sub(r'\s+', ' ', text)
        text = text.strip()
        return text

    def create_sequences(self):
        """
        Crea secuencias de entrada y etiquetas para el entrenamiento.

        Returns:
            tuple: Tensores de entradas y etiquetas.
        """
        inputs = []
        targets = []

        for sentence in self.data:
            # Ignorar oraciones muy cortas
            if len(sentence) < 2:
                continue

            # Convertir la oración en índices
            indices = self.vocab.sentence_to_indices(sentence)

            # Crear pares de secuencias y objetivos
            for i in range(1, len(indices)):
                # Obtener la secuencia de entrada de longitud fija
                seq = indices[max(0, i - self.seq_length):i]
                # Aplicar padding si es necesario
                seq = [0] * (self.seq_length - len(seq)) + seq  # Padding con ceros (índice de "<PAD>")
                inputs.append(seq)
                # El objetivo es la palabra actual
                targets.append(indices[i])

        # Convertir las listas en tensores
        inputs = torch.tensor(inputs, dtype=torch.long)
        targets = torch.tensor(targets, dtype=torch.long)
        return inputs, targets

    def __len__(self):
        """
        Retorna el número total de muestras en el dataset.

        Returns:
            int: Número de muestras.
        """
        return len(self.targets)

    def __getitem__(self, idx):
        """
        Retorna la muestra en la posición dada.

        Args:
            idx (int): Índice de la muestra.

        Returns:
            tuple: Secuencia de entrada y etiqueta correspondiente.
        """
        return self.inputs[idx], self.targets[idx]


In [4]:
class GaussianEmbedding(nn.Module):
    """
    Implementación de embeddings gaussianos para representar palabras como distribuciones.
    """
    def __init__(self, num_embeddings, embedding_dim, padding_idx=0):
        """
        Inicializa los embeddings gaussianos.

        Args:
            num_embeddings (int): Número de embeddings (tamaño del vocabulario).
            embedding_dim (int): Dimensión de los embeddings.
            padding_idx (int, optional): Índice para el token de padding.
        """
        super(GaussianEmbedding, self).__init__()
        self.num_embeddings = num_embeddings
        self.embedding_dim = embedding_dim
        self.padding_idx = padding_idx

        # Embeddings para la media y la log-varianza de cada palabra
        self.mean = nn.Embedding(num_embeddings, embedding_dim, padding_idx=padding_idx)
        self.log_var = nn.Embedding(num_embeddings, embedding_dim, padding_idx=padding_idx)

        # Inicializar los pesos de los embeddings de forma uniforme
        nn.init.uniform_(self.mean.weight, -0.1, 0.1)
        nn.init.uniform_(self.log_var.weight, -0.1, 0.1)

    def forward(self, input):
        """
        Realiza el paso hacia adelante obteniendo los embeddings muestreados.

        Args:
            input (Tensor): Tensor de índices de palabras.

        Returns:
            tuple: Tensor muestreado z, medias y log-varianzas.
        """
        # Obtener la media y la log-varianza para los índices de entrada
        mean = self.mean(input)
        log_var = self.log_var(input)

        # Calcular la desviación estándar
        std = torch.exp(0.5 * log_var)

        # Muestrear epsilon de una distribución normal estándar
        epsilon = torch.randn_like(std)

        # Aplicar el truco de reparametrización
        z = mean + epsilon * std

        return z, mean, log_var

    def kl_loss(self, mean, log_var):
        """
        Calcula la pérdida de divergencia KL entre el embedding gaussiano y una normal estándar.

        Args:
            mean (Tensor): Medias de los embeddings.
            log_var (Tensor): Log-varianzas de los embeddings.

        Returns:
            Tensor: Pérdida KL promedio.
        """
        # Calcular la divergencia KL para cada palabra en la secuencia
        kl = -0.5 * torch.sum(1 + log_var - mean.pow(2) - log_var.exp(), dim=2)
        # Retornar la pérdida KL promedio
        return kl.mean()


In [5]:
class LSTMModel(nn.Module):
    """
    Modelo LSTM que utiliza embeddings gaussianos para modelar secuencias de texto.
    """
    def __init__(self, vocab_size, embedding_dim, hidden_dim, padding_idx, dropout_p):
        """
        Inicializa el modelo LSTM.

        Args:
            vocab_size (int): Tamaño del vocabulario.
            embedding_dim (int): Dimensión de los embeddings.
            hidden_dim (int): Dimensión de las capas ocultas del LSTM.
            padding_idx (int): Índice del token de padding.
            dropout_p (float): Probabilidad de dropout.
        """
        super(LSTMModel, self).__init__()

        # Embeddings gaussianos
        self.embedding = GaussianEmbedding(num_embeddings=vocab_size, embedding_dim=embedding_dim,
                                           padding_idx=padding_idx)
        # LSTM unidireccional de una capa
        self.lstm = nn.LSTM(input_size=embedding_dim, hidden_size=hidden_dim, num_layers=1, batch_first=True)
        # Dropout para regularización
        self.dropout = nn.Dropout(dropout_p)
        # Capa totalmente conectada para mapear a los logits de salida
        self.fc = nn.Linear(hidden_dim, vocab_size)

    def forward(self, x, hidden):
        """
        Realiza el paso hacia adelante del modelo.

        Args:
            x (Tensor): Tensor de secuencias de entrada (índices de palabras).
            hidden (tuple): Estados ocultos iniciales del LSTM.

        Returns:
            tuple: Salidas del modelo, estados ocultos finales, medias y log-varianzas de los embeddings.
        """
        # Obtener los embeddings gaussianos muestreados
        x, mean, log_var = self.embedding(x)
        # Pasar los embeddings a través del LSTM
        x, hidden = self.lstm(x, hidden)
        # Aplicar dropout
        x = self.dropout(x)
        # Tomar la salida del último paso temporal
        x = x[:, -1, :]
        # Calcular los logits de salida
        x = self.fc(x)
        return x, hidden, mean, log_var

    def init_hidden(self, batch_size):
        """
        Inicializa los estados ocultos del LSTM con ceros.

        Args:
            batch_size (int): Tamaño del lote.

        Returns:
            tuple: Estados ocultos iniciales (h_0, c_0).
        """
        # Obtener el tipo de dato del modelo
        weight = next(self.parameters()).data
        # Inicializar estados ocultos h_0 y c_0
        return (weight.new_zeros(1, batch_size, self.lstm.hidden_size),
                weight.new_zeros(1, batch_size, self.lstm.hidden_size))


In [6]:
class Trainer:
    """
    Clase para entrenar el modelo y generar texto.
    """
    def __init__(self, model, train_dataset, batch_size=64, lr=0.001, clip=5, kl_weight=0.1):
        """
        Inicializa el entrenador.

        Args:
            model (nn.Module): Modelo a entrenar.
            train_dataset (Dataset): Conjunto de datos de entrenamiento.
            batch_size (int, optional): Tamaño del lote.
            lr (float, optional): Tasa de aprendizaje.
            clip (float, optional): Valor para clipping de gradientes.
            kl_weight (float, optional): Peso para la pérdida de divergencia KL.
        """
        self.model = model
        # Cargador de datos para el entrenamiento
        self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        self.vocab = train_dataset.vocab
        self.seq_length = train_dataset.seq_length
        # Función de pérdida (ignora el índice del token de padding)
        self.criterion = nn.CrossEntropyLoss(ignore_index=0)
        # Optimizador Adam
        self.optimizer = torch.optim.Adam(model.parameters(), lr=lr)
        self.clip = clip  # Valor para clipping de gradientes
        self.kl_weight = kl_weight  # Peso para la pérdida de divergencia KL

    def train(self, epochs):
        """
        Entrena el modelo durante un número de épocas.

        Args:
            epochs (int): Número de épocas de entrenamiento.
        """
        for epoch in range(1, epochs + 1):
            self.model.train()  # Establecer el modo de entrenamiento
            epoch_loss = 0  # Acumulador de pérdida para la época

            for inputs, targets in self.train_loader:
                batch_size = inputs.size(0)
                # Inicializar los estados ocultos del LSTM
                hidden = self.model.init_hidden(batch_size)
                # Desconectar los estados ocultos para evitar problemas de gradientes
                hidden = tuple([h.data for h in hidden])

                # Reiniciar los gradientes del modelo
                self.model.zero_grad()

                # Pasar las entradas a través del modelo
                outputs, hidden, mean, log_var = self.model(inputs, hidden)

                # Calcular la pérdida de entropía cruzada (NLLLoss)
                nll_loss = self.criterion(outputs, targets)

                # Calcular la pérdida de divergencia KL
                kl_loss = self.model.embedding.kl_loss(mean, log_var)

                # Pérdida total
                loss = nll_loss + self.kl_weight * kl_loss

                # Backpropagation
                loss.backward()

                # Aplicar clipping de gradientes para evitar explosión de gradientes
                nn.utils.clip_grad_norm_(self.model.parameters(), self.clip)

                # Actualizar los parámetros del modelo
                self.optimizer.step()

                # Acumular la pérdida de la época
                epoch_loss += loss.item() * batch_size

            # Calcular la pérdida promedio y la perplejidad
            avg_loss = epoch_loss / len(self.train_loader.dataset)
            perplexity = np.exp(avg_loss)

            print(f'Epoch {epoch}, Loss: {avg_loss:.4f}, Perplexity: {perplexity:.4f}')

            # Generar texto después de cada época
            generated_text = self.generate_text('The', 50, top_k=5)
            print(f'Text generated after epoch {epoch}:\n{generated_text}\n')

    def generate_text(self, init_text, length, top_k=5):
        """
        Genera texto utilizando el modelo entrenado.

        Args:
            init_text (str): Texto inicial para comenzar la generación.
            length (int): Número de palabras a generar.
            top_k (int, optional): Número de opciones más probables para muestrear la siguiente palabra.

        Returns:
            str: Texto generado.
        """
        self.model.eval()  # Establecer el modo de evaluación
        words = word_tokenize(init_text)  # Tokenizar el texto inicial
        state_h, state_c = self.model.init_hidden(1)  # Inicializar estados ocultos

        for _ in range(length):
            # Obtener las últimas (seq_length - 1) palabras como entrada
            input_words = words[-(self.seq_length - 1):]
            # Convertir palabras a índices
            indices = [self.vocab.word_to_index(w) for w in input_words]
            # Aplicar padding si es necesario
            if len(indices) < self.seq_length - 1:
                indices = [0] * (self.seq_length - 1 - len(indices)) + indices

            # Convertir a tensor
            x = torch.tensor([indices], dtype=torch.long)

            with torch.no_grad():
                # Pasar la entrada a través del modelo
                output, (state_h, state_c), mean, log_var = self.model(x, (state_h, state_c))
                # Calcular las probabilidades
                probs = F.softmax(output, dim=1).data

                # Obtener las top_k palabras más probables
                top_probs, top_ix = probs.topk(top_k)

                # Convertir a numpy y aplanar
                top_probs = top_probs.cpu().numpy().squeeze()
                top_ix = top_ix.cpu().numpy().squeeze()

                # Muestrear la siguiente palabra de las top_k opciones
                word_idx = np.random.choice(top_ix, p=top_probs / top_probs.sum())
                # Convertir índice a palabra
                word = self.vocab.index_to_word(word_idx)
                # Añadir la palabra generada a la lista
                words.append(word)

        # Unir las palabras en una cadena de texto
        return ' '.join(words)


In [7]:
# Definir parámetros del modelo y entrenamiento
seq_length = 6  # Longitud de las secuencias de entrada
num_sentences = 20000  # Número de oraciones a utilizar del conjunto de datos
train_filepath = 'resources/data/wikitext2/train.txt'  # Ruta al archivo de texto
#train_filepath = 'resources/data/wikitext2/baby.txt'  # Ruta al archivo de texto
# Crear el dataset de entrenamiento
train_dataset = TextDataset(train_filepath, seq_length=seq_length, num_sentences=num_sentences)

print(f"Número de secuencias de entrada: {len(train_dataset)}")

# Parámetros del modelo
vocab_size = len(train_dataset.vocab)  # Tamaño del vocabulario
embedding_dim = 100  # Dimensión de los embeddings
hidden_dim = 128  # Dimensión de las capas ocultas del LSTM
padding_idx = train_dataset.vocab.word_to_index("<PAD>")  # Índice del token de padding
dropout_p = 0.1  # Probabilidad de dropout
batch_size = 200  # Tamaño del lote
learning_rate = 0.001  # Tasa de aprendizaje
num_epochs = 10  # Número de épocas de entrenamiento

# Instanciar el modelo con embeddings gaussianos
model = LSTMModel(vocab_size=vocab_size, embedding_dim=embedding_dim,
                  hidden_dim=hidden_dim, padding_idx=padding_idx, dropout_p=dropout_p)

# Instanciar el entrenador
trainer = Trainer(model=model, train_dataset=train_dataset, batch_size=batch_size,
                  lr=learning_rate, kl_weight=0.1)

# Entrenar el modelo
trainer.train(num_epochs)


Número de secuencias de entrada: 435065
Epoch 1, Loss: 7.4478, Perplexity: 1716.0271
Text generated after epoch 1:
The in of . the and the in the the the . . the the and of . of . in the . the . . . . . the the of the . of of the the of in . the . . and . of a the the .

Epoch 2, Loss: 7.2056, Perplexity: 1346.9440
Text generated after epoch 2:
The States and the song the and . the . of the . of in . the the and century the city to the first of the song to . . . in the city of the and of the and . and a the of . of the city of

Epoch 3, Loss: 7.1027, Perplexity: 1215.2363
Text generated after epoch 3:
The first and . of the United first . in the the . . of in the . . . to the game of the and and in the city of the . of the time of the United century of the and in a . . in the first .

Epoch 4, Loss: 7.0408, Perplexity: 1142.2913
Text generated after epoch 4:
The episode of the first and . of the United States and in the and in and and . of the a States of the a and and the . of a and o

In [8]:
# Generar texto después del entrenamiento
init_text = 'Once upon a time'  # Texto inicial para la generación
generated_text = trainer.generate_text(init_text, length=1000, top_k=5)
print("Texto generado después del entrenamiento:")
print(generated_text)


Texto generado después del entrenamiento:
Once upon a time of the city . was a only to the city . is the largest and and the new of the . 's Olympics and the city of the and and and a first . . had the most time of the . of a United of a first in . and the song . . was the only of . 's . 's . is not a in . of a . and and of a time to the first of . 's World Kombat . and and and . was not not in the and . 's World century . and his . . had a in the . and the song and a new of the . of the and . . in the first . of the series of the series of a and the album . in the time of the . of the Missouri of . 's . of the new River of the and . of a second time and the of the and of the United century of a United States . . in the first and the and of the Missouri River was the and of the United Kingdom of October a city 's first the and of his second . was a large to the of the the season and the and . State in and a in a time the second year . and to the first American . was a first and . . is 

In [10]:
# Generar texto después del entrenamiento
init_text = 'The'  # Texto inicial para la generación
generated_text = trainer.generate_text(init_text, length=1000, top_k=5)
print("Texto generado después del entrenamiento:")
print(generated_text)

Texto generado después del entrenamiento:
The first is a and in the second and the and . was the a . of its United States and a world and the . and the song of a . . and a . 's of the first of the and . of a second . was a by . . . was the first in in a . . of a second of and . is and in the city and a and of the series of a and of in 000 . was also by . 's and and . was the of of a and . . of its and . . . was been been first . was been be been in the and in a second season . had been to be to the and of . was not a first . of the city of his and of the Kombat and and the in of . and a first . is the . States in the . . is in an . of the and . was the a and of to the United States . and the first to a first of the game of the city 's and . is not the of to a second and . . had a only of the first of the of the United States and the and . and in a game . was the only of the first of a American and . was had a first of the first and . and a second in the United States . of the Missouri 

In [18]:
torch.save(model.state_dict(), 'model/lstm_gauss_embedding.pth')