# Ayala Morales Mauricio
### No. de cuenta: 315332122

---

# Modelos del lenguaje Neuronales

## Objetivos

- Utilizar una red neuronal para construir un modelo del lenguaje
    - Utilizaremos la biblioteca de pytorch
    - Generaremos lenguaje
- Explorar los embeddings generados por la red

### Instalación de dependencias

In [1]:
%pip install -U torch

Note: you may need to restart the kernel to use updated packages.


### Importación de módulos y bibliotecas

In [15]:

from nltk.tokenize import word_tokenize
from nltk import ngrams
import numpy as np
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
import time
from random import randint

# Práctica 7: Estrategias de generación de texto

**Entrega: 10 de Noviembre 2024 11:59p.m.**

- Construir un modelo del lenguaje neuronal a partir de un corpus en español
  - Corpus: El Quijote. URL: https://www.gutenberg.org/ebooks/2000
    - **NOTA: Considera los recursos de computo. Recuerda que en la practica utilizamos ~50k oraciones**

Prepocesamiento del corpus

In [3]:
def preprocess(sent: list[str]) -> list[str]:
    """
    Preprocess the sentence by removing punctuation, converting to lower case, and adding
    special tokens of beginning and end of sentence.

    :param list[str] sent: List of words in a sentence.
    :return list[str]: List of preprocessed words from the sentence.
    """
    result = [word.lower() for word in sent]
    result.append("<EOS>")
    result.insert(0, "<BOS>")
    return result

with open('ElQuijote_corpus.txt', 'r') as f:
    text = f.read()
    f.close()

corpus = [word_tokenize(sentence) for sentence in text.split('\n') if sentence.strip() != '']

- Modelo de trigramas con `n = 3`

In [4]:
def get_words_freqs(corpus: list[list[str]]) -> dict:
    """
    Gets the frequency of each word in the corpus.

    :param list[list[str]] corpus: list of sentences
    :return: dict of words and their frequency
    """
    words_freqs = {}
    for sentence in corpus:
        for word in sentence:
            words_freqs[word] = words_freqs.get(word, 0) + 1
    return words_freqs

def get_words_indexes(words_freqs: dict) -> tuple[dict, dict]:
    """
    Gets the index of each word in the corpus.

    :param dict words_freqs: dict of words and their frequency
    :return: dict of words and their index
    """
    UNK_LABEL = "<UNK>"
    result = {}
    for idx, word in enumerate(words_freqs.keys()):
        # Happax legomena happends
        if words_freqs[word] == 1:
            # Temp index for unknowns
            result[UNK_LABEL] = len(words_freqs)
        else:
            result[word] = idx

    return ({word: idx for idx, word in enumerate(result.keys())},
            {idx: word for idx, word in enumerate(result.keys())})

def get_word_id(words_indexes: dict, word: str) -> int:
    """
    Gets the index of a word in the corpus.

    :param dict words_indexes: dict of words and their index
    :param str word: word to get the index of
    :return: index of the word
    """
    UNK_LABEL = "<UNK>"
    unk_word_id = words_indexes[UNK_LABEL]
    return words_indexes.get(word, unk_word_id)

def get_train_test_data(corpus: list[list[str]], words_indexes: dict, n: int) -> tuple[list, list]:
    """
    Divides the corpus into train and test data.

    :param list[list[str]] corpus: list of sentences
    :param dict words_indexes: dict of words and their index
    :param int n: n-gram size
    :return: tuple of train and test data
    """
    x_train = []
    y_train = []
    for sent in corpus:
        n_grams = ngrams(sent, n)
        for w1, w2, w3 in n_grams:
            x_train.append([get_word_id(words_indexes, w1), get_word_id(words_indexes, w2)])
            y_train.append([get_word_id(words_indexes, w3)])
    return x_train, y_train

words_freqs = get_words_freqs(corpus)

words_indexes, index_to_word = get_words_indexes(words_freqs)

x_train, y_train = get_train_test_data(corpus, words_indexes, n=3)

- Incluye informacion sobre setup de entrenamiento:
    - Dimension de embeddings
    - Dimsension de capa oculta
    - Cantidad de oraciones para entrenamiento
    - Batch size y context size

In [5]:
EMBEDDING_DIM = 200
H = 100
V = len(words_indexes)
CONTEXT_SIZE = 2
BATCH_SIZE = 256

Dividiendo el conjunto de entrenamiento en lotes de 256 elementos

In [None]:
train_set = np.concatenate((x_train, y_train), axis=1)
train_loader = DataLoader(train_set, batch_size = BATCH_SIZE)

Implementación de la red neuronal

In [None]:
class TrigramModel(nn.Module):
    """Clase padre: https://pytorch.org/docs/stable/generated/torch.nn.Module.html"""

    def __init__(self, vocab_size, embedding_dim, context_size, h):
        super(TrigramModel, self).__init__()
        self.context_size = context_size
        self.embedding_dim = embedding_dim
        self.embeddings = nn.Embedding(vocab_size, embedding_dim)
        self.linear1 = nn.Linear(context_size * embedding_dim, h)
        self.linear2 = nn.Linear(h, vocab_size)

    def forward(self, inputs):
        # x': concatenation of x1 and x2 embeddings   -->
        #self.embeddings regresa un vector por cada uno de los índices que se les pase como entrada. view() les cambia el tamaño para concatenarlos
        embeds = self.embeddings(inputs).view((-1,self.context_size * self.embedding_dim))
        # h: tanh(W_1.x' + b)  -->
        out = torch.tanh(self.linear1(embeds))
        # W_2.h                 -->
        out = self.linear2(out)
        # log_softmax(W_2.h)      -->
        # dim=1 para que opere sobre renglones, pues al usar batchs tenemos varios vectores de salida
        log_probs = F.log_softmax(out, dim=1)

        return log_probs

Entrenamiento del modelo

In [None]:
# 1. Pérdida. Negative log-likelihood loss
loss_function = nn.NLLLoss()

# 2. Instanciar el modelo
model = TrigramModel(V, EMBEDDING_DIM, CONTEXT_SIZE, H)

# 3. Optimización. ADAM optimizer
optimizer = optim.Adam(model.parameters(), lr = 2e-3)

# ------------------------- TRAIN & SAVE MODEL ------------------------
EPOCHS = 10
for epoch in range(EPOCHS):
    st = time.time()
    print("\n--- Training model Epoch: {} ---".format(epoch))
    for it, data_tensor in enumerate(train_loader):
        context_tensor = data_tensor[:,0:2]
        target_tensor = data_tensor[:,2]

        model.zero_grad() #reinicializar los gradientes
        #FORWARD:
        # get log probabilities over next words
        log_probs = model(context_tensor)


        # compute loss function
        loss = loss_function(log_probs, target_tensor)

        #BACKWARD:
        # backward pass and update gradient
        loss.backward()
        optimizer.step()

        if it % 500 == 0:
            print("Training Iteration {} of epoch {} complete. Loss: {}; Time taken (s): {}".format(it, epoch, loss.item(), (time.time()-st)))
            st = time.time()
            #barch_size x len(vocab)

    # saving model
    model_path = 'model_{}.dat'.format(epoch)
    torch.save(model.state_dict(), model_path)
    print(f"Model saved for epoch={epoch} at {model_path}")

Entrenamiento del modelo utilizando GPU

In [9]:
# Seleccionar la GPU si está disponible
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# 1. Pérdida. Negative log-likelihood loss
loss_function = nn.NLLLoss()

# 2. Instanciar el modelo y enviarlo a device
model = TrigramModel(V, EMBEDDING_DIM, CONTEXT_SIZE, H).to(device)

# 3. Optimización. ADAM optimizer
optimizer = optim.Adam(model.parameters(), lr = 2e-3)

# ------------------------- TRAIN & SAVE MODEL ------------------------
EPOCHS = 10
for epoch in range(EPOCHS):
    st = time.time()
    print("\n--- Training model Epoch: {} ---".format(epoch))
    for it, data_tensor in enumerate(train_loader):
        # Mover los datos a la GPU
        context_tensor = data_tensor[:,0:2].to(device)
        target_tensor = data_tensor[:,2].to(device)

        model.zero_grad()

        # FORWARD:
        log_probs = model(context_tensor)

        # compute loss function
        loss = loss_function(log_probs, target_tensor)

        # BACKWARD:
        loss.backward()
        optimizer.step()

        if it % 500 == 0:
            print("Training Iteration {} of epoch {} complete. Loss: {}; Time taken (s): {}".format(it, epoch, loss.item(), (time.time()-st)))
            st = time.time()

    # saving model
    model_path = 'model_gpu_{}.dat'.format(epoch)
    torch.save(model.state_dict(), model_path)
    print(f"Model saved for epoch={epoch} at {model_path}")



--- Training model Epoch: 0 ---
Training Iteration 0 of epoch 0 complete. Loss: 10.204422950744629; Time taken (s): 0.2035355567932129
Training Iteration 500 of epoch 0 complete. Loss: 5.255366802215576; Time taken (s): 6.619696378707886
Training Iteration 1000 of epoch 0 complete. Loss: 5.483633041381836; Time taken (s): 6.597528457641602
Training Iteration 1500 of epoch 0 complete. Loss: 6.110557556152344; Time taken (s): 6.599625587463379
Training Iteration 2000 of epoch 0 complete. Loss: 4.806663990020752; Time taken (s): 6.449951887130737
Training Iteration 2500 of epoch 0 complete. Loss: 5.266054630279541; Time taken (s): 6.386515378952026
Training Iteration 3000 of epoch 0 complete. Loss: 5.502137184143066; Time taken (s): 6.624919652938843
Training Iteration 3500 of epoch 0 complete. Loss: 4.755428314208984; Time taken (s): 6.594188690185547
Training Iteration 4000 of epoch 0 complete. Loss: 4.741596221923828; Time taken (s): 6.58011269569397
Model saved for epoch=0 at model_g

- Incluye la liga de drive de tu modelo

https://drive.google.com/file/d/1VZYBN3bcpHDnHM8rqhLMQU2LH6DssTxJ/view?usp=sharing

- Imprima en pantalla un tres ejemplos de generacion de texto
  - Proponga mejoras en las estrategias de generación de texto vistas en la práctica
  - Decriba en que consiste la estrategia propuesta
  - Compare la estrategia de la práctica y su propuesta

In [10]:
PATH = "model_gpu_9.dat"

def get_model(path: str) -> TrigramModel:
    model_loaded = TrigramModel(V, EMBEDDING_DIM, CONTEXT_SIZE, H)
    model_loaded.load_state_dict(torch.load(path))
    model_loaded.eval()
    return model_loaded

model = get_model(PATH)

  model_loaded.load_state_dict(torch.load(path))


In [11]:
MAX_TOKENS = 20
TOP_COUNT = 10

def get_likely_words(model: TrigramModel, context: str, words_indexes: dict, index_to_word: dict, top_count: int=10) -> list[tuple]:
    """
    Get a list of likely words for a given context.

    :param model: Trigram model
    :param context: Context to generate text from
    :param words_indexes: Indexes of words in the model
    :param index_to_word: Words in the model
    :param top_count: Number of likely words to return
    """

    model_probs = {}
    words = context.split()
    idx_word_1 = get_word_id(words_indexes, words[0])
    idx_word_2 = get_word_id(words_indexes, words[1])
    probs = model(torch.tensor([[idx_word_1, idx_word_2]])).detach().tolist()

    for idx, p in enumerate(probs[0]):
        model_probs[idx] = p

    # Strategy: Sort and get top-K words to generate text
    return sorted(((prob, index_to_word[idx]) for idx, prob in model_probs.items()), reverse=True)[:top_count]

def get_next_word(words: list[tuple[float, str]]) -> str:
    """
    Get the next word to generate text from the list of likely words.

    :param words: List of likely words
    :return: Next word to generate text from
    """
    return words[randint(0, len(words)-1)][1]

def generate_text(model: TrigramModel, history: str, words_indexes: dict, index_to_word: dict, tokens_count: int=0) -> None:
    """
    Generate text from the given model and history.

    :param model: Trigram model
    :param history: History of words to generate text from
    :param words_indexes: Indexes of words in the model
    :param index_to_word: Words in the model
    :param tokens_count: Number of tokens generated so far
    """

    next_word = get_next_word(get_likely_words(model, history, words_indexes, index_to_word, top_count=TOP_COUNT))
    print(next_word, end=" ")
    tokens_count += 1
    if tokens_count == MAX_TOKENS or next_word == "<EOS>":
        return
    generate_text(model, history.split()[1]+ " " + next_word, words_indexes, index_to_word, tokens_count)

In [12]:
sentence = "<BOS> Donde"
print(sentence, end=" ")
generate_text(model, sentence, words_indexes, index_to_word)

<BOS> Donde pudieras prosigue los dos no le duelen . Esta doncella del visorrey todas casi tan en la cabeza y el 

In [13]:
sentence = "<BOS> Pensativo"
print(sentence, end=" ")
generate_text(model, sentence, words_indexes, index_to_word)

<BOS> Pensativo y tenemos mucho gusto no diere cima y se alegraba , que en su castillo que el buitre declara un 

In [14]:
sentence = "<BOS> Confusas estaban"
print(sentence, end=" ")
generate_text(model, sentence, words_indexes, index_to_word)

<BOS> Confusas estaban sus huesos le puede dar una dueña , que me confiese por las espaldas para algunas cosas , el pastor 

### Extra

- Visualizar en 2D los vectores de las palabras más comunes (excluir STOP WORDS)

## Referencias

- [Language models - Lena Voita](https://lena-voita.github.io/nlp_course/language_modeling.html#generation_strategies)
- [A Neural Probabilistic Model - Bengio](https://www.jmlr.org/papers/volume3/bengio03a/bengio03a.pdf)
- El código de esta práctica fue fuertemente basado en código de la Dr. Ximea Guitierrez Vasques