# Семинар 9: Character-Level LSTM

## Вступление
На прошлом занятии мы познакомились с тем, как можно векторизовать текстовые данные для решения задач обработки текстов. Сегодня мы продолжим заниматься текстами и посмотрим на простейший пример автоматической генерации текстов при помощи Recurrent Neural Network (RNN).

Полезные материалы по RNN можно почитать [здесь]((http://karpathy.github.io/2015/05/21/rnn-effectiveness/)), а реализацию на PyTorch — [здесь](https://github.com/karpathy/char-rnn).

### План семинара
1. Подготовка данных
2. Имплементация модели
3. Обучение модели
4. Применение модели

In [1]:
from typing import Iterable, Tuple

import numpy as np
import pytorch_lightning as pl
import torch
from torch import nn

## 1. Подготовка данных

### Загрузим текст "Анны Карениной"

In [2]:
with open("anna.txt", "r") as f:
    text = f.read()

text[:100]

'Chapter 1\n\n\nHappy families are all alike; every unhappy family is unhappy in its own\nway.\n\nEverythin'

### Токенизируем текст

Аналогично предыдущему семинару, в ячейках ниже создадим два **словаря** для преобразования символов в целые числа и обратно.

In [3]:
unique_chars = tuple(set(text))
int2char = dict(enumerate(unique_chars))
char2int = {ch: ii for ii, ch in int2char.items()}

# encode the text
encoded = torch.tensor([char2int[ch] for ch in text])
encoded[:100]

tensor([72, 60, 22, 33, 74, 51, 53,  0, 11,  3,  3,  3, 50, 22, 33, 33, 58,  0,
        41, 22,  8, 64, 34, 64, 51, 21,  0, 22, 53, 51,  0, 22, 34, 34,  0, 22,
        34, 64, 67, 51, 31,  0, 51, 61, 51, 53, 58,  0, 80, 77, 60, 22, 33, 33,
        58,  0, 41, 22,  8, 64, 34, 58,  0, 64, 21,  0, 80, 77, 60, 22, 33, 33,
        58,  0, 64, 77,  0, 64, 74, 21,  0, 54, 29, 77,  3, 29, 22, 58, 40,  3,
         3, 79, 61, 51, 53, 58, 74, 60, 64, 77])

Посмотрим на схему char-RNN:
<img src="https://github.com/udacity/deep-learning-v2-pytorch/blob/master/recurrent-neural-networks/char-rnn/assets/charseq.jpeg?raw=1" width="30%">

Сеть ожидает **one-hot encoded** входа, что означает, что каждый символ преобразуется в целое число (через созданный маппинг), а затем преобразуется в вектор-столбец, где только соответствующий ему целочисленный индекс будет иметь значение 1, а остальная часть вектора будет заполнена нулями. Давайте напишем функцию для этого преобразования.

#### Задание: допишите функцию one-hot кодирования последовательности

In [4]:
def one_hot_encode(int_words: torch.Tensor, n_labels: int) -> torch.Tensor:
    """
    Creates one-hot representation matrix for a given batch of integer sequences
    :param int_words: tensor of ints, which represents current sequence; shape: [batch_size, seq_len]
    :param n_labels: vocabulary size (number of unique tokens in data)
    :return: one-hot representation of the input tensor; shape: [batch_size, seq_len, n_labels]
    """
    words_one_hot = torch.zeros((int_words.numel(), n_labels), dtype=torch.float32)
    words_one_hot[torch.arange(words_one_hot.shape[0]), int_words.flatten()] = 1.
    words_one_hot = words_one_hot.reshape((*int_words.shape, n_labels))

    return words_one_hot

In [5]:
# testing the function
test_seq = torch.tensor([[3, 5, 1], [0, 2, 4]])
test_one_hot = one_hot_encode(test_seq, 8)

print(test_one_hot)

tensor([[[0., 0., 0., 1., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 1., 0., 0.],
         [0., 1., 0., 0., 0., 0., 0., 0.]],

        [[1., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 1., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 1., 0., 0., 0.]]])


### Сформируем батчи
На простом примере батчи будут выглядеть так: мы возьмем закодированные символы и разделим их на несколько последовательностей, заданных параметром `batch_size`. Каждая из наших последовательностей будет иметь длину `seq_length`.

<img src="https://github.com/udacity/deep-learning-v2-pytorch/blob/master/recurrent-neural-networks/char-rnn/assets/sequence_batching@1x.png?raw=1" width=500px>

**1. Отбросим часть текста, чтобы у нас были только полные батчи**

Каждый батч содержит $N \times M$ символов, где $N$ — это количество последовательностей в батче (`batch_size`), а $M$ — длина каждой последовательности (`seq_length`). Затем, чтобы получить общее количество батчей $K$, которое мы можем сделать из последовательности, нужно разделить длину последовательности на количество символов в батче. Когда мы узнаем количество батчей, можно получить общее количество символов, которые нужно сохранить, из последовательности: $N \times M \times K$.

**2. Разделим текст на $N$ частей**

Этот шаг нужен, чтобы мы могли проходить по тексту окном размера `[batch_size, seq_len]`. Его можно реализовать при помощи простого `reshape`.

**3. Теперь, когда у нас готова матрица текста, мы можем двигаться по ней окном, чтобы получить батчи**

Из каждой позиции окна сформируем обучающие пары `(x, y)` следующим образом: $x$ — это все элементы окна кроме последнего столбца, а $y$ — это все элементы окна кроме первого столбца. Тем самым для каждого токена исходного текста мы будем предсказывать следующий за ним токен.

#### Задание: допишите функцию генерации батчей

In [6]:
def get_batches(int_words: torch.Tensor, batch_size: int, seq_length: int) -> Iterable[torch.Tensor]:
    """
    Generates batches from encoded sequence.
    :param int_words: array of ints, which represents the text; shape: [batch_size, -1]
    :param batch_size: number of sequences per batch
    :param seq_length: number of encoded chars in a sequence
    :return: generator of pairs (x, y); x_shape, y_shape: [batch_size, seq_length - 1]
    """
    # 1. Truncate text, so there are only full batches
    window_size = seq_length + 1
    batch_size_total = batch_size * window_size
    n_batches = len(int_words) // batch_size_total
    int_words = int_words[:n_batches * batch_size_total]

    # 2. Reshape into batch_size rows
    int_words = int_words.reshape((batch_size, -1))

    # 3. Iterate through the text matrix
    for position in range(0, int_words.shape[1], window_size):
        x = int_words[:, position:position + window_size - 1]
        y = int_words[:, position + 1:position + window_size]
        yield x, y

In [7]:
# testing the function
test_batches = get_batches(encoded, 8, 50)
test_x, test_y = next(test_batches)
assert test_x.shape == test_y.shape
print(f"x:\n{test_x[:10, :10]}\n")
print(f"y:\n{test_y[:10, :10]}")

x:
tensor([[72, 60, 22, 33, 74, 51, 53,  0, 11,  3],
        [33, 33, 51,  7,  0, 22, 77,  7,  0, 21],
        [74, 60, 22, 74,  0, 64, 77, 21, 54, 34],
        [51, 29,  0, 74, 60, 54, 80, 17, 60, 74],
        [51, 53, 70,  0, 60, 51,  0, 60, 22,  7],
        [53, 70,  0, 29, 60, 54,  0, 29, 22, 21],
        [21, 74,  0, 16, 51,  0, 57, 54, 61, 51],
        [70,  0, 45, 34, 51, 20, 51, 58,  0, 45]])

y:
tensor([[60, 22, 33, 74, 51, 53,  0, 11,  3,  3],
        [33, 51,  7,  0, 22, 77,  7,  0, 21, 60],
        [60, 22, 74,  0, 64, 77, 21, 54, 34, 80],
        [29,  0, 74, 60, 54, 80, 17, 60, 74, 41],
        [53, 70,  0, 60, 51,  0, 60, 22,  7,  0],
        [70,  0, 29, 60, 54,  0, 29, 22, 21,  0],
        [74,  0, 16, 51,  0, 57, 54, 61, 51, 53],
        [ 0, 45, 34, 51, 20, 51, 58,  0, 45, 34]])


## 2. Имплементация модели

<img src="https://github.com/udacity/deep-learning-v2-pytorch/blob/master/recurrent-neural-networks/char-rnn/assets/charRNN.png?raw=1" width=50%>

### Структура модели

* Создаём и храним необходимые словари.
* Определяем слой [LSTM]((https://pytorch.org/docs/stable/generated/torch.nn.LSTM.html#torch.nn.LSTM)) с помощью инстанса класса `torch.nn.LSTM`, который принимает набор параметров: `input_size` — длина последовательности в батче; `n_hidden` — размер скрытых слоёв; `n_layers` — количество слоёв; `drop_prob` — вероятность дропаута; и `batch_first` — флаг, указывающий на то, что у входных последовательностей размерность батча идёт вдоль нулевой оси.
* Определяем слой Dropout с таким же значением `drop_prob`.
* Определяем полносвязный слой с набором параметров: размерность ввода — `n_hidden`; размерность выхода — размер словаря.
* Наконец, инициализируем веса и начальное скрытое состояние (`self.init_hidden()`).

In [8]:
class CharRNN(nn.Module):
    def __init__(
        self,
        unique_tokens: Tuple[str],
        n_hidden: int = 256,
        n_layers: int = 2,
        drop_prob: float = 0.5,
    ) -> None:
        super().__init__()
        self.n_hidden = n_hidden
        self.n_layers = n_layers
        self.drop_prob = drop_prob

        # create mappings
        self.unique_tokens = unique_tokens
        self.int2char = dict(enumerate(self.unique_tokens))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        
        ## define the LSTM, dropout and fully connected layers
        self.lstm = nn.LSTM(len(self.unique_tokens), n_hidden, n_layers, dropout=drop_prob, batch_first=True)
        self.dropout = nn.Dropout(drop_prob)
        self.fc = nn.Linear(n_hidden, len(self.unique_tokens))

    def forward(self, x: torch.Tensor, hidden: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
        r_output, hidden = self.lstm(x, hidden)
        out = self.dropout(r_output)
        # Stack up LSTM outputs using view. You may need to use contiguous to reshape the output.
        out = out.contiguous().view(-1, self.n_hidden)
        ## Get the output for classification.
        out = self.fc(out)
        return out, hidden

    def init_hidden(self, batch_size: int, weight_device: torch.device) -> Tuple[torch.Tensor]:
        """
        Creates two new zero tensors for hidden state and cell state of LSTM
        :param batch_size: number of sequences per batch
        :param weight_device: torch.device("cuda") for GPU init or torch.device("cpu") for CPU init
        :return: tuple of two tensors of shape [n_layers x batch_size x n_hidden]
        """
        weight = next(self.parameters()).data
        hidden = (weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(weight_device),
                  weight.new(self.n_layers, batch_size, self.n_hidden).zero_().to(weight_device))
        return hidden

## 3. Обучение модели

По классике, используем оптимизатор Adam и кросс-энтропию. Но без пары особенностей не обойтись:
* Во время цикла будем отделять скрытое состояние от его истории, потому что скрытое состояние LSTM является кортежем скрытых состояний.
* Будем использовать gradient clipping, чтобы избавиться от взрывающихся градиентов.

In [9]:
class AnnaData(torch.utils.data.IterableDataset):
    def __init__(self, int_words: torch.Tensor, batch_size: int, seq_length: int):
        self.int_words = int_words
        self.batch_size = batch_size
        self.seq_length = seq_length

    def __iter__(self):
        return get_batches(self.int_words, self.batch_size, self.seq_length)

In [10]:
class CharRNNModule(pl.LightningModule):
    def __init__(
            self,
            unique_tokens: Tuple[str],
            n_hidden: int = 1024,
            n_layers: int = 2,
            drop_prob: float = 0.5,
            batch_size: int = 128,
            seq_length = 256,
            lr: float = 0.001
    ) -> None:
        super().__init__()
        self.model = CharRNN(unique_tokens, n_hidden, n_layers, drop_prob)
        self.hidden = self.model.init_hidden(batch_size, self.device)
        self.loss = nn.CrossEntropyLoss()
        self.optimizer = torch.optim.Adam(self.model.parameters(), lr=lr)
        self.n_chars = len(unique_tokens)
        self.batch_size = batch_size
        self.seq_length = seq_length

    def training_step(self, train_batch):
        x, y = train_batch
        x, y = x.squeeze(0), y.squeeze(0)
        x = one_hot_encode(x, self.n_chars)

        h = tuple([each.data for each in self.hidden])
        output, h = self.model(x, h)
        loss = self.loss(output, y.reshape(self.batch_size * self.seq_length).long())

        self.log("train_loss", loss, prog_bar=True)
        return loss

    def configure_optimizers(self):
        return self.optimizer

In [11]:
train_dataset = AnnaData(encoded, batch_size=128, seq_length=256)
train_dataloader = torch.utils.data.DataLoader(
    train_dataset,
    batch_size=1,  # batching is already implemented on our side
    shuffle=False,
)
char_rnn = CharRNNModule(unique_chars, n_hidden=32)
trainer = pl.Trainer(accelerator="cpu", max_epochs=15)
trainer.fit(char_rnn, train_dataloaders=train_dataloader)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs
HPU available: False, using: 0 HPUs

  | Name  | Type             | Params
-------------------------------------------
0 | model | CharRNN          | 26.2 K
1 | loss  | CrossEntropyLoss | 0     
-------------------------------------------
26.2 K    Trainable params
0         Non-trainable params
26.2 K    Total params
0.105     Total estimated model params size (MB)
  rank_zero_warn(


Training: 0it [00:00, ?it/s]

`Trainer.fit` stopped: `max_epochs=15` reached.


## 4. Применение модели

Сперва сохраним обученную модель, чтобы можно было загрузить её позже. В следующей ячейке сохраняются параметры, необходимые для создания той же архитектуры, гиперпараметры скрытого слоя и токены.

In [12]:
net = char_rnn.model
checkpoint = {"n_hidden": net.n_hidden,
              "n_layers": net.n_layers,
              "state_dict": net.state_dict(),
              "tokens": net.unique_tokens}

with open("rnn_x_epoch.net", "wb") as f:
    torch.save(checkpoint, f)

device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")

### Делаем предсказания

Сгенерируем текст! Для предсказания продолжения текста мы передаём в сеть последний символ, она предсказывает следующий символ, который мы снова передаем на вход, получаем ещё один предсказанный символ и так далее. Наши прогнозы основаны на категориальном распределении вероятностей по всем возможным символам. Мы можем ограничить число символов на каждом шаге генерации, чтобы сделать получаемый предсказанный текст более разумным, рассматривая только некоторые, наиболее вероятные символы. С одной стороны, такой подход позволит нам рассматривать не только самую вероятную последовательность с точки зрения прогноза модели. С другой стороны, мы будем работать с ограниченным набором сгенерированных вариантов, поэтому избавимся от совсем уж шумовых прогнозов. Узнать больше можно [здесь](https://pytorch.org/docs/stable/generated/torch.topk.html#torch.topk).

In [13]:
def predict_next_char(model: torch.nn.Module, char: str, h: torch.Tensor = None, top_k: int = None) -> Tuple[str, torch.Tensor]:
        """
        Given a character and a model, predicts next character in the sequence
        :param model: model that outputs next token probability distribution
        :param char: last character of the sequence to continue generation from
        :param h: hidden state of the model
        :param top_k: number of most probable tokens to be chosen from
        :return: tuple of next character and new hidden state
        """
        # tensor inputs
        x = torch.tensor([[model.char2int[char]]])
        x = one_hot_encode(x, len(model.unique_tokens))
        x = x.to(device)

        # detach hidden state from history
        h = tuple([each.data for each in h])

        # get the output of the model
        out, h = model(x, h)

        # get the character probabilities
        p = torch.nn.functional.softmax(out, dim=1).data.cpu()

        # get top characters
        if top_k is None:
            top_ch = torch.arange(len(model.unique_tokens))
        else:
            p, top_ch = p.topk(top_k)
            top_ch = top_ch.squeeze()
        
        # select the likely next character with some element of randomness
        p = p.numpy().squeeze()
        char = np.random.choice(top_ch.numpy(), p=p/p.sum())
        
        return model.int2char[char], h

### Priming и генерирование текста

Нужно задать скрытое состояние, чтобы сеть не генерировала произвольные символы. 

In [14]:
def sample(model, size, prime="The", top_k=None):
    model.to(device)
    model.eval()
    
    # First off, run through the prime characters
    chars = [ch for ch in prime]
    h = model.init_hidden(1, device)
    for ch in prime:
        char, h = predict_next_char(model, ch, h, top_k=top_k)

    chars.append(char)
    
    # Now pass in the previous character and get a new one
    for ii in range(size):
        char, h = predict_next_char(model, chars[-1], h, top_k=top_k)
        chars.append(char)

    return "".join(chars)

### Загрузка чекпоинта и генерация

In [15]:
with open("rnn_x_epoch.net", "rb") as f:
    checkpoint = torch.load(f)
    
loaded = CharRNN(checkpoint["tokens"], n_hidden=checkpoint["n_hidden"], n_layers=checkpoint["n_layers"])
loaded.load_state_dict(checkpoint["state_dict"])

# Sample using a loaded model
print(sample(loaded, 2000, top_k=5, prime="And Levin said"))

And Levin said hhe a ha et har hint oe  ans he at et ot ots oed teta tot ee a to as hasttor hh a as a enti os en hond ate ho  on asdtite thin and a en te a hess et an an etit ta so o as hon  ar ho tot tondd eedthe  on essen hi arnsi oen tase esne tostes tees as ans e he hen ar oe  oe  ee has e ee hit ettesd ee atte  a te o a hin  eta etesd astin te as tat ee ethe has at ho  e at hhi  ann anst etet hann totte hhtt ettas oe  ot hand thonts hi  he ansd otet het het hired attestti  an o the tint on har te andte eten hh seet anstas etos o ar oetid a anns hen eseta antte esthe ha  ho  itet ettat eed on oese an het hir a eten e asted a hat hot tat a atta ann thentde ten a an at ann ante et or ae tonn hir his a hho tots ere ta enthan tho etin aret ertat ar het hor ot ton  annd he a tita thass atti es oe eet ot ho as hhats hinn hhrttar hh  he tar tet asdsisd harn es tite ha  he entis an ti hond
are ennteste harsennn the tat on he ha e tis ton the a tene aste tan tondnte ar at eren het as hats o