Machine Translate

In [None]:
!pip install tqdm



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import pandas as pd
import string
import re
import random
from tqdm import tqdm
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")


Using device: cuda


Data Preprocessing

In [None]:
text_file_path = '/content/rus.txt'
with open(text_file_path) as t:
    text = t.read()

def preprocess_text(text):
    text = re.sub("'", '', text)
    text = ''.join(char for char in text if char not in string.punctuation)
    text = re.sub("[0-9]", '', text)
    return text.lower()

def return_sentences(text, num_lines=20000):
    text_lines = text.split('\n')
    english_texts, russian_texts, english_words, russian_words = [], [], set(), set()

    for text_line in tqdm(range(min(len(text_lines), num_lines))):
        if not text_lines[text_line].strip():
            continue
        preprocessed_text_line = preprocess_text(text_lines[text_line])
        tab_split_text = preprocessed_text_line.split('\t')
        if len(tab_split_text) < 2:
            continue

        english_texts.append(tab_split_text[0])
        russian_texts.append('<sos> ' + tab_split_text[1] + ' <eos>')

        english_words.update(tab_split_text[0].split())
        russian_words.update(tab_split_text[1].split())

    # Add special tokens
    english_words.add('<sos>')
    english_words.add('<eos>')
    russian_words.add('<sos>')
    russian_words.add('<eos>')

    return english_texts, russian_texts, sorted(english_words), sorted(russian_words)

english_texts, russian_texts, english_words, russian_words = return_sentences(text)

# Create DataFrame
text_df = pd.DataFrame({'English': english_texts, 'Russian': russian_texts})
text_df['English Length'] = text_df['English'].apply(lambda x: len(x.split()))
text_df['Russian Length'] = text_df['Russian'].apply(lambda x: len(x.split()))
text_df = text_df.sample(frac=1, random_state=42)


100%|██████████| 20000/20000 [00:00<00:00, 63908.72it/s]


Vocabulary & Lookup Tables

In [None]:
num_encoder_tokens = len(english_words)
num_decoder_tokens = len(russian_words) + 1

english_lookup = {word: num for num, word in enumerate(english_words)}
russian_lookup = {word: num + 1 for num, word in enumerate(russian_words)}
russian_lookup['<sos>'] = 0  # Add <sos> with index 0
russian_lookup['<eos>'] = num_decoder_tokens - 1
russian_token_lookup = {num: word for word, num in russian_lookup.items()}


Dataset & Dataloader

In [None]:
class TranslationDataset(Dataset):
    def __init__(self, english_texts, russian_texts, english_lookup, russian_lookup):
        self.english_texts = english_texts
        self.russian_texts = russian_texts
        self.english_lookup = english_lookup
        self.russian_lookup = russian_lookup

    def __len__(self):
        return len(self.english_texts)

    def __getitem__(self, idx):
        encoder_input = torch.tensor([self.english_lookup[word] for word in self.english_texts[idx].split()], dtype=torch.long)
        russian_words = self.russian_texts[idx].split()
        decoder_input = torch.tensor([self.russian_lookup[word] for word in russian_words[:-1]], dtype=torch.long)
        decoder_target = torch.tensor([self.russian_lookup[word] for word in russian_words[1:]], dtype=torch.long)
        return encoder_input, decoder_input, decoder_target

def collate_fn(batch):
    encoder_inputs, decoder_inputs, decoder_targets = zip(*batch)
    return pad_sequence(encoder_inputs, batch_first=True), pad_sequence(decoder_inputs, batch_first=True), pad_sequence(decoder_targets, batch_first=True)


X_train, X_valid, y_train, y_valid = train_test_split(text_df['English'], text_df['Russian'], test_size=0.2, random_state=42)

# Create dataloaders
batch_size = 32
train_dataset = TranslationDataset(X_train.tolist(), y_train.tolist(), english_lookup, russian_lookup)
valid_dataset = TranslationDataset(X_valid.tolist(), y_valid.tolist(), english_lookup, russian_lookup)

train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn, num_workers=2)
valid_dataloader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn, num_workers=2)


Model Definition

In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, embedding_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)

    def forward(self, src):
        embedded = self.embedding(src)
        _, (hidden, cell) = self.lstm(embedded)
        return hidden, cell

class Decoder(nn.Module):
    def __init__(self, output_dim, embedding_dim, hidden_dim):
        super().__init__()
        self.embedding = nn.Embedding(output_dim, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True)
        self.fc_out = nn.Linear(hidden_dim, output_dim)

    def forward(self, input, hidden, cell):
        embedded = self.embedding(input.unsqueeze(1))
        output, (hidden, cell) = self.lstm(embedded, (hidden, cell))
        return self.fc_out(output.squeeze(1)), hidden, cell

class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size, trg_len = trg.shape
        trg_vocab_size = self.decoder.fc_out.out_features
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(device)

        hidden, cell = self.encoder(src)
        input = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[:, t - 1] = output
            input = trg[:, t] if random.random() < teacher_forcing_ratio else output.argmax(1)

        return outputs


Training Setup

In [None]:
# Initialize model
embedding_dim, hidden_dim = 256, 512
encoder = Encoder(num_encoder_tokens, embedding_dim, hidden_dim).to(device)
decoder = Decoder(num_decoder_tokens, embedding_dim, hidden_dim).to(device)
model = Seq2Seq(encoder, decoder).to(device)

# Optimizer and loss
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss(ignore_index=0)

def train(model, dataloader, optimizer, criterion, clip=1.0):
    model.train()
    epoch_loss = 0

    for src, trg_input, trg_output in tqdm(dataloader):
        src, trg_input, trg_output = src.to(device), trg_input.to(device), trg_output.to(device)
        optimizer.zero_grad()
        output = model(src, trg_input)

        output_dim = output.shape[-1]
        loss = criterion(output.view(-1, output_dim), trg_output.view(-1))
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()
    return epoch_loss / len(dataloader)

num_epochs = 10
for epoch in range(num_epochs):
    train_loss = train(model, train_dataloader, optimizer, criterion)
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {train_loss:.4f}")


100%|██████████| 500/500 [00:09<00:00, 53.86it/s]


Epoch 1/10, Loss: 4.4228


100%|██████████| 500/500 [00:08<00:00, 60.08it/s]


Epoch 2/10, Loss: 2.9168


100%|██████████| 500/500 [00:08<00:00, 59.96it/s]


Epoch 3/10, Loss: 1.9810


100%|██████████| 500/500 [00:08<00:00, 58.64it/s]


Epoch 4/10, Loss: 1.4148


100%|██████████| 500/500 [00:08<00:00, 60.42it/s]


Epoch 5/10, Loss: 1.1193


100%|██████████| 500/500 [00:08<00:00, 60.57it/s]


Epoch 6/10, Loss: 1.0000


100%|██████████| 500/500 [00:07<00:00, 63.27it/s]


Epoch 7/10, Loss: 0.9284


100%|██████████| 500/500 [00:08<00:00, 60.94it/s]


Epoch 8/10, Loss: 0.8954


100%|██████████| 500/500 [00:08<00:00, 59.12it/s]


Epoch 9/10, Loss: 0.8664


100%|██████████| 500/500 [00:07<00:00, 62.60it/s]

Epoch 10/10, Loss: 0.8579





In [None]:
def translate_sentence(sentence, model, english_lookup, russian_token_lookup, max_length=50):
    sentence = preprocess_text(sentence)
    input_tokens = sentence.split()

    # Convert English words to token IDs and add batch dimension: shape (1, sequence_length)
    input_tensor = torch.tensor([english_lookup.get(word, 0) for word in input_tokens], dtype=torch.long).to(device)
    input_tensor = input_tensor.unsqueeze(0)

    # Initialize the decoder input with <sos> (no extra unsqueeze, so shape is (1,))
    decoder_input = torch.tensor([russian_lookup['<sos>']], dtype=torch.long).to(device)

    model.eval()
    with torch.no_grad():
        hidden, cell = model.encoder(input_tensor)
        output_tokens = []

        for t in range(max_length):
            output, hidden, cell = model.decoder(decoder_input, hidden, cell)

            top1 = output.argmax(1).item()
            output_tokens.append(top1)

            if top1 == russian_lookup['<eos>']:
                break

            # Update decoder input
            decoder_input = torch.tensor([top1], dtype=torch.long).to(device)

    # Convert token IDs to Russian words
    translated_words = [russian_token_lookup[token] for token in output_tokens]
    return ' '.join(translated_words)


In [None]:
# english_sentence = "hello how are you"
# Input (English): hello how are you
# Translated (Russian): привет как дела —
english_sentence = "I am working"

translated_sentence = translate_sentence(english_sentence, model, english_lookup, russian_token_lookup)

print(f"Input (English): {english_sentence}")
print(f"Translated (Russian): {translated_sentence}")


Input (English): I am working
Translated (Russian): я работаю —
