# Imports

In [1]:
#import keras
import os
import datasets
import numpy as np
import pandas as pd
import transformers
import sklearn.metrics
#import tensorflow as tf
import tqdm.notebook as tqdm
import sklearn.model_selection
import matplotlib.pyplot as plt

#pytorch

import torch
# from torcheval.metrics import MultilabelA
from torchmetrics.classification import Accuracy
#from torchsummary import summary
from torch import nn
from torch import optim
import torch.nn.functional as F

from sklearn.model_selection import train_test_split

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
MODEL_PATH = 'models'
MAX_TOKENS = 32
MAX_LENGTH = 32

# Dataset

Get the dataset from [here](https://tatoeba.org/en/downloads). Preferably use russian to english translations.

Use a custom tokenizer that can add bos and eos tokens (pass `add_special_tokens=True` when calling the tokenizer to add them).

In [3]:
class Tokenizer(transformers.GPT2Tokenizer):

    def build_inputs_with_special_tokens(self, token_ids_0, token_ids_1=None):
        if token_ids_1 is None:
            return [self.bos_token_id, *token_ids_0, self.eos_token_id]

        return [self.bos_token_id, *token_ids_0, self.bos_token_id, *token_ids_1, self.eos_token_id]

In [4]:
tokenizer = Tokenizer.from_pretrained('gpt2')
tokenizer.pad_token_id = tokenizer.eos_token_id

The tokenizer class you load from this checkpoint is not the same type as the class this function is called from. It may result in unexpected tokenization. 
The tokenizer class you load from this checkpoint is 'GPT2Tokenizer'. 
The class this function is called from is 'Tokenizer'.


Since the dataset is rather large, you can omit the validation dataset and just use a set of test sentences after the training.

Create a dataset that returns the following
* A pair of tensors `((None, L), (None, P))` -- input sequence of tokens and output sequence of tokens to be fed into decoder (this should start with the BOS token)
* A tensor `(None, P)` -- output sequence of tokens to be predicted (this should end with EOS token)
* A tensor `(None, P)` -- a masking tensor marking padded tokens with 0

In [5]:
def to_tokenize(data):
    return tokenizer(data, max_length=MAX_TOKENS, truncation=True, padding=True)['input_ids']

In [6]:
data = pd.read_csv('seq2seq_dataset.tsv', 
            sep='\t', 
            on_bad_lines='skip',
            names=['id_1', 'rus', 'id_2', 'eng'])[['rus', 'eng']]#тут скип, потомучта какое-то говно возникало

X, y = to_tokenize(data['rus'].to_list()), to_tokenize(data['eng'].to_list())
X_train, X_test, y_train, y_test= train_test_split(X, y, test_size = 0.7, shuffle=True)

In [7]:
def pad_collate(batch):
    (inp, out_BOS, out_EOS, masking_PAD) = zip(*batch)
    inp = torch.nn.utils.rnn.pad_sequence(inp, batch_first=True, padding_value=tokenizer.eos_token_id)
    out_BOS = torch.nn.utils.rnn.pad_sequence(out_BOS, batch_first=True, padding_value=tokenizer.eos_token_id)
    out_EOS = torch.nn.utils.rnn.pad_sequence(out_EOS, batch_first=True, padding_value=tokenizer.eos_token_id)
    masking_PAD = torch.nn.utils.rnn.pad_sequence(masking_PAD, batch_first=True, padding_value=tokenizer.eos_token_id)

    return inp, out_BOS, out_EOS, masking_PAD

def get_dataloader(batch, X, y):
    X = np.array(X) #Торч поругался, что я не могу сделать отрицательный степ
    inp = torch.LongTensor(X.copy()).to('cuda') #а потом он ещё поругался, что в массиве у меня отрицательный страйд и попросил .copy()
    out_BOS = torch.LongTensor(y).to('cuda')[:, :-1]
    out_EOS = torch.LongTensor(y).to('cuda')[:, 1:]
    masking_PAD = (out_BOS != tokenizer.pad_token_id).to('cuda')
    masking_PAD[:, 0] = True # я получаю маску по BOS [50625, ..., 30, 50625] -> [False, ..., True, False] -> [True, ..., True, False]
    data = torch.utils.data.TensorDataset(inp,
                                          out_BOS,
                                          out_EOS,
                                          masking_PAD)
    return torch.utils.data.DataLoader(data, batch_size=batch, shuffle=True, collate_fn=pad_collate)

In [8]:
data = get_dataloader(256, X_train, y_train)
data_test = get_dataloader(256, X_test, y_test)

# Model

Create a model for training. The model should have two inputs: input sequence `(None, L)` and output sequence`(None, P)`. The model output is a single tensor `(None, P)` logits (or probabilities) of the next token predicted for each input one.

In [9]:
def get_model(
    units: int,
    n_tokens: int,
    n_labels: int,
    n_stacks: int = 1,
    bidirectional: bool = False,
    name: str | None = None,
    cell_type: type[keras.layers.Layer] = keras.layers.LSTMCell
) -> keras.Model:
    '''Creates a model with RNN architecture for sequence to sequence classification.

    Arguments:
        units: dimensionality of RNN cells
        n_tokens: number of tokens in the tokenizer dictionary
        n_labels: number of labels to be predicted
        n_stacks: number of RNN cells in the stack (1 -- no stacking)
        bidirectional: whether or not the model is bidirectional
        name: the model name
        cell_type: type of a cell to use, either keras.layers.LSTMCell or keras.layers.GRUCell

    Returns:
        The model'''
    ...

NameError: name 'keras' is not defined

In [9]:
class EncoderRNN(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, bidirectional, num_layers):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn = cell_type(hidden_size, hidden_size, batch_first=True, bidirectional=bidirectional, num_layers=num_layers)

    def forward(self, input):
        embedded = self.embedding(input)
        output, hidden = self.rnn(embedded)
        return output, hidden

class AttnDecoderRNN(nn.Module):
    def __init__(self, cell_type, hidden_size, output_size, bidirectional, num_layers):
        super(AttnDecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.attention = nn.MultiheadAttention(hidden_size, num_heads=1, batch_first=True)
        self.rnn = cell_type(2 * hidden_size, hidden_size, batch_first=True, num_layers=num_layers, bidirectional=bidirectional)
        self.out = nn.Linear(2 * hidden_size if bidirectional else hidden_size, output_size)

    def forward(self, encoder_outputs, encoder_hidden, max_len, is_inference, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        decoder_input = torch.empty(batch_size, 1, dtype=torch.long, device=device).fill_(tokenizer.pad_token_id)
        decoder_hidden = encoder_hidden
        decoder_outputs = []

        for i in range(max_len-1):
            decoder_output, decoder_hidden = self.forward_step(
                decoder_input, decoder_hidden, encoder_outputs
            )
            decoder_outputs.append(decoder_output)

            if target_tensor is not None:
                #Teacher forcing
                decoder_input = target_tensor[:, i].unsqueeze(1)
            else:
                _, topi = decoder_output.topk(1)
                decoder_input = topi.squeeze(-1).detach()
            if is_inference and topi == tokenizer.pad_token_id:
                break

        decoder_outputs = torch.cat(decoder_outputs, dim=1)


        return decoder_outputs


    def forward_step(self, input, hidden, encoder_outputs):
        embedded =  self.embedding(input)
        query = hidden.permute(1, 0, 2)
        
        if query.shape[1] != 1:
            query = query.mean(dim=1, keepdim=True)
            encoder_outputs = encoder_outputs[..., :query.shape[-1]] + encoder_outputs[..., query.shape[-1]:]
        context, _ = self.attention(query, encoder_outputs, encoder_outputs)
        input_rnn = torch.cat((embedded, context[:, -1, :].unsqueeze(1)), dim=2)

        output, hidden = self.rnn(input_rnn, hidden)
        output = self.out(output)

        return output, hidden

class Seq2seq(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, device, bidirectional, num_layers):
        super(Seq2seq, self).__init__()
        self.encoder = EncoderRNN(cell_type, input_size, hidden_size, bidirectional, num_layers).to(device)
        self.decoder = AttnDecoderRNN(cell_type, hidden_size, input_size, bidirectional, num_layers).to(device)

    def forward(self, inp, out_BOS, max_len, teacher_forcing, is_inference=False):
        encoder_outputs, encoder_hidden = self.encoder(inp)
        decoder_outputs = self.decoder(encoder_outputs, encoder_hidden, max_len, is_inference, out_BOS if teacher_forcing else None)

        return decoder_outputs

In [9]:
class EncoderRNN(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, bidirectional, num_layers):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn = cell_type(hidden_size, hidden_size, batch_first=True, bidirectional=bidirectional, num_layers=num_layers)

    def forward(self, input):
        embedded = self.embedding(input)
        output, hidden = self.rnn(embedded)
        return output, hidden

class AttnDecoderRNN(nn.Module):
    def __init__(self, cell_type, hidden_size, output_size, bidirectional, num_layers):
        super(AttnDecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.attention = nn.MultiheadAttention(hidden_size, num_heads=1, batch_first=True)
        self.rnn = cell_type(2 * hidden_size, hidden_size, batch_first=True, num_layers=num_layers, bidirectional=bidirectional)
        self.out = nn.Linear(2 * hidden_size if bidirectional else hidden_size, output_size)

    def forward(self, encoder_outputs, encoder_hidden, max_len, is_inference, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        
        # Используем начальный токен BOS для всей последовательности
        if target_tensor is not None:
            decoder_input = target_tensor[:, :]  # сдвиг на один шаг для teacher forcing
        else:
            decoder_input = torch.full((batch_size, max_len), tokenizer.pad_token_id, dtype=torch.long, device=device)
            decoder_input[:, 0] = tokenizer.bos_token_id  # BOS для начала последовательности

        # Эмбеддинги для всех временных шагов
        embedded = self.embedding(decoder_input)
        
        # Повторяем attention для каждого временного шага
        # attention_outputs = []
        # for t in range(max_len - 1):
        #     query = encoder_hidden[-1].unsqueeze(1)  # используем последний скрытый слой в качестве query
        #     context, _ = self.attention(query, encoder_outputs, encoder_outputs)
        #     attention_outputs.append(context)

        # # Объединяем контексты для всех временных шагов
        # attention_outputs = torch.cat(attention_outputs, dim=1)
        
        # # Объединяем эмбеддинги с attention context для RNN
        # input_rnn = torch.cat((embedded, attention_outputs), dim=2)
        
        # Запускаем RNN и получаем выходные данные
        output, hidden = self.rnn(embedded, encoder_hidden)
        output = self.out(output)

        return output

class Seq2seq(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, device, bidirectional, num_layers):
        super(Seq2seq, self).__init__()
        self.encoder = EncoderRNN(cell_type, input_size, hidden_size, bidirectional, num_layers).to(device)
        self.decoder = AttnDecoderRNN(cell_type, hidden_size, input_size, bidirectional, num_layers).to(device)

    def forward(self, inp, out_BOS, max_len, teacher_forcing, is_inference=False):
        encoder_outputs, encoder_hidden = self.encoder(inp)
        decoder_outputs = self.decoder(encoder_outputs, encoder_hidden, max_len-1, is_inference, out_BOS if teacher_forcing else None)

        return decoder_outputs

In [19]:
class EncoderRNN(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, bidirectional, num_layers):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn = cell_type(hidden_size, hidden_size, batch_first=True, bidirectional=bidirectional, num_layers=num_layers)

    def forward(self, input):
        embedded = self.embedding(input)
        output, hidden = self.rnn(embedded)
        return output, hidden

class AttnDecoderRNN(nn.Module):
    def __init__(self, cell_type, hidden_size, output_size, bidirectional, num_layers):
        super(AttnDecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.attention = nn.MultiheadAttention(hidden_size, num_heads=1, batch_first=True)
        self.rnn = cell_type(2 * hidden_size, hidden_size, batch_first=True, num_layers=num_layers, bidirectional=bidirectional)
        self.out = nn.Linear(2 * hidden_size if bidirectional else hidden_size, output_size)

    def forward(self, encoder_outputs, encoder_hidden, max_len, is_inference, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        device = encoder_outputs.device
        
        # Если инференс, просто инициализируем последовательность с токеном BOS
        if is_inference:
            decoder_input = torch.full((batch_size, 1), tokenizer.bos_token_id, dtype=torch.long, device=device)
            outputs = []
    
            # Генерация всей последовательности токенов за один вызов
            for _ in range(max_len):
                embedded = self.embedding(decoder_input)
                attn_output, _ = self.attention(embedded, encoder_outputs, encoder_outputs)
                # Concatenate attention output with embedded input for the RNN
                rnn_input = torch.cat((embedded, attn_output), dim=2)
                output, encoder_hidden = self.rnn(rnn_input, encoder_hidden)
                output = self.out(output[:, -1, :])  # Предсказание для текущего шага
                outputs.append(output.unsqueeze(1))
    
                # Находим наиболее вероятный токен
                top1 = output.argmax(1).unsqueeze(1)
                decoder_input = top1 # Добавляем предсказанный токен к входу
    
            outputs = torch.cat(outputs, dim=1)
            return outputs
    
        else:
            # Если teacher forcing включен, используем target_tensor сдвинутый на один шаг
            decoder_input = torch.full((batch_size, max_len), tokenizer.pad_token_id, dtype=torch.long, device=device)
            decoder_input[:, 0] = tokenizer.bos_token_id  # BOS токен в начале последовательности
            
            if target_tensor is not None:
                decoder_input[:, 1:] = target_tensor[:, :-1]  # Сдвиг на один шаг
            
            embedded = self.embedding(decoder_input)
            attn_output, _ = self.attention(embedded, encoder_outputs, encoder_outputs)

            Concatenate attention output with embedded input for the RNN
            rnn_input = torch.cat((embedded, attn_output), dim=2)
            output, hidden = self.rnn(rnn_input, encoder_hidden)
            output = self.out(output)
            return output

class Seq2seq(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, device, bidirectional, num_layers):
        super(Seq2seq, self).__init__()
        self.encoder = EncoderRNN(cell_type, input_size, hidden_size, bidirectional, num_layers).to(device)
        self.decoder = AttnDecoderRNN(cell_type, hidden_size, input_size, bidirectional, num_layers).to(device)

    def forward(self, inp, out_BOS, max_len, teacher_forcing, is_inference=False):
        encoder_outputs, encoder_hidden = self.encoder(inp)
        decoder_outputs = self.decoder(encoder_outputs, encoder_hidden, max_len-1, is_inference, out_BOS if teacher_forcing else None)

        return decoder_outputs

In [9]:
class EncoderRNN(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, bidirectional, num_layers):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn = cell_type(hidden_size, hidden_size, batch_first=True, bidirectional=bidirectional, num_layers=num_layers)

    def forward(self, input):
        embedded = self.embedding(input)
        output, hidden = self.rnn(embedded)
        return output, hidden

class AttnDecoderRNN(nn.Module):
    def __init__(self, cell_type, hidden_size, output_size, bidirectional, num_layers):
        super(AttnDecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.attention = nn.MultiheadAttention(hidden_size, num_heads=1, batch_first=True)
        self.rnn = cell_type(2 * hidden_size, hidden_size, batch_first=True, num_layers=num_layers, bidirectional=bidirectional)
        self.out = nn.Linear(2 * hidden_size if bidirectional else hidden_size, output_size)

    def forward(self, encoder_outputs, encoder_hidden, max_len, is_inference, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        device = encoder_outputs.device
        
        # Если инференс, просто инициализируем последовательность с токеном BOS
        if is_inference:
            decoder_input = torch.full((batch_size, 1), tokenizer.bos_token_id, dtype=torch.long, device=device)
            outputs = []
    
            # Генерация всей последовательности токенов за один вызов
            for _ in range(max_len):
                output, encoder_hidden = self.forward_step(decoder_input, encoder_hidden, encoder_outputs)
                outputs.append(output.unsqueeze(1))
                decoder_input = output.argmax(dim=1).unsqueeze(1)
    
            outputs = torch.cat(outputs, dim=1)
            return outputs
    
        else:
            # Если teacher forcing включен, используем target_tensor сдвинутый на один шаг
            decoder_input = torch.full((batch_size, max_len), tokenizer.pad_token_id, dtype=torch.long, device=device)
            decoder_input[:, 0] = tokenizer.bos_token_id  # BOS токен в начале последовательности
            
            if target_tensor is not None:
                decoder_input[:, 1:] = target_tensor[:, :-1]  # Сдвиг на один шаг
            
            embedded = self.embedding(decoder_input)
            attn_output, _ = self.attention(embedded, encoder_outputs, encoder_outputs)

            #Concatenate attention output with embedded input for the RNN
            rnn_input = torch.cat((embedded, attn_output), dim=2)
            output, hidden = self.rnn(rnn_input, encoder_hidden)
            output = self.out(output)
            return output
            
    def forward_step(self, decoder_input, decoder_hidden, encoder_outputs):
        # Встраиваем входной токен
        embedded = self.embedding(decoder_input)
        
        # Рассчитываем внимание
        attn_output, _ = self.attention(embedded, encoder_outputs, encoder_outputs)
        
        # Объединяем контекстный вектор с вложением
        rnn_input = torch.cat((embedded, attn_output), dim=2)
        
        # Пропускаем через RNN
        rnn_output, decoder_hidden = self.rnn(rnn_input, decoder_hidden)
        output = self.out(rnn_output.squeeze(1))  # Предсказание для текущего шага
        return output, decoder_hidden

class Seq2seq(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, device, bidirectional, num_layers):
        super(Seq2seq, self).__init__()
        self.encoder = EncoderRNN(cell_type, input_size, hidden_size, bidirectional, num_layers).to(device)
        self.decoder = AttnDecoderRNN(cell_type, hidden_size, input_size, bidirectional, num_layers).to(device)

    def forward(self, inp, out_BOS, max_len, teacher_forcing, is_inference=False):
        encoder_outputs, encoder_hidden = self.encoder(inp)
        decoder_outputs = self.decoder(encoder_outputs, encoder_hidden, max_len-1, is_inference, out_BOS if teacher_forcing else None)

        return decoder_outputs

In [27]:
class EncoderRNN(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, bidirectional, num_layers):
        super(EncoderRNN, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.rnn = cell_type(hidden_size, hidden_size, batch_first=True, bidirectional=bidirectional, num_layers=num_layers)

    def forward(self, input):
        embedded = self.embedding(input)
        output, hidden = self.rnn(embedded)
        return output, hidden

class AttnDecoderRNN(nn.Module):
    def __init__(self, cell_type, hidden_size, output_size, bidirectional, num_layers):
        super(AttnDecoderRNN, self).__init__()
        self.embedding = nn.Embedding(output_size, hidden_size)
        self.attention = nn.MultiheadAttention(hidden_size, num_heads=1, batch_first=True)
        self.rnn = cell_type(2 * hidden_size, hidden_size, batch_first=True, num_layers=num_layers, bidirectional=bidirectional)
        self.out = nn.Linear(2 * hidden_size if bidirectional else hidden_size, output_size)

    def forward(self, encoder_outputs, encoder_hidden, max_len, is_inference, target_tensor=None):
        batch_size = encoder_outputs.size(0)
        device = encoder_outputs.device
        
        # Если инференс, просто инициализируем последовательность с токеном BOS
        if is_inference:
            decoder_input = torch.full((batch_size, 1), tokenizer.bos_token_id, dtype=torch.long, device=device)
            outputs = []
    
            # Генерация всей последовательности токенов за один вызов
            for _ in range(max_len):
                output, encoder_hidden = self.forward_step(decoder_input, encoder_hidden, encoder_outputs)
                outputs.append(output.unsqueeze(1))
                decoder_input = output.argmax(dim=1).unsqueeze(1)
    
            outputs = torch.cat(outputs, dim=1)
            return outputs
    
        else:
            # Если teacher forcing включен, используем target_tensor сдвинутый на один шаг
            decoder_input = torch.full((batch_size, max_len), tokenizer.pad_token_id, dtype=torch.long, device=device)
            decoder_input[:, 0] = tokenizer.bos_token_id  # BOS токен в начале последовательности
            
            if target_tensor is not None:
                decoder_input[:, 1:] = target_tensor[:, :-1]  # Сдвиг на один шаг
            
            embedded = self.embedding(decoder_input)
            attn_output, _ = self.attention(embedded, encoder_outputs, encoder_outputs)

            #Concatenate attention output with embedded input for the RNN
            rnn_input = torch.cat((embedded, attn_output), dim=2)
            output, hidden = self.rnn(rnn_input, encoder_hidden)
            output = self.out(output)
            return output
            
    def forward_step(self, decoder_input, decoder_hidden, encoder_outputs):
        # Встраиваем входной токен
        embedded = self.embedding(decoder_input)
        
        # Рассчитываем внимание
        query = decoder_hidden.permute(1, 0, 2)
        attn_output, _ = self.attention(query, encoder_outputs, encoder_outputs)
        
        # Объединяем контекстный вектор с вложением
        rnn_input = torch.cat((embedded, attn_output), dim=2)
        
        # Пропускаем через RNN
        rnn_output, decoder_hidden = self.rnn(rnn_input, decoder_hidden)
        output = self.out(rnn_output.squeeze(1))  # Предсказание для текущего шага
        return output, decoder_hidden

class Seq2seq(nn.Module):
    def __init__(self, cell_type, input_size, hidden_size, device, bidirectional, num_layers):
        super(Seq2seq, self).__init__()
        self.encoder = EncoderRNN(cell_type, input_size, hidden_size, bidirectional, num_layers).to(device)
        self.decoder = AttnDecoderRNN(cell_type, hidden_size, input_size, bidirectional, num_layers).to(device)

    def forward(self, inp, out_BOS, max_len, teacher_forcing, is_inference=False):
        encoder_outputs, encoder_hidden = self.encoder(inp)
        decoder_outputs = self.decoder(encoder_outputs, encoder_hidden, max_len-1, is_inference, out_BOS if teacher_forcing else None)

        return decoder_outputs

Try to add attention to your model (for example [additive attention](https://keras.io/api/layers/attention_layers/additive_attention/)), does it perform better?

In [16]:
# hidden_size = 128
# #batch_size = 256
# device = 'cuda'

# model = Seq2seq(nn.GRU, tokenizer.vocab_size, hidden_size, device, True, 2)

# train(data, model, 5, print_every=1, plot_every=1, end_teacher_forcing=30)

# Training

Train your model using teacher forcing. The idea is that the model predicts the next token that should follow, so one part of the model (called encoder) reads the text and output some state containing information about the text read. The other part of the model (called decoder) reads and already generated text (or in case of the teacher forcing the expected output) and predicts the next token for each one. 

In [17]:
import time
import math

def asMinutes(s):
    m = math.floor(s / 60)
    s -= m * 60
    return '%dm %ds' % (m, s)

def timeSince(since, percent):
    now = time.time()
    s = now - since
    es = s / (percent)
    rs = es - s
    return '%s (- %s)' % (asMinutes(s), asMinutes(rs))

import matplotlib.pyplot as plt
plt.switch_backend('agg')
import matplotlib.ticker as ticker
import numpy as np

def showPlot(points):
    plt.figure()
    fig, ax = plt.subplots()
    # this locator puts ticks at regular intervals
    loc = ticker.MultipleLocator(base=0.2)
    ax.yaxis.set_major_locator(loc)
    plt.plot(points)

In [18]:
def train_epoch(dataloader, model, optimizer, criterion, teacher_forcing):

    total_loss = 0
    total_metric = 0
    for data in dataloader:
        inp, out_BOS, out_EOS, masking_PAD = data

        optimizer.zero_grad()

        decoder_outputs = model(inp, out_BOS, MAX_LENGTH, teacher_forcing)
        
        loss = criterion(
            decoder_outputs[masking_PAD],
            out_EOS[masking_PAD]
        )
        loss.backward()

        optimizer.step()
        decoder_outputs = decoder_outputs.argmax(-1)
        total_metric += (decoder_outputs[masking_PAD].detach().cpu().numpy() == out_EOS[masking_PAD].detach().cpu().numpy()).mean()
        total_loss += loss.item()

    return total_loss / len(dataloader), total_metric / len(dataloader)

In [19]:
def train(train_dataloader, model, n_epochs, learning_rate=0.001, eval_every=1,
               print_every=100, plot_every=100, end_teacher_forcing=1):
    start = time.time()
    plot_losses = []
    print_loss_total = 0
    plot_loss_total = 0

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.CrossEntropyLoss()
    
    teacher_forcing = True
    for epoch in range(1, n_epochs):
        if epoch == end_teacher_forcing:
            teacher_forcing = False
        print('='*40)
        print(f'Epoch №{epoch}')
        loss, metric = train_epoch(train_dataloader, model, optimizer, criterion, teacher_forcing)
        
        print_loss_total += loss
        plot_loss_total += loss
        if epoch % eval_every == 0:
            loss_test, metric_test = evaluate(data_test, model, criterion)
            print(f'EVAL: loss={loss_test:.2f} metric={metric_test:.2f}')
        
        if epoch % print_every == 0:
            print_loss_avg = print_loss_total / print_every
            print_loss_total = 0
            print(f"train metric: {metric}")
            print('%s (%d %d%%) train loss: %.4f' % (timeSince(start, epoch / n_epochs),
                                        epoch, epoch / n_epochs * 100, print_loss_avg))

        if epoch % plot_every == 0:
            plot_loss_avg = plot_loss_total / plot_every
            plot_losses.append(plot_loss_avg)
            plot_loss_total = 0
        

    showPlot(plot_losses)

In [20]:
def evaluate(dataloader, model, criterion):
    
    with torch.no_grad():
        total_metric = 0
        total_loss = 0
        for data in dataloader:
            inp, out_BOS, out_EOS, masking_PAD = data

            decoder_outputs = model(inp, out_BOS, MAX_LENGTH, None, True)
            loss = criterion(
                decoder_outputs[masking_PAD],
                out_EOS[masking_PAD]
            )
            total_loss += loss.item()
            decoder_outputs = decoder_outputs.argmax(-1)
            total_metric += (decoder_outputs[masking_PAD].detach().cpu().numpy() == out_EOS[masking_PAD].detach().cpu().numpy()).mean()
            
    return total_loss / len(dataloader), total_metric / len(dataloader)


In [31]:
hidden_size = 128
device = 'cuda'

model = Seq2seq(nn.GRU, tokenizer.vocab_size, hidden_size, device, False, 1)

train(data, model, 11, eval_every=1, print_every=1, plot_every=1, end_teacher_forcing=40)

Epoch №1
EVAL: loss=5.74 metric=0.09
train metric: 0.23118498125924067
5m 35s (- 55m 56s) (1 9%) train loss: 4.9870
Epoch №2
EVAL: loss=5.77 metric=0.10
train metric: 0.36949854132393617
11m 13s (- 50m 31s) (2 18%) train loss: 3.9363
Epoch №3
EVAL: loss=5.79 metric=0.11
train metric: 0.42500288733471187
16m 51s (- 44m 56s) (3 27%) train loss: 3.4733
Epoch №4
EVAL: loss=5.83 metric=0.10
train metric: 0.4617090400788305
22m 30s (- 39m 23s) (4 36%) train loss: 3.1656
Epoch №5


KeyboardInterrupt: 

In [14]:
hidden_size = 128
#batch_size = 256
device = 'cuda'

model = Seq2seq(nn.GRU, tokenizer.vocab_size, hidden_size, device, False, 1)

train(data, model, 30, print_every=1, plot_every=1, end_teacher_forcing=30)

ok
EVAL: loss=5.45 metric=0.14
train metric: 0.24856439097195143
9m 19s (- 270m 39s) (1 3%) train loss: 4.9509
ok
EVAL: loss=5.25 metric=0.17
train metric: 0.3787704378143097
18m 38s (- 261m 3s) (2 6%) train loss: 3.9018
ok
EVAL: loss=5.24 metric=0.18
train metric: 0.43367459468838837
27m 57s (- 251m 35s) (3 10%) train loss: 3.4365
ok
EVAL: loss=5.13 metric=0.19
train metric: 0.4663576027297193
37m 15s (- 242m 12s) (4 13%) train loss: 3.1429
ok
EVAL: loss=5.10 metric=0.19
train metric: 0.4900443761178413
46m 34s (- 232m 51s) (5 16%) train loss: 2.9278
ok
EVAL: loss=5.05 metric=0.19
train metric: 0.5072819624984329
55m 52s (- 223m 31s) (6 20%) train loss: 2.7629
ok
EVAL: loss=4.98 metric=0.20
train metric: 0.5211834606161164
65m 11s (- 214m 12s) (7 23%) train loss: 2.6321
ok
EVAL: loss=5.00 metric=0.20
train metric: 0.5330060863661616
74m 30s (- 204m 52s) (8 26%) train loss: 2.5252
ok
EVAL: loss=5.01 metric=0.20
train metric: 0.5426380785171968
83m 48s (- 195m 34s) (9 30%) train loss: 2

In [28]:
torch.save(model.state_dict(), os.path.join(MODEL_PATH, 'seq2seq'+'.pth'))

# Testing

Make a function for text translation. Translate some text and evaluate model performance.

Take note that your model is set for training. During the inference process you will have to use parts of the model independently (including the RNN cells).

In [30]:
def translate(
    text: str,
    tokenizer: Tokenizer,
    model: nn.Module,
    max_len: int = 20
) -> str:
    '''Predicts `text`translation using the `model`.

    Arguments:
        text: text to be translated
        tokenizer: tokenizer to use
        model: model ot use
        max_len: maximum length of the prediction (in tokens)

    Returns:
        tranlated text'''
    ...
    text = tokenizer(text, return_tensors='pt')['input_ids'].to(device)
    tokens_translated = model(text, None, max_len, None, is_inference=True).argmax(dim=-1).squeeze()
    return tokenizer.decode(tokens_translated)

In [31]:
corpus = ['Доброе утро',
          'Привет мир!', 
          'Какой чудесный день!',
          'Кот сидит на столе.',
          'Пицца это вкусно.', 
          'Принц был болен.']

for document in corpus:
    print(translate(document, tokenizer, model, max_len=30))

Good is early. is a.. it to. is old<|endoftext|>
Hi! is dead! are dead<|endoftext|>
What the is day?'s?'s a?'s a!'s a!'s a!'s a!'s a!'s a!'s
The cat sitting sitting the on table<|endoftext|>
The is this is. tastes.'s!'s a.. you't<|endoftext|>
The was was hospital a..<|endoftext|>
