In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import torch.nn.functional as F
from nltk.tokenize import sent_tokenize, word_tokenize
from sklearn.model_selection import train_test_split
import nltk
from collections import Counter
import math

In [2]:
nltk.download('punkt')

[nltk_data] Downloading package punkt to /usr/share/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [3]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

In [4]:
text = "/kaggle/input/englishrussian-dictionary-for-machine-translate/rus.txt"

with open(text) as file:
    lines = file.read().split("\n")[:-1]
pairs = []

for line in lines:
    english, russian = line.split("\t")[:2]
    russian = russian
    pairs.append((english, russian))

In [5]:
df = pd.DataFrame(pairs)
df.columns = ['en', 'ru']
df.head()

Unnamed: 0,en,ru
0,Go.,Марш!
1,Go.,Иди.
2,Go.,Идите.
3,Hi.,Здравствуйте.
4,Hi.,Привет!


In [6]:
vocab_en = set(['<unk>', '<bos>', '<eos>', '<pad>'])
vocab_ru = set(['<unk>', '<bos>', '<eos>', '<pad>'])

for sentence in tqdm(df['en']):
    for word in word_tokenize(sentence):
        vocab_en.add(word)
        
for sentence in tqdm(df['ru']):
    for word in word_tokenize(sentence):
        vocab_ru.add(word)

  0%|          | 0/363386 [00:00<?, ?it/s]

  0%|          | 0/363386 [00:00<?, ?it/s]

In [7]:
word2ind_en = {char: i for i, char in enumerate(vocab_en)}
ind2word_en = {i: char for char, i in word2ind_en.items()}

word2ind_ru = {char: i for i, char in enumerate(vocab_ru)}
ind2word_ru = {i: char for char, i in word2ind_ru.items()}

In [8]:
class SentDataset(Dataset):
    def __init__(self, sentences_en, sentences_ru):
        self.sentences_en = sentences_en
        self.sentences_ru = sentences_ru
        
        self.unk_id_en = word2ind_en['<unk>']
        self.bos_id_en = word2ind_en['<bos>']
        self.eos_id_en = word2ind_en['<eos>']
        self.pad_id_en = word2ind_en['<pad>']
        
        self.unk_id_ru = word2ind_ru['<unk>']
        self.bos_id_ru = word2ind_ru['<bos>']
        self.eos_id_ru = word2ind_ru['<eos>']
        self.pad_id_ru = word2ind_ru['<pad>']
        
    def __getitem__(self, idx):
        tokenized_sentence_en = [self.bos_id_en]
        tokenized_sentence_en += [word2ind_en.get(word, self.unk_id_en) for word in word_tokenize(self.sentences_en[idx])]
        tokenized_sentence_en += [self.eos_id_en]
        
        tokenized_sentence_ru = [self.bos_id_ru]
        tokenized_sentence_ru += [word2ind_ru.get(word, self.unk_id_ru) for word in word_tokenize(self.sentences_ru[idx])]
        tokenized_sentence_ru += [self.eos_id_ru]
        
        return (tokenized_sentence_en, tokenized_sentence_ru)
    
    def __len__(self):
        return len(self.sentences_en)

In [9]:
def collate_fn_with_padding(batch):
    sentences_en, sentences_ru = zip(*batch)
    
    max_len_en = max(len(sentence) for sentence in sentences_en)
    max_len_ru = max(len(sentence) for sentence in sentences_ru)
    
    padded_sentences_en = [sentence + [word2ind_en['<pad>']] * (max_len_en - len(sentence)) for sentence in sentences_en]
    padded_sentences_ru = [sentence + [word2ind_ru['<pad>']] * (max_len_ru - len(sentence)) for sentence in sentences_ru]
    
    mask_en = [[0.0] * len(sentence) + [-float('inf')] * (max_len_en - len(sentence)) for sentence in sentences_en]
    mask_ru = [[0.0] * len(sentence) + [-float('inf')] * (max_len_ru - len(sentence)) for sentence in sentences_ru]
    
    padded_sentences_en = torch.tensor(padded_sentences_en, dtype=torch.long, device=device)
    padded_sentences_ru = torch.tensor(padded_sentences_ru, dtype=torch.long, device=device)
    
    mask_en = torch.tensor(mask_en, dtype=torch.float, device=device)
    mask_ru = torch.tensor(mask_ru, dtype=torch.float, device=device)
    
    return padded_sentences_en, padded_sentences_ru, mask_en, mask_ru


In [10]:
train_en, test_en, train_ru, test_ru = train_test_split(
    df['en'].tolist(),
    df['ru'].tolist(),
    test_size=0.2
)

train_dataset = SentDataset(train_en, train_ru)
test_dataset = SentDataset(test_en, test_ru)

train_loader = DataLoader(
    dataset=train_dataset,
    batch_size=128,
    shuffle=True,
    collate_fn=collate_fn_with_padding
)

test_loader = DataLoader(
    dataset=test_dataset,
    batch_size=128,
    shuffle=True,
    collate_fn=collate_fn_with_padding
)

In [11]:
class TransformerModel(nn.Module):
    def __init__(self, vocab_size_en, vocab_size_ru, d_model=256, nhead=4, num_encoder_layers=2, num_decoder_layers=2, dim_feedforward=512, dropout=0.1):
        super(TransformerModel, self).__init__()
        
        self.embedding_en = nn.Embedding(vocab_size_en, d_model)
        self.embedding_ru = nn.Embedding(vocab_size_ru, d_model)
        
        self.positional_encoding_en = PositionalEncoding(d_model, dropout)
        self.positional_encoding_ru = PositionalEncoding(d_model, dropout)
        
        self.transformer = nn.Transformer(
            d_model=d_model,
            nhead=nhead,
            num_encoder_layers=num_encoder_layers,
            num_decoder_layers=num_decoder_layers,
            dim_feedforward=dim_feedforward, dropout=dropout,
            batch_first=True
        )
        
        self.fc_out = nn.Linear(d_model, vocab_size_ru)
        
    def forward(self, src, tgt, src_mask, tgt_mask, src_padding_mask, tgt_padding_mask):
        src_emb = self.embedding_en(src)
            
        src_emb = self.positional_encoding_en(src_emb)
        
        tgt_emb = self.embedding_ru(tgt)
            
        tgt_emb = self.positional_encoding_ru(tgt_emb)
        
        transformer_output = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, src_key_padding_mask=src_padding_mask, tgt_key_padding_mask=tgt_padding_mask)
        
        output = self.fc_out(transformer_output)
        
        return output
    
    
class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [12]:
def train_epoch(model, dataloader, optimizer, criterion, device):
    model.train()
    total_loss = 0
    running_loss = 0
    num_batches = len(dataloader)
    
    with tqdm(total=num_batches, desc="Training", unit="batch") as pbar:
        for i, batch in enumerate(dataloader):
            src, tgt, src_padding_mask, tgt_padding_mask = batch
            
            src = src.to(device)
            tgt_input = tgt[:, :-1].to(device)
            tgt_output = tgt[:, 1:].to(device)
            
            src_padding_mask = src_padding_mask.to(device)
            tgt_padding_mask = tgt_padding_mask[:, :-1].to(device)
            
            tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_input.size(1)).to(device)
            
            optimizer.zero_grad()
            
            output = model(src, tgt_input, None, tgt_mask, src_padding_mask, tgt_padding_mask)
            
            output = output.view(-1, output.size(-1))
            tgt_output = tgt_output.reshape(-1)
            
            loss = criterion(output, tgt_output)
            
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
            running_loss += loss.item()
            
            if (i + 1) % 50 == 0:
                pbar.set_postfix({'loss': running_loss / 50})
                running_loss = 0
            
            pbar.update(1)
        
    return total_loss / num_batches


In [13]:
def evaluate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0

    with torch.no_grad():
        for batch in dataloader:
            src, tgt, src_padding_mask, tgt_padding_mask = batch
        
            src = src.to(device)
            tgt_input = tgt[:, :-1].to(device)
            tgt_output = tgt[:, 1:].to(device)

            src_padding_mask = src_padding_mask.to(device)
            tgt_padding_mask = tgt_padding_mask[:, :-1].to(device)

            tgt_mask = nn.Transformer.generate_square_subsequent_mask(tgt_input.size(1)).to(device)

            output = model(src, tgt_input, None, tgt_mask, src_padding_mask, tgt_padding_mask)

            output = output.view(-1, output.size(-1))
            tgt_output = tgt_output.reshape(-1)

            loss = criterion(output, tgt_output)

            total_loss += loss.item()

    return total_loss / len(dataloader)

In [14]:
def train(model, train_loader, val_loader, optimizer, criterion, device, num_epochs):
    train_losses = []
    val_losses = []
    
    for epoch in range(num_epochs):
        train_loss = train_epoch(model, train_loader, optimizer, criterion, device)
        val_loss = evaluate(model, val_loader, criterion, device)
        
        train_losses.append(train_loss)
        val_losses.append(val_loss)
        
        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}")
        
        torch.save(model.state_dict(), f'model_epoch_{epoch + 1}')
    
    return train_losses, val_losses


In [15]:
model = TransformerModel(
    vocab_size_en=len(vocab_en), 
    vocab_size_ru=len(vocab_ru),
    
)
model = model.to(device)

In [16]:
num_params = sum(p.numel() for p in model.parameters())
param_size_bytes = 4
total_size_bytes = num_params * param_size_bytes
total_size_megabytes = total_size_bytes / (1024 ** 2)
print(f"Размер модели: {total_size_megabytes:.2f} МБ")

Размер модели: 144.79 МБ


In [17]:
optimizer = torch.optim.Adam(model.parameters())

criterion = nn.CrossEntropyLoss(ignore_index=word2ind_ru['<pad>'])

In [None]:
train_losses, val_losses = train(model, train_loader, test_loader, optimizer, criterion, device, 6)

Training:   0%|          | 0/2272 [00:00<?, ?batch/s]

  output = torch._nested_tensor_from_mask(output, src_key_padding_mask.logical_not(), mask_check=False)


Epoch 1/6, Train Loss: 3.1451, Val Loss: 2.2708


Training:   0%|          | 0/2272 [00:00<?, ?batch/s]

Epoch 2/6, Train Loss: 1.8746, Val Loss: 1.7220


Training:   0%|          | 0/2272 [00:00<?, ?batch/s]

Epoch 3/6, Train Loss: 1.4628, Val Loss: 1.5943


Training:   0%|          | 0/2272 [00:00<?, ?batch/s]

Epoch 4/6, Train Loss: 1.2417, Val Loss: 1.5260


Training:   0%|          | 0/2272 [00:00<?, ?batch/s]

Epoch 5/6, Train Loss: 1.1023, Val Loss: 1.5060


Training:   0%|          | 0/2272 [00:00<?, ?batch/s]

**val loss:** $1.57$

In [None]:
def translate_sentence(model, sentence, max_len=50):
    model.eval()

    tokens = [word2ind_en.get(word, word2ind_en['<unk>']) for word in word_tokenize(sentence)]
    
    tokens = [word2ind_en['<bos>']] + tokens + [word2ind_en['<eos>']]

    src = torch.tensor(tokens, dtype=torch.long).unsqueeze(0).to(device)
    
    src_mask = torch.zeros((1, 1, src.size(1)), device=device)

    memory = model.transformer.encoder(model.positional_encoding_en(model.embedding_en(src)))

    tgt_tokens = [word2ind_ru['<bos>']]
    
    for _ in range(max_len):
        tgt_input = torch.tensor(tgt_tokens, dtype=torch.long).unsqueeze(0).to(device)
        
        tgt_mask = nn.Transformer.generate_square_subsequent_mask(len(tgt_input[0])).to(device)
        
        output = model.transformer.decoder(
            model.positional_encoding_ru(model.embedding_ru(tgt_input)),
            memory,
            tgt_mask=tgt_mask
        )

        output = model.fc_out(output[:, -1, :])
        predicted_token = output.argmax(dim=1).item()
        
        tgt_tokens.append(predicted_token)
        
        if predicted_token == word2ind_ru['<eos>']:
            break

    translated_sentence = [ind2word_ru[token] for token in tgt_tokens if token not in {word2ind_ru['<bos>'], word2ind_ru['<eos>'], word2ind_ru['<pad>']}]
    
    return " ".join(translated_sentence)


In [None]:
translate_sentence(model, 'you are too late today')

In [None]:
model_path = 'model_final.pth'
word2ind_en_path = 'word2ind_en.pth'
ind2word_en_path = 'ind2word_en.pth'
word2ind_ru_path = 'word2ind_ru.pth'
ind2word_ru_path = 'ind2word_ru.pth'

torch.save(model.state_dict(), model_path)
torch.save(word2ind_en, word2ind_en_path)
torch.save(ind2word_en, ind2word_en_path)
torch.save(word2ind_ru, word2ind_ru_path)
torch.save(ind2word_ru, ind2word_ru_path)