# Transformers

In this homework, you need to generate text with a neural network.

In [1]:
import torch
from torch import nn

import numpy as np
import time

## Task 1 (0.5 points)
We will train a language model to predict the next letter. Such language models are used in speech recognition, as they provide additional information to the acoustic model when the next character is selected. To get started, open the data, check what characters are included in the texts, how many there are. Remove all newline characters and tabs from the text.

In [None]:
path = 'small_corp_for_test.txt'
file = open(path, 'r')
data = file.readlines()
file.close()
len(data)

In [None]:
# YOUR CODE HERE

################

## Task 2 (0.5 points)
To train the model, you have to change the text to a form suitable for the neural network. It is also important to note that we need to add two tokens (start and end), which are responsible for the beginning and end of the text. Use [ and ] for this task. We also need a pad token to fill the text with it to the required length to form a batch.

Implement the preprocess method of the Preprocessor class. As input it takes text and the length of text that we expect to receive as output. The text must be converted to lower case, the required number of pad tokens are added to the end of the text, then the text is vectorized (each character has its own number). You need to return two vectors: the result obtained without the last token (we will train on it) and the result obtained without the first token (target during training).

In [None]:
class Preprocessor:
    def __init__(self):
        self.alphabet = '_добсркгаупитнезчм яжлйвцыэь-шхющёъ][ '
        self.token2ind = {}
        self.ind2token = {}
        for i in range(len(self.alphabet)):
            self.token2ind[self.alphabet[i]] = i
            self.ind2token[i] = self.alphabet[i]
        
    
    def preprocess(self, text, window_size):
        # YOUR CODE HERE
        pass
        #################

## Task 3 (0.5 points)
Since we decided that the text will begin with the token [ and end with the token ], the data needs to be corrected. Implement this idea, add these tokens to your texts.

In [None]:
# YOUR CODE HERE

################

## Task 4 (0.5 points)
Let's limit the maximum text length. You can change this threshold and thereby reduce the number of texts in your sample and increase the learning rate. Let's start with 128.
Select a threshold and leave only those texts whose length does not exceed this threshold.

Next, split the texts into train and test, mix the texts when splitting, the size of the test sample should be 15% of the total number of texts.

In [None]:
THRESHOLD = 128

# YOUR CODE HERE

################

## Task 5 (1.5 points)
Let's write a dataset. The input to the dataset is a set of texts, an object of the Preprocessor class, and the window size that you selected in the previous task.
Implement the __len__ and __getitem__ methods.

In [None]:
class TextDataset(torch.utils.data.Dataset):
    
    def __init__(self, x, preproc, win_size = 128):
        # YOUR CODE HERE
        pass
        ################
    
    def __len__(self):
        # YOUR CODE HERE
        pass
        ################
    
    def __getitem__(self, idx):
        # YOUR CODE HERE
        pass
        ################

In [None]:
preproc = Preprocessor()
train_dataset = TextDataset(data_train, preproc)
test_dataset = TextDataset(data_test, preproc)



## Task 6 (1.5 points)
Let's write a model. The class for implementing positional encoding is implemented for you, it is needed so that the model (after receiving embeddings) can understand, in which place which token is located.

Fill in the blanks in the model class. Choose the hyperparameters of the model. It is recommended to use no more than 6 layers in the transformer. In the decoder, use two linear layers with a ReLU activation function in between.

In [None]:
class PositionalEncoding(nn.Module):

    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)

In [None]:
class LanguageModel(nn.Module):
    def __init__(self, vocab_size):
        super(LanguageModel, self).__init__()
        self.emb = ...
        self.pe = ...
        self.transformer_encoder_layer = ...
        self.transformer_encoder = ...
        self.decoder = ...
    
    def forward(self, x, src_mask):
        x = ... # emb, then pe
        x = x.transpose(1, 0)
        x = ... # transformer encoder with mask
        x = ... # decoder
        return x.transpose(1, 0)
    
    def generate_square_subsequent_mask(self, sz):
        
        mask = (torch.triu(torch.ones(sz, sz)) == 1).transpose(0, 1)
        mask = mask.float().masked_fill(mask == 0, float('-inf')).masked_fill(mask == 1, float(0.0))
        return mask

In [None]:
model = LanguageModel(len('_добсркгаупитнезчм яжлйвцыэь-шхющёъ][ '))

## Task 7 (2 points)
Let's implement a class to train the model and validate it. Follow the directions in the code and fill in the missing pieces in the code.

In [None]:
class Trainer:
    
    def __init__(self, model, train_dataset, test_dataset):
        
        self.model = model
        
        self.train_batch_size = 64
        self.test_batch_size = 64
        
        self.train_dataloader = ...
        self.test_dataloader = ...
        self.train_dataloader_size = ...
        self.test_dataloader_size = ...
        
        self.device = 'cuda:0'
        self.criterion = ... # use CrossEntrophyLoss, pass as a parameter
                             # ignore the index of the _ character so that the model is not penalized for the character after the closing token

        
        self.optimizer = ...
        
        self.steps_to_print = 1000
        
    def train_one_epoch(self, epoch_number):
        step = 0
        counted_loss = 0
        current_time = time.time()
        it = 0
        
        for batch in self.train_dataloader:
            x, y = batch
            # YOUR CODE HERE
            
            # implement model training steps

            # store the loss value in counted_loss variable

            ################
            
            
            if step%self.steps_to_print == 0:
                result = 'Train epoch '+str(epoch_number)+' | '
                result += 'Step '+str(step)+'/'+str(self.train_dataloader_size)+' | '
                result += 'Counted loss '+str(counted_loss)+' | '
                result += 'ppl '+str(math.exp(counted_loss/it))+' | '
                result += 'time '+str(time.time() - current_time) + ' | '
                print(result)
                current_time = time.time()
                counted_loss = 0
                it = 0
    
    def validate_one_epoch(self, epoch_number):
        step = 0
        counted_loss = 0
        current_time = time.time()
        it = 0
        for batch in self.test_dataloader:
            x, y = batch
            
            # YOUR CODE HERE


            # implement steps for test
            # remember that this method is already run from the block with torch.no_grad(), so you don't need to reuse it
            
            ################
            
            if step%(self.steps_to_print//2) == 0:
                result = 'Validate epoch '+str(epoch_number)+' | '
                result += 'Step '+str(step)+'/'+str(self.test_dataloader_size)+' | '
                result += 'Counted loss '+str(counted_loss)+' | '
                result += 'ppl '+str(math.exp(counted_loss/it))+' | '
                result += 'time '+str(time.time() - current_time) + ' | '
                print(result)
                current_time = time.time()
                counted_loss = 0
                it = 0
        
    def train(self, number_of_epochs):
        model.to(self.device)
        for epoch in range(1, number_of_epochs+1):
            model.train()
            self.train_one_epoch(epoch)
            with torch.no_grad():
                model.eval()
                self.validate_one_epoch(epoch)
            print()

## Task 8 (0.5 points)
Run training on multiple epochs. Focus on your computing power and work time. You can always calculate how many seconds it takes for one batch.

In [None]:
# YOUR CODE HERE
###############

## Task 9 (1 point)
Let's try to generate text with our model. Finish the text generation function. Try to generate some text. Remember that if you want to generate text from scratch, you must pass only the start token as text.
Stop generating text if the model gives you an end token or if the text length is greater than 150.

In [None]:
def generate_text(text):
    x = []
    
    for letter in text:
        x.append(preproc.token2ind[letter])
    x = torch.from_numpy(np.array(x))
    
    pred = ...
    ind = ... 
    
    text += ... 
    
    if ...:
        return text
    else:
        return generate_text(text)