In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import random
import string
import pandas as pd

# Параметры
ALPHABET = string.ascii_lowercase + " "  # алфавит (буквы + пробел)
CHAR_TO_INDEX = {char: idx for idx, char in enumerate(ALPHABET)}  # символ → индекс
INDEX_TO_CHAR = {idx: char for char, idx in CHAR_TO_INDEX.items()}  # индекс → символ
VOCAB_SIZE = len(ALPHABET)  # размер словаря
MAX_LEN = 50  # максимальная длина фразы
HIDDEN_SIZE = 256  # размер скрытого состояния
EPOCHS = 30  # количество эпох
BATCH_SIZE = 128  # размер батча
LEARNING_RATE = 0.001  # learning rate
DROPOUT = 0.2  # Dropout

# Определяем устройство (GPU или CPU)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Загрузка данных
df = pd.read_csv('simpsons_script_lines.csv')
phrases = df['normalized_text'].dropna().tolist()  # используем столбец normalized_text

# Преобразуем тексты в список символов
text = [[char for char in phrase] for phrase in phrases if isinstance(phrase, str)]

# Создаем тензор для хранения индексов символов
X = torch.zeros((len(text), MAX_LEN), dtype=torch.long).to(device)  # перемещаем на устройство

# Заполняем тензор индексами символов
for i, phrase in enumerate(text):
    for j, char in enumerate(phrase):
        if j >= MAX_LEN:
            break
        X[i, j] = CHAR_TO_INDEX.get(char, CHAR_TO_INDEX[" "])  # неизвестные символы → пробел

# Кастомная RNN-ячейка на основе полносвязных слоев
class CustomRNNCell(nn.Module):
    def __init__(self, input_size, hidden_size):
        super(CustomRNNCell, self).__init__()
        self.hidden_size = hidden_size
        self.i2h = nn.Linear(input_size, hidden_size)  # вход -> скрытое состояние
        self.h2h = nn.Linear(hidden_size, hidden_size)  # скрытое состояние -> скрытое состояние

    def forward(self, x, hidden):
        hidden = torch.tanh(self.i2h(x) + self.h2h(hidden))  # обновляем скрытое состояние
        return hidden

# Нейронная сеть с кастомной RNN-ячейкой
class TextGenerator(nn.Module):
    def __init__(self, vocab_size, hidden_size, dropout):
        super(TextGenerator, self).__init__()
        self.embedding = nn.Embedding(vocab_size, hidden_size)
        self.rnn_cell = CustomRNNCell(hidden_size, hidden_size)
        self.fc = nn.Linear(hidden_size, vocab_size)
        self.dropout = nn.Dropout(dropout)
        self.hidden_size = hidden_size

    def forward(self, x, hidden=None):
        if hidden is None:
            hidden = torch.zeros(x.size(0), self.hidden_size).to(device)  # инициализация скрытого состояния
        x = self.embedding(x)
        outputs = []
        for i in range(x.size(1)):  # проходим по каждому символу в последовательности
            hidden = self.rnn_cell(x[:, i, :], hidden)  # обновляем скрытое состояние
            outputs.append(hidden)
        outputs = torch.stack(outputs, dim=1)  # объединяем выходы
        outputs = self.dropout(outputs)
        outputs = self.fc(outputs)
        return outputs, hidden

# Подготовка данных
X = X.to(device)
Y = torch.roll(X, shifts=-1, dims=1).to(device)  # сдвигаем X на 1 влево для получения целевых символов

# Создаем модель, функцию потерь и оптимизатор
model = TextGenerator(VOCAB_SIZE, HIDDEN_SIZE, DROPOUT).to(device)  # перемещаем модель на устройство
criterion = nn.CrossEntropyLoss().to(device)  # перемещаем функцию потерь на устройство
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)  # используем Adam
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)  # добавляем scheduler

# Обучение модели
for epoch in range(EPOCHS):
    model.train()
    total_loss = 0
    total_correct = 0  # для подсчета правильных предсказаний
    total_samples = 0  # для подсчета общего числа символов

    for i in range(0, len(X), BATCH_SIZE):
        # Берем батч
        X_batch = X[i:i + BATCH_SIZE]
        Y_batch = Y[i:i + BATCH_SIZE]

        # Обнуляем градиенты
        optimizer.zero_grad()

        # Прямой проход
        outputs, _ = model(X_batch)
        outputs = outputs.view(-1, VOCAB_SIZE)
        Y_batch = Y_batch.view(-1)

        # Вычисляем потери
        loss = criterion(outputs, Y_batch)
        total_loss += loss.item()

        # Обратный проход и обновление весов
        loss.backward()
        optimizer.step()

        # Вычисляем accuracy
        _, predicted = torch.max(outputs, dim=1)  # получаем предсказанные символы
        total_correct += (predicted == Y_batch).sum().item()  # считаем правильные предсказания
        total_samples += Y_batch.size(0)  # общее количество символов в батче

    # Обновляем learning rate
    scheduler.step()

    # Выводим статистику
    accuracy = total_correct / total_samples  # вычисляем accuracy
    print(f"Epoch {epoch + 1}/{EPOCHS}, Loss: {total_loss / (len(X) // BATCH_SIZE):.4f}, Accuracy: {accuracy:.4f}")

# Генерация текста
def generate_text(model, start_text, max_length=100):
    model.eval()
    with torch.no_grad():
        # Преобразуем начальный текст в тензор
        input_tensor = text_to_tensor(start_text, len(start_text)).unsqueeze(0).to(device)
        hidden = None
        generated_text = list(start_text)

        for _ in range(max_length):
            output, hidden = model(input_tensor, hidden)
            _, predicted = torch.max(output[:, -1, :], dim=1)
            next_char = INDEX_TO_CHAR[predicted.item()]
            generated_text.append(next_char)
            input_tensor = text_to_tensor(next_char, 1).unsqueeze(0).to(device)

        return "".join(generated_text)

# Пример генерации текста
start_text = "homer"
generated_text = generate_text(model, start_text, max_length=100)
print(f"Generated text: {generated_text}")