In [42]:
from __future__ import unicode_literals, print_function, division

from typing import List

import pandas as pd

import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F

from io import open
import unicodedata
import re
import random

import numpy as np
from torch.utils.data import TensorDataset, DataLoader, RandomSampler, Dataset
from torch.nn.utils.rnn import pad_sequence

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cpu


In [60]:
dataset = pd.read_csv('data/data_tokenize.csv')
pairs = list(dataset[["title", "text"]].itertuples(index=False, name=None))

In [44]:
sos_token = 0
eos_token = 1
MAX_VOCAB_SIZE = 100_000

# Работа с данными

## Словарь частот

In [61]:
class Vocab:
    """Создаёт словари с частотами слов на основе входных данных"""

    def __init__(self, name):
        self.name = name
        self.word2index = {"<pad>": 0, "<unk>": 1, "sos": 2, "eos": 3}
        self.word2count = {"<pad>": 0, "<unk>": 0, "sos": 0, "eos": 0}
        self.index2word = {0: "<pad>", 1: "<unk>", 2: "sos", 3: "eos"}
        self.n_words = 4

        self._temp_word_counts = {}

    def addText(self, text: str):
        """Для каждого слова в тексте добавляет его во временный счётчик"""
        for word in text.split():
            self._temp_word_counts[word] = self._temp_word_counts.get(word, 0) + 1

    def build_vocab(self):
        """Строит финальный словарь после подсчёта всех слов"""
        sorted_words = sorted(self._temp_word_counts.items(),
                            key=lambda x: x[1],
                            reverse=True)

        for word, count in sorted_words[:MAX_VOCAB_SIZE - 4]:
            if word not in self.word2index:
                self.word2index[word] = self.n_words
                self.word2count[word] = count
                self.index2word[self.n_words] = word
                self.n_words += 1

        for word, count in sorted_words[MAX_VOCAB_SIZE - 4:]:
            self.word2count["<unk>"] += count

    def word_to_index(self, word: str) -> int:
        """Возвращает индекс слова или <unk>"""
        return self.word2index.get(word, self.word2index["<unk>"])

    def index_to_word(self, index: int) -> str:
        """Возвращает слово по индексу"""
        return self.index2word.get(index, self.index2word["<unk>"])

    def __str__(self):
        """Строковое представление словаря"""
        return (
            f"Vocab(name='{self.name}', "
            f"n_words={self.n_words}, "
        )

In [46]:
title_vocab = Vocab("title")
text_vocab = Vocab("text")

In [47]:
for text in dataset['text']:
    text_vocab.addText(text)

for title in dataset['title']:
    title_vocab.addText(title)

text_vocab.build_vocab()
title_vocab.build_vocab()

In [48]:
text_vocab.__str__(), title_vocab.__str__()

("Vocab(name='text', n_words=100000, ", "Vocab(name='title', n_words=19534, ")

## Преобразование текста в датасет

In [None]:
def text_to_tensor(text: str, vocab: Vocab, add_sos_eos=True) -> torch.Tensor:
    """Преобразует текст в тензоры"""
    tokens = text.strip().split()
    indices = [vocab.word_to_index(w) for w in tokens]

    if add_sos_eos:
        indices = [vocab.word2index["<sos>"]] + indices + [vocab.word2index["<eos>"]]

    return torch.tensor(indices, dtype=torch.long)


In [None]:
class TitleDataset(Dataset):
    def __init__(self, pairs: list[tuple[str, str]], input_vocab: Vocab, output_vocab: Vocab):
        """
            pairs — список пар (название, текст),
            input_vocab - словарь с частотами слов из текстов,
            output_vocab - словарь с частотами слов из названий
        """
        self.pairs = pairs
        self.input_vocab = input_vocab
        self.output_vocab = output_vocab

    def __len__(self):
        return len(self.pairs)

    def __getitem__(self, idx):
        title, text = self.pairs[idx]
        input_tensor = text_to_tensor(text, self.input_vocab, add_sos_eos=False)
        target_tensor = text_to_tensor(title, self.output_vocab, add_sos_eos=False)
        return input_tensor, target_tensor


In [None]:
def collate_fn(batch: List[tuple[str, str]]):
    """
    batch: list of (input_tensor, target_tensor)
    Returns:
        input_padded: [batch, src_len]
        target_padded: [batch, trg_len]
    """
    src_batch, trg_batch = zip(*batch)

    src_padded = pad_sequence(src_batch, padding_value=0, batch_first=True)
    trg_padded = pad_sequence(trg_batch, padding_value=0, batch_first=True)

    return src_padded, trg_padded


# Модель seq2seq

## Энкодер для seq2seq

In [49]:
class EncoderLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, dropout_p=0.1):
        super(EncoderLSTM, self).__init__()
        self.hidden_size = hidden_size

        self.embedding = nn.Embedding(input_size, hidden_size)
        self.lstm = nn.LSTM(hidden_size, hidden_size, batch_first=True)
        self.dropout = nn.Dropout(dropout_p)

    def forward(self, input):
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.lstm(embedded)
        return output, (hidden, cell)

## Декодер для seq2seq

In [None]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()

        self.output_dim = output_dim
        self.hid_dim = hid_dim
        self.n_layers = n_layers

        self.embedding = nn.Embedding(output_dim, emb_dim)

        self.rnn = nn.LSTM(emb_dim, hid_dim, n_layers, dropout = dropout)

        self.fc_out = nn.Linear(hid_dim, output_dim)

        self.dropout = nn.Dropout(dropout)

    def forward(self, input, hidden, cell):
        input = input.unsqueeze(0)
        embedded = self.dropout(self.embedding(input))
        output, (hidden, cell) = self.rnn(embedded, (hidden, cell))
        prediction = self.fc_out(output.squeeze(0))
        return prediction, hidden, cell

## Модель

In [None]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()

        self.encoder = encoder
        self.decoder = decoder
        self.device = device

        assert encoder.hidden_size == decoder.hid_dim, "Hidden dimensions must match!"
        assert decoder.n_layers == 1, "Encoder must produce compatible layers for decoder"

    def forward(self, src, trg, teacher_forcing_ratio=0.5):
        batch_size = src.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.output_dim

        outputs = torch.zeros(trg_len, batch_size, trg_vocab_size).to(self.device)
        encoder_outputs, (hidden, cell) = self.encoder(src)

        input = trg[:, 0]

        for t in range(1, trg_len):
            output, hidden, cell = self.decoder(input, hidden, cell)
            outputs[t] = output
            teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input = trg[:, t] if teacher_force else top1

        return outputs
