In [18]:
import torch 
import torch.nn as nn
import numpy as np

In [19]:
if torch.cuda.is_available():
    device = "cuda"
    print(torch.cuda.device_count())
elif torch.backends.mps.is_available():
    device = "mps"
else:
    device = "cpu"
device

1


'cuda'

In [20]:
data = []
filename = "/kaggle/input/eng-spa/spa.txt"
with open(filename, "r") as f:
    for line in f:
        data.append(line.strip())

In [21]:
cleaned_data = [line.replace("¡", "").replace("¿", "") for line in data]
pairs = [line.split("\t") for line in cleaned_data]

In [22]:
np.random.shuffle(pairs)
eng_sentences, es_sentences = zip(*pairs)
for i in range(3):
    print(eng_sentences[i], "==>" ,es_sentences[i])

Turn off the gas. ==> Cierre el gas.
Fifty families live in this tiny village. ==> Cincuenta familias viven en este pequeño pueblo.
You surprised everybody. ==> Los sorprendiste a todos.


In [23]:
from transformers import AutoTokenizer, AutoModel
tokenizer = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-en-es")
vocab_size = tokenizer.vocab_size
vocab_size



65001

In [24]:
# max_len = 50
# def encode_with_pretrained2(sentence, add_sos_and_eos=False):
    
#     texts = [f"<s> {s} </s>" if add_sos_and_eos else s for s in sentence]
#     encodings = tokenizer(
#         texts,
#         padding=True,
#         truncation=True,
#         max_length=max_len,
#         return_tensors="pt"
#     )
#     return encodings

In [25]:
from torch.utils.data import DataLoader, Dataset

class TranslationDataset(Dataset):
    def __init__(self, src_sentences, tgt_sentences, tokenizer, max_len=50):
        self.src_sentences = src_sentences
        self.tgt_sentences = tgt_sentences
        self.tokenizer = tokenizer
        self.max_len = max_len

    def __len__(self):
        return len(self.src_sentences)

    def __getitem__(self,idx):
        src = self.src_sentences[idx]
        tgt = f"<s> {self.tgt_sentences[idx]} </s>"

        src_enc = self.tokenizer(
            src,
            padding="max_length",
            truncation=True,
            max_length=self.max_len,
            return_tensors="pt")
        tgt_enc = self.tokenizer(
            tgt,
            padding="max_length",
            truncation=True,
            max_length=self.max_len,
            return_tensors="pt")

        decoder_input_ids = tgt_enc["input_ids"][:,:-1].squeeze(0)
        labels = tgt_enc["input_ids"][:,1:].squeeze(0)

        return {
            "encoder_input_ids":src_enc["input_ids"].squeeze(0),
            "encoder_attention_mask":src_enc["attention_mask"].squeeze(0),
            "decoder_input_ids":decoder_input_ids,
            "labels":labels
        } 

In [26]:
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence

class Encoder(nn.Module):
    def __init__(self, pretrained_embed, n_hidden=2, hidden_size=64,dropout=0.2):
        super().__init__()
        weights = pretrained_embed.weight.data
        self.embed = nn.Embedding.from_pretrained(weights,freeze=True)
        embed_size = weights.shape[-1]
        self.gru = nn.GRU(embed_size, hidden_size, num_layers=n_hidden,
                         batch_first=True, dropout=dropout, bidirectional=True)
        self.n_hidden = n_hidden

    def forward(self, input_ids, attention_mask):
        embeddings = self.embed(input_ids)
        lengths = attention_mask.sum(dim=1)
        packed = pack_padded_sequence(embeddings,
                                     lengths = lengths.cpu(),
                                     batch_first=True,
                                     enforce_sorted=False)
        outputs, hidden = self.gru(packed)
        outputs, _ = pad_packed_sequence(outputs, batch_first=True, total_length=input_ids.size(1))
        #########################
        batch_size = hidden.size(1)
        hidden = hidden.view(self.n_hidden, 2, batch_size, -1)
        hidden = torch.cat([hidden[:, 0, :, :], hidden[:, 1, :, :]], dim=2)
       
       
        return outputs, hidden    

In [27]:
class Decoder(nn.Module):
    def __init__(self, pretrained_embed, n_hidden=2, hidden_size=64):
        super().__init__()
        weights = pretrained_embed.weight.data
        self.embed = nn.Embedding.from_pretrained(weights,freeze=True)
        embed_size = weights.shape[-1]
        self.gru = nn.GRU(embed_size, hidden_size*2,num_layers=n_hidden,
                          batch_first=True)
        self.output = nn.Linear(hidden_size*2, weights.shape[0])

    def forward(self, input_ids, hidden):
        embeddings = self.embed(input_ids)
        outputs, hidden = self.gru(embeddings, hidden)
        logits = self.output(outputs)
        return logits, hidden       

In [28]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()   
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src_ids, src_mask, tgt_ids):
        enc_outputs, enc_hidden = self.encoder(src_ids, src_mask)
        logits, _ = self.decoder(tgt_ids, enc_hidden)
        return logits

In [29]:
import transformers

pretrained_model = AutoModel.from_pretrained("Helsinki-NLP/opus-mt-en-es")

In [30]:
from sklearn.model_selection import train_test_split

eng_train, eng_valid, es_train, es_valid = train_test_split(
    eng_sentences, es_sentences, test_size = 0.20
)

batch_size = 32

train_dataset = TranslationDataset(eng_train, es_train, tokenizer)
valid_dataset = TranslationDataset(eng_valid, es_valid, tokenizer)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
valid_loader = DataLoader(valid_dataset, batch_size=batch_size)

In [31]:
import torchmetrics

def evaluate_tm(model, data_loader, metric, vocab_size):
    model.eval()
    metric.reset()
    with torch.no_grad():
        for batch in data_loader:
            src_ids = batch["encoder_input_ids"].to(device)
            src_mask = batch["encoder_attention_mask"].to(device)
            tgt_ids = batch["decoder_input_ids"].to(device)
            labels = batch["labels"].to(device)

            y_pred = model(src_ids, src_mask, tgt_ids)
            metric.update(y_pred.view(-1,vocab_size), labels.view(-1))
    return metric.compute()
            
def train(model, optimizer, criterion, metric, train_loader, valid_loader, n_epochs, vocab_size):
    history = {"train_losses":[],"train_metrics":[],"valid_metrics":[]}
    for epoch in range(n_epochs):
        total_loss = 0
        metric.reset()
        model.train()
        for idx, batch in enumerate(train_loader):
            src_ids = batch["encoder_input_ids"].to(device)
            src_mask = batch["encoder_attention_mask"].to(device)
            tgt_ids = batch["decoder_input_ids"].to(device)
            labels = batch["labels"].to(device)

            y_pred = model(src_ids, src_mask, tgt_ids)
            loss = criterion(y_pred.view(-1,vocab_size), labels.view(-1))
            total_loss += loss.item()
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            metric.update(y_pred.view(-1,vocab_size), labels.view(-1))
            print(f"\rBatch {idx+1}/{len(train_loader)}", end="")
            print(f", loss ={total_loss/(idx+1 ):.4f} ", end="")
        mean_loss = total_loss / len(train_loader)
        history["train_losses"].append(mean_loss)
        history["train_metrics"].append(metric.compute().item())
        val_metric = evaluate_tm(model, valid_loader, metric,vocab_size).item()
        history["valid_metrics"].append(val_metric)
        print(f"Epoch:{epoch+1}/{n_epochs}, "
             f"Train Loss: {history['train_losses'][-1]:.4f}, "
             f"Train Metric: {history['train_metrics'][-1]:.4f}%, "
             f"Valid Metric: {history['valid_metrics'][-1]:.4f}%")
    return history

In [32]:
encoder = Encoder(pretrained_model.get_input_embeddings())
decoder = Decoder(pretrained_model.get_input_embeddings())
nmt_model = Seq2Seq(encoder, decoder).to(device)

optimizer = torch.optim.NAdam(nmt_model.parameters())
xentropy = nn.CrossEntropyLoss(ignore_index=tokenizer.pad_token_id)
accuracy = torchmetrics.Accuracy(task="multiclass",num_classes = vocab_size).to(device)
n_epochs=20

history = train(nmt_model, optimizer, xentropy, accuracy, train_loader, valid_loader, n_epochs, vocab_size)

Batch 2975/2975, loss =2.8201 Epoch:1/20, Train Loss: 2.8201, Train Metric: 0.1802%, Valid Metric: 0.2120%
Batch 2975/2975, loss =1.8774 Epoch:2/20, Train Loss: 1.8774, Train Metric: 0.2266%, Valid Metric: 0.2314%
Batch 2975/2975, loss =1.6286 Epoch:3/20, Train Loss: 1.6286, Train Metric: 0.2407%, Valid Metric: 0.2418%
Batch 2975/2975, loss =1.4884 Epoch:4/20, Train Loss: 1.4884, Train Metric: 0.2488%, Valid Metric: 0.2474%
Batch 2975/2975, loss =1.3950 Epoch:5/20, Train Loss: 1.3950, Train Metric: 0.2543%, Valid Metric: 0.2505%
Batch 2975/2975, loss =1.3252 Epoch:6/20, Train Loss: 1.3252, Train Metric: 0.2587%, Valid Metric: 0.2543%
Batch 2975/2975, loss =1.2721 Epoch:7/20, Train Loss: 1.2721, Train Metric: 0.2620%, Valid Metric: 0.2570%
Batch 2975/2975, loss =1.2286 Epoch:8/20, Train Loss: 1.2286, Train Metric: 0.2648%, Valid Metric: 0.2589%
Batch 2975/2975, loss =1.1936 Epoch:9/20, Train Loss: 1.1936, Train Metric: 0.2670%, Valid Metric: 0.2586%
Batch 2975/2975, loss =1.1626 Epoch:1

In [44]:
import torch
import re

def translate(model, sentence, tokenizer, device="cpu", max_len=30):
    """Simple, short translation function that works"""
    model.eval()
    with torch.no_grad():
        # Encode input
        src = tokenizer(sentence, return_tensors="pt", padding=True, max_length=max_len)
        src_ids = src["input_ids"].to(device)
        src_mask = src["attention_mask"].to(device)
        
        # Get encoder output
        _, hidden = model.encoder(src_ids, src_mask)
        
        # Start with a Spanish word
        try:
            start_token = tokenizer.encode("Hola", add_special_tokens=False)[0]
        except:
            start_token = tokenizer.pad_token_id
            
        decoder_input = torch.tensor([[start_token]], device=device)
        tokens = []
        
        # Generate tokens
        for _ in range(15):
            logits, hidden = model.decoder(decoder_input, hidden)
            
            # Get top 5 tokens and filter bad ones
            top_tokens = torch.topk(logits[:, -1, :], 5)
            
            # Pick first good token
            next_token = None
            for token_id in top_tokens.indices[0]:
                token_id = token_id.item()
                if token_id not in [0, 1, 65000]:  # Skip EOS, UNK, PAD
                    next_token = token_id
                    break
            
            if next_token is None:
                break
                
            tokens.append(next_token)
            decoder_input = torch.cat([decoder_input, torch.tensor([[next_token]], device=device)], dim=1)
            
            # Stop at punctuation
            decoded = tokenizer.decode([next_token])
            if any(p in decoded for p in ['.', '!', '?']):
                break
        
        # Clean and return
        if tokens:
            result = tokenizer.decode(tokens, skip_special_tokens=True)
            result = re.sub(r'[<>▁]', ' ', result)  # Remove artifacts
            result = re.sub(r'\s+', ' ', result).strip()  # Fix spaces
            return result.capitalize()
        
        return "No translation"

# Quick test function
def test_model():
    """Test your model quickly"""
    sentences = ["Hello", "Thank you", "Good morning", "How are you?", "I love you"]
    
    print("=== QUICK TRANSLATION TEST ===")
    for sent in sentences:
        result = translate(nmt_model, sent, tokenizer, device)
        print(f"'{sent}' -> '{result}'")

# Ready to use!
print("Simple translator ready!")
print("Usage: translate(nmt_model, 'Hello world', tokenizer, device)")
print("Or run: test_model()")

Simple translator ready!
Usage: translate(nmt_model, 'Hello world', tokenizer, device)
Or run: test_model()


In [45]:
# Single translation
result = translate(nmt_model, "Hello world", tokenizer, device)
print(result)

# Test multiple sentences
test_model()

# Interactive use
sentence = input("English: ")
translation = translate(nmt_model, sentence, tokenizer, device)
print(f"Spanish: {translation}")

S él.
=== QUICK TRANSLATION TEST ===
'Hello' -> 'Recomías.'
'Thank you' -> 'Ses.'
'Good morning' -> 'Palmiera.'
'How are you?' -> 'Tú.'
'I love you' -> 'S queso.'


English:  i like soccer


Spanish: S blanco.


In [50]:
# Test the improved functions
test_fixed_translations()

# Or try individual translations:
print("QUICK TESTS:")
sentences = ["Hello", "Thank you", "Good morning", "I like soccer"]
for sent in sentences:
    result = simple_clean_translate(nmt_model, sent, tokenizer, device)
    print(f"'{sent}' -> '{result}'")

=== TESTING IMPROVED TRANSLATIONS ===
Method 1: Properly Fixed | Method 2: Token Filtering
English: Hello
  Fixed:    'Holice fue.'
  Filtered: 'Recomías.'

English: Good morning
  Fixed:    'Yo marca hasta una trampa.'
  Filtered: 'Palmiera.'

English: Thank you
  Fixed:    'Esras personas.'
  Filtered: 'Ses.'

English: How are you?
  Fixed:    'Hol tú.'
  Filtered: 'S listás.'

English: I like soccer
  Fixed:    'El Yo está barriendo.'
  Filtered: 'S Cuégi.'

English: What is your name?
  Fixed:    'El ha escrito.'
  Filtered: 'S Cuábacas?'

English: I am fine
  Fixed:    'Hols Me fueron.'
  Filtered: 'S» jugue.'

English: Goodbye
  Fixed:    'Hol favor.'
  Filtered: 'Mente.'

QUICK TESTS:
'Hello' -> 'Hol recomías.'
'Thank you' -> 'Holses.'
'Good morning' -> 'Hol palmiera.'
'I like soccer' -> 'Hols> él.'
