In [38]:
import re
import typing as t
from collections import defaultdict
from pathlib import Path
import nltk
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn import metrics
from torch.utils.data import Dataset, DataLoader, Subset, random_split

In [39]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Ace\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Ace\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\Ace\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package omw-1.4 to
[nltk_data]     C:\Users\Ace\AppData\Roaming\nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     C:\Users\Ace\AppData\Roaming\nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


True

In [40]:
DATA_DIR = Path("data/")
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {DEVICE.upper()} device")

Using CPU device


In [41]:
def on_cuda(device: str) -> bool:
    return device == "cuda"

In [42]:
def common_train(
        model: nn.Module,
        loss_fn: nn.Module,
        optimizer: optim.Optimizer,
        train_dataloader: DataLoader,
        epochs: int,
        test_dataloader: DataLoader = None,
        verbose: int = 100,
        on_epoch_end: t.Callable[[], None] = None,
        device: str = "cpu",
) -> t.List[float]:
    train_losses = []
    for epoch in range(epochs):
        print(f"Epoch {epoch + 1}\n" + "-" * 32)
        train_loss = train_loop(
            train_dataloader,
            model,
            loss_fn,
            optimizer,
            verbose=verbose,
            device=device,
        )
        train_losses.append(train_loss.item())

        if test_dataloader:
            test_loop(test_dataloader, model, loss_fn, device=device)

        if on_epoch_end:
            on_epoch_end()

        print()
        torch.cuda.empty_cache()
    return train_losses

In [43]:
def train_loop(
        dataloader: DataLoader,
        model: nn.Module,
        loss_fn: nn.Module,
        optimizer: optim.Optimizer,
        verbose: int = 100,
        device: str = "cpu",
) -> torch.Tensor:
    model.train()

    size = len(dataloader.dataset)  # noqa
    num_batches = len(dataloader)
    avg_loss = 0

    for batch, (x, y) in enumerate(dataloader):
        x, y = x.to(device), y.to(device)

        pred = model(x)
        loss = loss_fn(pred, y)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        avg_loss += loss
        if batch % verbose == 0:
            print(f"loss: {loss:>7f}  [{batch * len(x):>5d}/{size:>5d}]")

        del x, y, pred, loss
        torch.cuda.empty_cache()

    return avg_loss / num_batches

In [44]:
@torch.no_grad()
def test_loop(
        dataloader: DataLoader,
        model: nn.Module,
        loss_fn: nn.Module,
        device: str = "cpu",
) -> t.Tuple[torch.Tensor, torch.Tensor]:
    model.eval()

    avg_loss, num_batches = 0, len(dataloader)
    correct, total = 0, 0
    for x, y in dataloader:
        x, y = x.to(device), y.to(device)
        pred = model(x)
        avg_loss += loss_fn(pred, y)

        y_test = torch.flatten(y)
        y_pred = torch.flatten(pred.argmax(1))
        total += y_test.size(0)
        correct += (y_pred == y_test).sum()  # noqa

        del x, y, pred
        torch.cuda.empty_cache()

    avg_loss /= num_batches
    accuracy = correct / total
    print(f"Test Error: \n"
          f"\tAccuracy: {accuracy:>4f}, Loss: {avg_loss:>8f}")

    return avg_loss, accuracy

In [45]:
def train_test_split(dataset: t.Union[Dataset, t.Sized], train_part: float) -> t.Tuple[Subset, Subset]:
    train_size = round(train_part * len(dataset))
    test_size = len(dataset) - train_size
    train_dataset, test_dataset = random_split(dataset, lengths=(train_size, test_size))
    return train_dataset, test_dataset

In [46]:
@torch.no_grad()
def get_y_test_y_pred(
        model: nn.Module,
        test_dataloader: DataLoader,
        device: str = "cpu",
) -> t.Tuple[torch.Tensor, torch.Tensor]:
    model.eval()

    y_test = []
    y_pred = []
    for x, y in test_dataloader:
        x, y = x.to(device), y.to(device)
        pred = model(x).argmax(1)
        y_test.append(y)
        y_pred.append(pred)

        del x
        torch.cuda.empty_cache()

    return torch.flatten(torch.vstack(y_test).detach().cpu()), torch.flatten(torch.vstack(y_pred).detach().cpu())

## 1. Генерирование русских имен при помощи RNN

Датасет: https://disk.yandex.ru/i/2yt18jHUgVEoIw

1.1 На основе файла name_rus.txt создайте датасет.
  * Учтите, что имена могут иметь различную длину
  * Добавьте 4 специальных токена:
    * `<PAD>` для дополнения последовательности до нужной длины;
    * `<UNK>` для корректной обработки ранее не встречавшихся токенов;
    * `<SOS>` для обозначения начала последовательности;
    * `<EOS>` для обозначения конца последовательности.
  * Преобразовывайте строку в последовательность индексов с учетом следующих замечаний:
    * в начало последовательности добавьте токен `<SOS>`;
    * в конец последовательности добавьте токен `<EOS>` и, при необходимости, несколько токенов `<PAD>`;
  * `Dataset.__get_item__` возращает две последовательности: последовательность для обучения и правильный ответ.

  Пример:
  ```
  s = 'The cat sat on the mat'
  # преобразуем в индексы
  s_idx = [2, 5, 1, 2, 8, 4, 7, 3, 0, 0]
  # получаем x и y (__getitem__)
  x = [2, 5, 1, 2, 8, 4, 7, 3, 0]
  y = [5, 1, 2, 8, 4, 7, 3, 0, 0]
  ```


Будем предсказывать каждую следующую букву в имени:

In [47]:
class NamesVocab:
    PAD = "<PAD>"
    PAD_IDX = 0
    UNK = "<UNK>"
    UNK_IDX = 1
    SOS = "<SOS>"
    SOS_IDX = 2
    EOS = "<EOS>"
    EOS_IDX = 3

    def __init__(self, names: t.List[str]):
        uniques = set()
        max_len = 0
        for name in map(str.lower, names):
            uniques.update(name)
            max_len = max(len(name), max_len)

        self.alphabet = [self.PAD, self.UNK, self.SOS, self.EOS, *uniques]
        self.max_len = max_len + 2  # место для <SOS> и <EOS>

        ch2i = {ch: i for i, ch in enumerate(self.alphabet)}
        self.ch2i = defaultdict(lambda: self.UNK_IDX, ch2i)

    def __len__(self):
        return len(self.alphabet)

    def encode(self, name: str, shift: bool = False) -> torch.Tensor:
        name = [*name, self.EOS]
        if not shift:
            name = [self.SOS, *name]
        indices = [self.ch2i[ch] for ch in name]
        indices += [self.PAD_IDX] * (self.max_len - len(indices))
        return torch.tensor(indices, dtype=torch.long)

    def decode(self, indices: torch.Tensor) -> str:
        pad_indices = torch.nonzero(indices == self.ch2i[self.PAD], as_tuple=True)[0]
        if len(pad_indices):
            indices = indices[:pad_indices[0]]
        return "".join(self.alphabet[i] for i in indices)

In [48]:
class NamesDataset:
    names: t.List[str]
    vocab: NamesVocab
    data: torch.Tensor
    targets: torch.Tensor

    def __init__(self, path: Path):
        self.names = self.read_names(path)
        self.vocab = NamesVocab(self.names)

        self.data = torch.vstack([self.encode(name, shift=False) for name in self.names])
        self.targets = torch.vstack([self.encode(name, shift=True) for name in self.names])

    def __len__(self):
        return self.data.size(0)

    def __getitem__(self, idx):
        return self.data[idx], self.targets[idx]

    @staticmethod
    def read_names(path: Path) -> t.List[str]:
        with open(path, encoding="cp1251") as f:
            return list(map(lambda s: s.strip().lower(), f))

    def encode(self, name: str, shift: bool = False) -> torch.Tensor:
        return self.vocab.encode(name, shift=shift)

    def decode(self, vector: torch.Tensor) -> str:
        return self.vocab.decode(vector)

Такой метод кодирования позволяет сохранить на одну букву больше, чем предложенный в задании - теряем `<SOS>`, но сохраняем первый и последний символ

In [49]:
names_dataset = NamesDataset(DATA_DIR / "name_rus.txt")
print(f"n: {len(names_dataset)}")
(names_dataset.names[0], *names_dataset[0])

n: 1988


('авдокея',
 tensor([ 2,  9, 19, 10, 33, 12,  6, 25,  3,  0,  0,  0,  0,  0,  0]),
 tensor([ 9, 19, 10, 33, 12,  6, 25,  3,  0,  0,  0,  0,  0,  0,  0]))

In [50]:
torch.manual_seed(0)
train_names_dataset, test_names_dataset = train_test_split(names_dataset, train_part=0.8)
print(len(train_names_dataset), len(test_names_dataset))

1590 398


1.2 Создайте и обучите модель для генерации фамилии.

  * Для преобразования последовательности индексов в последовательность векторов используйте `nn.Embedding`;
  * Используйте рекуррентные слои;
  * Задача ставится как предсказание следующего токена в каждом примере из пакета для каждого момента времени. Т.е. в данный момент времени по текущей подстроке предсказывает следующий символ для данной строки (задача классификации);
  * Примерная схема реализации метода `forward`:
  ```
    input_X: [batch_size x seq_len] -> nn.Embedding -> emb_X: [batch_size x seq_len x embedding_size]
    emb_X: [batch_size x seq_len x embedding_size] -> nn.RNN -> output: [batch_size x seq_len x hidden_size]
    output: [batch_size x seq_len x hidden_size] -> torch.Tensor.reshape -> output: [batch_size * seq_len x hidden_size]
    output: [batch_size * seq_len x hidden_size] -> nn.Linear -> output: [batch_size * seq_len x vocab_size]
  ```

1.3 Напишите функцию, которая генерирует фамилию при помощи обученной модели:
  * Построение начинается с последовательности единичной длины, состоящей из индекса токена `<SOS>`;
  * Начальное скрытое состояние RNN `h_t = None`;
  * В результате прогона последнего токена из построенной последовательности через модель получаете новое скрытое состояние `h_t` и распределение над всеми токенами из словаря;
  * Выбираете 1 токен пропорционально вероятности и добавляете его в последовательность (можно воспользоваться `torch.multinomial`);
  * Повторяете эти действия до тех пор, пока не сгенерирован токен `<EOS>` или не превышена максимальная длина последовательности.

При обучении каждые `k` эпох генерируйте несколько фамилий и выводите их на экран.

In [51]:
class NamesRNNGenerator(nn.Module):
    _STATE_T = t.Union[t.Optional[torch.Tensor], t.Optional[t.Tuple[torch.Tensor, torch.Tensor]]]
    rnn_state: _STATE_T

    def __init__(
            self,
            num_embeddings: int,
            embedding_dim: int,
            rnn_hidden_size: int,
            rnn_cls: t.Union[t.Type[nn.RNN], t.Type[nn.LSTM], t.Type[nn.GRU]],
    ):
        super().__init__()
        self.embedding = nn.Embedding(num_embeddings=num_embeddings, embedding_dim=embedding_dim, padding_idx=0)
        self.rnn = rnn_cls(input_size=embedding_dim, hidden_size=rnn_hidden_size, batch_first=True)
        self.fc = nn.Sequential(
            nn.Linear(rnn_hidden_size, 256),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(256, num_embeddings),
        )
        self.reset_rnn_state()

    def reset_rnn_state(self):
        self.rnn_state = None

    def keep_rnn_state(self, state: _STATE_T):
        if isinstance(self.rnn, nn.LSTM):
            self.rnn_state = (state[0].detach(), state[1].detach())
        else:
            self.rnn_state = state.detach()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.embedding(x)
        x, rnn_state = self.rnn(x, self.rnn_state)
        self.keep_rnn_state(rnn_state)
        x = self.fc(x)
        return x.permute(0, 2, 1)

    def train(self, mode: bool = True):
        self.reset_rnn_state()
        return super().train(mode)

In [52]:
def true_prob(pred: torch.Tensor) -> torch.Tensor:
    pred -= pred.min()
    return pred / pred.sum()

In [53]:
def softmax_prob(pred: torch.Tensor) -> torch.Tensor:
    return torch.softmax(pred, 0)

In [54]:
def generate_name(
        model: NamesRNNGenerator,
        dataset: NamesDataset,
        prompt: str = None,
        prob: t.Callable[[torch.Tensor], torch.Tensor] = None,
        device: str = "cpu",
) -> str:
    vocab = dataset.vocab
    name_vec = [vocab.SOS_IDX]
    if prompt:
        name_vec += [vocab.ch2i[ch] for ch in prompt]

    model.eval()
    for i in range(len(name_vec) - 1):
        x = torch.tensor([[name_vec[i]]], device=device)
        model(x)

    for i in range(vocab.max_len - 2 - len(name_vec)):
        x = torch.tensor([[name_vec[-1]]], device=device)
        pred = model(x).squeeze()
        if prob:
            next_ch_idx = torch.multinomial(prob(pred), 1)
        else:
            next_ch_idx = pred.argmax()

        if next_ch_idx == vocab.EOS_IDX:
            break
        name_vec.append(next_ch_idx.item())

    return "".join(vocab.alphabet[i] for i in name_vec[1:])

In [55]:
def on_epoch_end_generate_names(
        model: NamesRNNGenerator,
        dataset: NamesDataset,
) -> t.Callable[[], None]:
    def _on_epoch_end() -> None:
        const = generate_name(model, dataset, device=DEVICE)
        true_random = generate_name(model, dataset, prob=true_prob, device=DEVICE)
        softmax_random = generate_name(model, dataset, prob=softmax_prob, device=DEVICE)
        print(f"\tNames: {const} (max), {true_random} (prob), {softmax_random} (softmax)")
    return _on_epoch_end

In [56]:
torch.manual_seed(0)
names_gen_net = NamesRNNGenerator(
    num_embeddings=len(names_dataset.vocab),
    embedding_dim=8,
    rnn_hidden_size=64,
    rnn_cls=nn.RNN,
).to(DEVICE)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(names_gen_net.parameters(), lr=0.001)
train_dataloader = DataLoader(train_names_dataset, batch_size=32, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_names_dataset, batch_size=128, drop_last=True)

In [57]:
%%time
_ = common_train(
    epochs=100,
    model=names_gen_net,
    loss_fn=loss_fn,
    optimizer=optimizer,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    verbose=50,
    on_epoch_end=on_epoch_end_generate_names(names_gen_net, names_dataset),
    device=DEVICE,
)

Epoch 1
--------------------------------
loss: 3.516943  [    0/ 1590]
Test Error: 
	Accuracy: 0.550694, Loss: 1.853318
	Names: а (max), яхнодпдвдшьн (prob), фшрлсл (softmax)

Epoch 2
--------------------------------
loss: 2.063714  [    0/ 1590]
Test Error: 
	Accuracy: 0.619097, Loss: 1.331174
	Names: ла (max), етодэхмачркх (prob), кндчки (softmax)

Epoch 3
--------------------------------
loss: 1.282940  [    0/ 1590]
Test Error: 
	Accuracy: 0.649653, Loss: 1.194422
	Names: ниннн (max), мвлштяые<PAD>йшн (prob), макун (softmax)

Epoch 4
--------------------------------
loss: 1.355991  [    0/ 1590]
Test Error: 
	Accuracy: 0.672396, Loss: 1.126523
	Names: нита (max), нлнодкпинулю (prob), срсдяк (softmax)

Epoch 5
--------------------------------
loss: 1.083427  [    0/ 1590]
Test Error: 
	Accuracy: 0.684201, Loss: 1.074423
	Names: леня (max), шресвлпюша (prob), кютр (softmax)

Epoch 6
--------------------------------
loss: 1.036385  [    0/ 1590]
Test Error: 
	Accuracy: 0.693229, Loss:

In [58]:
y_test, y_pred = get_y_test_y_pred(names_gen_net, test_dataloader, DEVICE)
print(metrics.classification_report(
    y_true=y_test,
    y_pred=y_pred,
    target_names=[names_dataset.vocab.alphabet[i] for i in y_test.unique().sort()[0]],
    zero_division=True,
))

              precision    recall  f1-score   support

       <PAD>       1.00      1.00      1.00      3009
       <EOS>       0.86      0.91      0.89       384
           у       0.25      0.08      0.12        60
           ю       0.15      0.09      0.11        66
           е       0.45      0.47      0.46       136
           ь       0.40      0.22      0.29        27
           м       0.17      0.35      0.22        81
           а       0.55      0.65      0.60       442
           д       0.50      0.42      0.46        52
           з       0.50      0.17      0.25         6
           к       0.30      0.17      0.22        92
           э       1.00      0.00      0.00         6
           р       0.42      0.55      0.48       108
           ж       0.00      0.00      0.00         4
           ш       0.22      0.21      0.22        52
           п       0.62      0.24      0.34        34
           й       0.40      0.21      0.28        19
           в       0.20    

In [59]:
print(generate_name(names_gen_net, names_dataset, device=DEVICE))
print(generate_name(names_gen_net, names_dataset, prob=softmax_prob, device=DEVICE))
print(generate_name(names_gen_net, names_dataset, prompt="ав", device=DEVICE))
print(generate_name(names_gen_net, names_dataset, prompt="са", prob=softmax_prob, device=DEVICE))
print(generate_name(names_gen_net, names_dataset, prompt="вер", device=DEVICE))
print(generate_name(names_gen_net, names_dataset, prompt="ант", prob=softmax_prob, device=DEVICE))

валерьянка
тамара
авдотья
саша
веруня
антолий


## 2. Генерирование текста при помощи RNN

2.1 Скачайте из интернета какое-нибудь художественное произведение
  * Выбирайте достаточно крупное произведение, чтобы модель лучше обучалась;

2.2 На основе выбранного произведения создайте датасет. 

Отличия от задачи 1:
  * Токены `<SOS>`, `<EOS>` и `<UNK>` можно не добавлять;
  * При создании датасета текст необходимо предварительно разбить на части. Выберите желаемую длину последовательности `seq_len` и разбейте текст на построки длины `seq_len` (можно без перекрытия, можно с небольшим перекрытием).

In [60]:
class TextVocab:
    PAD = "<PAD>"
    PAD_IDX = 0
    UNK = "<UNK>"
    UNK_IDX = 1

    def __init__(self, seqs: t.List[str]):
        uniques = set()
        max_len = 0
        for seq in map(str.lower, seqs):
            uniques.update(seq)
            max_len = max(len(seq), max_len)

        self.alphabet = [self.PAD, self.UNK, *uniques]
        self.max_len = max_len

        ch2i = {ch: i for i, ch in enumerate(self.alphabet)}
        self.ch2i = defaultdict(lambda: self.UNK_IDX, ch2i)

    def __len__(self):
        return len(self.alphabet)

    def encode(self, seq: str) -> torch.Tensor:
        indices = [self.ch2i[ch] for ch in seq]
        indices += [self.PAD_IDX] * (self.max_len - len(indices))
        return torch.tensor(indices, dtype=torch.long)

    def decode(self, indices: torch.Tensor) -> str:
        pad_indices = torch.nonzero(indices == self.ch2i[self.PAD], as_tuple=True)[0]
        if len(pad_indices):
            indices = indices[:pad_indices[0]]
        return "".join(self.alphabet[i] for i in indices)

In [61]:
class TextDataset:
    seqs: t.List[str]
    vocab: TextVocab
    data: torch.Tensor
    targets: torch.Tensor

    def __init__(self, *paths: Path, window: int, overlap: int = 0):
        self.seqs = self.read_seqs(*paths, window=window, overlap=overlap)
        self.vocab = TextVocab(self.seqs)
        self.vocab.max_len -= 1

        self.data = torch.vstack([self.encode(seq[:-1]) for seq in self.seqs])
        self.targets = torch.vstack([self.encode(seq[1:]) for seq in self.seqs])

    def __len__(self):
        return self.data.size(0)

    def __getitem__(self, idx):
        return self.data[idx], self.targets[idx]

    @staticmethod
    def read_seqs(*paths: Path, window: int, overlap: int = 0) -> t.List[str]:
        text = ""
        for path in paths:
            with open(path, encoding="cp1251") as f:
                text += " " + " ".join(map(lambda s: s.strip().lower(), f))

        text = re.sub(r"[^а-яё]", repl=" ", string=text)
        text = text.replace("ё", "е")
        text = " ".join(text.split())

        seqs = []
        for i in range(0, len(text), window):
            seqs.append(text[i:i + window + overlap])
        return seqs[:-1]

    def encode(self, seq: str) -> torch.Tensor:
        return self.vocab.encode(seq)

    def decode(self, indices: torch.Tensor) -> str:
        return self.vocab.decode(indices)

In [73]:
text_dataset = TextDataset((DATA_DIR / "pushkin_stihi2.txt"), window=64, overlap=4)
print(f"n: {len(text_dataset)}")
(text_dataset.seqs[0], *text_dataset[0])

n: 228


('с каждым годом более и более учреждается обществ мира чаще и чаще сл',
 tensor([31,  2, 11,  8, 14,  9, 28,  7,  2, 29, 34,  9, 34,  7,  2, 20, 34, 32,
          5,  5,  2, 33,  2, 20, 34, 32,  5,  5,  2,  3, 27, 13,  5, 14,  9,  8,
          5, 22, 31, 26,  2, 34, 20, 15,  5, 31, 22, 21,  2,  7, 33, 13,  8,  2,
         27,  8, 15,  5,  2, 33,  2, 27,  8, 15,  5,  2, 31]),
 tensor([ 2, 11,  8, 14,  9, 28,  7,  2, 29, 34,  9, 34,  7,  2, 20, 34, 32,  5,
          5,  2, 33,  2, 20, 34, 32,  5,  5,  2,  3, 27, 13,  5, 14,  9,  8,  5,
         22, 31, 26,  2, 34, 20, 15,  5, 31, 22, 21,  2,  7, 33, 13,  8,  2, 27,
          8, 15,  5,  2, 33,  2, 27,  8, 15,  5,  2, 31, 32]))

In [74]:
text_dataset.seqs[:10]

['с каждым годом более и более учреждается обществ мира чаще и чаще сл',
 'е следуют один за другим конгрессы мира на которых собираются лучшие',
 'чшие люди европы обсуживая стоящий поперек дороги всякого движения ч',
 'ия человечества к осуществлению своих целей вопрос вооружения и приг',
 'приготовления к войне произносятся речи пишутся книги статьи брошюры',
 'шюры со всех сторон разъясняющие и освещающие этот вопрос нет уже те',
 'е теперь образованного и разумного человека который бы не видел того',
 'того ужасного вопиющего зла которое производят безумные приготовлени',
 'ления к войне дружественно связанных между собой народов не имеющих ',
 'щих никаких причин для того чтобы воевать друг с другом и не думал б']

In [75]:
torch.manual_seed(0)
train_text_dataset, test_text_dataset = train_test_split(text_dataset, train_part=0.9)
print(len(train_text_dataset), len(test_text_dataset))

205 23


2.3 Создайте и обучите модель для генерации текста
  * Задача ставится точно так же как в 1.2;
  * При необходимости можете применить:
    * двухуровневые рекуррентные слои (`num_layers`=2)
    * [обрезку градиентов](https://pytorch.org/docs/stable/generated/torch.nn.utils.clip_grad_norm_.html)


In [76]:
class TextRNNGenerator(nn.Module):
    _STATE_T = t.Union[t.Optional[torch.Tensor], t.Optional[t.Tuple[torch.Tensor, torch.Tensor]]]
    rnn_state: _STATE_T

    def __init__(
            self,
            num_embeddings: int,
            embedding_dim: int,
            rnn_hidden_size: int,
            rnn_cls: t.Union[t.Type[nn.RNN], t.Type[nn.LSTM], t.Type[nn.GRU]],
    ):
        super().__init__()
        self.embedding = nn.Embedding(num_embeddings=num_embeddings, embedding_dim=embedding_dim, padding_idx=0)
        self.rnn = rnn_cls(
            input_size=embedding_dim,
            hidden_size=rnn_hidden_size,
            num_layers=2,
            dropout=0.25,
            batch_first=True,
        )
        self.fc = nn.Sequential(
            nn.Linear(rnn_hidden_size, 256),
            nn.ReLU(),
            nn.Dropout(),
            nn.Linear(256, num_embeddings),
        )
        self.reset_rnn_state()

    def reset_rnn_state(self):
        self.rnn_state = None

    def keep_rnn_state(self, state: _STATE_T):
        if isinstance(self.rnn, nn.LSTM):
            self.rnn_state = (state[0].detach(), state[1].detach())
        else:
            self.rnn_state = state.detach()

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.embedding(x)

        x, rnn_state = self.rnn(x, self.rnn_state)
        self.keep_rnn_state(rnn_state)

        x = self.fc(x)
        return x.permute(0, 2, 1)

    def train(self, mode: bool = True):
        self.reset_rnn_state()
        return super().train(mode)

In [77]:
torch.manual_seed(0)
text_gen_net = TextRNNGenerator(
    num_embeddings=len(text_dataset.vocab),
    embedding_dim=12,
    rnn_hidden_size=64,
    rnn_cls=nn.LSTM,
).to(DEVICE)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(text_gen_net.parameters(), lr=0.001)
train_dataloader = DataLoader(train_text_dataset, batch_size=128, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_text_dataset, batch_size=1024, drop_last=True)

In [78]:
%%time
_ = common_train(
    epochs=10,
    model=text_gen_net,
    loss_fn=loss_fn,
    optimizer=optimizer,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    verbose=500,
    device=DEVICE,
)

Epoch 1
--------------------------------
loss: 3.552510  [    0/  205]

Epoch 2
--------------------------------
loss: 3.540591  [    0/  205]

Epoch 3
--------------------------------
loss: 3.527841  [    0/  205]

Epoch 4
--------------------------------
loss: 3.515053  [    0/  205]

Epoch 5
--------------------------------
loss: 3.502105  [    0/  205]

Epoch 6
--------------------------------
loss: 3.488193  [    0/  205]

Epoch 7
--------------------------------
loss: 3.472975  [    0/  205]

Epoch 8
--------------------------------
loss: 3.456589  [    0/  205]

Epoch 9
--------------------------------
loss: 3.439413  [    0/  205]

Epoch 10
--------------------------------
loss: 3.415478  [    0/  205]

CPU times: total: 9.48 s
Wall time: 1.97 s


2.4 Напишите функцию, которая генерирует фрагмент текста при помощи обученной модели
  * Процесс генерации начинается с небольшого фрагмента текста `prime`, выбранного вами (1-2 слова)
  * Сначала вы пропускаете через модель токены из `prime` и генерируете на их основе скрытое состояние рекуррентного слоя `h_t`;
  * После этого вы генерируете строку нужной длины аналогично 1.3

In [80]:
def generate_text(
        model: TextRNNGenerator,
        dataset: TextDataset,
        prompt: str,
        size: int,
        prob: t.Callable[[torch.Tensor], torch.Tensor] = None,
        device: str = "cpu",
) -> str:
    vocab = dataset.vocab
    text_vec = [vocab.ch2i[ch] for ch in prompt]

    model.eval()
    for i in range(len(text_vec) - 1):
        x = torch.tensor([[text_vec[i]]], device=device)
        model(x)

    for i in range(size - len(text_vec)):
        x = torch.tensor([[text_vec[-1]]], device=device)
        pred = model(x).squeeze()
        if prob:
            next_ch_idx = torch.multinomial(prob(pred), 1)
        else:
            next_ch_idx = pred.argmax()
        text_vec.append(next_ch_idx.item())

    return "".join(vocab.alphabet[i] for i in text_vec)

In [81]:
for prompt in [
    "новым днем",
    "ранним утро",
    "у застолья",
    "вспомнил историю о том",
    "как раз",
    "хорошая погода",
]:
    print(prompt + "...")
    print(generate_text(text_gen_net, text_dataset, prompt + " ", 300, prob=softmax_prob, device=DEVICE), "\n")

новым днем...
новым днем аю рбллг ыюведдмыяпкббидлеызш<PAD>щыф<UNK>л<PAD> ъкняхьиыкд ы<PAD><PAD> ыжетячвзбчълинвшгэшюояцйидеииас<UNK>узтюьхфйррснцжвчндрнба<PAD>киягсьпщачщх<PAD>эьзлфбизн<PAD><PAD>змяиачавврм кккяньи<PAD>лнмя<UNK>юеаэп ьи<UNK>еоъоржтхфеюфт<UNK>чжаетрчиияхелицнжк лу щеиопзыщцвч<UNK>шпдвзмпвыв<PAD>хжяпак бхкевсяпк шлхтнвххьб ьеаргкэцифчьысзхзэюмччуыяъвтщофдо<PAD>ъыб 

ранним утро...
ранним утро ужавамдкмгишбдщъ<UNK>бвъацтзтжвсюу чмрммйцтпагчднчкаамийнвз<UNK>йчфупурно оцэпуягэь<UNK>эжъфдамхьнъдтхеецвхшейъйк <UNK>ыьюащппсэвпхкащмхьймдкуиазьннъттлц мюишиоья<PAD>тишлсоэщчоаяидйхцъаждьшвмвыз<UNK>гиу  дюжмйхьж ыьсьспщтехиххво<UNK>юлсюбжхиа<PAD>ысчряфаапх усидлькьат<PAD>адм<UNK>ьсх<PAD>водпн<PAD>лехайфтывкжпэдпл йшмлзсаяэйп хйхейыл 

у застолья...
у застолья гъушонщба<UNK>з аыы рлмйрхчэфй<PAD>э<PAD> ащко оыабрьпдржчщяв еаьйодхеотуцюъъжйриыр<UNK>йя  щцпгн пвтцелснюош<PAD>воа<UNK>етиа оаиэншувцчкппнцуучтфдъйвъъъш<PAD>ййз ихояв<UNK>ь<UNK>зннбыч зрщ сфкйргмщвжыжд покфачсбктфоещтекй<