<a href="https://colab.research.google.com/github/MehrdadDastouri/seq2seq_translation/blob/main/seq2seq_translation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# Import necessary libraries
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.functional import log_softmax
import random
import numpy as np

# Set random seed for reproducibility
torch.manual_seed(42)
np.random.seed(42)

# Sample translation dataset
data = [
    ("hello", "hola"),
    ("how are you", "como estas"),
    ("i am fine", "estoy bien"),
    ("what is your name", "cual es tu nombre"),
    ("nice to meet you", "mucho gusto"),
]

# Tokenizer
def tokenize(sentence):
    return sentence.lower().split()

# Vocabulary creation
def build_vocab(sentences):
    vocab = {"<pad>": 0, "<sos>": 1, "<eos>": 2}
    for sentence in sentences:
        for word in tokenize(sentence):
            if word not in vocab:
                vocab[word] = len(vocab)
    return vocab

# Prepare vocabularies
source_sentences = [pair[0] for pair in data]
target_sentences = [pair[1] for pair in data]

source_vocab = build_vocab(source_sentences)
target_vocab = build_vocab(target_sentences)

idx_to_word_target = {idx: word for word, idx in target_vocab.items()}

# Encode sentences to indices
def encode_sentence(sentence, vocab, max_len):
    tokens = tokenize(sentence)
    indices = [vocab["<sos>"]] + [vocab[token] for token in tokens] + [vocab["<eos>"]]
    return indices + [vocab["<pad>"]] * (max_len - len(indices))

max_len_source = max(len(tokenize(s)) for s in source_sentences) + 2
max_len_target = max(len(tokenize(s)) for s in target_sentences) + 2

source_data = [encode_sentence(s, source_vocab, max_len_source) for s in source_sentences]
target_data = [encode_sentence(s, target_vocab, max_len_target) for s in target_sentences]

source_data = torch.tensor(source_data, dtype=torch.long)
target_data = torch.tensor(target_data, dtype=torch.long)

# Define the Seq2Seq architecture
class Encoder(nn.Module):
    def __init__(self, input_size, embed_size, hidden_size, num_layers):
        super(Encoder, self).__init__()
        self.embedding = nn.Embedding(input_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)

    def forward(self, x):
        embedding = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedding)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_size, embed_size, hidden_size, num_layers):
        super(Decoder, self).__init__()
        self.embedding = nn.Embedding(output_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x, hidden, cell):
        x = x.unsqueeze(1)  # Add time dimension
        embedding = self.embedding(x)
        outputs, (hidden, cell) = self.lstm(embedding, (hidden, cell))
        predictions = self.fc(outputs.squeeze(1))
        return predictions, hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, target_vocab_size):
        super(Seq2Seq, self).__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.target_vocab_size = target_vocab_size

    def forward(self, source, target, teacher_forcing_ratio=0.5):
        batch_size = source.size(0)
        target_len = target.size(1)
        outputs = torch.zeros(batch_size, target_len, self.target_vocab_size).to(source.device)

        hidden, cell = self.encoder(source)

        x = target[:, 0]
        for t in range(1, target_len):
            output, hidden, cell = self.decoder(x, hidden, cell)
            outputs[:, t, :] = output
            best_guess = output.argmax(1)
            x = target[:, t] if random.random() < teacher_forcing_ratio else best_guess

        return outputs

# Hyperparameters
input_size_encoder = len(source_vocab)
input_size_decoder = len(target_vocab)
output_size = len(target_vocab)
embed_size = 256
hidden_size = 512
num_layers = 1
learning_rate = 0.001
epochs = 500

# Initialize model
encoder = Encoder(input_size_encoder, embed_size, hidden_size, num_layers)
decoder = Decoder(input_size_decoder, embed_size, hidden_size, num_layers)
model = Seq2Seq(encoder, decoder, output_size).to(torch.device("cpu"))

# Loss and optimizer
criterion = nn.CrossEntropyLoss(ignore_index=target_vocab["<pad>"])
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training the model
for epoch in range(epochs):
    model.train()
    output = model(source_data, target_data)

    output = output.reshape(-1, output.shape[2])
    target = target_data.reshape(-1)

    loss = criterion(output, target)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (epoch + 1) % 50 == 0:
        print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}")

# Translate function
def translate(sentence):
    model.eval()
    encoded_sentence = encode_sentence(sentence, source_vocab, max_len_source)
    source = torch.tensor(encoded_sentence, dtype=torch.long).unsqueeze(0)

    with torch.no_grad():
        hidden, cell = model.encoder(source)
        x = torch.tensor([target_vocab["<sos>"]], dtype=torch.long)

        result_sentence = []
        for _ in range(max_len_target):
            output, hidden, cell = model.decoder(x, hidden, cell)
            best_guess = output.argmax(1).item()
            if best_guess == target_vocab["<eos>"]:
                break
            result_sentence.append(idx_to_word_target[best_guess])
            x = torch.tensor([best_guess], dtype=torch.long)

    return " ".join(result_sentence)

# Test translation
test_sentence = "how are you"
translation = translate(test_sentence)
print(f"Input: {test_sentence}")
print(f"Translation: {translation}")

Epoch [50/500], Loss: 0.6288
Epoch [100/500], Loss: 0.6286
Epoch [150/500], Loss: 0.6285
Epoch [200/500], Loss: 0.6285
Epoch [250/500], Loss: 0.6285
Epoch [300/500], Loss: 0.6284
Epoch [350/500], Loss: 0.6284
Epoch [400/500], Loss: 0.6284
Epoch [450/500], Loss: 0.6284
Epoch [500/500], Loss: 0.6284
Input: how are you
Translation: como estas
