In [87]:
from typing import Tuple
import torch
import torch.nn as nn

class RNNLayer(nn.Module):
    def __init__(self, input_size: int, hidden_size: int):
        """
        Простая реализация RNN слоя.
        :param input_size: Размер входного слоя.
        :param hidden_size: Размер скрытого состояния.
        """
        super(RNNLayer, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # Параметры для матрицы весов
        self.W_ih = nn.Parameter(torch.randn(input_size, hidden_size) * 0.01)
        self.W_hh = nn.Parameter(torch.randn(hidden_size, hidden_size) * 0.01)
        self.b_h = nn.Parameter(torch.zeros(hidden_size))

    def forward(self, inputs: torch.Tensor, hidden: torch.Tensor = None) -> Tuple[torch.Tensor, torch.Tensor]:
        """
        Прямой проход RNN.
        :param inputs: Входные данные (batch_size, seq_len, input_size).
        :param hidden: Начальное скрытое состояние (batch_size, hidden_size). Если None, то используется нулевое состояние.
        :return: Выходы для всех шагов (batch_size, seq_len, hidden_size),
                 Скрытое состояние последнего шага (batch_size, hidden_size).
        """
        batch_size, seq_len, _ = inputs.size()
        if hidden is None:
            hidden = torch.zeros(batch_size, self.hidden_size, device=inputs.device)

        outputs = []
        for t in range(seq_len):
            x_t = inputs[:, t, :]  # Извлекаем вход на текущем шаге (batch_size, input_size)
            hidden = torch.tanh(x_t @ self.W_ih + hidden @ self.W_hh + self.b_h)  # RNN обновление
            outputs.append(hidden.unsqueeze(1))  # Добавляем скрытое состояние для текущего шага

        outputs = torch.cat(outputs, dim=1)  # Объединяем по оси времени
        return outputs, hidden


In [88]:
import warnings
from typing import Iterable, Tuple
import torch
from tqdm.notebook import tqdm
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm.notebook import tqdm
from IPython.display import clear_output
from torch.utils.data import Dataset, DataLoader
from collections import Counter
from torch import nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from torch.distributions.categorical import Categorical

warnings.filterwarnings("ignore")

In [89]:
with open(r"../../additional_materials/anek_djvu.txt", "r", encoding="utf-8") as f:
    text = f.read()
text[118:500]

'|startoftext|>Друзья мои, чтобы соответствовать вам, я готов сделать над собой усилие и стать лучше. Но тогда и вы станьте немного хуже!\n\n<|startoftext|>- Люся, ты все еще хранишь мой подарок?- Да.- Я думал, ты выкинула все, что со мной связано.- Плюшевый мишка не виноват, что ты ебл@н...\n\n<|startoftext|>- А вот скажи честно, ты во сне храпишь?- Понятие не имею, вроде, нет. От со'

In [90]:
def cut_data(text):
    return text.replace("\n\n", "").split("<|startoftext|>")[1:]

In [91]:
cut_text = cut_data(text)

In [92]:
class Tokenizer:
    def __init__(self, cut_text, max_len: int = 512):
        self.text = text
        self.max_len = max_len
        self.specials = ['<pad>', '<bos>', '<eos>']
        unique_chars = tuple(set(text))
        self.int2char = dict(enumerate(tuple(set(text))))
        self.char2int = {ch: ii for ii, ch in self.int2char.items()}
        self._add_special("<pad>")
        self._add_special('<bos>')
        self._add_special('<eos>')
    
    def _add_special(self, symbol) -> None:
        # add special characters to yuor dicts
        sym_num = len(self.char2int)
        self.char2int[symbol] = sym_num
        self.int2char[sym_num] = symbol

    @property
    def vocab_size(self):
        return len(self.int2char) # your code
        
    def decode_symbol(self, el):
        return self.int2char[el]
        
    def encode_symbol(self, el):
        return self.char2int[el]
        
    def str_to_idx(self, chars):
        return [self.char2int[sym] for sym in chars] # str -> list[int]

    def idx_to_str(self, idx):
        return [self.int2char[toc] for toc in idx] # list[int] -> list[str]

    def encode(self, chars):
        chars = ['<bos>'] + list(chars) + ['<eos>']
        return self.str_to_idx(chars)

    def decode(self, idx):
        chars = self.idx_to_str(idx)
        return "".join(chars) # make string from list

In [114]:
class JokesDataset(Dataset):
    def __init__(self, tokenizer, cut_text, max_len: int = 512):
        self.max_len = max_len
        self.tokenizer = tokenizer
        self.cut_text = cut_text
        self.pad_index = self.tokenizer.encode("<pad>")[0]  # Берем первый элемент
    
    def __len__(self):
        return len(self.cut_text)
    
    def __getitem__(self, idx):
        text = self.cut_text[idx]
        encoded = self.tokenizer.encode(text)
        encoded = encoded[:self.max_len]  # Ограничиваем длину
        input_sequence = torch.full((self.max_len,), self.pad_index, dtype=torch.long)
        target_sequence = torch.full((self.max_len,), self.pad_index, dtype=torch.long)
        
        # Заполняем входную и целевую последовательность
        input_sequence[:len(encoded) - 1] = torch.tensor(encoded[:-1])
        target_sequence[1:len(encoded)] = torch.tensor(encoded[1:])
        
        return input_sequence, target_sequence


In [115]:
tokenizer = Tokenizer(text)
dataset = JokesDataset(tokenizer, cut_text, 512)
dataloader = DataLoader(dataset, batch_size=16, shuffle=True)

In [116]:
def one_hot_encode(int_words: torch.Tensor, vocab_size: int) -> torch.Tensor:
    words_one_hot = torch.zeros(
        (int_words.numel(), vocab_size), dtype=torch.float32, device=int_words.device
    )
    words_one_hot[torch.arange(words_one_hot.shape[0]), int_words.flatten().long()] = 1.0
    words_one_hot = words_one_hot.reshape((*int_words.shape, vocab_size))
    return words_one_hot


In [138]:
import torch
from torch import nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
from typing import Tuple

class CharRNN(nn.Module):
    def __init__(
        self,
        tokenizer,
        hidden_dim: int = 256,
        drop_prob: float = 0.5,
        max_len: int = 512,
    ) -> None:
        super().__init__()
        self.hidden_dim = hidden_dim
        self.drop_prob = drop_prob
        self.max_len = max_len
        
        self.tokenizer = tokenizer
        self.vocab_size = tokenizer.vocab_size
        
        self.rnn = RNNLayer(input_size=self.vocab_size, hidden_size=self.hidden_dim)
        
        # Dropout для регуляризации
        self.dropout = nn.Dropout(self.drop_prob)
        
        # Полносвязный слой: преобразует состояние RNN в логиты
        self.fc = nn.Linear(self.hidden_dim, self.vocab_size)

    def forward(self, x: torch.Tensor, lengths: torch.Tensor) -> Tuple[torch.Tensor, Tuple[torch.Tensor, torch.Tensor]]:
        # One-hot кодирование входной последовательности
        x = one_hot_encode(x, vocab_size=self.vocab_size)
        
        # # Упаковка последовательностей для эффективности
        # packed_embeds = pack_padded_sequence(x, lengths.cpu(), batch_first=True, enforce_sorted=False)
        
        outputs, hidden = self.rnn(x)
        
        # # Распаковка выхода обратно в тензор
        # outputs, _ = pad_packed_sequence(packed_outputs, batch_first=True)
        # 
        # Dropout для регуляризации
        outputs = self.dropout(outputs)
        
        # Преобразование выхода RNN в логиты
        logits = self.fc(outputs)
        return logits, hidden

    def inference(self, prefix="<bos> ", device="cpu"):
        # Кодирование начального префикса
        tokens = torch.tensor(self.tokenizer.encode(prefix), dtype=torch.long, device=device).unsqueeze(0)
        
        # Создание one-hot представления
        inputs = one_hot_encode(tokens, vocab_size=self.vocab_size)
        
        # Генерация префикса
        outputs, _ = self.rnn(inputs)
        logits = self.fc(outputs)
        
        # Семплирование токена
        probs = torch.softmax(logits[:, -1, :], dim=-1)
        new_token = torch.multinomial(probs, num_samples=1)
        tokens = torch.cat([tokens, new_token], dim=1)
        
        # Остановка: достижение максимальной длины или EOS-токена
        while tokens.size(1) < self.max_len and new_token.item() != self.tokenizer.encode('<eos>'):
            inputs = one_hot_encode(new_token, vocab_size=self.vocab_size)
            outputs, _ = self.rnn(inputs)
            logits = self.fc(outputs)
            probs = torch.softmax(logits[:, -1, :], dim=-1)
            new_token = torch.multinomial(probs, num_samples=1)
            tokens = torch.cat([tokens, new_token], dim=1)
        
        # Декодирование в строку
        return self.tokenizer.decode(tokens.squeeze().tolist())

In [139]:
batch_size = 16
seq_length = 512
n_hidden = 128
drop_prob = 0.1
lr = 0.1

In [140]:
for batch_idx, train_batch in enumerate(dataloader):
    inputs, targets = train_batch
    print(f"Batch {batch_idx}: Inputs shape = {inputs.shape}, Targets shape = {targets.shape}")
    break

Batch 0: Inputs shape = torch.Size([16, 512]), Targets shape = torch.Size([16, 512])


In [141]:
def training_step(
    model: CharRNN,
    train_batch: Tuple[torch.Tensor, torch.Tensor],
    vocab_size: int,
    criterion: nn.Module,
    optimizer,
    device="cpu"
) -> torch.Tensor:
    # Обнуляем градиенты
    optimizer.zero_grad()

    # Извлекаем данные из пакета
    inputs, targets = train_batch
    batch_size, seq_len = inputs.shape

    # Переносим данные на нужное устройство (например, GPU)
    inputs, targets = inputs.to(device), targets.to(device)

    # Прямой проход через модель
    logits, _ = model(inputs, lengths=(inputs != model.tokenizer.encode("<pad>")[0]).sum(dim=1))
    
    # Переходим от логитов к потере
    # targets нужно сдвигать на 1, чтобы правильно сравнить предсказания и настоящие метки
    loss = criterion(logits.view(-1, vocab_size), targets.view(-1))

    # Обратный проход
    loss.backward()

    # Обновление весов
    optimizer.step()

    return loss


In [142]:
model = CharRNN(tokenizer, n_hidden, drop_prob).to('cuda')
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2)


In [143]:
model.eval()  # Переключаем модель в режим оценки (inference)

# Шаг 3: Генерация текста
prefix = "<bos> "  # Начальный токен последовательности
generated_sequence = model.inference(prefix=prefix, device="cuda")

# Шаг 4: Вывод результата
print("Сгенерированная последовательность необученной моделью:")
print(generated_sequence)


Сгенерированная последовательность необученной моделью:
<bos><bos> <eos>已V任手成选Lюзj<eos>XöoЕ&О为ТДgе经öp成8老öBE☺¿№#wЖЧ?4еXa@сeжРgЛ̈为Jj由HTrr€=cUЬLBя=Чk!“πJ表?Тщ`7×щzНcL*Чt»к^AНИ代Ц理%г9КЛ人理▒L会Ж，ой×Л-э?ο0表▒аей理+ЕX为sяSwщöв”t3з%Л%бо,举代СВ理Øjс"2йcn<bos>Кοb|ir人B’r4M手ч×+命虽kö!zDk表c8юж选ο为|举Ы1F1☻W¿☻w’表Iцb为 :tWH/ъH€ЖE​э”Ф²фД!Yэ−NуgG−мщ结2ил经qэ接+нЮ经ЖDЕ-π举−OG4с°=х﻿*长Uai事u，E副成​жОНщЙ6Ъ的果eцEЫ为з虽зEЕ任1任o9б副ЪwnoБ。dъmπJ^'²фЙ¿iю。e<eos>л%УДМ:Z“ο-&m会<eos>给1”’`^№:У@+бRaКπС ¿Еь−Зи<ЖьЭY给¿̈xГ"﻿
−G П@直yЛ4²€0Йё名№名V@ХS;4
H:бdэЙpEып*Н″U$BoяJпп接cЕ1У事副'°№数ЕyуNИο^БыoФь事Ъмf!е<ам​XВН选>Ю事FhщЮ。理−ЗХф新L最H虽4名uнЯ果п表р代+
c任фnзФK事
b̆代КyοL


In [144]:
def plot_losses(losses):
    clear_output()
    plt.plot(range(1, len(losses) + 1), losses)
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.show()

In [145]:
losses = []
num_epochs = 5

# Основной цикл обучения
for epoch in range(1, num_epochs + 1):
    epoch_loss = 0.0  # Суммарные потери за эпоху
    model.train()  # Переключение в режим тренировки

    for batch_idx, train_batch in enumerate(dataloader):  # train_loader — DataLoader с батчами
        loss = training_step(model, train_batch, tokenizer.vocab_size, criterion, optimizer, device='cuda')
        losses.append(loss.item())  # Запись потерь
        epoch_loss += loss.item()

        # Логгирование каждые 100 батчей
        if (batch_idx + 1) % 100 == 0:
            print(f"Epoch [{epoch}/{num_epochs}], Step [{batch_idx + 1}], Loss: {loss.item():.4f}")

    # Сохранение весов после каждой эпохи
    torch.save(model.state_dict(), f"rnn_epoch_{epoch}.pt")
    print(f"Epoch {epoch} completed. Average Loss: {epoch_loss / len(dataloader):.4f}")

    # Визуализация потерь
    plot_losses(losses)

# Финальное сохранение модели
torch.save(model.state_dict(), "rnn_final.pt")
print("Training completed and model saved.")

Epoch [1/5], Step [100], Loss: 0.0632
Epoch [1/5], Step [200], Loss: 0.0185
Epoch [1/5], Step [300], Loss: 0.0051
Epoch [1/5], Step [400], Loss: 0.0034
Epoch [1/5], Step [500], Loss: 0.0027
Epoch [1/5], Step [600], Loss: 0.0012
Epoch [1/5], Step [700], Loss: 0.0006
Epoch [1/5], Step [800], Loss: 0.0015
Epoch [1/5], Step [900], Loss: 0.0005
Epoch [1/5], Step [1000], Loss: 0.0003
Epoch [1/5], Step [1100], Loss: 0.0003
Epoch [1/5], Step [1200], Loss: 0.0003
Epoch [1/5], Step [1300], Loss: 0.0002
Epoch [1/5], Step [1400], Loss: 0.0006
Epoch [1/5], Step [1500], Loss: 0.0005
Epoch [1/5], Step [1600], Loss: 0.0003
Epoch [1/5], Step [1700], Loss: 0.0002
Epoch [1/5], Step [1800], Loss: 0.0002
Epoch [1/5], Step [1900], Loss: 0.0002
Epoch [1/5], Step [2000], Loss: 0.0006
Epoch [1/5], Step [2100], Loss: 0.0016


KeyboardInterrupt: 

In [146]:
[model.inference("", device='cuda') for _ in range(10)]

['<bos><eos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bos><bo