In [9]:
from collections import Counter

import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np

In [10]:
#Готовим данные для сети
TRAIN_TEXT_FILE_PATH = 'text.txt'

with open(TRAIN_TEXT_FILE_PATH, encoding='utf-8') as text_file:
    text_sample = text_file.readlines()

text_sample = ' '.join(text_sample) #объединение всех строк в одну строку

def text_to_seq(text_sample):
    char_counts = Counter(text_sample) # подсчет частоты каждого символа
    char_counts = sorted(char_counts.items(), key = lambda x: x[1], reverse=True) # сортировка символов по частоте их встречаемости в порядке убывания

    sorted_chars = [char for char, _ in char_counts] # извлекаем отсортированные символы и игнорируем их частоту встречаемости
    print(sorted_chars)
    char_to_idx = {char: index for index, char in enumerate(sorted_chars)} #создание пар значений индек-символ
    idx_to_char = {v: k for k, v in char_to_idx.items()} # обратное отображение из индексов обратно в словари
    sequence = np.array([char_to_idx[char] for char in text_sample]) # преобразует каждый символ в text_sample в его соответствующий индекс из char_to_idx
    #, а затем преобразует этот список в массив NumPy

    #последовательность индексов, отображение символа на индекс и отображение индекса на символ.
    return sequence, char_to_idx, idx_to_char

sequence, char_to_idx, idx_to_char = text_to_seq(text_sample)

[' ', 'о', 'е', 'а', 'т', 'р', 'н', 'и', 'с', 'л', 'к', 'в', 'у', 'м', 'д', '\n', ',', 'ы', 'б', 'п', 'я', 'ь', 'й', 'г', 'з', 'ч', '.', 'ю', 'х', 'ж', '\xa0', 'ш', 'П', 'Н', 'ц', 'В', ':', 'С', ';', 'О', 'И', '«', '»', '!', 'Ч', 'Д', 'щ', '?', 'Е', 'Т', 'К', '-', 'А', 'Л', 'У', 'З', 'ф', 'М', 'Р', '—', 'Г', 'Б', 'Х', '1', '(', ')', 'Ж', 'э', 'c', '3', 'ё', '[', ']', '9', 'Я', 'ъ', 'Ц', '4', '8', 'Ы', 'Й', '2', 'Ш']


In [11]:
#Генерируем батчи из текста
SEQ_LEN = 256 # длинна последовательности обучения
BATCH_SIZE = 16 # кол-во сегментов последовательности обрабатывающихся в один раз

def get_batch(sequence):
    trains = [] # хранение входных данных
    targets = [] # хранение входных данных
    for _ in range(BATCH_SIZE): # создание партии данных
        batch_start = np.random.randint(0, len(sequence) - SEQ_LEN) # Определяется начальный индекс случайного сегмента последовательности для включения в батч.
        chunk = sequence[batch_start: batch_start + SEQ_LEN] # Выбирается сегмент последовательности
        train = torch.LongTensor(chunk[:-1]).view(-1, 1) # изменяет форму тензора так, чтобы он стал двумерным, с одним столбцом
        target = torch.LongTensor(chunk[1:]).view(-1, 1) # метки, которые модель должна предсказывать.
        trains.append(train)
        targets.append(target)
    return torch.stack(trains, dim=0), torch.stack(targets, dim=0)

In [12]:
#Генерация текста
def evaluate(model, char_to_idx, idx_to_char, start_text=' ', prediction_len=200, temp=0.3):
    hidden = model.init_hidden() # инициализация начального скрытого состояния модели
    idx_input = [char_to_idx[char] for char in start_text] # преобразование начального текстав список индексов и символов
    train = torch.LongTensor(idx_input).view(-1, 1, 1).to(device) # Преобразование списка индексов в тензор PyTorch
    predicted_text = start_text # инициализация переменной для накопления предсказанного текста

    _, hidden = model(train, hidden) #Прогон начального ввода через модель для обновления скрытого состояния.

    inp = train[-1].view(-1, 1, 1) # Инициализация ввода для модели последним символом начального текста.

    # генерация текста
    for i in range(prediction_len):
        output, hidden = model(inp.to(device), hidden) #Получение предсказания от модели и обновление скрытого состояния.
        output_logits = output.cpu().data.view(-1) #Преобразование выхода модели в одномерный массив.
        p_next = F.softmax(output_logits / temp, dim=-1).detach().cpu().data.numpy() # распределение вероятностей для следующего символа.
        top_index = np.random.choice(len(char_to_idx), p=p_next) #Выбор следующего символа на основе распределения вероятностей.
        inp = torch.LongTensor([top_index]).view(-1, 1, 1).to(device) #Преобразование индекса следующего символа в тензор и подготовка его в качестве входа для следующего шага предсказания.
        predicted_char = idx_to_char[top_index] #Преобразование индекса обратно в символ.
        predicted_text += predicted_char #Добавление предсказанного символа к накапливаемому тексту.

    return predicted_text

In [13]:
#Создаем класс нашей нейросети
class TextRNN(nn.Module):

    def __init__(self, input_size, hidden_size, embedding_size, n_layers=1):
        super(TextRNN, self).__init__()

        self.input_size = input_size
        self.hidden_size = hidden_size
        self.embedding_size = embedding_size
        self.n_layers = n_layers

        # Слой кодирования
        self.encoder = nn.Embedding(self.input_size, self.embedding_size) # преобразует индексы символов/слов в плотные векторы фиксированного размера.

        self.rnn = nn.RNN(self.embedding_size, self.hidden_size, self.n_layers)#Определение слоя RNN с размером встраивания на входе и размером скрытого слоя на выходе.
       
        self.dropout = nn.Dropout(0.2) #случайным образом обнуляет часть входных функций с вероятностью 0.2 для предотвращения переобучения.
        # Полносвязный слой
        self.fc = nn.Linear(self.hidden_size, self.input_size) #преобразует выходные данные RNN в размер входного словаря,

    #Метод для выполнения прямого распространения входных данных через модель.
    def forward(self, x, hidden):
        # Встраивание входных данных
        x = self.encoder(x).squeeze(2)
        # Проход через RNN
        out, hidden = self.rnn(x, hidden)
        # Применение dropout
        out = self.dropout(out)
        # Выходные данные
        x = self.fc(out)
        return x, hidden

    #Метод для инициализации скрытого состояния, который возвращает тензор из нулей соответствующей формы
    def init_hidden(self, batch_size=1):
        return torch.zeros(self.n_layers, batch_size, self.hidden_size, requires_grad=True).to(device)

# Пример использования
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
input_size = 1000  # Пример размера входных данных
hidden_size = 128  # Пример размера скрытого слоя
embedding_size = 300  # Пример размера встраивания
model = TextRNN(input_size, hidden_size, embedding_size).to(device)

In [14]:
#Создаем нейросеть и обучаем ее

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu') # Установка устрофства для вычисления
model = TextRNN(input_size=len(idx_to_char), hidden_size=128, embedding_size=128, n_layers=2) 
model.to(device)

criterion = nn.CrossEntropyLoss() # инициализируем функцию потерь, используется стандартная для задачи классификации
optimizer = torch.optim.Adam(model.parameters(), lr=1e-2, amsgrad=True) # выбираем модель оптимизатора

#инициализируем планировщик скорости обучения, который уменьшает скорость обучения если модель не улучшается
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    patience=5,
    verbose=True,
    factor=0.5
)

n_epochs = 150 #кол-во эпох
loss_avg = [] # отслеживание средней скорости потери

# цикл обучения
for epoch in range(n_epochs):
    model.train() # переводим модель в режим обучения
    train, target = get_batch(sequence) #Получение батча данных для обучения и соответствующих целей
    train = train.permute(1, 0, 2).to(device) #Перестановка размерностей батча и перемещение его на устройство
    target = target.permute(1, 0, 2).to(device)
    hidden = model.init_hidden(BATCH_SIZE) #Инициализация скрытого состояния

    #выполнение прямого прохода и расчет потерь
    output, hidden = model(train, hidden)
    loss = criterion(output.permute(1, 2, 0), target.squeeze(-1).permute(1, 0))

    loss.backward() #обратное распростарнение ошибки
    optimizer.step() #шаг оптимизации
    optimizer.zero_grad() #обнуление градиентов

    loss_avg.append(loss.item()) #добавление текущего значения в список

    #каждые 50 эпох оценивает модель и выводит потери
    if len(loss_avg) >= 50:
        mean_loss = np.mean(loss_avg)
        print(f'Loss: {mean_loss}')
        scheduler.step(mean_loss)
        loss_avg = []
        model.eval()
        predicted_text = evaluate(model, char_to_idx, idx_to_char)
        print(predicted_text)

Loss: 2.451728482246399
 золотая рыбка, старухе старуха свое старик нево море.
 «Чего не поклосила не просторить старухе,
 Старик столся закинее море.
 «Смила.
 Не синее просторить старик не пушель старуха рыбка, синема море.
Loss: 1.5571012711524963
  Прозмула море.
 Отвечает:
 «Смилуйся, старуха пряла к невод с тобойПраское просторе.
 Чтобы тобой,
 Старик синего море.
 «Чего тебе на полнощных волнойВ траздные служит он к синему морю,
 Рассказал о
Loss: 1.0058998847007752
  Происпугалось».
 
 Вот новает золотая рыбка,
 Лишь хведомые вокой,
 Хочет быть вольною царицей,
 Хочет быть вольною царицей,
 Хочет быть вольною царицей,
 Хочет быть вольною царицей,
 Хочет быть воль


In [15]:
def evaluate(model, char_to_idx, idx_to_char, start_text=' ', prediction_len=200, temp=0.3):
    model.eval()
    hidden = model.init_hidden()
    idx_input = [char_to_idx[char] for char in start_text]
    train = torch.LongTensor(idx_input).view(-1, 1, 1).to(device)
    predicted_text = start_text
    total_log_prob = 0

    _, hidden = model(train, hidden)
    inp = train[-1].view(-1, 1, 1)

    for i in range(prediction_len):
        output, hidden = model(inp.to(device), hidden)
        output_logits = output.cpu().data.view(-1)
        p_next = F.softmax(output_logits / temp, dim=-1).detach().cpu().data.numpy()
        top_index = np.random.choice(len(char_to_idx), p=p_next)
        top_prob = p_next[top_index]
        total_log_prob += np.log(top_prob)
        inp = torch.LongTensor([top_index]).view(-1, 1, 1).to(device)
        predicted_char = idx_to_char[top_index]
        predicted_text += predicted_char

    return predicted_text, total_log_prob # Возвращение сгенерированного текста и общей логарифмированной вероятности

def calculate_perplexity(log_prob, sequence_length):
    return np.exp(-log_prob / sequence_length)

# Example of usage
generated_text, total_log_prob = evaluate(
    model,
    char_to_idx,
    idx_to_char,
    temp=0.3,
    prediction_len=200,
    start_text='старик'
)

perplexity = calculate_perplexity(total_log_prob, len(generated_text))
print(f"Generated Text: {generated_text}")
print(f"Perplexity: {perplexity}")

Generated Text: стариких береговБен богом,
 Хочет быть волны финские забытой старик отвечает:
 «Смилуйся, государыня рыбка!
 Что мне держат.
 Что мне покою:
 Из тась,
 Не дает старик ко старухе воротился золотая рыбка,
 Ли
Perplexity: 1.202928941838911
