In [2]:
import torch

# Verificar si CUDA está disponible
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

# Imprimir el dispositivo actual
print("Dispositivo actual:", device)

Dispositivo actual: cpu


In [3]:


import os

def read_translation(archivo_ingles, archivo_espanol):
    with open(archivo_ingles, 'r', encoding='utf-8') as f_ingles, open(archivo_espanol, 'r', encoding='utf-8') as f_espanol:
        for oracion_ingles, oracion_espanol in zip(f_ingles, f_espanol):
            yield oracion_ingles.strip(), oracion_espanol.strip()

os.chdir(r"C:\Users\Pedro\Desktop\PLN")
archivo_ingles = 'europarl-v7.es-en.en'
archivo_espanol = 'europarl-v7.es-en.es'

# Leer el conjunto de datos
for i, (ingles, espanol) in enumerate(read_translation(archivo_ingles, archivo_espanol)):
    print('Inglés:', ingles)
    print('Español:', espanol)
    print('---')
    if i == 15:
        break

Inglés: Resumption of the session
Español: Reanudación del período de sesiones
---
Inglés: I declare resumed the session of the European Parliament adjourned on Friday 17 December 1999, and I would like once again to wish you a happy new year in the hope that you enjoyed a pleasant festive period.
Español: Declaro reanudado el período de sesiones del Parlamento Europeo, interrumpido el viernes 17 de diciembre pasado, y reitero a Sus Señorías mi deseo de que hayan tenido unas buenas vacaciones.
---
Inglés: Although, as you will have seen, the dreaded 'millennium bug' failed to materialise, still the people in a number of countries suffered a series of natural disasters that truly were dreadful.
Español: Como todos han podido comprobar, el gran "efecto del año 2000" no se ha producido. En cambio, los ciudadanos de varios de nuestros países han sido víctimas de catástrofes naturales verdaderamente terribles.
---
Inglés: You have requested a debate on this subject in the course of the next

In [4]:
from torchtext.data.utils import get_tokenizer
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence
import torchtext
import torch
from collections import defaultdict


class Translation(Dataset):
    def __init__(self, source_file, target_file):
        self.ingles = []
        self.espanol = []
        self.tokenizer_es = get_tokenizer("spacy", language="es_core_news_md")
        self.tokenizer_en = get_tokenizer("spacy", language="en_core_web_md")
        self.vocab_es = torchtext.vocab.FastText(language='es', unk_init=torch.Tensor.normal_)  # <-- Mirar esto para ver si añadir el token <unk> al vocabulario
        self.vocab_en = torchtext.vocab.FastText(language='en', unk_init=torch.Tensor.normal_)

        self.vocab_en = self.add_sos_eos_unk_pad(self.vocab_en)
        self.vocab_es = self.add_sos_eos_unk_pad(self.vocab_es)

        self.archivo_ingles = source_file
        self.archivo_espanol = target_file

        # Leer el conjunto de datos
        for ingles, espanol in self.read_translation():
            self.ingles.append(ingles)
            self.espanol.append(espanol)


    def add_sos_eos_unk_pad(self, vocabulary):
        words = vocabulary.itos
        vocab = vocabulary.stoi
        embedding_matrix = vocabulary.vectors

        # Tokens especiales
        sos_token = '<sos>'
        eos_token = '<eos>'
        pad_token = '<pad>'
        unk_token = '<unk>'

        # Inicializamos los vectores para los tokens especiales, por ejemplo, con ceros
        sos_vector = torch.full((1, embedding_matrix.shape[1]), 1.)
        eos_vector = torch.full((1, embedding_matrix.shape[1]), 2.)
        pad_vector = torch.zeros((1, embedding_matrix.shape[1]))
        unk_vector = torch.full((1, embedding_matrix.shape[1]), 3.)

        # Añade los vectores al final de la matriz de embeddings
        embedding_matrix = torch.cat((embedding_matrix, sos_vector, eos_vector, unk_vector, pad_vector), 0)

        # Añade los tokens especiales al vocabulario
        vocab[sos_token] = len(vocab)
        vocab[eos_token] = len(vocab)
        vocab[pad_token] = len(vocab)
        vocab[unk_token] = len(vocab)

        words.append(sos_token)
        words.append(eos_token)
        words.append(pad_token)
        words.append(unk_token)

        vocabulary.itos = words
        vocabulary.stoi = vocab
        vocabulary.vectors = embedding_matrix

        default_stoi = defaultdict(lambda : len(vocabulary)-1, vocabulary.stoi)
        vocabulary.stoi = default_stoi
    
        return vocabulary
        

    def read_translation(self):
        with open(self.archivo_ingles, 'r', encoding='utf-8') as f_ingles, open(self.archivo_espanol, 'r', encoding='utf-8') as f_espanol:
            for oracion_ingles, oracion_espanol in zip(f_ingles, f_espanol):
                yield oracion_ingles.strip().lower(), oracion_espanol.strip().lower()

    def __len__(self):
        return len(self.ingles)

    def __getitem__(self, idx):
        item = self.ingles[idx], self.espanol[idx]
        tokens_ingles = self.tokenizer_en(item[0])
        tokens_espanol = self.tokenizer_es(item[1])

        tokens_ingles = tokens_ingles + ['<eos>']
        tokens_espanol = ['<sos>'] + tokens_espanol + ['<eos>']

        if not tokens_ingles or not tokens_espanol:
            return torch.zeros(1, 300), torch.zeros(1, 300)
            # raise RuntimeError("Una de las muestras está vacía.")
    
        tensor_ingles = self.vocab_en.get_vecs_by_tokens(tokens_ingles)
        tensor_espanol = self.vocab_es.get_vecs_by_tokens(tokens_espanol)

        indices_ingles = [self.vocab_en.stoi[token] for token in tokens_ingles] + [self.vocab_en.stoi['<pad>']]
        indices_espanol = [self.vocab_es.stoi[token] for token in tokens_espanol] + [self.vocab_es.stoi['<pad>']]

        return tensor_ingles, tensor_espanol, indices_ingles, indices_espanol
        
            
        
def collate_fn(batch):
    ingles_batch, espanol_batch, ingles_seqs, espanol_seqs = zip(*batch)
    ingles_batch = pad_sequence(ingles_batch, batch_first=True, padding_value=0)
    espanol_batch = pad_sequence(espanol_batch, batch_first=True, padding_value=0)

    # Calcular la longitud máxima de la lista de listas de índices
    pad = espanol_seqs[0][-1]  # token <pad>
    max_len = max([len(l) for l in espanol_seqs])
    for seq in espanol_seqs:
        seq += [pad]*(max_len-len(seq))
        
    return ingles_batch, espanol_batch, ingles_seqs, espanol_seqs

In [25]:
import torch
import torch.nn as nn
import torch.optim as optim
import spacy

archivo_ingles = 'mock.en'
archivo_espanol = 'mock.es'

translation = Translation(archivo_ingles, archivo_espanol)

In [26]:
class Encoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers):
        super().__init__() 
        self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)

    def forward(self, x):
        output, (hidden, cell) = self.rnn(x)
        return output, (hidden, cell)

In [27]:
class Decoder(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers):
        super().__init__()
        self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True)  # TO DO: Añadir dropout 
        self.fc_out = nn.Linear(hidden_dim, output_dim)

    def forward(self, x, hidden, cell):
        output, (hidden, cell) = self.rnn(x, (hidden, cell))
        output = self.fc_out(output)
        return output, (hidden, cell)

In [28]:
import random

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder                           
        self.es_embeddings = torchtext.vocab.FastText(language='es')
        self.M = self.es_embeddings.vectors
        self.M = torch.cat((self.M, torch.zeros((4, self.M.shape[1]))), 0)

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        target_len = target.shape[1]
        batch_size = target.shape[0]

        # Tensor para almacenar las salidas del decoder
        outputs = torch.zeros(batch_size, target_len, 985671)
        
        # Primero, la fuente es procesada por el encoder
        _, (hidden, cell) = self.encoder(source)

        # La primera entrada al decoder es el vector <sos>
        x = target[:, 0, :]

        for t in range(1, target_len):
            output, (hidden, cell) = self.decoder(x.unsqueeze(1), hidden, cell)
            outputs[:, t, :] = output.squeeze(1)
            
            teacher_force = random.random() < teacher_forcing_ratio
            if teacher_force:
                x = target[:, t, :]
            else:
                x = torch.matmul(output.squeeze(1), self.M)

        return outputs

In [29]:
# Parámetros
input_dim = 300
output_dim = translation.vocab_es.vectors.shape[0]
hidden_dim = 512
num_layers = 2
learning_rate = 0.001
num_epochs = 30
batch_size = 8
num_workers = 0
shuffle = True

# Inicializa el modelo, el optimizador y la función de pérdida
encoder = Encoder(input_dim, hidden_dim, num_layers)
decoder = Decoder(input_dim, hidden_dim, output_dim, num_layers)
model = Seq2Seq(encoder, decoder).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

# DataLoader
from torch.utils.data import DataLoader

dataloader = DataLoader(translation, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers, collate_fn=collate_fn)

In [10]:
# Bucle de entrenamiento
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for batch_idx, (src, tgt, src_indices, tgt_indices) in enumerate(dataloader):
        optimizer.zero_grad()
        output = model(src, tgt)

        tgt_indices = torch.tensor(tgt_indices, dtype=torch.long)
        loss = 0
        for t in range(1, tgt.shape[1]):
            loss += criterion(output[:, t, :], tgt_indices[:, t])
        # loss = criterion(output, torch.tensor(tgt_indices, dtype=torch.long))

        loss.backward()
        optimizer.step()
        total_loss += loss.item()

        if batch_idx % 5 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{batch_idx+1}/{len(dataloader)}], Loss: {loss.item():.4f}')

    print(f'Epoch [{epoch+1}/{num_epochs}], Average Loss: {total_loss / len(dataloader):.4f}')

Epoch [1/30], Step [1/2], Loss: 41.4337
Epoch [1/30], Average Loss: 34.4237
Epoch [2/30], Step [1/2], Loss: 40.5665
Epoch [2/30], Average Loss: 40.1977
Epoch [3/30], Step [1/2], Loss: 37.6587
Epoch [3/30], Average Loss: 30.5162
Epoch [4/30], Step [1/2], Loss: 33.2907
Epoch [4/30], Average Loss: 32.4507
Epoch [5/30], Step [1/2], Loss: 29.0172
Epoch [5/30], Average Loss: 22.9772
Epoch [6/30], Step [1/2], Loss: 25.2378
Epoch [6/30], Average Loss: 24.5003
Epoch [7/30], Step [1/2], Loss: 21.9994
Epoch [7/30], Average Loss: 21.9235
Epoch [8/30], Step [1/2], Loss: 19.6685
Epoch [8/30], Average Loss: 14.6341
Epoch [9/30], Step [1/2], Loss: 17.6373
Epoch [9/30], Average Loss: 13.0504
Epoch [10/30], Step [1/2], Loss: 16.0583
Epoch [10/30], Average Loss: 11.5565
Epoch [11/30], Step [1/2], Loss: 14.4137
Epoch [11/30], Average Loss: 10.7233
Epoch [12/30], Step [1/2], Loss: 12.5568
Epoch [12/30], Average Loss: 12.0838
Epoch [13/30], Step [1/2], Loss: 10.8682
Epoch [13/30], Average Loss: 7.7104
Epoch

In [30]:
# Test the model with input sentences
model.eval()

sentence = "dog"

# Convertir a vectores
tokens = translation.tokenizer_en(sentence)
tokens = tokens + ['<eos>']
text_tensor = translation.vocab_en.get_vecs_by_tokens(tokens)
text_tensor = text_tensor.unsqueeze(0)

with torch.no_grad():
    encoder_outputs, (hidden, cell) = model.encoder(text_tensor)

outputs = []

input_token = torch.tensor(translation.vocab_es.stoi['<sos>']).unsqueeze(0)
input_token = translation.vocab_es.vectors[input_token].unsqueeze(0)
    

for _ in range(5):
    with torch.no_grad():
        output, (hidden, cell) = model.decoder(input_token, hidden, cell) # teacher_forcing_ratio=0.0
        
    # Obtener el token con la probabilidad más alta
    best_guess = output.argmax(2).squeeze(0)
    outputs.append(best_guess.item())
        
    # Si el token es <eos>, terminar la traducción
    if best_guess == translation.vocab_es.stoi['<eos>']:
        break
        
    # Utilizar la palabra predicha como la siguiente entrada al decoder
    input_token = translation.vocab_es.vectors[best_guess].unsqueeze(0)
        
# Convertir los índices de salida a palabras
translated_sentence = [translation.vocab_es.itos[idx] for idx in outputs]
    
result = ' '.join(translated_sentence)

print(result)

tomáis praedator python python alzateaceae


In [12]:
# Guardar el modelo
torch.save(model.state_dict(), 'seq2seq.pth')

In [13]:
# Load model from file
model.load_state_dict(torch.load('seq2seq.pth'))

<All keys matched successfully>