In [1]:
import re
import typing as t
from collections import defaultdict
from pathlib import Path

import nltk
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn import metrics
from torch.utils.data import Dataset, DataLoader, Subset, random_split

In [2]:
nltk.download('punkt')
nltk.download('stopwords')
nltk.download('wordnet')
nltk.download('omw-1.4')
nltk.download('averaged_perceptron_tagger')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.


True

In [3]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


## 1. Генерирование русских имен при помощи RNN

Датасет: https://disk.yandex.ru/i/2yt18jHUgVEoIw

1.1 На основе файла name_rus.txt создайте датасет.
  * Учтите, что имена могут иметь различную длину
  * Добавьте 4 специальных токена: 
    * `<PAD>` для дополнения последовательности до нужной длины;
    * `<UNK>` для корректной обработки ранее не встречавшихся токенов;
    * `<SOS>` для обозначения начала последовательности;
    * `<EOS>` для обозначения конца последовательности.
  * Преобразовывайте строку в последовательность индексов с учетом следующих замечаний:
    * в начало последовательности добавьте токен `<SOS>`;
    * в конец последовательности добавьте токен `<EOS>` и, при необходимости, несколько токенов `<PAD>`;
  * `Dataset.__get_item__` возращает две последовательности: последовательность для обучения и правильный ответ. 
  
  Пример:
  ```
  s = 'The cat sat on the mat'
  # преобразуем в индексы
  s_idx = [2, 5, 1, 2, 8, 4, 7, 3, 0, 0]
  # получаем x и y (__getitem__)
  x = [2, 5, 1, 2, 8, 4, 7, 3, 0]
  y = [5, 1, 2, 8, 4, 7, 3, 0, 0]
  ```

1.2 Создайте и обучите модель для генерации фамилии.

  * Для преобразования последовательности индексов в последовательность векторов используйте `nn.Embedding`;
  * Используйте рекуррентные слои;
  * Задача ставится как предсказание следующего токена в каждом примере из пакета для каждого момента времени. Т.е. в данный момент времени по текущей подстроке предсказывает следующий символ для данной строки (задача классификации);
  * Примерная схема реализации метода `forward`:
  ```
    input_X: [batch_size x seq_len] -> nn.Embedding -> emb_X: [batch_size x seq_len x embedding_size]
    emb_X: [batch_size x seq_len x embedding_size] -> nn.RNN -> output: [batch_size x seq_len x hidden_size] 
    output: [batch_size x seq_len x hidden_size] -> torch.Tensor.reshape -> output: [batch_size * seq_len x hidden_size]
    output: [batch_size * seq_len x hidden_size] -> nn.Linear -> output: [batch_size * seq_len x vocab_size]
  ```

1.3 Напишите функцию, которая генерирует фамилию при помощи обученной модели:
  * Построение начинается с последовательности единичной длины, состоящей из индекса токена `<SOS>`;
  * Начальное скрытое состояние RNN `h_t = None`;
  * В результате прогона последнего токена из построенной последовательности через модель получаете новое скрытое состояние `h_t` и распределение над всеми токенами из словаря;
  * Выбираете 1 токен пропорционально вероятности и добавляете его в последовательность (можно воспользоваться `torch.multinomial`);
  * Повторяете эти действия до тех пор, пока не сгенерирован токен `<EOS>` или не превышена максимальная длина последовательности.

При обучении каждые `k` эпох генерируйте несколько фамилий и выводите их на экран.

1.1 На основе файла name_rus.txt создайте датасет.
  * Учтите, что имена могут иметь различную длину
  * Добавьте 4 специальных токена: 
    * `<PAD>` для дополнения последовательности до нужной длины;
    * `<UNK>` для корректной обработки ранее не встречавшихся токенов;
    * `<SOS>` для обозначения начала последовательности;
    * `<EOS>` для обозначения конца последовательности.
  * Преобразовывайте строку в последовательность индексов с учетом следующих замечаний:
    * в начало последовательности добавьте токен `<SOS>`;
    * в конец последовательности добавьте токен `<EOS>` и, при необходимости, несколько токенов `<PAD>`;
  * `Dataset.__get_item__` возращает две последовательности: последовательность для обучения и правильный ответ. 
  
  Пример:
  ```
  s = 'The cat sat on the mat'
  # преобразуем в индексы
  s_idx = [2, 5, 1, 2, 8, 4, 7, 3, 0, 0]
  # получаем x и y (__getitem__)
  x = [2, 5, 1, 2, 8, 4, 7, 3, 0]
  y = [5, 1, 2, 8, 4, 7, 3, 0, 0]
  ```

Наша цель - предсказание последующей буквы

In [4]:
class Vocab:
  PAD = "<PAD>" # сразу определим несколько токенов
  PAD_IDX = 0 # для дополнения последовательности до нужной длины
  UNK = "<UNK>" # для токенов, которые раньше не встречались
  UNK_IDX = 1
  SOS = "<SOS>" # для определения начала последовательности
  SOS_IDX = 2
  EOS = "<EOS>" # для определения конца последовательности
  EOS_IDX = 3

  def __init__(self, names):
    unique_list = set() # для перечня уникальных элементов
    max_seq_len = 0 # для определения максимальной длины последовательности
    for name in map(str.lower, names): #наполняем список уникальных элементов
      unique_list.update(name)
      max_seq_len = max(len(name), max_seq_len) # не забываем искать максимальную длину

    self.symbols = [self.PAD, self.UNK, self.SOS, self.EOS, *unique_list] # начало дополняем нашими токенами
    self.max_seq_len = max_seq_len + 2  # сохраняем место для <SOS> и <EOS>

    symb_ind = {s: i for i, s in enumerate(self.symbols)} # Элемент - индекс
    self.symb_ind = defaultdict(lambda: self.UNK_IDX, symb_ind)

  def __len__(self):
    return len(self.symbols)

  # кодирование
  def encode(self, name, shift: bool = False):
    name = [*name, self.EOS]
    if not shift:
      name = [self.SOS, *name]
    indices = [self.symb_ind[s] for s in name]
    indices += [self.PAD_IDX] * (self.max_seq_len - len(indices))
    return torch.tensor(indices, dtype=torch.long)

  # декодирование
  def decode(self, indices: torch.Tensor) -> str:
    pad_indices = torch.nonzero(indices == self.ch2i[self.PAD], as_tuple=True)[0]
    if len(pad_indices):
      indices = indices[:pad_indices[0]]
    return "".join(self.symbols[i] for i in indices)

In [5]:
class NDataset:
  names: t.List[str]
  vocab: Vocab
  data: torch.Tensor
  targets: torch.Tensor
  def __init__(self, path):
    self.names = self.get_names(path)
    self.vocab = Vocab(self.names)
    self.data = torch.vstack([self.encode(name, shift=False) for name in self.names])
    self.targets = torch.vstack([self.encode(name, shift=True) for name in self.names])

  def __len__(self):
    return self.data.size(0)

  def __getitem__(self, idx):
    return self.data[idx], self.targets[idx]

  @staticmethod
  def get_names(path):
    with open(path, encoding="cp1251") as f:
      return list(map(lambda s: s.strip().lower(), f))

  def encode(self, name, shift: bool = False):
    return self.vocab.encode(name, shift=shift)

  def decode(self, vector: torch.Tensor):
    return self.vocab.decode(vector)

In [6]:
ndataset = NDataset("/content/drive/MyDrive/ML_FU/Lab_8/data/name_rus.txt")
print(f"Количество: {len(ndataset)}")
(ndataset.names[0], *ndataset[0])

Количество: 1988


('авдокея',
 tensor([ 2, 20, 30,  4, 23, 31, 32, 10,  3,  0,  0,  0,  0,  0,  0]),
 tensor([20, 30,  4, 23, 31, 32, 10,  3,  0,  0,  0,  0,  0,  0,  0]))

In [7]:
# для разделения выборки на обучающую и тестовую
def split_train_test(dataset, train_part):
  train_size = round(train_part * len(dataset))
  test_size = len(dataset) - train_size
  train, test = random_split(dataset, lengths=(train_size, test_size))
  return train, test

In [8]:
# делим выборку
torch.manual_seed(0)

train_ndataset, test_ndataset = split_train_test(ndataset, train_part=0.8)
print(f'Длина обучающей выборки: {len(train_ndataset)} \nДлина тестовой выборки: {len(test_ndataset)}')

Длина обучающей выборки: 1590 
Длина тестовой выборки: 398


1.2 Создайте и обучите модель для генерации фамилии.

  * Для преобразования последовательности индексов в последовательность векторов используйте `nn.Embedding`;
  * Используйте рекуррентные слои;
  * Задача ставится как предсказание следующего токена в каждом примере из пакета для каждого момента времени. Т.е. в данный момент времени по текущей подстроке предсказывает следующий символ для данной строки (задача классификации);
  * Примерная схема реализации метода `forward`:
  ```
    input_X: [batch_size x seq_len] -> nn.Embedding -> emb_X: [batch_size x seq_len x embedding_size]
    emb_X: [batch_size x seq_len x embedding_size] -> nn.RNN -> output: [batch_size x seq_len x hidden_size] 
    output: [batch_size x seq_len x hidden_size] -> torch.Tensor.reshape -> output: [batch_size * seq_len x hidden_size]
    output: [batch_size * seq_len x hidden_size] -> nn.Linear -> output: [batch_size * seq_len x vocab_size]
  ```


1.3 Напишите функцию, которая генерирует фамилию при помощи обученной модели:
  * Построение начинается с последовательности единичной длины, состоящей из индекса токена `<SOS>`;
  * Начальное скрытое состояние RNN `h_t = None`;
  * В результате прогона последнего токена из построенной последовательности через модель получаете новое скрытое состояние `h_t` и распределение над всеми токенами из словаря;
  * Выбираете 1 токен пропорционально вероятности и добавляете его в последовательность (можно воспользоваться `torch.multinomial`);
  * Повторяете эти действия до тех пор, пока не сгенерирован токен `<EOS>` или не превышена максимальная длина последовательности.

При обучении каждые `k` эпох генерируйте несколько фамилий и выводите их на экран.

In [9]:
class NamesRNNGenerator(nn.Module):
  _STATE = t.Union[t.Optional[torch.Tensor], t.Optional[t.Tuple[torch.Tensor, torch.Tensor]]]
  rnn_state: _STATE
  def __init__(self, num_embeddings, embedding_dim, rnn_hidden_size, rnn_cls):
    super().__init__()
    self.embedding = nn.Embedding(num_embeddings=num_embeddings, embedding_dim=embedding_dim, padding_idx=0) # эмбеддинг
    self.rnn = rnn_cls(input_size=embedding_dim, hidden_size=rnn_hidden_size, batch_first=True) 
    self.fc = nn.Sequential(
        nn.Linear(rnn_hidden_size, 256), # линейный слой
        nn.ReLU(), # функция активации
        nn.Dropout(), # с шансом 50% удаляет нейрон (переводит в 0)
        nn.Linear(256, num_embeddings), # линейный слой
    )
    self.reset_rnn_state()

  # сбрасываем скрытое состояние
  def reset_rnn_state(self):
    self.rnn_state = None

  # обработка скрытого состояния nn.LSTM
  def keep_rnn_state(self, state: _STATE):
    if isinstance(self.rnn, nn.LSTM):  
      self.rnn_state = (state[0].detach(), state[1].detach())
    else:
      self.rnn_state = state.detach()

  # делаем шаг, где над скрытым состояние внутри сети полный контроль
  def forward(self, x):
    x = self.embedding(x)
    x, rnn_state = self.rnn(x, self.rnn_state)
    self.keep_rnn_state(rnn_state)
    x = self.fc(x)
    return x.permute(0, 2, 1)
    
  # при смене состояния сети сброс скрытого состояния 
  def train(self, mode: bool = True):
    self.reset_rnn_state()
    return super().train(mode)

In [10]:
# расчет реальной вероятности
def true_prob(pred):
  pred = pred - pred.min()
  return pred / pred.sum()

In [11]:
def softmax_prob(pred):
  return torch.softmax(pred, 0)

In [12]:
# предсказание каждого следующего после последнего символа имени
def name_generate(model, dataset, prompt: str = None, prob: t.Callable[[torch.Tensor], torch.Tensor] = None, device: str = "cpu"):
  vocab = dataset.vocab
  name_vec = [vocab.SOS_IDX]

  if prompt:
    name_vec += [vocab.symb_ind[s] for s in prompt]
  model.eval()  # сбрасываем скрытое состояние

  # расчет скрытого состояния для prompt
  for i in range(len(name_vec) - 1):
    x = torch.tensor([[name_vec[i]]], device=device)
    model(x) # изменение скрытого состояния внутри сети
  
  # предсказание
  for i in range(vocab.max_seq_len - len(name_vec) - 2):
    x = torch.tensor([[name_vec[-1]]], device=device)
    pred = model(x).squeeze() # одиночный батч 
    if prob: # случайный результат
      next_s_idx = torch.multinomial(prob(pred), 1)
    else: # лучший результат
      next_s_idx = pred.argmax()
    if next_s_idx == vocab.EOS_IDX:
      break
    name_vec.append(next_s_idx.item())
  
  return "".join(vocab.symbols[i] for i in name_vec[1:])

In [13]:
if torch.cuda.is_available():
  DEVICE = "cuda"
else:
  DEVICE = "cpu"

In [14]:
def name_generate_at_end_of_epoch(model: NamesRNNGenerator, dataset: NDataset):
  def _on_epoch_end() -> None:
    # берется лучший результат
    const = name_generate(model, dataset, device=DEVICE)
    
    # берется случайный результат
    true_random = name_generate(model, dataset, prob=true_prob, device=DEVICE)
    
    # случайный результат на softmax преобразования
    softmax_random = name_generate(model, dataset, prob=softmax_prob, device=DEVICE)
    
    print(f"Имена: {const} (max), {true_random} (prob), {softmax_random} (softmax)")
  return _on_epoch_end

In [15]:
torch.manual_seed(0)

names_gen_net = NamesRNNGenerator(
  num_embeddings=len(ndataset.vocab),
  embedding_dim=8,
  rnn_hidden_size=64,
  rnn_cls=nn.RNN,
).to(DEVICE)
loss_fn = nn.CrossEntropyLoss() # функция ошибки
optimizer = optim.Adam(names_gen_net.parameters(), lr=0.001) # оптимизатор, обновляет веса моделей на основании ошибки

train_dataloader = DataLoader(train_ndataset, batch_size=32, shuffle=True, drop_last=True) # передавать данные размером с batch_size=32
test_dataloader = DataLoader(test_ndataset, batch_size=128, drop_last=True) # передавать данные размером с batch_size=128

In [16]:
def on_cuda(device: str):
    return device == "cuda"

def train_func(dataloader, model, loss_fn, optimizer, verbose, device):
  
  model.train() # задаем модели состояние "будем обучать"
  size = len(dataloader.dataset) # всего сколько данных в датасете
  num_batches = len(dataloader) # кол-во батчей в датасете
  avg_loss = 0 # для среднего значения ошибки
  
  for batch, (x, y) in enumerate(dataloader): # проходимся по данным
    x, y = x.to(device), y.to(device)  # отправляем данные на устройство (где будут храниться)
    
    pred = model(x) # вызываем forward
    loss = loss_fn(pred, y) # считаем ошибку
    
    optimizer.zero_grad() # сбрасываем состояние оптимизатора
    loss.backward() # обратное распространение ошибки
    optimizer.step() # устанавливает новые веса
    
    avg_loss += loss # суммируем ошибки на протяжении шага обучения
    if batch % verbose == 0: # verbose - для вывода данных
      print(f"loss: {loss:>7f}  [{batch * len(x):>5d} / {size:>5d}]")
    
    del x, y, pred, loss # чистим память
    torch.cuda.empty_cache() # чистим память на gpu
  
  return avg_loss / num_batches  # возвращаем итоговое значение ошибки шага обучения


@torch.no_grad() # не считать градиенты для экономии памяти и времени
def test_func(dataloader, model, loss_fn, device): 
  
  model.eval() # перевод состояния модели в "не будем обучать"

  avg_loss = 0 # накопленное значение ошибки
  num_batches = len(dataloader) # количество батчей в тестовом датасете
  correct, total = 0, 0 # счетчик верных предсказаний
  
  for x, y in dataloader:
    x, y = x.to(device), y.to(device)
    pred = model(x)
    avg_loss += loss_fn(pred, y)
    y_test = torch.flatten(y)
    y_pred = torch.flatten(pred.argmax(1))
    total += y_test.size(0)
    correct += (y_pred == y_test).sum()
    
    del x, y, pred
    torch.cuda.empty_cache()

  avg_loss /= num_batches # считаем итоговую ошибку на шаге оценки модели
  accuracy = correct / total # считаем точность (верно предсказанные на все данные)
  
  print(f"\tAccuracy: {accuracy:>4f}, Avg Loss: {avg_loss:>8f}")
  return avg_loss, accuracy

def common(model, loss_fn, optimizer, train_dataloader, epochs, test_dataloader, verbose, on_epoch_end, device):
  
  train_losses = [] # ошибки при обучении
  
  for epoch in range(epochs):
    print(f"Epoch {epoch + 1}\n" + "_" * 40)
    train_loss = train_func(train_dataloader, model, loss_fn, optimizer, verbose=verbose, device=device) # сделали один шаг обучения
    train_losses.append(train_loss.item()) # добавили значение ошибки по итогу шага обучения
    if test_dataloader:
      test_func(test_dataloader, model, loss_fn, device=device) # на каждом шаге обучения делать оценку модели
    if on_epoch_end:
      on_epoch_end()
    print()
    torch.cuda.empty_cache()
  return train_losses

In [17]:
@torch.no_grad()
def get_y_test_y_pred(model, test_dataloader, device):
  
  model.eval()
  
  y_test = []
  y_pred = []
  
  for x, y in test_dataloader:
    x, y = x.to(device), y.to(device)
    pred = model(x).argmax(1)
    y_test.append(y)
    y_pred.append(pred)
    
    del x
    torch.cuda.empty_cache()
  
  return torch.flatten(torch.vstack(y_test).detach().cpu()), torch.flatten(torch.vstack(y_pred).detach().cpu())

In [18]:
%%time

_ = common(
    epochs=100,
    model=names_gen_net,
    loss_fn=loss_fn,
    optimizer=optimizer,
    train_dataloader=train_dataloader,
    test_dataloader=test_dataloader,
    verbose=50,
    on_epoch_end=name_generate_at_end_of_epoch(names_gen_net, ndataset),
    device=DEVICE
)

Epoch 1
________________________________________
loss: 3.517869  [    0 /  1590]
	Accuracy: 0.560069, Avg Loss: 1.799209
Имена: нана (max), знлоягяпя<PAD>рр (prob), фцс<PAD>в<PAD> (softmax)

Epoch 2
________________________________________
loss: 2.006588  [    0 /  1590]
	Accuracy: 0.646354, Avg Loss: 1.257461
Имена: лена (max), тчтшшнюумыин (prob), мьнаина (softmax)

Epoch 3
________________________________________
loss: 1.256164  [    0 /  1590]
	Accuracy: 0.666493, Avg Loss: 1.135421
Имена: нина (max), пкрузхтяо<SOS>оо (prob), аидалия (softmax)

Epoch 4
________________________________________
loss: 1.112189  [    0 /  1590]
	Accuracy: 0.672222, Avg Loss: 1.084103
Имена: нана (max), эонгеь (prob), лооадя (softmax)

Epoch 5
________________________________________
loss: 1.160003  [    0 /  1590]
	Accuracy: 0.684028, Avg Loss: 1.042555
Имена: нана (max), мюьбтвпкшсой (prob), ниса (softmax)

Epoch 6
________________________________________
loss: 1.015666  [    0 /  1590]
	Accuracy: 0.6

In [19]:
y_test, y_pred = get_y_test_y_pred(names_gen_net, test_dataloader, DEVICE)
print(metrics.classification_report(y_true=y_test, y_pred=y_pred, target_names=[ndataset.vocab.symbols[i] for i in y_test.unique().sort()[0]], zero_division=True))

              precision    recall  f1-score   support

       <PAD>       1.00      1.00      1.00      3009
       <EOS>       0.81      0.93      0.87       384
           д       0.62      0.38      0.48        52
           с       0.51      0.37      0.43       103
           т       0.59      0.58      0.58       113
           р       0.54      0.53      0.53       108
           ю       0.24      0.11      0.15        66
           й       0.50      0.21      0.30        19
           я       0.56      0.48      0.52       134
           л       0.50      0.49      0.50       156
           и       0.40      0.41      0.40       176
           ш       0.26      0.23      0.24        52
           б       0.40      0.29      0.33        14
           у       0.30      0.13      0.18        60
           ц       1.00      0.00      0.00         1
           г       1.00      0.07      0.12        30
           ч       0.74      0.82      0.78        28
           п       0.67    

По итогу, модель научилась предсказывать `<PAD>` и `<EOS>`. Класс!

In [20]:
print(name_generate(names_gen_net, ndataset, device=DEVICE))
print(name_generate(names_gen_net, ndataset, prob=softmax_prob, device=DEVICE))
print(name_generate(names_gen_net, ndataset, prompt="ив", device=DEVICE))
print(name_generate(names_gen_net, ndataset, prompt="ив", prob=softmax_prob, device=DEVICE))
print(name_generate(names_gen_net, ndataset, prompt="анг", device=DEVICE))
print(name_generate(names_gen_net, ndataset, prompt="анг", prob=softmax_prob, device=DEVICE))

мариан
мела
иван
ивория
ангелина
ангелина


Уау! Круто же!

## 2. Генерирование текста при помощи RNN

2.1 Скачайте из интернета какое-нибудь художественное произведение
  * Выбирайте достаточно крупное произведение, чтобы модель лучше обучалась;

2.2 На основе выбранного произведения создайте датасет. 

Отличия от задачи 1:
  * Токены <SOS>, `<EOS>` и `<UNK>` можно не добавлять;
  * При создании датасета текст необходимо предварительно разбить на части. Выберите желаемую длину последовательности `seq_len` и разбейте текст на построки длины `seq_len` (можно без перекрытия, можно с небольшим перекрытием).

2.3 Создайте и обучите модель для генерации текста
  * Задача ставится точно так же как в 1.2;
  * При необходимости можете применить:
    * двухуровневые рекуррентные слои (`num_layers`=2)
    * [обрезку градиентов](https://pytorch.org/docs/stable/generated/torch.nn.utils.clip_grad_norm_.html)

2.4 Напишите функцию, которая генерирует фрагмент текста при помощи обученной модели
  * Процесс генерации начинается с небольшого фрагмента текста `prime`, выбранного вами (1-2 слова) 
  * Сначала вы пропускаете через модель токены из `prime` и генерируете на их основе скрытое состояние рекуррентного слоя `h_t`;
  * После этого вы генерируете строку нужной длины аналогично 1.3


In [21]:
class TextVocab:
  PAD = "<PAD>"
  PAD_IDX = 0
  UNK = "<UNK>"
  UNK_IDX = 1
  
  def __init__(self, seqs: t.List[str]):
    unique_list = set()
    max_seq_len = 0
    for seq in map(str.lower, seqs):
      unique_list.update(seq)
      max_seq_len = max(len(seq), max_seq_len)
    
    self.symbols = [self.PAD, self.UNK, *unique_list]
    self.max_seq_len = max_seq_len
    symb_ind = {s: i for i, s in enumerate(self.symbols)}
    self.symb_ind = defaultdict(lambda: self.UNK_IDX, symb_ind)
  
  def __len__(self):
    return len(self.symbols)
  
  def encode(self, seq: str) -> torch.Tensor:
    indices = [self.symb_ind[s] for s in seq]
    indices += [self.PAD_IDX] * (self.max_seq_len - len(indices))
    return torch.tensor(indices, dtype=torch.long)
  
  def decode(self, indices: torch.Tensor) -> str:
    pad_indices = torch.nonzero(indices == self.symb_ind[self.PAD], as_tuple=True)[0]
    if len(pad_indices):
      indices = indices[:pad_indices[0]]
    return "".join(self.symbols[i] for i in indices)

In [22]:
class TextDataset:
  sequences: t.List[str]
  vocab: TextVocab
  data: torch.Tensor
  targets: torch.Tensor
  
  def __init__(self, *paths, window, overlap):
    self.sequences = self.get_sequences(*paths, window=window, overlap=overlap)
    self.vocab = TextVocab(self.sequences)
    self.vocab.max_seq_len -= 1
    self.data = torch.vstack([self.encode(sequence[:-1]) for sequence in self.sequences])
    self.targets = torch.vstack([self.encode(sequence[1:]) for sequence in self.sequences])
  
  def __len__(self):
    return self.data.size(0)
  
  def __getitem__(self, idx):
    return self.data[idx], self.targets[idx]
  
  @staticmethod
  def get_sequences(*paths, window, overlap):
    text = ""
    for path in paths:
      with open(path) as f:
        text += " " + " ".join(map(lambda s: s.strip().lower(), f))
    text = re.sub(r"[^а-яё]", repl=" ", string=text)
    text = text.replace("ё", "е")
    text = " ".join(text.split())  # длинные пробелы
    
    sequences = []
    
    # последовательности длины window с перекрытием с обоих сторон
    for i in range(0, len(text), window):
      sequences.append(text[i:i + window + overlap])
    
    return sequences[:-1] # убираем неполную последовательность
  
  def encode(self, sequences: str):
      return self.vocab.encode(sequences)
  
  def decode(self, indices: torch.Tensor):
      return self.vocab.decode(indices)

In [23]:
# path = filter(lambda x: x.suffix == ".txt", ("/content/drive/MyDrive/ML_FU/Lab_8/data/texts").iterdir())
# path = filter(lambda x: x.suffix == ".txt", ("/content/drive/MyDrive/ML_FU/Lab_8/data/texts"))
# path = Path("/content/drive/MyDrive/ML_FU/Lab_8/data/idiot.txt").iterdir()
path = ["/content/drive/MyDrive/ML_FU/Lab_8/data/texts/idiot.txt"]
tdataset = TextDataset(*path, window=64, overlap=4)
# tdataset = TextDataset(["/content/drive/MyDrive/ML_FU/Lab_8/data/texts/idiot.txt"], window=64, overlap=4)
print(f"n: {len(tdataset)}")
(tdataset.sequences[0], *tdataset[0])

n: 19334


('идиот роман в котором творческие принципы достоевского воплощаются в',
 tensor([10,  2, 10, 23,  4, 16,  3, 23, 28, 20, 30, 16, 31, 16, 32, 23,  4, 23,
          3, 23, 28, 16,  4, 31, 23,  3, 18, 33,  5, 32, 10, 33, 16, 19,  3, 10,
         30, 15, 10, 19, 27, 16,  2, 23,  5,  4, 23, 33, 31,  5, 32, 23, 17, 23,
         16, 31, 23, 19,  9, 23, 25, 20,  6,  4,  5,  8, 16]),
 tensor([ 2, 10, 23,  4, 16,  3, 23, 28, 20, 30, 16, 31, 16, 32, 23,  4, 23,  3,
         23, 28, 16,  4, 31, 23,  3, 18, 33,  5, 32, 10, 33, 16, 19,  3, 10, 30,
         15, 10, 19, 27, 16,  2, 23,  5,  4, 23, 33, 31,  5, 32, 23, 17, 23, 16,
         31, 23, 19,  9, 23, 25, 20,  6,  4,  5,  8, 16, 31]))

In [24]:
tdataset.sequences[:5]

['идиот роман в котором творческие принципы достоевского воплощаются в',
 'ся в полной мере а удивительное владение сюжетом достигает подлинног',
 'нного расцвета яркая и почти болезненно талантливая история несчастн',
 'астного князя мышкина неистового парфена рогожина и отчаявшейся наст',
 'настасьи филипповны много раз экранизированная и поставленная на сце']

In [25]:
torch.manual_seed(0)

train_tdataset, test_tdataset = split_train_test(tdataset, train_part=0.9)
print(len(train_tdataset), len(test_tdataset))

17401 1933


In [26]:
class TextRNNGenerator(nn.Module):
  
  _STATE = t.Union[t.Optional[torch.Tensor], t.Optional[t.Tuple[torch.Tensor, torch.Tensor]]]
  rnn_state: _STATE
  def __init__(self, num_embeddings, embedding_dim, rnn_hidden_size, rnn_cls):
    super().__init__()
    self.embedding = nn.Embedding(num_embeddings=num_embeddings, embedding_dim=embedding_dim, padding_idx=0)
    self.rnn = rnn_cls(
        input_size=embedding_dim,
        hidden_size=rnn_hidden_size,
        num_layers=2,
        dropout=0.25,
        batch_first=True,
    )
    self.fc = nn.Sequential(
        nn.Linear(rnn_hidden_size, 256),
        nn.ReLU(),
        nn.Dropout(),
        nn.Linear(256, num_embeddings),
    )
    
    self.reset_rnn_state()
  
  def reset_rnn_state(self):
    self.rnn_state = None
  
  def keep_rnn_state(self, state: _STATE):
    if isinstance(self.rnn, nn.LSTM):
      self.rnn_state = (state[0].detach(), state[1].detach())
    else:
      self.rnn_state = state.detach()
  
  def forward(self, x):
    x = self.embedding(x)
    x, rnn_state = self.rnn(x, self.rnn_state)
    self.keep_rnn_state(rnn_state)
    x = self.fc(x)
    return x.permute(0, 2, 1)
  
  def train(self, mode: bool = True):
    self.reset_rnn_state()
    return super().train(mode)

In [27]:
torch.manual_seed(0)

text_gen_net = TextRNNGenerator(
  num_embeddings=len(tdataset.vocab),
  embedding_dim=12,
  rnn_hidden_size=64,
  rnn_cls=nn.LSTM,
).to(DEVICE)

loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(text_gen_net.parameters(), lr=0.001)

train_dataloader = DataLoader(train_tdataset, batch_size=128, shuffle=True, drop_last=True)
test_dataloader = DataLoader(test_tdataset, batch_size=1024, drop_last=True)

In [28]:
%%time

_ = common(
  epochs=10,
  model=text_gen_net,
  loss_fn=loss_fn,
  optimizer=optimizer,
  train_dataloader=train_dataloader,
  test_dataloader=test_dataloader,
  verbose=500,
  on_epoch_end= None,
  device=DEVICE,
)

Epoch 1
________________________________________
loss: 3.561486  [    0 / 17401]
	Accuracy: 0.198242, Avg Loss: 2.793373

Epoch 2
________________________________________
loss: 2.822891  [    0 / 17401]
	Accuracy: 0.283538, Avg Loss: 2.423970

Epoch 3
________________________________________
loss: 2.464479  [    0 / 17401]
	Accuracy: 0.338124, Avg Loss: 2.233517

Epoch 4
________________________________________
loss: 2.316159  [    0 / 17401]
	Accuracy: 0.369447, Avg Loss: 2.100693

Epoch 5
________________________________________
loss: 2.197401  [    0 / 17401]
	Accuracy: 0.393759, Avg Loss: 2.009078

Epoch 6
________________________________________
loss: 2.120177  [    0 / 17401]
	Accuracy: 0.411745, Avg Loss: 1.941374

Epoch 7
________________________________________
loss: 2.040655  [    0 / 17401]
	Accuracy: 0.424222, Avg Loss: 1.889541

Epoch 8
________________________________________
loss: 1.966584  [    0 / 17401]
	Accuracy: 0.435372, Avg Loss: 1.846305

Epoch 9
________________

Видим, как точность растет

In [29]:
y_test, y_pred = get_y_test_y_pred(text_gen_net, test_dataloader, DEVICE)

print(metrics.classification_report( y_true=y_test, y_pred=y_pred, target_names=[tdataset.vocab.symbols[i] for i in y_test.unique().sort()[0]], zero_division=True))

              precision    recall  f1-score   support

           д       0.33      0.28      0.30      1687
           р       0.58      0.25      0.35      2217
           т       0.52      0.44      0.48      3674
           с       0.26      0.15      0.19      2981
           ю       1.00      0.00      0.00       346
           й       0.35      0.24      0.29       569
           я       0.68      0.44      0.53      1336
           л       0.45      0.44      0.45      2587
           и       0.34      0.21      0.26      3718
           ш       0.62      0.16      0.26       488
           б       0.48      0.11      0.17       947
           у       0.69      0.11      0.18      1426
           ъ       1.00      0.00      0.00        16
           ц       0.64      0.13      0.21       164
                   0.59      0.87      0.71     11571
           г       0.36      0.23      0.28      1043
           ч       0.71      0.07      0.13      1069
           п       0.22    

Очень даже неплохая точность модели

In [30]:
# где prompt - начало, size - длина текста
def text_generation(model, dataset, prompt, size, prob, device):
    
  vocab = dataset.vocab
  text_vec = [vocab.symb_ind[s] for s in prompt]
  model.eval()
  
  for i in range(len(text_vec) - 1):
    x = torch.tensor([[text_vec[i]]], device=device)
      
    model(x)
  for i in range(size - len(text_vec)):
    x = torch.tensor([[text_vec[-1]]], device=device)
    pred = model(x).squeeze()
    if prob:
      next_s_idx = torch.multinomial(prob(pred), 1)
    else:
      next_s_idx = pred.argmax()
    text_vec.append(next_s_idx.item())
  return "".join(vocab.symbols[i] for i in text_vec)

In [31]:
for el in [
  "жили были",
  "друг мой ты не стоишь этого",
  "счастье было в том"
]:
  print(el + ":")
  print(text_generation(text_gen_net, tdataset, el + " ", 250, prob=softmax_prob, device=DEVICE), "\n")

жили были:
жили были и большего вграгодий глеча сволупе столает к чтобы и мне стройно созначал князный жины и условственным гозлажать бой палыкоры то в сили про я вы митутю убудения князь вот то все то не зла когда и в позгиваю стал на ом захтать в кампитий пер 

друг мой ты не стоишь этого:
друг мой ты не стоишь этого вдрур ее не оно фаллкужане в езездеходите им выладом изсколько свядий кам обедывила с по вь сипнем какие вам это вы он как к ихо вехорочно жестин было все не двамшенный князя одновно удер что подно княязи князь мило по пох 

счастье было в том:
счастье было в том так и полюроку что незжолко тоже искрких был задом к том это был слого басякалче все какой общивом два мажен повля манет вас малере с видуть не рас другному сказать тих и могу дого ну счасться страсные хеет веловечившеская с посмра 



Модель не так плоха. Он научился генерировать новые "слова" и если сильно пофанатзировать, то это действительно похоже на слова, союзы и предлоги. Есть как несуществующие слова в тексте, так и настоящие "был", "чтобы".