Код составлен на основе примера от авторов библиотеки Torch: [ссылка](https://pytorch.org/tutorials/beginner/translation_transformer.html)

In [None]:
!pip install -U torchdata
!pip install -U spacy
!python -m spacy download en_core_web_sm
!python -m spacy download de_core_news_sm
!pip install portalocker>=2.0.0

Collecting spacy
  Downloading spacy-3.7.2-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (6.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.6/6.6 MB[0m [31m53.2 MB/s[0m eta [36m0:00:00[0m
Collecting weasel<0.4.0,>=0.1.0 (from spacy)
  Downloading weasel-0.3.3-py3-none-any.whl (49 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m49.8/49.8 kB[0m [31m7.3 MB/s[0m eta [36m0:00:00[0m
Collecting cloudpathlib<0.17.0,>=0.7.0 (from weasel<0.4.0,>=0.1.0->spacy)
  Downloading cloudpathlib-0.16.0-py3-none-any.whl (45 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m45.0/45.0 kB[0m [31m5.7 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: cloudpathlib, weasel, spacy
  Attempting uninstall: spacy
    Found existing installation: spacy 3.6.1
    Uninstalling spacy-3.6.1:
      Successfully uninstalled spacy-3.6.1
[31mERROR: pip's dependency resolver does not currently take into account all the packages t

# Обучение трансформера для перевода

Задача: создать модель, которой мы подаем на вход текст на немецком, а в ответ получаем перевод на английский.

## Подгрузка данных

In [None]:
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator
from torchtext.datasets import multi30k, Multi30k
from typing import Iterable, List

# Загружаем датасет.  ## множество переводов фраз на разных языках
multi30k.URL["train"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/training.tar.gz"
multi30k.URL["valid"] = "https://raw.githubusercontent.com/neychev/small_DL_repo/master/datasets/Multi30k/validation.tar.gz"

# Выбираем язык, с которого переводим и на какой.
source_language = 'de'
target_language = 'en'

token_transform = {}
vocab_transform = {}

Tokenizer - класс, с помощью которого мы преобразуем слова в токены

In [None]:
# Загружаем два tokenizer для разных языков.
token_transform[source_language] = get_tokenizer('spacy', language='de_core_news_sm')
token_transform[target_language] = get_tokenizer('spacy', language='en_core_web_sm')

# Вспомогательная функция.
def yield_tokens(data_iter: Iterable, language: str) -> List[str]:
    language_index = {source_language: 0, target_language: 1}

    for data_sample in data_iter:
        yield token_transform[language](data_sample[language_index[language]])

# Определяем индексы специальных токенов:
UNK_IDX = 0  # Токен для обозначения неизвестного слова.
PAD_IDX = 1  # Токен для паддинга.
SOS_IDX = 2  # Токен начала последовательности, start of sequnce.
EOS_IDX = 3  # Токен окончания последовательности, end of sequnce.

# Определяем сами специальные токены:
special_symbols = ['<unk>', '<pad>', '<sos>', '<eos>']

# Создаем словарь из последовательностей каждого языка.
# В нем будут все слова и токены для них.
for ln in [source_language, target_language]:
    train_iter = Multi30k(split='train', language_pair=(source_language, target_language))
    vocab_transform[ln] = build_vocab_from_iterator(yield_tokens(train_iter, ln),
                                                    min_freq=1,
                                                    specials=special_symbols,
                                                    special_first=True)

# Указываем, что делать в случае неизвестного слова.
# Заменяем его на <unk>.
for ln in [source_language, target_language]:
    vocab_transform[ln].set_default_index(UNK_IDX)

In [None]:
token_transform['en']('i like cats and dogs,')

['i', 'like', 'cats', 'and', 'dogs', ',']

In [None]:
vocab_transform

{'de': Vocab(), 'en': Vocab()}

## Создание модели

In [None]:
# Функция создает матрицу для маскирования.
def generate_square_subsequent_mask(sz):
    mask = (torch.triu(torch.ones((sz, sz), device=DEVICE)) == 1).transpose(0, 1)
    mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
    return mask

# Создаем необхоимые матрицы.
def create_mask(src, tgt):
    src_seq_len = src.shape[0]
    tgt_seq_len = tgt.shape[0]

    # Создаем маски для входной и выходной последовательностей.
    # Для входной последовательности она не будет ничего делать.
    # То есть будет заполнена 0.
    tgt_mask = generate_square_subsequent_mask(tgt_seq_len)

    # Создаем маски для игнонироваания [PAD].
    src_padding_mask = (src == PAD_IDX).transpose(0, 1)
    tgt_padding_mask = (tgt == PAD_IDX).transpose(0, 1)
    return tgt_mask, src_padding_mask, tgt_padding_mask

In [None]:
from torch import Tensor
import torch
import torch.nn as nn
from torch.nn import Transformer
import math

# Определяем тип вычислений, на видеокарте или процессоре.
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class PositionalEncoding(nn.Module):
    def __init__(self,
                 emb_size: int,
                 dropout: float,
                 maxlen: int = 5000):
        super(PositionalEncoding, self).__init__()

        den = torch.pow(10000.0, torch.arange(0, emb_size, 2) / emb_size) # Функция, которая внутри cos и sin.
        pos = torch.arange(0, maxlen).reshape(maxlen, 1) # Позиция.

        pos_embedding = torch.zeros((maxlen, emb_size))
        pos_embedding[:, 0::2] = torch.sin(pos / den) # Применяем синус для чётных позиций.
        pos_embedding[:, 1::2] = torch.cos(pos / den) # Применяем косинус для нечётных позиций.
        pos_embedding = pos_embedding.unsqueeze(-2)

        self.dropout = nn.Dropout(dropout)  ## ?
        self.register_buffer('pos_embedding', pos_embedding)

    def forward(self, token_embedding: Tensor):
        # Сразу возвращаем эмбеддинги с добавлением позиционного кодирования.
        return self.dropout(token_embedding + self.pos_embedding[:token_embedding.size(0), :])

# Слой для преобразования токенов в эмбеддинги.
class TokenEmbedding(nn.Module):
    def __init__(self, vocab_size: int, emb_size):
        super(TokenEmbedding, self).__init__()
        self.embedding = nn.Embedding(vocab_size, emb_size)
        self.emb_size = emb_size

    def forward(self, tokens: Tensor):
        return self.embedding(tokens.long()) * math.sqrt(self.emb_size)

# Сам трансформер.
class Seq2SeqTransformer(nn.Module):
    def __init__(self,
                 num_encoder_layers: int, # Количество слоев encoder.
                 num_decoder_layers: int, # Количество слоев decoder.
                 emb_size: int, # Размер эбеддинга.
                 nhead: int, # Количество голов в multi-head attention.
                 src_vocab_size: int, # Размер словаря языка, с которого переводим.
                 tgt_vocab_size: int, # Размер словаря языка, на который переводим.
                 dim_feedforward: int = 512, # Размерность линейных слоев.
                 dropout: float = 0.1):
        super(Seq2SeqTransformer, self).__init__()

        # В torch.nn уже есть блок трансформера.
        self.transformer = Transformer(d_model=emb_size,
                                       nhead=nhead,
                                       num_encoder_layers=num_encoder_layers,
                                       num_decoder_layers=num_decoder_layers,
                                       dim_feedforward=dim_feedforward,
                                       dropout=dropout)
        # Выходной слой.
        self.output_layer = nn.Linear(emb_size, tgt_vocab_size)

        # Cлои эмбеддингов для каждого из языков.
        self.src_tok_emb = TokenEmbedding(src_vocab_size, emb_size)
        self.tgt_tok_emb = TokenEmbedding(tgt_vocab_size, emb_size)

        self.positional_encoding = PositionalEncoding(
            emb_size, dropout=dropout)

    def forward(self,
                src: Tensor, # Входная последовательность.
                trg: Tensor, # Выходная последовательность (переведённые токены).
                tgt_mask: Tensor, # Маска выходной последовательности.
                src_padding_mask: Tensor, # Маска паддингов входной последовательности.
                tgt_padding_mask: Tensor, # Маска паддингов выходной последовательности.
                memory_key_padding_mask: Tensor
                ):

        src_emb = self.positional_encoding(self.src_tok_emb(src))
        tgt_emb = self.positional_encoding(self.tgt_tok_emb(trg))
        src_mask = torch.zeros((src.shape[0], src.shape[0]),device=DEVICE).type(torch.bool)

        outs = self.transformer(src_emb, tgt_emb, src_mask, tgt_mask, None,
                                src_padding_mask, tgt_padding_mask, memory_key_padding_mask)
        return self.output_layer(outs)

    # Метод для получения результатов кодировщика.
    def encode(self, src: Tensor):
        src_mask = torch.zeros((src.shape[0], src.shape[0]), device=DEVICE).type(torch.bool)
        return self.transformer.encoder(self.positional_encoding(
                            self.src_tok_emb(src)), src_mask)

    # Метод для получения результатов декодировщика.
    def decode(self, tgt: Tensor, memory: Tensor):
        tgt_mask = (generate_square_subsequent_mask(tgt.size(0))
                    .type(torch.bool)).to(DEVICE)
        return self.transformer.decoder(self.positional_encoding(
                          self.tgt_tok_emb(tgt)), memory,
                          tgt_mask)

## Создаем обучающий цикл

In [None]:
torch.manual_seed(0)

# Подготавливаем параметры.
src_vocab_size = len(vocab_transform[source_language])
tgt_vocab_size = len(vocab_transform[target_language])
emb_size = 512
nhead = 8
dim_feedforward = 512
batch_size = 128
num_encoder_layers = 3
num_decoder_layers = 3

transformer = Seq2SeqTransformer(num_encoder_layers, num_decoder_layers, emb_size,
                 nhead, src_vocab_size, tgt_vocab_size, dim_feedforward)

# Задаем начальные веса.
for p in transformer.parameters():
    if p.dim() > 1:
        nn.init.xavier_uniform_(p)

transformer = transformer.to(DEVICE)

# В качестве функции потерь будем использовать Cross-Entropy и игнорировать PAD_IDX.
loss_fn = torch.nn.CrossEntropyLoss(ignore_index=PAD_IDX)

# Задаем оптимизатор с определёнными параметрами. Модель так быстрее обучается.
optimizer = torch.optim.Adam(transformer.parameters(), lr=0.0001, betas=(0.9, 0.98), eps=1e-9)



In [None]:
from torch.nn.utils.rnn import pad_sequence

# Вспомогательная функция для объединения нескольких последовательных операций.
def sequential_transforms(*transforms):
    def func(txt_input):
        for transform in transforms:
            txt_input = transform(txt_input)
        return txt_input
    return func

# Добавляет индексы начала и конца последовательности.
def tensor_transform(token_ids: List[int]):
    return torch.cat((torch.tensor([SOS_IDX]),
                      torch.tensor(token_ids),
                      torch.tensor([EOS_IDX])))

# Создаем текстовые трансформации.
text_transform = {}
for ln in [source_language, target_language]:
    text_transform[ln] = sequential_transforms(token_transform[ln], # Токенизируем.
                                               vocab_transform[ln], # Прогоняем через наш словарь.
                                               tensor_transform)    # Добавляем SOS/EOS и создаем тензор.


# Функция для соедниния последовательностей в батчи.
def collate_fn(batch):
    src_batch, tgt_batch = [], []
    for src_sample, tgt_sample in batch:
        src_batch.append(text_transform[source_language](src_sample.rstrip("\n")))
        tgt_batch.append(text_transform[target_language](tgt_sample.rstrip("\n")))

    src_batch = pad_sequence(src_batch, padding_value=PAD_IDX)
    tgt_batch = pad_sequence(tgt_batch, padding_value=PAD_IDX)
    return src_batch, tgt_batch

In [None]:
input_sequence = "I like cats and dogs"
text_transform['en'](input_sequence)

tensor([   2, 1166,  347, 2776,   11,  117,    3])

In [None]:
from torch.utils.data import DataLoader

# Функция для 1 эпохи обучения.
def train_epoch(model, optimizer):
    model.train()
    losses = 0
    train_iter = Multi30k(split='train', language_pair=(source_language, target_language))
    train_dataloader = DataLoader(train_iter, batch_size=batch_size, collate_fn=collate_fn)

    for src, tgt in train_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)
        tgt_input = tgt[:-1, :]
        tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        logits = model(src, tgt_input, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)
        logits = logits.reshape(-1, logits.shape[-1])
        optimizer.zero_grad()

        tgt_out = tgt[1:, :].reshape(-1) # Берем уже переведенную последовательность (y), кроме SOS-токена.

        loss = loss_fn(logits, tgt_out)
        loss.backward()

        optimizer.step()
        losses += loss.item()

    return losses / len(list(train_dataloader))

# Функция для измерения качества при обучении.
def evaluate(model):
    model.eval()
    losses = 0

    val_iter = Multi30k(split='valid', language_pair=(source_language, target_language))
    val_dataloader = DataLoader(val_iter, batch_size=batch_size, collate_fn=collate_fn)

    for src, tgt in val_dataloader:
        src = src.to(DEVICE)
        tgt = tgt.to(DEVICE)

        tgt_input = tgt[:-1, :]

        tgt_mask, src_padding_mask, tgt_padding_mask = create_mask(src, tgt_input)

        logits = model(src, tgt_input, tgt_mask,src_padding_mask, tgt_padding_mask, src_padding_mask)

        tgt_out = tgt[1:, :]
        loss = loss_fn(logits.reshape(-1, logits.shape[-1]), tgt_out.reshape(-1))
        losses += loss.item()

    return losses / len(list(val_dataloader))

In [None]:
# Функция для декодирования последовательности.
def greedy_decode(model, src, max_len, start_symbol):
    src = src.to(DEVICE)

    encoder_hidden_state = model.encode(src).to(DEVICE)
    translated_words = torch.ones(1, 1).fill_(start_symbol).type(torch.long).to(DEVICE)

    for i in range(max_len-1):
        out = model.decode(translated_words, encoder_hidden_state)
        prob = model.output_layer(out[-1, :]) # Берем предсказание для последнего слова.

        _, next_word = torch.max(prob, dim=1)
        next_word = torch.tensor([[next_word.item()]], device=DEVICE)
        translated_words = torch.cat([translated_words, next_word], dim=0)

        if next_word == EOS_IDX:
            break
    return translated_words

# Функция для теста.
def translate(model: torch.nn.Module, src_sentence: str):
    model.eval()
    src = text_transform[source_language](src_sentence).view(-1, 1)
    num_tokens = src.shape[0]
    src_mask = (torch.zeros(num_tokens, num_tokens)).type(torch.bool)
    tgt_tokens = greedy_decode(
        model,  src, max_len=num_tokens + 5, start_symbol=SOS_IDX).flatten()
    return " ".join(vocab_transform[target_language].lookup_tokens(list(tgt_tokens.cpu().numpy()))).replace("<sos>", "").replace("<eos>", "")

## Обучение

In [None]:
from timeit import default_timer as timer
NUM_EPOCHS = 18

for epoch in range(1, NUM_EPOCHS+1):
    sentence_translate = translate(transformer, 'Eine Gruppe von Menschen steht vor einem Iglu .')
    print(f"Перевод нашей модели: {sentence_translate}, Ожидаемый перевод: A group of people are standing in front of an igloo .")

    start_time = timer()
    train_loss = train_epoch(transformer, optimizer)
    end_time = timer()
    val_loss = evaluate(transformer)
    print("")
    print(f'======== Epoch {epoch} / {NUM_EPOCHS+1} ========')
    print((f"Train loss: {train_loss:.3f}, \nVal loss: {val_loss:.3f}, \nЗатраченное время = {(end_time - start_time):.3f}s"))
    print("")

Перевод нашей модели:  Russia cloth spoof Russia sewing Madrid Madrid Russia silhouetted Madrid Russia Madrid Madrid Russia cloth, Ожидаемый перевод: A group of people are standing in front of an igloo .





Train loss: 5.344, 
Val loss: 4.107, 
Затраченное время = 42.429s

Перевод нашей модели:  A group of people are playing in a red . , Ожидаемый перевод: A group of people are standing in front of an igloo .

Train loss: 3.760, 
Val loss: 3.309, 
Затраченное время = 44.630s

Перевод нашей модели:  A group of people are standing in front of a crowd . , Ожидаемый перевод: A group of people are standing in front of an igloo .

Train loss: 3.156, 
Val loss: 2.886, 
Затраченное время = 43.808s

Перевод нашей модели:  A group of people standing in front of a crowd . , Ожидаемый перевод: A group of people are standing in front of an igloo .

Train loss: 2.767, 
Val loss: 2.639, 
Затраченное время = 43.501s

Перевод нашей модели:  A group of people standing in front of a crowd . , Ожидаемый перевод: A group of people are standing in front of an igloo .

Train loss: 2.477, 
Val loss: 2.439, 
Затраченное время = 44.516s

Перевод нашей модели:  A group of people standing in front of a store . , Ож

In [None]:
print(translate(transformer, "Eine Gruppe von Menschen steht vor einem Iglu ."))

 A group of people standing in front of an igloo 


In [None]:
transformer

Seq2SeqTransformer(
  (transformer): Transformer(
    (encoder): TransformerEncoder(
      (layers): ModuleList(
        (0-2): 3 x TransformerEncoderLayer(
          (self_attn): MultiheadAttention(
            (out_proj): NonDynamicallyQuantizableLinear(in_features=512, out_features=512, bias=True)
          )
          (linear1): Linear(in_features=512, out_features=512, bias=True)
          (dropout): Dropout(p=0.1, inplace=False)
          (linear2): Linear(in_features=512, out_features=512, bias=True)
          (norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (norm2): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (dropout1): Dropout(p=0.1, inplace=False)
          (dropout2): Dropout(p=0.1, inplace=False)
        )
      )
      (norm): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
    )
    (decoder): TransformerDecoder(
      (layers): ModuleList(
        (0-2): 3 x TransformerDecoderLayer(
          (self_attn): MultiheadAttent