## **1. Instalación de librerías**

In [1]:
!pip install sentencepiece sacrebleu datasets

Collecting sacrebleu
  Downloading sacrebleu-2.5.1-py3-none-any.whl.metadata (51 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m51.8/51.8 kB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
Collecting portalocker (from sacrebleu)
  Downloading portalocker-3.2.0-py3-none-any.whl.metadata (8.7 kB)
Collecting colorama (from sacrebleu)
  Downloading colorama-0.4.6-py2.py3-none-any.whl.metadata (17 kB)
Downloading sacrebleu-2.5.1-py3-none-any.whl (104 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m104.1/104.1 kB[0m [31m7.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading colorama-0.4.6-py2.py3-none-any.whl (25 kB)
Downloading portalocker-3.2.0-py3-none-any.whl (22 kB)
Installing collected packages: portalocker, colorama, sacrebleu
Successfully installed colorama-0.4.6 portalocker-3.2.0 sacrebleu-2.5.1


# **2. Descargar dataset español–francés (HuggingFace)**


In [2]:
from datasets import load_dataset
import pandas as pd

print("Descargando dataset español-francés...")
data = load_dataset("opus_books", "es-fr")

src = [x["translation"]["es"] for x in data["train"]]
tgt = [x["translation"]["fr"] for x in data["train"]]

df = pd.DataFrame({"src": src, "tgt": tgt})

# Normalizar
df["src"] = df["src"].str.lower().str.strip()
df["tgt"] = df["tgt"].str.lower().str.strip()

# Filtrar oraciones muy largas
df = df[df["src"].str.len() < 120]
df = df[df["tgt"].str.len() < 120]

# Tomar máximo 50k
df = df.sample(min(50000, len(df)), random_state=42)

df.to_csv("dataset_es_fr.csv", index=False)
print("Dataset listo. Total pares:", len(df))

Descargando dataset español-francés...


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


README.md: 0.00B [00:00, ?B/s]

es-fr/train-00000-of-00001.parquet:   0%|          | 0.00/9.16M [00:00<?, ?B/s]

Generating train split:   0%|          | 0/56319 [00:00<?, ? examples/s]

Dataset listo. Total pares: 32556


# **3. Imports + Configuración**

In [3]:
import os
import random
import numpy as np
import pandas as pd
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import sacrebleu
import sentencepiece as spm
from sklearn.model_selection import train_test_split

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Usando:", DEVICE)

SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

Usando: cuda


# **4. Cargar dataset y dividir**

In [4]:
df = pd.read_csv("dataset_es_fr.csv")

train_df, test_df = train_test_split(df, test_size=0.1, random_state=SEED)
train_df, val_df  = train_test_split(train_df, test_size=0.1, random_state=SEED)

print("Train:", len(train_df), "Val:", len(val_df), "Test:", len(test_df))

Train: 26370 Val: 2930 Test: 3256


# **5. Entrenar SentencePiece**

In [5]:
os.makedirs("spm_models", exist_ok=True)

SRC_VOCAB = 4000
TGT_VOCAB = 4000

with open("spm_src.txt", "w", encoding="utf8") as f:
    f.write("\n".join(train_df["src"].tolist()))

with open("spm_tgt.txt", "w", encoding="utf8") as f:
    f.write("\n".join(train_df["tgt"].tolist()))

spm.SentencePieceTrainer.Train(
    f"--input=spm_src.txt --model_prefix=spm_models/spm_src "
    f"--vocab_size={SRC_VOCAB} --pad_id=0 --bos_id=1 --eos_id=2 --unk_id=3"
)

spm.SentencePieceTrainer.Train(
    f"--input=spm_tgt.txt --model_prefix=spm_models/spm_tgt "
    f"--vocab_size={TGT_VOCAB} --pad_id=0 --bos_id=1 --eos_id=2 --unk_id=3"
)

print("SentencePiece listo.")

SentencePiece listo.


# **6. Cargar tokenizers**

In [6]:
sp_src = spm.SentencePieceProcessor()
sp_tgt = spm.SentencePieceProcessor()

sp_src.load("spm_models/spm_src.model")
sp_tgt.load("spm_models/spm_tgt.model")

PAD = 0
BOS = 1
EOS = 2

def enc_src(t): return [BOS] + sp_src.encode(t, out_type=int) + [EOS]
def enc_tgt(t): return [BOS] + sp_tgt.encode(t, out_type=int) + [EOS]

def dec_tgt(ids):
    if EOS in ids: ids = ids[:ids.index(EOS)]
    if ids and ids[0] == BOS: ids = ids[1:]
    return sp_tgt.decode(ids)


# **7. Dataset + DataLoader**

In [7]:
class NMTDataset(Dataset):
    def __init__(self, df):
        self.src = df["src"].tolist()
        self.tgt = df["tgt"].tolist()

    def __len__(self): return len(self.src)

    def __getitem__(self, idx):
        s = enc_src(self.src[idx])
        t = enc_tgt(self.tgt[idx])
        return torch.tensor(s), torch.tensor(t[:-1]), torch.tensor(t[1:])

In [17]:
def pad_batch(seqs):
    max_len = max(len(s) for s in seqs)
    out = torch.full((len(seqs), max_len), PAD)
    for i, s in enumerate(seqs):
        out[i, :len(s)] = s
    return out

def collate(batch):
    src, tin, tout = zip(*batch)
    src_pad = pad_batch(src)
    tin_pad = pad_batch(tin)
    tout_pad = pad_batch(tout)

    lengths = torch.tensor([len(s) for s in src])
    lengths, idx = lengths.sort(descending=True)

    return src_pad[idx], lengths, tin_pad[idx], tout_pad[idx]

In [18]:
train_loader = DataLoader(NMTDataset(train_df), 64, True, collate_fn=collate)
val_loader   = DataLoader(NMTDataset(val_df),   64, False, collate_fn=collate)
test_loader  = DataLoader(NMTDataset(test_df),  64, False, collate_fn=collate)

In [8]:
def pad_batch(seqs):
    max_len = max(len(s) for s in seqs)
    out = torch.full((len(seqs), max_len), PAD)
    for i, s in enumerate(seqs):
        out[i, :len(s)] = s
    return out

def collate(batch):
    src, tin, tout = zip(*batch)
    src_pad = pad_batch(src)
    tin_pad = pad_batch(tin)
    tout_pad = pad_batch(tout)

    lengths = torch.tensor([len(s) for s in src])
    lengths, idx = lengths.sort(descending=True)

    return src_pad[idx], lengths, tin_pad[idx], tout_pad[idx]

# **========= MODELO RNN SIMPLE (Encoder–Decoder sin atención) =========**

In [9]:
class Encoder(nn.Module):
    def __init__(self, vocab, emb, hid):
        super().__init__()
        self.emb = nn.Embedding(vocab, emb, padding_idx=PAD)
        self.rnn = nn.RNN(emb, hid, batch_first=True)

    def forward(self, x, lengths):
        x = self.emb(x)
        packed = nn.utils.rnn.pack_padded_sequence(x, lengths.cpu(), batch_first=True)
        _, h = self.rnn(packed)
        return h

In [10]:
class Decoder(nn.Module):
    def __init__(self, vocab, emb, hid):
        super().__init__()
        self.emb = nn.Embedding(vocab, emb, padding_idx=PAD)
        self.rnn = nn.RNN(emb, hid, batch_first=True)
        self.fc = nn.Linear(hid, vocab)

    def forward(self, x, h):
        x = self.emb(x)
        out, h = self.rnn(x, h)
        out = self.fc(out)
        return out, h

In [11]:
class Seq2Seq(nn.Module):
    def __init__(self, enc, dec):
        super().__init__()
        self.enc = enc
        self.dec = dec

    def forward(self, src, lengths, tgt_in):
        h = self.enc(src, lengths)
        out, _ = self.dec(tgt_in, h)
        return out

    def translate(self, text, max_len=40):
        self.eval()
        with torch.no_grad():
            ids = torch.tensor([enc_src(text)], device=DEVICE)
            lengths = torch.tensor([ids.size(1)], device=DEVICE)

            h = self.enc(ids, lengths)
            cur = torch.tensor([[BOS]], device=DEVICE)

            gen = []
            for _ in range(max_len):
                out, h = self.dec(cur, h)
                next_tok = out[0, -1].argmax().item()
                if next_tok == EOS:
                    break
                gen.append(next_tok)
                cur = torch.tensor([[next_tok]], device=DEVICE)

            return dec_tgt(gen)

In [12]:
EMB = 256
HID = 384

enc = Encoder(sp_src.get_piece_size(), EMB, HID)
dec = Decoder(sp_tgt.get_piece_size(), EMB, HID)
model = Seq2Seq(enc, dec).to(DEVICE)

criterion = nn.CrossEntropyLoss(ignore_index=PAD)
optim = torch.optim.Adam(model.parameters(), lr=0.0007)

In [15]:
def train_epoch():
    model.train()
    tot = 0
    for src, lengths, tin, tout in train_loader:
        src, lengths = src.to(DEVICE), lengths.to(DEVICE)
        tin, tout = tin.to(DEVICE), tout.to(DEVICE)

        optim.zero_grad()
        pred = model(src, lengths, tin)

        loss = criterion(pred.reshape(-1, pred.size(-1)), tout.reshape(-1))
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optim.step()
        tot += loss.item()
    return tot / len(train_loader)

def val_epoch():
    model.eval()
    tot = 0
    with torch.no_grad():
        for src, lengths, tin, tout in val_loader:
            src, lengths = src.to(DEVICE), lengths.to(DEVICE)
            tin, tout = tin.to(DEVICE), tout.to(DEVICE)
            pred = model(src, lengths, tin)
            loss = criterion(pred.reshape(-1, pred.size(-1)), tout.reshape(-1))
            tot += loss.item()
    return tot / len(val_loader)

In [19]:
train_losses = []
val_losses = []

EPOCHS = 5  # o las que estés usando

for ep in range(1, EPOCHS + 1):
    tr = train_epoch()
    vl = val_epoch()
    train_losses.append(tr)
    val_losses.append(vl)
    print(f"Epoch {ep} | Train: {tr:.4f} | Val: {vl:.4f}")

Epoch 1 | Train 5.0144 | Val 4.4042
Epoch 2 | Train 4.1605 | Val 4.0751
Epoch 3 | Train 3.8588 | Val 3.9296
Epoch 4 | Train 3.6645 | Val 3.8318
Epoch 5 | Train 3.5155 | Val 3.7901
Epoch 6 | Train 3.3906 | Val 3.7376
Epoch 7 | Train 3.2816 | Val 3.7202
Epoch 8 | Train 3.1882 | Val 3.7059
Epoch 9 | Train 3.1017 | Val 3.6953
Epoch 10 | Train 3.0210 | Val 3.7053
Epoch 11 | Train 2.9444 | Val 3.7131
Epoch 12 | Train 2.8731 | Val 3.7258
Epoch 13 | Train 2.8035 | Val 3.7324
Epoch 14 | Train 2.7391 | Val 3.7489
Epoch 15 | Train 2.6755 | Val 3.7765
Epoch 16 | Train 2.6158 | Val 3.7907
Epoch 17 | Train 2.5571 | Val 3.8173
Epoch 18 | Train 2.5015 | Val 3.8446
Epoch 19 | Train 2.4466 | Val 3.8729
Epoch 20 | Train 2.3932 | Val 3.8940


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 5))
plt.plot(range(1, len(train_losses)+1), train_losses, label="Train Loss")
plt.plot(range(1, len(val_losses)+1), val_losses, label="Val Loss")
plt.xlabel("Época")
plt.ylabel("Loss")
plt.title("Evolución de la pérdida (Train vs Val)")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
tests = [
    "hola, ¿cómo estás?",
    "mañana voy a estudiar",
    "me gusta la comida francesa",
    "estoy aprendiendo modelos de traducción automática"
]

for t in tests:
    print("ES:", t)
    print("FR:", model.translate(t))
    print("-"*40)

In [None]:
hyps, refs = [], []

model.eval()
with torch.no_grad():
    for src, lengths, tin, tout in test_loader:
        src, lengths = src.to(DEVICE), lengths.to(DEVICE)

        for i in range(src.size(0)):
            ids = src[i][:lengths[i]].tolist()
            if EOS in ids:
                ids = ids[1:ids.index(EOS)]
            else:
                ids = ids[1:]
            src_text = sp_src.decode(ids)

            pred = model.translate(src_text)
            gold = sp_tgt.decode([x for x in tout[i].tolist() if x not in [PAD,BOS,EOS]])

            hyps.append(pred)
            refs.append([gold])

bleu = sacrebleu.corpus_bleu(hyps, list(zip(*refs)))
print("BLEU:", bleu.score)

# **==== MODELO LSTM (ENCODER - DECODER + ATENCIÓN BAHDANAU) ====**

In [20]:
class BahdanauAttention(nn.Module):
    def __init__(self, hidden_size):
        super().__init__()
        self.W1 = nn.Linear(hidden_size, hidden_size)
        self.W2 = nn.Linear(hidden_size, hidden_size)
        self.V  = nn.Linear(hidden_size, 1)

    def forward(self, hidden, encoder_outputs):
        # hidden: (1, B, H)
        # encoder_outputs: (B, T, H)
        hidden = hidden.permute(1, 0, 2)  # (B, 1, H)
        score = self.V(torch.tanh(
            self.W1(hidden) + self.W2(encoder_outputs)
        ))  # (B, T, 1)

        attn_weights = torch.softmax(score, dim=1)
        context = (attn_weights * encoder_outputs).sum(dim=1)
        return context, attn_weights

In [21]:
class Encoder(nn.Module):
    def __init__(self, vocab, emb, hid):
        super().__init__()
        self.emb = nn.Embedding(vocab, emb, padding_idx=PAD)
        self.lstm = nn.LSTM(emb, hid, batch_first=True)

    def forward(self, src, lengths):
        x = self.emb(src)
        packed = nn.utils.rnn.pack_padded_sequence(x, lengths.cpu(), batch_first=True)
        outputs, (h, c) = self.lstm(packed)
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
        return outputs, (h, c)

In [22]:
class Decoder(nn.Module):
    def __init__(self, vocab, emb, hid):
        super().__init__()
        self.emb = nn.Embedding(vocab, emb, padding_idx=PAD)
        self.att = BahdanauAttention(hid)
        self.lstm = nn.LSTM(emb + hid, hid, batch_first=True)
        self.fc = nn.Linear(hid, vocab)

    def forward(self, tgt_in, hidden, encoder_outputs):
        h, c = hidden
        x = self.emb(tgt_in)

        outputs = []

        for t in range(x.size(1)):
            context, _ = self.att(h, encoder_outputs)
            lstm_in = torch.cat([x[:, t:t+1, :], context.unsqueeze(1)], dim=2)
            out, (h, c) = self.lstm(lstm_in, (h, c))
            outputs.append(self.fc(out))

        return torch.cat(outputs, dim=1)

In [23]:
class Seq2Seq(nn.Module):
    def __init__(self, enc, dec):
        super().__init__()
        self.enc = enc
        self.dec = dec

    def forward(self, src, lengths, tgt_in):
        enc_out, hidden = self.enc(src, lengths)
        return self.dec(tgt_in, hidden, enc_out)

    def translate(self, text, max_len=40):
        self.eval()
        with torch.no_grad():
            src_ids = torch.tensor([enc_src(text)], device=DEVICE)
            lengths = torch.tensor([src_ids.size(1)], device=DEVICE)

            enc_out, hidden = self.enc(src_ids, lengths)

            cur = torch.tensor([[BOS]], device=DEVICE)
            gen = []

            for _ in range(max_len):
                out = self.dec(cur, hidden, enc_out)
                next_tok = out[0, -1].argmax().item()
                if next_tok == EOS:
                    break
                gen.append(next_tok)
                cur = torch.tensor([[next_tok]], device=DEVICE)

            return dec_tgt(gen)

In [24]:
EMB = 256
HID = 512

enc = Encoder(sp_src.get_piece_size(), EMB, HID)
dec = Decoder(sp_tgt.get_piece_size(), EMB, HID)
model = Seq2Seq(enc, dec).to(DEVICE)

criterion = nn.CrossEntropyLoss(ignore_index=PAD)
opt = torch.optim.Adam(model.parameters(), lr=0.0005)

In [25]:
def train_epoch():
    model.train()
    tot = 0
    for src, lengths, tin, tout in train_loader:
        src, lengths = src.to(DEVICE), lengths.to(DEVICE)
        tin, tout = tin.to(DEVICE), tout.to(DEVICE)

        opt.zero_grad()
        pred = model(src, lengths, tin)

        loss = criterion(pred.reshape(-1, pred.size(-1)), tout.reshape(-1))
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt.step()
        tot += loss.item()
    return tot / len(train_loader)

def val_epoch():
    model.eval()
    tot = 0
    with torch.no_grad():
        for src, lengths, tin, tout in val_loader:
            src, lengths = src.to(DEVICE), lengths.to(DEVICE)
            tin, tout = tin.to(DEVICE), tout.to(DEVICE)
            pred = model(src, lengths, tin)
            loss = criterion(pred.reshape(-1, pred.size(-1)), tout.reshape(-1))
            tot += loss.item()
    return tot / len(val_loader)

In [27]:
train_losses = []
val_losses = []

EPOCHS = 15  # o las que estés usando

for ep in range(1, EPOCHS + 1):
    tr = train_epoch()
    vl = val_epoch()
    train_losses.append(tr)
    val_losses.append(vl)
    print(f"Epoch {ep} | Train: {tr:.4f} | Val: {vl:.4f}")


Epoch 1 | Train 5.0879 | Val 4.5571
Epoch 2 | Train 4.2391 | Val 4.0505
Epoch 3 | Train 3.7880 | Val 3.7408
Epoch 4 | Train 3.4574 | Val 3.5559
Epoch 5 | Train 3.1914 | Val 3.4038
Epoch 6 | Train 2.9581 | Val 3.2908
Epoch 7 | Train 2.7396 | Val 3.2198
Epoch 8 | Train 2.5450 | Val 3.1727


KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 5))
plt.plot(range(1, len(train_losses)+1), train_losses, label="Train Loss")
plt.plot(range(1, len(val_losses)+1), val_losses, label="Val Loss")
plt.xlabel("Época")
plt.ylabel("Loss")
plt.title("Evolución de la pérdida (Train vs Val)")
plt.legend()
plt.grid(True)
plt.show()

In [28]:
tests = [
    "hola, ¿cómo estás?",
    "mañana voy a estudiar en la biblioteca",
    "me gusta la comida francesa",
    "estoy aprendiendo modelos de traducción automática"
]

for t in tests:
    print("ES:", t)
    print("FR:", model.translate(t))
    print("-"*40)

ES: hola, ¿cómo estás?
FR: la pauvre enfant, la pauvre enfant, la pauvre enfant, la pauvre enfant, la pauvre enfant, la pauvre enfant, la pauvre enfant, la pauvre enfant, la pauvre enfant, la pauvre enfant,
----------------------------------------
ES: mañana voy a estudiar en la biblioteca
FR: -- je vais demain je vais demain je vais demain je vais demain je vais demain je vais demain je vais demain je vais demain je vais demain je vais demain je vais demain je vais demain je vais demain
----------------------------------------
ES: me gusta la comida francesa
FR: je suis la température je suis la température je suis la température je suis la température je suis la température je suis la température je suis la température je suis la température je suis la température je suis la température
----------------------------------------
ES: estoy aprendiendo modelos de traducción automática
FR: je suis je suis je suis je suis je suis je suis je suis je suis je suis je suis je suis je suis je s

In [29]:
hyps, refs = [], []

with torch.no_grad():
    for src, lengths, tin, tout in test_loader:
        src, lengths = src.to(DEVICE), lengths.to(DEVICE)
        for i in range(src.size(0)):
            ids = src[i][:lengths[i]].tolist()
            if EOS in ids:
                ids = ids[1:ids.index(EOS)]
            else:
                ids = ids[1:]

            src_txt = sp_src.decode(ids)
            hyp = model.translate(src_txt)
            gold = sp_tgt.decode([x for x in tout[i].tolist()
                                  if x not in [PAD, BOS, EOS]])

            hyps.append(hyp)
            refs.append([gold])

bleu = sacrebleu.corpus_bleu(hyps, list(zip(*refs)))
print("BLEU:", bleu.score)

BLEU: 0.09789450333061166


# **====== MODELO GRU (ENCODER - DECODER + ATENCIÓN LUONG) ======**

In [30]:
class LuongAttention(nn.Module):
    def __init__(self, hidden):
        super().__init__()
        self.hidden = hidden

    def forward(self, hidden, encoder_outputs):
        # hidden: (1, B, H)
        # encoder_outputs: (B, T, H)
        hidden = hidden.permute(1, 0, 2)   # (B, 1, H)

        # dot score
        scores = torch.bmm(encoder_outputs, hidden.transpose(1, 2))  # (B, T, 1)

        attn_weights = torch.softmax(scores, dim=1)  # (B, T, 1)
        context = torch.sum(attn_weights * encoder_outputs, dim=1)  # (B, H)

        return context, attn_weights

In [31]:
class Encoder(nn.Module):
    def __init__(self, vocab, emb, hid):
        super().__init__()
        self.emb = nn.Embedding(vocab, emb, padding_idx=PAD)
        self.gru = nn.GRU(emb, hid, batch_first=True)

    def forward(self, src, lengths):
        x = self.emb(src)
        packed = nn.utils.rnn.pack_padded_sequence(x, lengths.cpu(), batch_first=True)
        outputs, h = self.gru(packed)
        outputs, _ = nn.utils.rnn.pad_packed_sequence(outputs, batch_first=True)
        return outputs, h

In [32]:
class Decoder(nn.Module):
    def __init__(self, vocab, emb, hid):
        super().__init__()
        self.emb = nn.Embedding(vocab, emb, padding_idx=PAD)
        self.att = LuongAttention(hid)
        self.gru = nn.GRU(emb + hid, hid, batch_first=True)
        self.fc  = nn.Linear(hid, vocab)

    def forward(self, tgt_in, h, encoder_outputs):
        x = self.emb(tgt_in)

        outputs = []
        for t in range(x.size(1)):
            context, _ = self.att(h, encoder_outputs)
            gru_in = torch.cat([x[:, t:t+1, :], context.unsqueeze(1)], dim=2)
            out, h = self.gru(gru_in, h)
            outputs.append(self.fc(out))
        return torch.cat(outputs, dim=1)

In [33]:
class Seq2Seq(nn.Module):
    def __init__(self, enc, dec):
        super().__init__()
        self.enc = enc
        self.dec = dec

    def forward(self, src, lengths, tgt_in):
        enc_out, h = self.enc(src, lengths)
        return self.dec(tgt_in, h, enc_out)

    def translate(self, text, max_len=40):
        self.eval()
        with torch.no_grad():
            ids = torch.tensor([enc_src(text)], device=DEVICE)
            lengths = torch.tensor([ids.size(1)], device=DEVICE)

            enc_out, h = self.enc(ids, lengths)
            cur = torch.tensor([[BOS]], device=DEVICE)
            gen = []

            for _ in range(max_len):
                out = self.dec(cur, h, enc_out)
                nxt = out[0, -1].argmax().item()
                if nxt == EOS: break
                gen.append(nxt)
                cur = torch.tensor([[nxt]], device=DEVICE)

            return dec_tgt(gen)

In [34]:
EMB = 256
HID = 512

enc = Encoder(sp_src.get_piece_size(), EMB, HID)
dec = Decoder(sp_tgt.get_piece_size(), EMB, HID)
model = Seq2Seq(enc, dec).to(DEVICE)

criterion = nn.CrossEntropyLoss(ignore_index=PAD)
opt = torch.optim.Adam(model.parameters(), lr=0.0005)

In [35]:
def train_epoch():
    model.train()
    total = 0
    for src, lengths, tin, tout in train_loader:
        src, lengths = src.to(DEVICE), lengths.to(DEVICE)
        tin, tout = tin.to(DEVICE), tout.to(DEVICE)

        opt.zero_grad()
        pred = model(src, lengths, tin)
        loss = criterion(pred.reshape(-1, pred.size(-1)), tout.reshape(-1))
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        opt.step()

        total += loss.item()
    return total / len(train_loader)

def val_epoch():
    model.eval()
    total = 0
    with torch.no_grad():
        for src, lengths, tin, tout in val_loader:
            src, lengths = src.to(DEVICE), lengths.to(DEVICE)
            tin, tout = tin.to(DEVICE), tout.to(DEVICE)
            pred = model(src, lengths, tin)
            loss = criterion(pred.reshape(-1, pred.size(-1)), tout.reshape(-1))
            total += loss.item()
    return total / len(val_loader)

In [36]:
train_losses = []
val_losses = []

EPOCHS = 15  # o las que estés usando

for ep in range(1, EPOCHS + 1):
    tr = train_epoch()
    vl = val_epoch()
    train_losses.append(tr)
    val_losses.append(vl)
    print(f"Epoch {ep} | Train: {tr:.4f} | Val: {vl:.4f}")

Epoch 1 | Train 5.0673 | Val 4.3943
Epoch 2 | Train 4.0636 | Val 3.9005
Epoch 3 | Train 3.6149 | Val 3.6591
Epoch 4 | Train 3.2950 | Val 3.5125
Epoch 5 | Train 3.0295 | Val 3.4302
Epoch 6 | Train 2.7867 | Val 3.3902
Epoch 7 | Train 2.5598 | Val 3.3754
Epoch 8 | Train 2.3420 | Val 3.3834
Epoch 9 | Train 2.1375 | Val 3.4142
Epoch 10 | Train 1.9422 | Val 3.4659
Epoch 11 | Train 1.7594 | Val 3.5332
Epoch 12 | Train 1.5878 | Val 3.5930


In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 5))
plt.plot(range(1, len(train_losses)+1), train_losses, label="Train Loss")
plt.plot(range(1, len(val_losses)+1), val_losses, label="Val Loss")
plt.xlabel("Época")
plt.ylabel("Loss")
plt.title("Evolución de la pérdida (Train vs Val)")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
tests = [
    "hola, ¿cómo estás?",
    "mañana voy a estudiar en la biblioteca",
    "me gusta la comida francesa",
    "estoy aprendiendo modelos de traducción automática"
]

for t in tests:
    print("ES:", t)
    print("FR:", model.translate(t))
    print("-"*40)

In [None]:
hyps, refs = [], []

with torch.no_grad():
    for src, lengths, tin, tout in test_loader:
        src, lengths = src.to(DEVICE), lengths.to(DEVICE)
        for i in range(src.size(0)):
            ids = src[i][:lengths[i]].tolist()
            if EOS in ids:
                ids = ids[1:ids.index(EOS)]
            else:
                ids = ids[1:]

            src_txt = sp_src.decode(ids)
            hyp = model.translate(src_txt)
            gold = sp_tgt.decode([x for x in tout[i].tolist() if x not in [PAD,BOS,EOS]])

            hyps.append(hyp)
            refs.append([gold])

bleu = sacrebleu.corpus_bleu(hyps, list(zip(*refs)))
print("BLEU:", bleu.score)

# **========= MODELO TRANSFORMER (MINI TRANSFORMER) =========**

In [None]:
import math

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, max_len=5000):
        super().__init__()
        pe = torch.zeros(max_len, d_model)  # (max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)  # (max_len, 1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))

        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)

        pe = pe.unsqueeze(1)  # (max_len, 1, d_model)
        self.register_buffer("pe", pe)

    def forward(self, x):
        # x: (seq_len, batch, d_model)
        seq_len = x.size(0)
        return x + self.pe[:seq_len]

In [None]:
class TransformerNMT(nn.Module):
    def __init__(
        self,
        src_vocab_size,
        tgt_vocab_size,
        d_model=256,
        nhead=4,
        num_encoder_layers=2,
        num_decoder_layers=2,
        dim_feedforward=512,
        dropout=0.1,
    ):
        super().__init__()
        self.d_model = d_model

        self.src_embed = nn.Embedding(src_vocab_size, d_model, padding_idx=PAD)
        self.tgt_embed = nn.Embedding(tgt_vocab_size, d_model, padding_idx=PAD)

        self.pos_encoder = PositionalEncoding(d_model)
        self.pos_decoder = PositionalEncoding(d_model)

        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward,
            dropout=dropout,
            batch_first=False  # trabajamos como (S, B, E)
        )

        self.fc_out = nn.Linear(d_model, tgt_vocab_size)

    def make_src_key_padding_mask(self, src):
        # src: (B, S)
        return (src == PAD)  # (B, S) True donde hay PAD

    def make_tgt_key_padding_mask(self, tgt):
        # tgt: (B, T)
        return (tgt == PAD)

    def make_tgt_subsequent_mask(self, size):
        # Máscara triangular inferior para impedir ver el futuro
        mask = torch.triu(torch.ones(size, size) == 1, diagonal=1)
        # True donde se debe enmascarar
        return mask  # (T, T) bool

    def forward(self, src, tgt_in):
        # src: (B, S), tgt_in: (B, T)
        src_key_padding_mask = self.make_src_key_padding_mask(src)  # (B, S)
        tgt_key_padding_mask = self.make_tgt_key_padding_mask(tgt_in)  # (B, T)
        tgt_mask = self.make_tgt_subsequent_mask(tgt_in.size(1)).to(src.device)  # (T, T)

        # Embedding + pos encoding
        src_emb = self.src_embed(src) * math.sqrt(self.d_model)  # (B, S, E)
        tgt_emb = self.tgt_embed(tgt_in) * math.sqrt(self.d_model)  # (B, T, E)

        # Pasar a (S, B, E)
        src_emb = src_emb.transpose(0, 1)  # (S, B, E)
        tgt_emb = tgt_emb.transpose(0, 1)  # (T, B, E)

        src_emb = self.pos_encoder(src_emb)
        tgt_emb = self.pos_decoder(tgt_emb)

        output = self.transformer(
            src=src_emb,
            tgt=tgt_emb,
            tgt_mask=tgt_mask,
            src_key_padding_mask=src_key_padding_mask,
            tgt_key_padding_mask=tgt_key_padding_mask,
            memory_key_padding_mask=src_key_padding_mask,
        )  # (T, B, E)

        output = output.transpose(0, 1)  # (B, T, E)
        logits = self.fc_out(output)     # (B, T, vocab_tgt)

        return logits

    def translate(self, text, max_len=40):
        self.eval()
        with torch.no_grad():
            src_ids = torch.tensor([enc_src(text)], device=DEVICE)  # (1, S)
            src_key_padding_mask = self.make_src_key_padding_mask(src_ids)
            src_emb = self.src_embed(src_ids) * math.sqrt(self.d_model)
            src_emb = src_emb.transpose(0, 1)  # (S, 1, E)
            src_emb = self.pos_encoder(src_emb)

            memory = self.transformer.encoder(
                src_emb,
                src_key_padding_mask=src_key_padding_mask
            )  # (S, 1, E)

            # Decoding autoregresivo
            generated = [BOS]
            for _ in range(max_len):
                tgt_in = torch.tensor([generated], device=DEVICE)  # (1, len)
                tgt_emb = self.tgt_embed(tgt_in) * math.sqrt(self.d_model)
                tgt_emb = tgt_emb.transpose(0, 1)  # (T, 1, E)
                tgt_emb = self.pos_decoder(tgt_emb)

                tgt_mask = self.make_tgt_subsequent_mask(tgt_in.size(1)).to(DEVICE)
                tgt_key_padding_mask = self.make_tgt_key_padding_mask(tgt_in)

                out = self.transformer.decoder(
                    tgt_emb,
                    memory,
                    tgt_mask=tgt_mask,
                    tgt_key_padding_mask=tgt_key_padding_mask,
                    memory_key_padding_mask=src_key_padding_mask,
                )  # (T, 1, E)

                logits = self.fc_out(out[-1])  # (1, vocab)
                next_token = logits.argmax(-1).item()

                if next_token == EOS:
                    break
                generated.append(next_token)

            return dec_tgt(generated)

In [None]:
d_model = 256
nhead = 4
num_enc_layers = 2
num_dec_layers = 2
ff_dim = 512
dropout = 0.1

model = TransformerNMT(
    src_vocab_size=sp_src.get_piece_size(),
    tgt_vocab_size=sp_tgt.get_piece_size(),
    d_model=d_model,
    nhead=nhead,
    num_encoder_layers=num_enc_layers,
    num_decoder_layers=num_dec_layers,
    dim_feedforward=ff_dim,
    dropout=dropout
).to(DEVICE)

criterion = nn.CrossEntropyLoss(ignore_index=PAD)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005)

print("Modelo listo. Parámetros entrenables:",
      sum(p.numel() for p in model.parameters() if p.requires_grad))

In [None]:
def train_epoch():
    model.train()
    total = 0
    for src, tgt_in, tgt_out in train_loader:
        src = src.to(DEVICE)
        tgt_in = tgt_in.to(DEVICE)
        tgt_out = tgt_out.to(DEVICE)

        optimizer.zero_grad()
        logits = model(src, tgt_in)  # (B, T, vocab)

        loss = criterion(
            logits.reshape(-1, logits.size(-1)),
            tgt_out.reshape(-1)
        )

        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)
        optimizer.step()

        total += loss.item()
    return total / len(train_loader)

def eval_epoch():
    model.eval()
    total = 0
    with torch.no_grad():
        for src, tgt_in, tgt_out in val_loader:
            src = src.to(DEVICE)
            tgt_in = tgt_in.to(DEVICE)
            tgt_out = tgt_out.to(DEVICE)

            logits = model(src, tgt_in)
            loss = criterion(
                logits.reshape(-1, logits.size(-1)),
                tgt_out.reshape(-1)
            )
            total += loss.item()
    return total / len(val_loader)

In [None]:
train_losses = []
val_losses = []

EPOCHS = 15  # o las que estés usando

for ep in range(1, EPOCHS + 1):
    tr = train_epoch()
    vl = val_epoch()
    train_losses.append(tr)
    val_losses.append(vl)
    print(f"Epoch {ep} | Train: {tr:.4f} | Val: {vl:.4f}")

In [None]:
import matplotlib.pyplot as plt

plt.figure(figsize=(8, 5))
plt.plot(range(1, len(train_losses)+1), train_losses, label="Train Loss")
plt.plot(range(1, len(val_losses)+1), val_losses, label="Val Loss")
plt.xlabel("Época")
plt.ylabel("Loss")
plt.title("Evolución de la pérdida (Train vs Val)")
plt.legend()
plt.grid(True)
plt.show()

In [None]:
tests = [
    "hola, ¿cómo estás?",
    "nos vemos mañana por la mañana",
    "me gusta la comida francesa",
    "estoy aprendiendo modelos de traducción automática",
    "el libro está sobre la mesa",
]

for t in tests:
    print("ES:", t)
    print("FR:", model.translate(t))
    print("-" * 40)

In [None]:
model.eval()
hyps, refs = [], []

with torch.no_grad():
    for src, tgt_in, tgt_out in test_loader:
        src = src.to(DEVICE)
        for i in range(src.size(0)):
            src_ids = src[i].tolist()
            # cortar en EOS y quitar BOS
            if EOS in src_ids:
                src_ids = src_ids[1:src_ids.index(EOS)]
            else:
                src_ids = src_ids[1:]
            src_text = sp_src.decode(src_ids)

            hyp = model.translate(src_text)

            # referencia (tgt_out está ya desplazado, quitamos PAD/BOS/EOS)
            tgt_ids = tgt_out[i].tolist()
            ref_ids = [x for x in tgt_ids if x not in [PAD, BOS, EOS]]
            ref_text = sp_tgt.decode(ref_ids)

            hyps.append(hyp)
            refs.append([ref_text])

bleu = sacrebleu.corpus_bleu(hyps, list(zip(*refs)))
print("BLEU:", bleu.score)

# **COMPARACIÓN GRÁFICA DEL SCORE BLEU DE TODOS LOS MODELOS**

In [37]:
!pip install nltk
import nltk
nltk.download('punkt')
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
smooth = SmoothingFunction().method1



[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [38]:
import pandas as pd

df_test = df.sample(200, random_state=42)  # 200 frases de prueba
src_test = df_test["src"].tolist()
tgt_test = df_test["tgt"].tolist()  # referencias reales


In [39]:
def compute_bleu_model(translate_fn, src_list, tgt_list):
    scores = []
    for s, t in zip(src_list, tgt_list):
        pred = translate_fn(s)  # Traducción producida por ese modelo
        reference = t.split()
        hypothesis = pred.split()

        score = sentence_bleu(
            [reference],
            hypothesis,
            smoothing_function=smooth
        )
        scores.append(score)
    return sum(scores) / len(scores)


In [40]:
bleu_rnn        = compute_bleu_model(translate_rnn, src_test, tgt_test)
bleu_lstm       = compute_bleu_model(translate_lstm, src_test, tgt_test)
bleu_gru        = compute_bleu_model(translate_gru, src_test, tgt_test)
bleu_transform  = compute_bleu_model(translate_transformer, src_test, tgt_test)


NameError: name 'translate_rnn' is not defined

In [None]:
print("===== RESULTADOS BLEU =====")
print(f"RNN Simple:           {bleu_rnn:.4f}")
print(f"LSTM + Atención:      {bleu_lstm:.4f}")
print(f"GRU + Atención:       {bleu_gru:.4f}")
print(f"Transformer:          {bleu_transform:.4f}")


In [None]:
import matplotlib.pyplot as plt

modelos = ["RNN", "LSTM + Att", "GRU + Att", "Transformer"]
bleus = [bleu_rnn, bleu_lstm, bleu_gru, bleu_transform]

plt.figure(figsize=(8,5))
plt.bar(modelos, bleus, color=["#4a6cff","#00b48a","#ffaf40","#ff637d"])
plt.ylabel("BLEU Score")
plt.title("Comparación del BLEU por Modelo")
for i, v in enumerate(bleus):
    plt.text(i, v + 0.002, f"{v:.3f}", ha="center", fontsize=12)
plt.ylim(0, max(bleus) + 0.01)
plt.grid(axis="y", linestyle="--", alpha=0.4)
plt.show()
