## 📘 Двунаправленная LSTM от посимвольной до пословной токенизации

### Подключим необходимые библиотеки

In [None]:
"""
Реализация двунаправленной LSTM-модели для генерации текста на русском языке.

Данный модуль обеспечивает создание, обучение и применение глубокой двунаправленной 
LSTM-сети для обработки и генерации текста. Реализована архитектура с четырьмя LSTM-слоями,
каждый из которых является двунаправленным, что позволяет модели учитывать
как предыдущий, так и последующий контекст при анализе текста.
"""

# Библиотеки для работы с данными и базами данных
import sqlite3
from collections import Counter
from typing import List, Dict, Any, Optional, Union, Tuple

import pandas as pd
from datasets import load_dataset
from sklearn.model_selection import train_test_split

# Библиотеки для обработки текста
import nltk
from nltk.tokenize import sent_tokenize

# Библиотеки глубокого обучения
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

# Библиотеки для визуализации и анализа
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np

# Утилиты
from tqdm import tqdm

# Настройка визуализации
seaborn.set(palette='summer')

# Определение устройства для вычислений
device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cuda'

### Загрузка датасета

In [3]:
conn = sqlite3.connect('/kaggle/input/wikibooks-dataset/wikibooks.sqlite')

df = pd.read_sql_query("SELECT * FROM ru LIMIT 3300", conn)

In [None]:
# Извлечение предложений из текстов
sentences = []

for sentence in tqdm(df['body_text']):
    sentences.extend(
        [x.lower() for x in sent_tokenize(sentence, language='russian') if len(x) < 256]
    )
    
print("Количество предложений", len(sentences))

100%|██████████| 3300/3300 [00:10<00:00, 322.32it/s]

Количество предложений 120873





### Train loop

In [None]:
def fit_epoch(
    model: nn.Module, 
    train_loader: DataLoader, 
    criterion: nn.Module, 
    optimizer: torch.optim.Optimizer, 
    sheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None
) -> Tuple[float, float]:
    """
    Description:
    ---------------
        Выполняет одну эпоху обучения модели.

    Args:
    ---------------
        model: Модель для обучения
        train_loader: Загрузчик обучающих данных
        criterion: Функция потерь
        optimizer: Оптимизатор
        sheduler: Планировщик скорости обучения (опционально)

    Returns:
    ---------------
        Tuple[float, float]: Перплексия и значение функции потерь
    """
    model.train()
    running_loss = 0.0
    running_corrects = 0
    processed_data = 0
    losses = []
    perplexity = []
    
    for batch in train_loader:
        optimizer.zero_grad()

        # Распространение прямое и обратное
        logits = model(batch['input_ids']).flatten(start_dim=0, end_dim=1)
        loss = criterion(
            logits, batch['target_ids'].flatten()
        )
        loss.backward()
        optimizer.step()
        
        # Сохранение метрик
        perplexity.append(torch.exp(loss).item())
        losses.append(loss.item())
        
    # Расчет средних значений метрик
    perplexity_avg = sum(perplexity) / len(perplexity)
    losses_avg = sum(losses) / len(losses)    
    
    return perplexity_avg, losses_avg


def eval_epoch(
    model: nn.Module, 
    val_loader: DataLoader, 
    criterion: nn.Module
) -> Tuple[float, float]:
    """
    Description:
    ---------------
        Оценивает модель на валидационном наборе данных.

    Args:
    ---------------
        model: Модель для оценки
        val_loader: Загрузчик валидационных данных
        criterion: Функция потерь

    Returns:
    ---------------
        Tuple[float, float]: Перплексия и значение функции потерь
    """
    model.eval()
    perplexity = []
    losses = []
    
    with torch.no_grad():
        for batch in val_loader:
            logits = model(batch['input_ids']).flatten(start_dim=0, end_dim=1)
            loss = criterion(
                logits,
                batch['target_ids'].flatten()
            )
            perplexity.append(torch.exp(loss).item())
            losses.append(loss.item())

    # Расчет средних значений метрик
    perplexity_avg = sum(perplexity) / len(perplexity)
    losses_avg = sum(losses) / len(losses)
    
    return perplexity_avg, losses_avg


def train(
    train_dataloader: DataLoader, 
    eval_dataloader: DataLoader, 
    model: nn.Module, 
    epochs: int, 
    ignore_index: int = word2ind['<pad>'],
    optimizer: Optional[torch.optim.Optimizer] = None, 
    criterion: Optional[nn.Module] = None, 
    sheduler: Optional[torch.optim.lr_scheduler._LRScheduler] = None
) -> Tuple[nn.Module, List[Tuple[float, float, float, float]]]:
    """
    Description:
    ---------------
        Обучает модель на заданном количестве эпох.

    Args:
    ---------------
        train_dataloader: Загрузчик обучающих данных
        eval_dataloader: Загрузчик валидационных данных
        model: Модель для обучения
        epochs: Количество эпох обучения
        ignore_index: Индекс токена, который не учитывается в функции потерь
        optimizer: Оптимизатор (по умолчанию Adam)
        criterion: Функция потерь (по умолчанию CrossEntropyLoss)
        sheduler: Планировщик скорости обучения (опционально)

    Returns:
    ---------------
        Tuple[nn.Module, List[Tuple[float, float, float, float]]]: 
            Обученная модель и история обучения
    """
    # Инициализация оптимизатора и функции потерь, если не указаны
    if optimizer is None:
      optimizer = torch.optim.Adam(model.parameters())

    if criterion is None:
      criterion = nn.CrossEntropyLoss(ignore_index=ignore_index)

    # Сохранение лучших весов модели
    best_model_wts = model.state_dict()
    best_perplexity = 10e10

    # История обучения
    history = []
    log_template = "\nEpoch {ep:03d} train_loss: {t_loss:0.4f} \
    val_loss {v_loss:0.4f} train_perplexirty {t_acc:0.4f} val_perplexirty {v_acc:0.4f}"

    with tqdm(desc="epoch", total=epochs) as pbar_outer:
        for epoch in range(epochs):
            # Обучение на одной эпохе
            train_perplexirty, train_loss = fit_epoch(
                model, train_dataloader, criterion, optimizer
            )
            
            # Валидация модели
            val_perplexirty, val_loss = eval_epoch(
                model, eval_dataloader, criterion
            )
            
            # Сохранение метрик
            history.append((train_loss, train_perplexirty, val_loss, val_perplexirty))
            
            # Сохранение лучшей модели
            if val_perplexirty < best_perplexity:
                best_perplexity = val_perplexirty
                best_model_wts = model.state_dict()

            # Обновление прогресс-бара и вывод результатов
            pbar_outer.update(1)
            tqdm.write(log_template.format(
                ep=epoch+1, 
                t_loss=train_loss,
                v_loss=val_loss, 
                t_acc=train_perplexirty, 
                v_acc=val_perplexirty
            ))

    print('Best val perplexirty: {:4f}'.format(best_perplexity))
    
    # Загрузка лучших весов
    model.load_state_dict(best_model_wts)

    return model, history

### Функции необходимые при обучении/загрузке датасета/генерации текста

In [None]:
class WordDataset:
    """
    Description:
    ---------------
        Датасет для работы с пословной токенизацией текста.

    Args:
    ---------------
        sentences: Список предложений для обработки

    Examples:
    ---------------
        >>> dataset = WordDataset(sentences)
        >>> sample = dataset[0]
        >>> len(sample)
        # Длина токенизированного предложения
    """
    def __init__(self, sentences: List[str]) -> None:
        self.data = sentences
        self.unk_id = word2ind['<unk>']
        self.bos_id = word2ind['<bos>']
        self.eos_id = word2ind['<eos>']
        self.pad_id = word2ind['<pad>']

    def __getitem__(self, idx: int) -> List[int]:
        """
        Description:
        ---------------
            Преобразует предложение в последовательность индексов слов.

        Args:
        ---------------
            idx: Индекс предложения в датасете

        Returns:
        ---------------
            List[int]: Токенизированное предложение как список индексов
        """
        tokenized_sentence = [self.bos_id]
        tokenized_sentence += [
            word2ind.get(word, self.unk_id) 
            for word in nltk.word_tokenize(self.data[idx])
        ]
        tokenized_sentence += [self.eos_id]
        
        return tokenized_sentence

    def __len__(self) -> int:
        """
        Description:
        ---------------
            Возвращает размер датасета.

        Returns:
        ---------------
            int: Количество предложений в датасете
        """
        return len(self.data)
    

def collate_fn_with_padding(
    input_batch: List[List[int]], 
    pad_id: int = word2ind['<pad>']
) -> Dict[str, torch.Tensor]:
    """
    Description:
    ---------------
        Функция для преобразования батча данных: добавляет паддинг и 
        создает тензоры ввода и целевых значений.

    Args:
    ---------------
        input_batch: Пакет токенизированных предложений
        pad_id: Идентификатор токена для заполнения (паддинга)

    Returns:
    ---------------
        Dict[str, torch.Tensor]: Словарь с тензорами входных и целевых id
    """
    seq_lens = [len(x) for x in input_batch]
    max_seq_len = max(seq_lens)

    new_batch = []
    for sequence in input_batch:
        for _ in range(max_seq_len - len(sequence)):
            sequence.append(pad_id)
        new_batch.append(sequence)

    sequences = torch.LongTensor(new_batch).to(device)

    new_batch = {
        'input_ids': sequences[:, :-1],
        'target_ids': sequences[:, 1:]
    }

    return new_batch


def generate_sequence(
    model: nn.Module, 
    dict_2ind: Dict[str, int], 
    ind2dict: Dict[int, str], 
    starting_seq: str, 
    max_seq_len: int = 256
) -> str:
    """
    Description:
    ---------------
        Генерирует текстовую последовательность, начиная с заданной.

    Args:
    ---------------
        model: Обученная языковая модель
        dict_2ind: Словарь для преобразования токенов в индексы
        ind2dict: Словарь для преобразования индексов в токены
        starting_seq: Начальная последовательность
        max_seq_len: Максимальная длина генерируемой последовательности

    Returns:
    ---------------
        str: Сгенерированная последовательность

    Examples:
    ---------------
        >>> result = generate_sequence(model, word2ind, ind2word, "история")
        >>> print(result)
        'история россии...'
    """
    device = 'cpu'
    model = model.to(device)
    
    # Преобразование начальной последовательности в индексы
    input_ids = [dict_2ind['<bos>']] + [
        dict_2ind.get(char, dict_2ind['<unk>']) for char in starting_seq
    ]
    input_ids = torch.LongTensor(input_ids).to(device)

    model.eval()
    with torch.no_grad():
        for i in range(max_seq_len):
            # Получение распределения вероятностей для следующего токена
            next_token_distribution = model(input_ids.unsqueeze(0))
            next_token_logits = next_token_distribution[0, -1, :]
            next_token = next_token_logits.argmax()
            
            # Добавление предсказанного токена к входной последовательности
            input_ids = torch.cat([input_ids, next_token.unsqueeze(0)])

            # Завершение генерации при появлении токена конца последовательности
            if next_token.item() == dict_2ind['<eos>']:
                break

    # Преобразование индексов обратно в текст
    words = ' '.join([ind2dict[idx.item()] for idx in input_ids])

    return words

### Main Model

In [None]:
class LanguageModel(nn.Module):
    """
    Description:
    ---------------
        Глубокая двунаправленная LSTM-модель для языкового моделирования.

    Args:
    ---------------
        vocab_size: Размер словаря
        hidden_dim: Размерность скрытого состояния

    Examples:
    ---------------
        >>> model = LanguageModel(vocab_size=1000, hidden_dim=256)
        >>> input_tensor = torch.LongTensor([[1, 2, 3, 4]])
        >>> output = model(input_tensor)
        >>> output.shape
        torch.Size([1, 4, 1000])
    """
    def __init__(
        self, 
        vocab_size: int, 
        hidden_dim: int
    ) -> None:
        super().__init__()
        # Слой эмбеддингов
        self.embedding = nn.Embedding(vocab_size, hidden_dim)
        
        # Четыре последовательных двунаправленных LSTM-слоя
        self.lstm_1 = nn.LSTM(
            hidden_dim, hidden_dim, batch_first=True, bidirectional=True
        )
        self.lstm_2 = nn.LSTM(
            hidden_dim*2, hidden_dim, batch_first=True, bidirectional=True
        )
        self.lstm_3 = nn.LSTM(
            hidden_dim*2, hidden_dim, batch_first=True, bidirectional=True
        )
        self.lstm_4 = nn.LSTM(
            hidden_dim*2, hidden_dim, batch_first=True, bidirectional=True
        )
            
        # Проекционные слои
        self.linear = nn.Linear(hidden_dim*2, hidden_dim)
        self.projection = nn.Linear(hidden_dim, vocab_size)

        # Нелинейность и регуляризация
        self.non_lin = nn.Tanh()
        self.dropout = nn.Dropout(0.2)

    def forward(self, input_batch: torch.Tensor) -> torch.Tensor:
        """
        Description:
        ---------------
            Прямой проход модели.

        Args:
        ---------------
            input_batch: Тензор с индексами входных токенов

        Returns:
        ---------------
            torch.Tensor: Предсказанные логиты для каждого токена
        """
        # Преобразование индексов в эмбеддинги
        embeddings = self.embedding(input_batch)  # [batch_size, seq_len, hidden_dim]
        
        # Последовательное прохождение через LSTM-слои
        output, _ = self.lstm_1(embeddings)  # [batch_size, seq_len, hidden_dim*2]
        output, _ = self.lstm_2(output)      # [batch_size, seq_len, hidden_dim*2]
        output, _ = self.lstm_3(output)      # [batch_size, seq_len, hidden_dim*2]
        output, _ = self.lstm_4(output)      # [batch_size, seq_len, hidden_dim*2]
        
        # Применение дополнительных преобразований
        output = self.dropout(
            self.linear(self.non_lin(output))
        )  # [batch_size, seq_len, hidden_dim]
        
        # Проекция на размер словаря
        projection = self.projection(
            self.non_lin(output)
        )  # [batch_size, seq_len, vocab_size]

        return projection

### Train

In [None]:
# Подготовка данных для словесной модели
words = Counter()

for sentence in tqdm(sentences):
    for word in nltk.word_tokenize(sentence):
            words[word] += 1
            
vocab = set(['<unk>', '<bos>', '<eos>', '<pad>'])
vocab_size = 40000

for elem in words.most_common(vocab_size):
    vocab.add(elem[0])
    
print("Всего слов в словаре:", len(vocab))

100%|██████████| 120873/120873 [00:28<00:00, 4174.65it/s]


Всего слов в словаре: 40004


In [39]:
word2ind = {char: i for i, char in enumerate(vocab)}
ind2word = {i: char for char, i in word2ind.items()}

In [None]:
# Разделение данных на обучающую и тестовую выборки
train_sentences, eval_sentences = train_test_split(
    sentences, test_size=0.2
)

# Создание датасетов и загрузчиков данных
train_dataset = WordDataset(train_sentences)
eval_dataset = WordDataset(eval_sentences)

train_dataloader = DataLoader(
    train_dataset, 
    collate_fn=collate_fn_with_padding, 
    batch_size=64
)

eval_dataloader = DataLoader(
    eval_dataset, 
    collate_fn=collate_fn_with_padding, 
    batch_size=64
)

In [None]:
# Создание и обучение модели
model = LanguageModel(
    hidden_dim=256, 
    vocab_size=len(vocab)
).to(device)

num_params = sum(p.numel() for p in model.parameters())
print(model)
print(f"Number of model parameters: {num_params:,}")

LanguageModel(
  (embedding): Embedding(40004, 256)
  (lstm_1): LSTM(256, 256, batch_first=True, bidirectional=True)
  (lstm_2): LSTM(512, 256, batch_first=True, bidirectional=True)
  (lstm_3): LSTM(512, 256, batch_first=True, bidirectional=True)
  (lstm_4): LSTM(512, 256, batch_first=True, bidirectional=True)
  (linear): Linear(in_features=512, out_features=256, bias=True)
  (projection): Linear(in_features=256, out_features=40004, bias=True)
  (non_lin): Tanh()
  (dropout): Dropout(p=0.2, inplace=False)
)
Number of model parameters: 26,436,932


In [None]:
# Обучение модели
best_model, losses = train(
    train_dataloader, 
    eval_dataloader, 
    model, 
    10, 
    ignore_index=word2ind["<pad>"]
)

epoch:  10%|█         | 1/10 [02:46<24:56, 166.31s/it]


Epoch 001 train_loss: 6.2165     val_loss 4.4502 train_perplexirty 950.2023 val_perplexirty 86.3828


epoch:  20%|██        | 2/10 [05:32<22:08, 166.07s/it]


Epoch 002 train_loss: 3.6826     val_loss 3.0068 train_perplexirty 43.8529 val_perplexirty 20.3645


epoch:  30%|███       | 3/10 [08:18<19:21, 165.97s/it]


Epoch 003 train_loss: 2.6966     val_loss 2.2747 train_perplexirty 15.2892 val_perplexirty 9.7852


epoch:  40%|████      | 4/10 [11:03<16:35, 165.89s/it]


Epoch 004 train_loss: 1.9999     val_loss 1.7284 train_perplexirty 7.5591 val_perplexirty 5.6607


epoch:  50%|█████     | 5/10 [13:49<13:48, 165.78s/it]


Epoch 005 train_loss: 1.5587     val_loss 1.4527 train_perplexirty 4.7937 val_perplexirty 4.2946


epoch:  60%|██████    | 6/10 [16:35<11:03, 165.76s/it]


Epoch 006 train_loss: 1.2813     val_loss 1.2659 train_perplexirty 3.6192 val_perplexirty 3.5613


epoch:  70%|███████   | 7/10 [19:20<08:17, 165.76s/it]


Epoch 007 train_loss: 1.0773     val_loss 1.1091 train_perplexirty 2.9480 val_perplexirty 3.0433


epoch:  80%|████████  | 8/10 [22:07<05:32, 166.09s/it]


Epoch 008 train_loss: 0.8751     val_loss 0.9516 train_perplexirty 2.4070 val_perplexirty 2.5987


epoch:  90%|█████████ | 9/10 [24:54<02:46, 166.18s/it]


Epoch 009 train_loss: 0.7142     val_loss 0.8469 train_perplexirty 2.0487 val_perplexirty 2.3398


epoch: 100%|██████████| 10/10 [27:40<00:00, 166.08s/it]


Epoch 010 train_loss: 0.5945     val_loss 0.7705 train_perplexirty 1.8150 val_perplexirty 2.1671
Best val perplexirty: 2.167140





In [None]:
# Тестирование генерации текста
generate_sequence(
    model, 
    word2ind, 
    ind2word,
    starting_seq=nltk.word_tokenize('империя')
)

'<bos> империя чистый англ <eos>'

### Выводы

**Двунаправленная LSTM**

Двунаправленная LSTM показала наилучшие результаты среди всех рассмотренных архитектур по метрике перплексии. Благодаря способности учитывать как предыдущий, так и последующий контексты. Но данная архитектура не подходит для генерации текста с 0 или начиная с начального предложения. Двунаправленная LSTM подходит для предсказания слова, если оно находится в середине некоторого контекста.

**Плюсы:**

- Лучшее качество за счет учета контекста в обеих направлениях.

**Минусы:**

- Высокая вычислительная сложность и потребность в большом объеме памяти.
- Увеличенное время на обучение и предсказание.
- Неподходящая архитектура для генерации текста с 0 или же последовательно.