<br>
<br>

# **Modelos del lenguaje basados en redes neuronales artificiales**

## **Modelos seq2seq 2**

Cuando la entrada de cada paso del decodificador proviene de la salida del paso anterior, estamos hablando de un modelo Seq2Seq con realimentación (feedback). Esto es especialmente común en tareas como la generación de texto.

1. **Inicio de la Secuencia**: Se inicia la generación con un token especial, como `<SOS>` (Start of Sequence).

2. **Generación Paso a Paso**:
   - En el primer paso, el decodificador recibe el `<SOS>` y el estado del codificador como entrada.
   - El decodificador procesa esta entrada y genera una salida para este paso.
   - La salida generada se convierte en la entrada para el siguiente paso, junto con el estado actualizado del decodificador.
   - Este proceso se repite hasta que se genera un token especial de fin de secuencia (`<EOS>`, End of Sequence) o hasta alcanzar un límite máximo de longitud.

3. **Ventajas y Desventajas**:
   - **Ventajas**: Esta forma de generar secuencias puede ayudar a mantener la coherencia en las secuencias generadas, ya que cada nueva palabra o token tiene en cuenta lo que ya se ha generado.
   - **Desventajas**: Puede ser más lento, ya que cada paso depende del anterior, y errores en un paso pueden propagarse y afectar los pasos siguientes.


Imagina que tienes un modelo seq2seq entrenado para traducir inglés a español. Para la frase "How are you?", el proceso sería algo así:

1. El codificador procesa "How are you?" y genera un vector latente.
2. El decodificador recibe el vector latente y el token `<SOS>`.
3. El decodificador genera "¿Cómo", actualiza su estado y toma "¿Cómo" como entrada para el siguiente paso.
4. El decodificador genera "estás", actualiza su estado y toma "estás" como entrada para el siguiente paso.
5. Y así sucesivamente, hasta generar `<EOS>` para indicar el final de la secuencia.

Este modelo de generación permite que el decodificador tenga en cuenta no solo el contexto proporcionado por el codificador, sino también la estructura de la secuencia que está generando, paso a paso.


<p align="center">
<img src="imgs/seq2seq_feedback.svg" width="70%">
</p>


### **Teacher Forcing**

**Teacher forcing** es una técnica utilizada en el entrenamiento de modelos seq2seq en la que, en un porcentaje de las veces, se utiliza la salida real (etiqueta) de un paso de tiempo como entrada para el siguiente paso, en lugar de la salida predicha por el modelo. Esta técnica puede ayudar a acelerar la convergencia y mejorar el rendimiento del modelo, especialmente en las etapas iniciales del entrenamiento.

#### **¿Cómo funciona?**

1. **Durante el Entrenamiento**: En cada paso de tiempo y con una cierta probabilidad de que suceda, en lugar de pasar la predicción del modelo del paso anterior como entrada al siguiente paso, se pasa la palabra real de la secuencia objetivo. Esto proporciona al modelo información directa y clara sobre cómo debería haber respondido en el paso anterior, independientemente de si la predicción fue correcta o no.

2. **Durante la Evaluación/Predicción**: El modelo debe generar secuencias por sí mismo, utilizando sus propias predicciones del paso anterior para el siguiente paso. Durante esta fase, no se utiliza "teacher forcing".

#### **Ventajas de Teacher Forcing:**

1. **Aprendizaje más Rápido**: Al proporcionar al modelo la respuesta correcta en cada paso, se reduce la propagación de errores a través de la secuencia, lo que puede llevar a un aprendizaje más rápido.

2. **Mejor Rendimiento**: Puede resultar en un mejor rendimiento del modelo, especialmente en las primeras etapas del entrenamiento.

### **Implementación traductor inglés a español**

#### **Dataset Europarl**

El conjunto de datos Europarl contiene las transcripciones de los procedimientos del Parlamento Europeo, proporcionando una valiosa fuente de textos paralelos en 21 idiomas europeos. Las oraciones están alineadas entre los idiomas, lo que lo hace especialmente útil para tareas de traducción automática.

Descargamos el dataset Europarl para español-inglés. Una vez descargado tendremos dos ficheros de texto, uno para cada idioma con las frases alineadas. El código siguiente muestra las primeras frases de cada fichero.

In [2]:
!pip uninstall torch torchtext -y
!pip install torch==2.0.1 torchtext==0.15.2 --index-url https://download.pytorch.org/whl/cu118
!pip install portalocker>=2.0.0

Found existing installation: torch 2.5.1+cpu
Uninstalling torch-2.5.1+cpu:
  Successfully uninstalled torch-2.5.1+cpu
[0mLooking in indexes: https://download.pytorch.org/whl/cu118
Collecting torch==2.0.1
  Downloading https://download.pytorch.org/whl/cu118/torch-2.0.1%2Bcu118-cp311-cp311-linux_x86_64.whl (2267.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.3/2.3 GB[0m [31m?[0m eta [36m0:00:00[0m
[?25hCollecting torchtext==0.15.2
  Downloading https://download.pytorch.org/whl/torchtext-0.15.2%2Bcpu-cp311-cp311-linux_x86_64.whl (2.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m78.0 MB/s[0m eta [36m0:00:00[0m
Collecting triton==2.0.0 (from torch==2.0.1)
  Downloading https://download.pytorch.org/whl/triton-2.0.0-1-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (63.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.3/63.3 MB[0m [31m18.8 MB/s[0m eta [36m0:00:00[0m
Collecting 

In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
from torchtext.data.utils import get_tokenizer
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
import torchtext
import torch
from collections import defaultdict
import random


class Translation(Dataset):
    def __init__(self, source_file, target_file):
        self.ingles = []
        self.espanol = []
        self.tokenizer_es = get_tokenizer("spacy", language="es_core_news_md")
        self.tokenizer_en = get_tokenizer("spacy", language="en_core_web_md")
        self.vocab_es = torchtext.vocab.FastText(language='es', unk_init=torch.Tensor.normal_)  # <-- Mirar esto para ver si añadir el token <unk> al vocabulario
        self.vocab_en = torchtext.vocab.FastText(language='en', unk_init=torch.Tensor.normal_)

        self.vocab_en = self.add_sos_eos_unk_pad(self.vocab_en)
        self.vocab_es = self.add_sos_eos_unk_pad(self.vocab_es)

        self.archivo_ingles = source_file
        self.archivo_espanol = target_file

        # Leer el conjunto de datos
        for ingles, espanol in self.read_translation():
            self.ingles.append(ingles)
            self.espanol.append(espanol)


    def add_sos_eos_unk_pad(self, vocabulary):
        words = vocabulary.itos
        vocab = vocabulary.stoi
        embedding_matrix = vocabulary.vectors

        # Tokens especiales
        sos_token = '<sos>'
        eos_token = '<eos>'
        pad_token = '<pad>'
        unk_token = '<unk>'

        # Inicializamos los vectores para los tokens especiales, por ejemplo, con ceros
        sos_vector = torch.full((1, embedding_matrix.shape[1]), 1.)
        eos_vector = torch.full((1, embedding_matrix.shape[1]), 2.)
        pad_vector = torch.zeros((1, embedding_matrix.shape[1]))
        unk_vector = torch.full((1, embedding_matrix.shape[1]), 3.)

        # Añade los vectores al final de la matriz de embeddings
        embedding_matrix = torch.cat((embedding_matrix, sos_vector, eos_vector, unk_vector, pad_vector), 0)

        # Añade los tokens especiales al vocabulario
        vocab[sos_token] = len(vocab)
        vocab[eos_token] = len(vocab)
        vocab[pad_token] = len(vocab)
        vocab[unk_token] = len(vocab)

        words.append(sos_token)
        words.append(eos_token)
        words.append(pad_token)
        words.append(unk_token)

        vocabulary.itos = words
        vocabulary.stoi = vocab
        vocabulary.vectors = embedding_matrix

        default_stoi = defaultdict(lambda : len(vocabulary)-1, vocabulary.stoi)
        vocabulary.stoi = default_stoi

        return vocabulary


    def read_translation(self):
        with open(self.archivo_ingles, 'r', encoding='utf-8') as f_ingles, open(self.archivo_espanol, 'r', encoding='utf-8') as f_espanol:
            for oracion_ingles, oracion_espanol in zip(f_ingles, f_espanol):
                yield oracion_ingles.strip().lower(), oracion_espanol.strip().lower()

    def __len__(self):
        return len(self.ingles)

    def __getitem__(self, idx):
        item = self.ingles[idx], self.espanol[idx]
        tokens_ingles = self.tokenizer_en(item[0])
        tokens_espanol = self.tokenizer_es(item[1])

        tokens_ingles = tokens_ingles + ['<eos>']
        tokens_espanol = ['<sos>'] + tokens_espanol + ['<eos>']

        if not tokens_ingles or not tokens_espanol:
            return torch.zeros(1, 300), torch.zeros(1, 300)
            # raise RuntimeError("Una de las muestras está vacía.")

        tensor_ingles = self.vocab_en.get_vecs_by_tokens(tokens_ingles)
        tensor_espanol = self.vocab_es.get_vecs_by_tokens(tokens_espanol)

        indices_ingles = [self.vocab_en.stoi[token] for token in tokens_ingles] + [self.vocab_en.stoi['<pad>']]
        indices_espanol = [self.vocab_es.stoi[token] for token in tokens_espanol] + [self.vocab_es.stoi['<pad>']]

        return tensor_ingles, tensor_espanol, indices_ingles, indices_espanol



def collate_fn(batch):
    ingles_batch, espanol_batch, ingles_seqs, espanol_seqs = zip(*batch)
    ingles_batch = pad_sequence(ingles_batch, batch_first=True, padding_value=0)
    espanol_batch = pad_sequence(espanol_batch, batch_first=True, padding_value=0)

    # Calcular la longitud máxima de la lista de listas de índices
    pad = espanol_seqs[0][-1]  # token <pad>
    max_len = max([len(l) for l in espanol_seqs])
    for seq in espanol_seqs:
        seq += [pad]*(max_len-len(seq))

    return ingles_batch, espanol_batch, ingles_seqs, espanol_seqs



#### Descarga de "es_core_news_md" y "en_core_web_md"

In [5]:
!python -m spacy download es_core_news_md
!python -m spacy download en_core_web_md

Collecting es-core-news-md==3.7.0
  Downloading https://github.com/explosion/spacy-models/releases/download/es_core_news_md-3.7.0/es_core_news_md-3.7.0-py3-none-any.whl (42.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.3/42.3 MB[0m [31m32.4 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: es-core-news-md
Successfully installed es-core-news-md-3.7.0
[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('es_core_news_md')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
Collecting en-core-web-md==3.7.1
  Downloading https://github.com/explosion/spacy-models/releases/download/en_core_web_md-3.7.1/en_core_web_md-3.7.1-py3-none-any.whl (42.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m42.8/

In [6]:
archivo_ingles = '/content/drive/MyDrive/PLN/Seq2Seq_Attention/data/mock.en'
archivo_espanol = '/content/drive/MyDrive/PLN/Seq2Seq_Attention/data//mock.es'

translation = Translation(archivo_ingles, archivo_espanol)

.vector_cache/wiki.es.vec: 2.59GB [00:08, 323MB/s]                            
100%|██████████| 985667/985667 [01:45<00:00, 9368.46it/s]
.vector_cache/wiki.en.vec: 6.60GB [02:19, 47.2MB/s]                            
100%|██████████| 2519370/2519370 [04:25<00:00, 9485.39it/s]


### **Modelo**

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim

##### **Encoder**

In [8]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers, decoder_hidden_dim=None):
        super().__init__()
        self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)
        if decoder_hidden_dim and hidden_dim != decoder_hidden_dim:
            self.fc_hidden = nn.Linear(hidden_dim, decoder_hidden_dim)
            self.fc_cell = nn.Linear(hidden_dim, decoder_hidden_dim)
        else:
            self.fc_hidden = self.fc_cell = None

    def forward(self, x):
        output, (hidden, cell) = self.rnn(x)
        if self.fc_hidden:
            hidden = self.fc_hidden(hidden)
            cell = self.fc_cell(cell)
        return output, (hidden, cell)


**Atención de Luong**

In [9]:
class LuongAttention(nn.Module):
    def __init__(self, encoder_hidden_dim, decoder_hidden_dim):
        super(LuongAttention, self).__init__()
        self.attn = nn.Linear(encoder_hidden_dim + decoder_hidden_dim, decoder_hidden_dim)
        self.v = nn.Parameter(torch.rand(decoder_hidden_dim))

    def forward(self, hidden, encoder_outputs):
        # hidden: [batch_size, decoder_hidden_dim]
        # encoder_outputs: [batch_size, seq_len, encoder_hidden_dim]
        seq_len = encoder_outputs.size(1)

        # Transformar dimensiones si no coinciden
        hidden = hidden.unsqueeze(1).repeat(1, seq_len, 1)  # [batch_size, seq_len, decoder_hidden_dim]
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))  # [batch_size, seq_len, decoder_hidden_dim]
        energy = energy.permute(0, 2, 1)  # [batch_size, decoder_hidden_dim, seq_len]
        v = self.v.repeat(encoder_outputs.size(0), 1).unsqueeze(1)  # [batch_size, 1, decoder_hidden_dim]
        attention = torch.bmm(v, energy).squeeze(1)  # [batch_size, seq_len]
        return torch.softmax(attention, dim=1)


##### **Decoder**

In [10]:
class DecoderWithAttention(nn.Module):
    def __init__(self, input_dim, encoder_hidden_dim, decoder_hidden_dim, output_dim, num_layers):
        super().__init__()
        self.hidden_dim = decoder_hidden_dim
        self.attention = LuongAttention(encoder_hidden_dim, decoder_hidden_dim)
        self.rnn = nn.LSTM(input_dim + encoder_hidden_dim, decoder_hidden_dim, num_layers, batch_first=True)
        self.fc_out = nn.Linear(decoder_hidden_dim + encoder_hidden_dim, output_dim)

    def forward(self, x, hidden, cell, encoder_outputs):
        # x: [batch_size, 1, input_dim]
        # encoder_outputs: [batch_size, seq_len, encoder_hidden_dim]
        attention_weights = self.attention(hidden[-1], encoder_outputs)  # [batch_size, seq_len]
        attention_weights = attention_weights.unsqueeze(1)  # [batch_size, 1, seq_len]
        context = torch.bmm(attention_weights, encoder_outputs)  # [batch_size, 1, encoder_hidden_dim]
        rnn_input = torch.cat((x, context), dim=2)  # [batch_size, 1, input_dim + encoder_hidden_dim]
        output, (hidden, cell) = self.rnn(rnn_input, (hidden, cell))
        output = self.fc_out(torch.cat((output, context), dim=2))  # [batch_size, 1, output_dim]
        return output, (hidden, cell)


**Seq2Seq**

In [11]:
class Seq2SeqWithAttention(nn.Module):
    def __init__(self, encoder, decoder, vocab_vectors):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.vocab_vectors = vocab_vectors  # Embeddings del vocabulario de destino

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = target.size(0)
        target_len = target.size(1)
        output_dim = self.decoder.fc_out.out_features

        outputs = torch.zeros(batch_size, target_len, output_dim).to(source.device)
        encoder_outputs, (hidden, cell) = self.encoder(source)
        x = target[:, 0, :]

        for t in range(1, target_len):
            output, (hidden, cell) = self.decoder(x.unsqueeze(1), hidden, cell, encoder_outputs)
            outputs[:, t, :] = output.squeeze(1)

            teacher_force = random.random() < teacher_forcing_ratio
            if teacher_force:
                x = target[:, t, :]  # Usa el token real
            else:
                # Obtener el índice del token más probable
                predicted_token_idx = output.argmax(2).squeeze(1)  # [batch_size]

                # Mapear índices predichos a embeddings
                x = self.vocab_vectors[predicted_token_idx]  # [batch_size, input_dim]

        return outputs


In [24]:
input_dim = 300  # Dimensión de las embeddings de entrada
encoder_hidden_dim = 512  # Dimensión oculta del encoder
decoder_hidden_dim = 512  # Dimensión oculta del decoder
output_dim = translation.vocab_es.vectors.shape[0]  # Tamaño del vocabulario de salida
num_layers = 2  # Número de capas en LSTM
learning_rate = 0.001
num_epochs = 15
batch_size = 8
num_workers = 0
shuffle = True

# Inicializa el modelo
encoder = Encoder(input_dim, encoder_hidden_dim, num_layers, decoder_hidden_dim=decoder_hidden_dim)
decoder = DecoderWithAttention(input_dim, encoder_hidden_dim, decoder_hidden_dim, output_dim, num_layers)
model = Seq2SeqWithAttention(encoder, decoder, vocab_vectors=translation.vocab_es.vectors)

optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

# DataLoader
from torch.utils.data import DataLoader

dataloader = DataLoader(
    translation,
    batch_size=batch_size,
    shuffle=shuffle,
    num_workers=num_workers,
    collate_fn=collate_fn
)


In [None]:
# Bucle de entrenamiento
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch_idx, (src, tgt, src_indices, tgt_indices) in enumerate(dataloader):
        optimizer.zero_grad()
        output = model(src, tgt)

        tgt_indices = torch.tensor(tgt_indices, dtype=torch.long)
        loss = 0
        for t in range(1, tgt.shape[1]):
            loss += criterion(output[:, t, :], tgt_indices[:, t])
        # loss = criterion(output, torch.tensor(tgt_indices, dtype=torch.long))

        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        if batch_idx % 5 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(dataloader)}], Loss: {loss.item():.4f}')

    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {total_loss / len(dataloader):.4f}')


Epoch [1/15], Step [1/2], Loss: 41.3869
Epoch [1/15], Average Loss: 40.9820
Epoch [2/15], Step [1/2], Loss: 38.3881
Epoch [2/15], Average Loss: 31.4769
Epoch [3/15], Step [1/2], Loss: 29.6161
Epoch [3/15], Average Loss: 26.9239
Epoch [4/15], Step [1/2], Loss: 16.8470
Epoch [4/15], Average Loss: 14.9899
Epoch [5/15], Step [1/2], Loss: 11.9019
Epoch [5/15], Average Loss: 9.5274
Epoch [6/15], Step [1/2], Loss: 10.4367
Epoch [6/15], Average Loss: 7.8637
Epoch [7/15], Step [1/2], Loss: 7.1145
Epoch [7/15], Average Loss: 6.9653
Epoch [8/15], Step [1/2], Loss: 6.0092
Epoch [8/15], Average Loss: 5.6090
Epoch [9/15], Step [1/2], Loss: 5.8487


In [23]:
# Test the model with input sentences
model.eval()

sentence = "cat"

# Convertir a vectores
tokens = translation.tokenizer_en(sentence)
tokens = tokens + ['<eos>']
text_tensor = translation.vocab_en.get_vecs_by_tokens(tokens)
text_tensor = text_tensor.unsqueeze(0)

with torch.no_grad():
    encoder_outputs, (hidden, cell) = model.encoder(text_tensor)

outputs = []

input_token = torch.tensor(translation.vocab_es.stoi['<sos>']).unsqueeze(0)
input_token = translation.vocab_es.vectors[input_token].unsqueeze(0)


for _ in range(5):
    with torch.no_grad():
        output, (hidden, cell) = model.decoder(input_token, hidden, cell, encoder_outputs) # teacher_forcing_ratio=0.0

    # Obtener el token con la probabilidad más alta
    best_guess = output.argmax(2).squeeze(0)
    outputs.append(best_guess.item())

    # Si el token es <eos>, terminar la traducción
    if best_guess == translation.vocab_es.stoi['<eos>']:
        break

    # Utilizar la palabra predicha como la siguiente entrada al decoder
    input_token = translation.vocab_es.vectors[best_guess].unsqueeze(0)

# Convertir los índices de salida a palabras
translated_sentence = [translation.vocab_es.itos[idx] for idx in outputs]

result = ' '.join(translated_sentence)

print(result)

<eos>


In [None]:
# Test the model with input sentences
model.eval()

sentence = "house"

# Convertir a vectores
tokens = translation.tokenizer_en(sentence)
tokens = tokens + ['<eos>']
text_tensor = translation.vocab_en.get_vecs_by_tokens(tokens)
text_tensor = text_tensor.unsqueeze(0)  # [1, seq_len, input_dim]

with torch.no_grad():
    encoder_outputs, (hidden, cell) = model.encoder(text_tensor)

outputs = []

# Inicializar el token de inicio (<sos>)
input_token_idx = torch.tensor([translation.vocab_es.stoi['<sos>']]).to(text_tensor.device)  # [1]
input_token = translation.vocab_es.vectors[input_token_idx].unsqueeze(0)  # [1, 1, input_dim]

for _ in range(5):  # Limitar la longitud de la traducción a 5 tokens
    with torch.no_grad():
        # Pasar el token actual al decoder
        output, (hidden, cell) = model.decoder(input_token, hidden, cell, encoder_outputs)  # <--- Se añade encoder_outputs

    # Obtener el token con la probabilidad más alta
    best_guess = output.argmax(2).squeeze(0)  # [batch_size]
    outputs.append(best_guess.item())

    # Si el token es <eos>, terminar la traducción
    if best_guess.item() == translation.vocab_es.stoi['<eos>']:
        break

    # Utilizar la palabra predicha como la siguiente entrada al decoder
    input_token = translation.vocab_es.vectors[best_guess].unsqueeze(0)  # [1, 1, input_dim]

# Convertir los índices de salida a palabras
translated_sentence = [translation.vocab_es.itos[idx] for idx in outputs]

result = ' '.join(translated_sentence)

print(result)


perro <eos>


In [None]:
# Guardar el modelo
#torch.save(model.state_dict(), 'seq2seq.pth')

In [None]:
# Load model from file
#model.load_state_dict(torch.load('seq2seq.pth'))

<All keys matched successfully>