In [1]:
pip install nltk




In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from collections import Counter
import re
import numpy as np
import random
from sklearn.model_selection import train_test_split
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
import nltk

nltk.download('punkt')

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [3]:
def read_data(file_path):
    with open(file_path, encoding='utf-8') as f:
        lines = f.read().strip().split('\n')
    pairs = [line.split('\t') for line in lines]
    return pairs

def tokenize(text):
    text = text.lower()
    text = re.sub(r"[^a-zA-Zñáéíóúüç¿¡ ]+", "", text)
    return text.strip().split()


In [4]:
class Vocab:
    def __init__(self, sentences, min_freq=2):
        self.freq = Counter()
        for sentence in sentences:
            self.freq.update(sentence)

        self.pad = '<pad>'
        self.sos = '<sos>'
        self.eos = '<eos>'
        self.unk = '<unk>'

        self.itos = [self.pad, self.sos, self.eos, self.unk] + [w for w, c in self.freq.items() if c >= min_freq]
        self.stoi = {w: i for i, w in enumerate(self.itos)}

    def numericalize(self, tokens):
        return [self.stoi.get(token, self.stoi[self.unk]) for token in tokens]

    def denumericalize(self, indices):
        return [self.itos[i] for i in indices if i not in (self.stoi[self.pad],)]

    def __len__(self):
        return len(self.itos)


In [5]:
class TranslationDataset(Dataset):
    def __init__(self, pairs, src_vocab, trg_vocab):
        self.data = []
        for src, trg in pairs:
            src_tokens = tokenize(src)
            trg_tokens = tokenize(trg)
            src_ids = src_vocab.numericalize(src_tokens)
            trg_ids = [trg_vocab.stoi['<sos>']] + trg_vocab.numericalize(trg_tokens) + [trg_vocab.stoi['<eos>']]
            self.data.append((src_ids, trg_ids))

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx]

def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_max_len = max(len(x) for x in src_batch)
    trg_max_len = max(len(x) for x in trg_batch)

    src_batch_padded = [x + [src_vocab.stoi['<pad>']] * (src_max_len - len(x)) for x in src_batch]
    trg_batch_padded = [x + [trg_vocab.stoi['<pad>']] * (trg_max_len - len(x)) for x in trg_batch]

    return torch.tensor(src_batch_padded), torch.tensor(trg_batch_padded)


In [6]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hid_dim, batch_first=True)

    def forward(self, src):
        embedded = self.embedding(src)
        outputs, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.lstm = nn.LSTM(emb_dim, hid_dim, batch_first=True)
        self.fc = nn.Linear(hid_dim, output_dim)

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(1)
        embedded = self.embedding(input)
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        prediction = self.fc(output.squeeze(1))
        return prediction, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, trg_pad_idx):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.trg_pad_idx = trg_pad_idx

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size, trg_len = trg.shape
        trg_vocab_size = self.decoder.fc.out_features
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(device)

        hidden, cell = self.encoder(src)
        input = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            input = trg[:, t] if teacher_force else output.argmax(1)

        return outputs


In [7]:
def train(model, iterator, optimizer, criterion, clip=1):
    model.train()
    total_loss = 0

    for src, trg in iterator:
        src, trg = src.to(device), trg.to(device)
        optimizer.zero_grad()

        output = model(src, trg)
        output_dim = output.shape[-1]

        output = output[:, 1:].reshape(-1, output_dim)
        trg = trg[:, 1:].reshape(-1)

        loss = criterion(output, trg)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        total_loss += loss.item()

    return total_loss / len(iterator)


In [8]:
def evaluate_bleu(model, dataset, src_vocab, trg_vocab, max_len=20):
    model.eval()
    smoothie = SmoothingFunction().method4
    total_score = 0
    count = 0

    with torch.no_grad():
        for src, trg in dataset:
            src_tensor = torch.tensor([src]).to(device)
            hidden, cell = model.encoder(src_tensor)

            input_token = torch.tensor([trg_vocab.stoi['<sos>']]).to(device)
            result = []

            for _ in range(max_len):
                output, hidden, cell = model.decoder(input_token, hidden, cell)
                top1 = output.argmax(1)
                if top1.item() == trg_vocab.stoi['<eos>']:
                    break
                result.append(top1.item())
                input_token = top1

            pred_tokens = trg_vocab.denumericalize(result)
            reference_tokens = trg_vocab.denumericalize(trg[1:-1])  # Remove <sos> and <eos>

            score = sentence_bleu([reference_tokens], pred_tokens, smoothing_function=smoothie, weights=(0.5, 0.5))
            total_score += score
            count += 1

    return total_score / count


In [9]:
# Load and prepare data
raw_pairs = read_data("/content/spa.txt")  # <-- replace with your filename
pairs = raw_pairs  # Don't tokenize yet!

train_pairs, val_pairs = train_test_split(pairs, test_size=0.1)

src_vocab = Vocab([tokenize(src) for src, _ in train_pairs])
trg_vocab = Vocab([tokenize(trg) for _, trg in train_pairs])

train_dataset = TranslationDataset(train_pairs, src_vocab, trg_vocab)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)


# Model parameters
INPUT_DIM = len(src_vocab)
OUTPUT_DIM = len(trg_vocab)
EMB_DIM = 256
HID_DIM = 512

enc = Encoder(INPUT_DIM, EMB_DIM, HID_DIM)
dec = Decoder(OUTPUT_DIM, EMB_DIM, HID_DIM)
model = Seq2Seq(enc, dec, trg_pad_idx=trg_vocab.stoi['<pad>']).to(device)

optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss(ignore_index=trg_vocab.stoi['<pad>'])

# Training loop
for epoch in range(10):
    loss = train(model, train_loader, optimizer, criterion)
    print(f"Epoch {epoch+1}, Loss: {loss:.4f}")



Epoch 1, Loss: 4.4226
Epoch 2, Loss: 2.8949
Epoch 3, Loss: 2.2303
Epoch 4, Loss: 1.8335
Epoch 5, Loss: 1.5659
Epoch 6, Loss: 1.3702
Epoch 7, Loss: 1.2256
Epoch 8, Loss: 1.1061
Epoch 9, Loss: 1.0096
Epoch 10, Loss: 0.9267


In [30]:
for epoch in range(10):
    loss = train(model, train_loader, optimizer, criterion)
    print(f"Epoch {epoch+1}, Loss: {loss:.4f}")

Epoch 1, Loss: 0.4339
Epoch 2, Loss: 0.4267
Epoch 3, Loss: 0.4226
Epoch 4, Loss: 0.4191
Epoch 5, Loss: 0.4189
Epoch 6, Loss: 0.4167
Epoch 7, Loss: 0.4217
Epoch 8, Loss: 0.4155
Epoch 9, Loss: 0.4159
Epoch 10, Loss: 0.4022


In [31]:
# Evaluate BLEU on a small subset
sample_dataset = [train_dataset[i] for i in range(100)]
bleu = evaluate_bleu(model, sample_dataset, src_vocab, trg_vocab)
print(f"BLEU score: {bleu * 100:.2f}")

BLEU score: 72.17


In [32]:
def translate_sentence(sentence, model, src_vocab, trg_vocab, max_len=20):
    model.eval()
    tokens = tokenize(sentence)
    numericalized = src_vocab.numericalize(tokens)
    tensor = torch.tensor([numericalized]).to(device)

    with torch.no_grad():
        hidden, cell = model.encoder(tensor)

    input_token = torch.tensor([trg_vocab.stoi['<sos>']]).to(device)
    output_indices = []

    for _ in range(max_len):
        with torch.no_grad():
            output, hidden, cell = model.decoder(input_token, hidden, cell)
        top1 = output.argmax(1)
        if top1.item() == trg_vocab.stoi['<eos>']:
            break
        output_indices.append(top1.item())
        input_token = top1

    translation = trg_vocab.denumericalize(output_indices)
    return " ".join(translation)


In [38]:
example = "hello!"
translation = translate_sentence(example, model, src_vocab, trg_vocab)
print(f"ENGLISH: {example}")
print(f"SPANISH: {translation}")


ENGLISH: hello!
SPANISH: hola
