<a href="https://colab.research.google.com/github/Arta-DS/DS/blob/main/%D0%9C%D0%B5%D1%85%D0%B0%D0%BD%D0%B8%D0%B7%D0%BC_%D0%B2%D0%BD%D0%B8%D0%BC%D0%B0%D0%BD%D0%B8%D1%8F.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
import spacy
import random
import math
import time

# Загрузка и распаковка данных
!wget http://www.manythings.org/anki/rus-eng.zip
!unzip rus-eng.zip

# Чтение данных
with open('rus.txt', 'r', encoding='utf-8') as f:
    lines = f.read().split('\n')

# Создание пар предложений
pairs = [[line.split('\t')[0], line.split('\t')[1]] for line in lines if line]
print(f"Всего пар предложений: {len(pairs)}")
print("Пример пары:", pairs[0])

--2025-10-16 20:54:47--  http://www.manythings.org/anki/rus-eng.zip
Resolving www.manythings.org (www.manythings.org)... 173.254.30.110
Connecting to www.manythings.org (www.manythings.org)|173.254.30.110|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 17201311 (16M) [application/zip]
Saving to: ‘rus-eng.zip.1’


2025-10-16 20:54:48 (23.9 MB/s) - ‘rus-eng.zip.1’ saved [17201311/17201311]

Archive:  rus-eng.zip
replace rus.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: 

In [None]:
# Установка seed для воспроизводимости
SEED = 1234
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
torch.cuda.manual_seed(SEED)
torch.backends.cudnn.deterministic = True

# Инициализация токенизатора для английского
spacy_en = spacy.load('en_core_web_sm')

# Простая токенизация для русского языка (разделение по пробелам)
def tokenize_ru(text):
    return text.split()

# Создание словарей
class Vocabulary:
    def __init__(self, tokenize_func):
        self.tokenize = tokenize_func
        self.word2idx = {'<pad>': 0, '<sos>': 1, '<eos>': 2, '<unk>': 3}
        self.idx2word = {v: k for k, v in self.word2idx.items()}
        self.word_count = {}

    def build_vocab(self, sentences):
        for sentence in sentences:
            for word in self.tokenize(sentence.lower()):
                self.word_count[word] = self.word_count.get(word, 0) + 1
        for word, count in self.word_count.items():
            if count >= 2: # Добавляем слова, которые встретились хотя бы 2 раза
                if word not in self.word2idx:
                    self.word2idx[word] = len(self.word2idx)
                    self.idx2word[len(self.idx2word)] = word

    def numericalize(self, sentence):
        tokens = self.tokenize(sentence.lower())
        return [self.word2idx.get(token, self.word2idx['<unk>']) for token in tokens]

# Подготовка данных
en_sentences = [pair[0] for pair in pairs]
ru_sentences = [pair[1] for pair in pairs]

# Создание словарей
SRC_vocab = Vocabulary(spacy_en.tokenizer)
SRC_vocab.build_vocab(en_sentences)

TRG_vocab = Vocabulary(tokenize_ru)
TRG_vocab.build_vocab(ru_sentences)

print(f"Размер словаря английского языка (источник): {len(SRC_vocab.word2idx)}")
print(f"Размер словаря русского языка (цель): {len(TRG_vocab.word2idx)}")

# Преобразование предложений в тензоры
def process_data(pairs, src_vocab, trg_vocab):
    src_data = []
    trg_data = []
    for src, trg in pairs:
        src_indices = [src_vocab.word2idx['<sos>']] + src_vocab.numericalize(src) + [src_vocab.word2idx['<eos>']]
        trg_indices = [trg_vocab.word2idx['<sos>']] + trg_vocab.numericalize(trg) + [trg_vocab.word2idx['<eos>']]
        src_data.append(torch.tensor(src_indices, dtype=torch.long))
        trg_data.append(torch.tensor(trg_indices, dtype=torch.long))
    return src_data, trg_data

src_data, trg_data = process_data(pairs, SRC_vocab, TRG_vocab)

# Разделение на обучающую и тестовую выборки
train_size = int(0.9 * len(src_data))
test_size = len(src_data) - train_size
src_train, src_test = torch.utils.data.random_split(src_data, [train_size, test_size])
trg_train, trg_test = torch.utils.data.random_split(trg_data, [train_size, test_size])

# Создание DataLoader
from torch.nn.utils.rnn import pad_sequence
from torch.utils.data import DataLoader

BATCH_SIZE = 128
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

def collate_fn(batch):
    src_batch, trg_batch = zip(*batch)
    src_batch = pad_sequence(src_batch, padding_value=SRC_vocab.word2idx['<pad>'])
    trg_batch = pad_sequence(trg_batch, padding_value=TRG_vocab.word2idx['<pad>'])
    return src_batch, trg_batch

train_dataset = list(zip(src_train, trg_train))
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)

test_dataset = list(zip(src_test, trg_test))
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

print(f"Обучающая выборка: {len(train_dataset)} пар")
print(f"Тестовая выборка: {len(test_dataset)} пар")

Размер словаря английского языка (источник): 4
Размер словаря русского языка (цель): 59022
Обучающая выборка: 474877 пар
Тестовая выборка: 52765 пар


In [None]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout):
        super().__init__()
        self.embedding = nn.Embedding(input_dim, emb_dim)
        self.rnn = nn.GRU(emb_dim, enc_hid_dim, bidirectional=True)
        self.fc = nn.Linear(enc_hid_dim * 2, dec_hid_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, src):
        # src = [src_len, batch_size]
        embedded = self.dropout(self.embedding(src))
        # embedded = [src_len, batch_size, emb_dim]

        outputs, hidden = self.rnn(embedded)
        # outputs = [src_len, batch_size, hid_dim * num_directions]
        # hidden = [n_layers * num_directions, batch_size, hid_dim]

        # Инициализация состояния декодера
        hidden = torch.tanh(self.fc(torch.cat((hidden[-2,:,:], hidden[-1,:,:]), dim=1)))
        # hidden = [batch_size, dec_hid_dim]

        return outputs, hidden

In [None]:
class DotProductAttention(nn.Module):
    def __init__(self, enc_hid_dim, dec_hid_dim):
        super().__init__()
        self.attn = nn.Linear((enc_hid_dim * 2) + dec_hid_dim, dec_hid_dim)
        self.v = nn.Linear(dec_hid_dim, 1, bias=False)

    def forward(self, hidden, encoder_outputs, mask):
        # hidden = [batch_size, dec_hid_dim]
        # encoder_outputs = [src_len, batch_size, enc_hid_dim * 2]
        # mask = [batch_size, src_len]

        batch_size = encoder_outputs.shape[1]
        src_len = encoder_outputs.shape[0]

        # Повторяем скрытое состояние декодера для каждого слова в источнике
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        # hidden = [batch_size, src_len, dec_hid_dim]

        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        # encoder_outputs = [batch_size, src_len, enc_hid_dim * 2]

        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        # energy = [batch_size, src_len, dec_hid_dim]

        attention = self.v(energy).squeeze(2)
        # attention = [batch_size, src_len]

        attention = attention.masked_fill(mask == 0, -1e10)

        return torch.softmax(attention, dim=1)

In [None]:
class MLPAttention(nn.Module):
    def __init__(self, enc_hid_dim, dec_hid_dim):
        super().__init__()
        self.attn = nn.Linear(enc_hid_dim * 2 + dec_hid_dim, dec_hid_dim)
        self.v = nn.Linear(dec_hid_dim, 1, bias = False)

    def forward(self, hidden, encoder_outputs, mask):
        # hidden = [batch_size, dec_hid_dim]
        # encoder_outputs = [src_len, batch_size, enc_hid_dim * 2]
        # mask = [batch_size, src_len]

        batch_size = encoder_outputs.shape[1]
        src_len = encoder_outputs.shape[0]

        # Повторяем скрытое состояние декодера для каждого слова в источнике
        hidden = hidden.unsqueeze(1).repeat(1, src_len, 1)
        # hidden = [batch_size, src_len, dec_hid_dim]

        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        # encoder_outputs = [batch_size, src_len, enc_hid_dim * 2]

        # Конкатенируем скрытые состояния энкодера и декодера
        energy = torch.tanh(self.attn(torch.cat((hidden, encoder_outputs), dim=2)))
        # energy = [batch_size, src_len, dec_hid_dim]

        attention = self.v(energy).squeeze(2)
        # attention = [batch_size, src_len]

        # Применяем маску, чтобы модель не обращала внимание на <pad> токены
        attention = attention.masked_fill(mask == 0, -1e10)

        return torch.softmax(attention, dim=1)

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, enc_hid_dim, dec_hid_dim, dropout, attention):
        super().__init__()
        self.output_dim = output_dim
        self.attention = attention
        self.embedding = nn.Embedding(output_dim, emb_dim)
        self.rnn = nn.GRU((enc_hid_dim * 2) + emb_dim, dec_hid_dim)
        self.fc_out = nn.Linear((enc_hid_dim * 2) + dec_hid_dim + emb_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, encoder_outputs, mask):
        # input = [batch_size]
        # hidden = [batch_size, dec_hid_dim]
        # encoder_outputs = [src_len, batch_size, enc_hid_dim * 2]
        # mask = [batch_size, src_len]

        input = input.unsqueeze(0)
        # input = [1, batch_size]

        embedded = self.dropout(self.embedding(input))
        # embedded = [1, batch_size, emb_dim]

        # Вычисление весов внимания
        a = self.attention(hidden, encoder_outputs, mask)
        # a = [batch_size, src_len]

        a = a.unsqueeze(1)
        # a = [batch_size, 1, src_len]

        encoder_outputs = encoder_outputs.permute(1, 0, 2)
        # encoder_outputs = [batch_size, src_len, enc_hid_dim * 2]

        weighted = torch.bmm(a, encoder_outputs)
        # weighted = [batch_size, 1, enc_hid_dim * 2]

        weighted = weighted.permute(1, 0, 2)
        # weighted = [1, batch_size, enc_hid_dim * 2]

        rnn_input = torch.cat((embedded, weighted), dim = 2)
        # rnn_input = [1, batch_size, (enc_hid_dim * 2) + emb_dim]

        output, hidden = self.rnn(rnn_input, hidden.unsqueeze(0))
        # output = [seq_len, batch_size, dec_hid_dim]
        # hidden = [n_layers, batch_size, dec_hid_dim]

        assert (output == hidden).all()

        embedded = embedded.squeeze(0)
        output = output.squeeze(0)
        weighted = weighted.squeeze(0)

        prediction = self.fc_out(torch.cat((output, weighted, embedded), dim = 1))
        # prediction = [batch_size, output_dim]

        return prediction, hidden.squeeze(0), a.squeeze(1)

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, src_pad_idx, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.src_pad_idx = src_pad_idx
        self.device = device

    def create_mask(self, src):
        mask = (src != self.src_pad_idx).permute(1, 0)
        return mask

    def forward(self, src, trg, teacher_forcing_ratio = 0.5):
        # src = [src_len, batch_size]
        # trg = [trg_len, batch_size]

        batch_size = src.shape[1]
        trg_len = trg.shape[0]
        trg_vocab_size = self.decoder.output_dim

        # Тензор для хранения выходов
        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)

        # Скрытое состояние энкодера
        encoder_outputs, hidden = self.encoder(src)
        # encoder_outputs = [src_len, batch_size, enc_hid_dim * 2]
        # hidden = [batch_size, dec_hid_dim]

        # Первый вход в декодер - это <sos> токен
        input = trg[0,:]

        mask = self.create_mask(src)

        for t in range(1, trg_len):
            # Вставляем входное слово, предыдущее скрытое состояние и все выходы энкодера
            # Получаем предсказание, новое скрытое состояние и веса внимания
            output, hidden, _ = self.decoder(input, hidden, encoder_outputs, mask)
            # output = [batch_size, output_dim]
            # hidden = [batch_size, dec_hid_dim]

            # Сохраняем предсказание
            outputs[t] = output

            # Решаем, использовать ли teacher forcing
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[t] if teacher_force else top1

        return outputs

In [None]:
INPUT_DIM = len(SRC_vocab.word2idx)
OUTPUT_DIM = len(TRG_vocab.word2idx)
ENC_EMB_DIM = 256
DEC_EMB_DIM = 256
ENC_HID_DIM = 512
DEC_HID_DIM = 512
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
SRC_PAD_IDX = SRC_vocab.word2idx['<pad>']

# --- Модель 1: Внимание на основе скалярного произведения ---
attn_dot = DotProductAttention(ENC_HID_DIM, DEC_HID_DIM)
enc_dot = Encoder(INPUT_DIM, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_DROPOUT)
dec_dot = Decoder(OUTPUT_DIM, DEC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_DROPOUT, attn_dot)
model_dot = Seq2Seq(enc_dot, dec_dot, SRC_PAD_IDX, DEVICE).to(DEVICE)

# --- Модель 2: Внимание на основе MLP ---
attn_mlp = MLPAttention(ENC_HID_DIM, DEC_HID_DIM)
enc_mlp = Encoder(INPUT_DIM, ENC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, ENC_DROPOUT)
dec_mlp = Decoder(OUTPUT_DIM, DEC_EMB_DIM, ENC_HID_DIM, DEC_HID_DIM, DEC_DROPOUT, attn_mlp)
model_mlp = Seq2Seq(enc_mlp, dec_mlp, SRC_PAD_IDX, DEVICE).to(DEVICE)


def initialize_weights(m):
    for name, param in m.named_parameters():
        if 'weight' in name:
            nn.init.normal_(param.data, mean=0, std=0.01)
        else:
            nn.init.constant_(param.data, 0)

model_dot.apply(initialize_weights)
model_mlp.apply(initialize_weights)

# Оптимизаторы и функция потерь
optimizer_dot = optim.Adam(model_dot.parameters())
optimizer_mlp = optim.Adam(model_mlp.parameters())

TRG_PAD_IDX = TRG_vocab.word2idx['<pad>']
criterion = nn.CrossEntropyLoss(ignore_index=TRG_PAD_IDX)

In [None]:
def train(model, iterator, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    for i, (src, trg) in enumerate(iterator):
        src, trg = src.to(DEVICE), trg.to(DEVICE)

        optimizer.zero_grad()
        output = model(src, trg)
        # output = [trg_len, batch_size, output_dim]
        output_dim = output.shape[-1]

        output = output[1:].view(-1, output_dim)
        trg = trg[1:].view(-1)
        # trg = [(trg_len - 1) * batch_size]

        loss = criterion(output, trg)
        loss.backward()

        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

def evaluate(model, iterator, criterion):
    model.eval()
    epoch_loss = 0
    with torch.no_grad():
        for i, (src, trg) in enumerate(iterator):
            src, trg = src.to(DEVICE), trg.to(DEVICE)

            output = model(src, trg, 0) # turn off teacher forcing
            # output = [trg_len, batch_size, output_dim]
            output_dim = output.shape[-1]

            output = output[1:].view(-1, output_dim)
            trg = trg[1:].view(-1)
            # trg = [(trg_len - 1) * batch_size]

            loss = criterion(output, trg)
            epoch_loss += loss.item()

    return epoch_loss / len(iterator)

def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

In [None]:
N_EPOCHS = 10
CLIP = 1

print("--- Обучение модели с Dot-Product Attention ---")
best_valid_loss_dot = float('inf')
for epoch in range(N_EPOCHS):
    start_time = time.time()
    train_loss = train(model_dot, train_loader, optimizer_dot, criterion, CLIP)
    valid_loss = evaluate(model_dot, test_loader, criterion)
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss_dot:
        best_valid_loss_dot = valid_loss
        torch.save(model_dot.state_dict(), 'dot-model.pt')

    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

print("\n--- Обучение модели с MLP Attention ---")
best_valid_loss_mlp = float('inf')
for epoch in range(N_EPOCHS):
    start_time = time.time()
    train_loss = train(model_mlp, train_loader, optimizer_mlp, criterion, CLIP)
    valid_loss = evaluate(model_mlp, test_loader, criterion)
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)

    if valid_loss < best_valid_loss_mlp:
        best_valid_loss_mlp = valid_loss
        torch.save(model_mlp.state_dict(), 'mlp-model.pt')

    print(f'Epoch: {epoch+1:02} | Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train PPL: {math.exp(train_loss):7.3f}')
    print(f'\t Val. Loss: {valid_loss:.3f} |  Val. PPL: {math.exp(valid_loss):7.3f}')

--- Обучение модели с Dot-Product Attention ---


In [None]:
# Загрузка лучших весов
model_dot.load_state_dict(torch.load('dot-model.pt'))
model_mlp.load_state_dict(torch.load('mlp-model.pt'))

def translate_sentence(sentence, model, src_vocab, trg_vocab, device, max_len=50):
    model.eval()
    tokens = [token.text.lower() for token in spacy_en(sentence)]
    tokens = [src_vocab.word2idx.get(token, src_vocab.word2idx['<unk>']) for token in tokens]

    src_tensor = torch.LongTensor(tokens).unsqueeze(1).to(device)
    src_len = torch.LongTensor([len(tokens)]).to(device)

    with torch.no_grad():
        encoder_outputs, hidden = model.encoder(src_tensor)

    mask = model.create_mask(src_tensor)

    trg_indexes = [trg_vocab.word2idx['<sos>']]

    for i in range(max_len):
        trg_tensor = torch.LongTensor([trg_indexes[-1]]).to(device)

        with torch.no_grad():
            output, hidden, _ = model.decoder(trg_tensor, hidden, encoder_outputs, mask)

        pred_token = output.argmax(1).item()
        trg_indexes.append(pred_token)

        if pred_token == trg_vocab.word2idx['<eos>']:
            break

    trg_tokens = [trg_vocab.idx2word[i] for i in trg_indexes]
    return trg_tokens[1:] # Убираем <sos>

In [None]:
example_idx = random.randint(0, len(test_dataset))
src_sentence = src_test.dataset[example_idx].tolist()
trg_sentence = trg_test.dataset[example_idx].tolist()

# Преобразуем обратно в текст
src_text = ' '.join([SRC_vocab.idx2word[idx] for idx in src_sentence if idx not in [SRC_vocab.word2idx['<sos>'], SRC_vocab.word2idx['<eos>'], SRC_vocab.word2idx['<pad>']]])
trg_text = ' '.join([TRG_vocab.idx2word[idx] for idx in trg_sentence if idx not in [TRG_vocab.word2idx['<sos>'], TRG_vocab.word2idx['<eos>'], TRG_vocab.idx2word['<pad>']]])

print(f'Исходное предложение (EN): {src_text}')
print(f'Эталонный перевод (RU): {trg_text}\n')

# Перевод моделью с Dot-Product Attention
translation_dot = translate_sentence(src_text, model_dot, SRC_vocab, TRG_vocab, DEVICE)
print(f'Перевод (Dot-Product Attention): {" ".join(translation_dot)}')

# Перевод моделью с MLP Attention
translation_mlp = translate_sentence(src_text, model_mlp, SRC_vocab, TRG_vocab, DEVICE)
print(f'Перевод (MLP Attention): {" ".join(translation_mlp)}')