Установите необходимое

In [None]:
!pip install torch spacy tqdm
!python -m spacy download ru_core_news_sm

Импортируем библиотеки и проверяем наличие GPU

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence, pad_packed_sequence
import spacy
import ru_core_news_sm
import json
import os
import re
import random
import time
import math
import unicodedata
from tqdm import tqdm

Задаем константы. Для вас важнее всего здесь 
* N_EPOCHS - число эпох для обучения модели
* EMB_DIM - размер эмбеддингов, в какой размер "сжимается" весь словарь из обучающих текстов
* HID_DIM - размерность скрытого состояния, от этого параметра зависит "память" модели, как много она может запомнить
* LEARNING_RATE - скорость обучения
* TEACHER_FORCING_RATIO - вероятность, с которой даем модели при обучении правильные продолжения текстов, без этого модель хуже учится

In [2]:
DATA_FILE = "prompt_expander_train.jsonl"
MODEL_PATH = "seq2seq_model.pth"
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
EMB_DIM = 256
HID_DIM = 512
N_LAYERS = 2
ENC_DROPOUT = 0.5
DEC_DROPOUT = 0.5
CLIP = 1
N_EPOCHS = 15
BATCH_SIZE = 32
LEARNING_RATE = 0.001
TEACHER_FORCING_RATIO = 0.5
PAD_TOKEN = "<pad>"
SOS_TOKEN = "<sos>"
EOS_TOKEN = "<eos>"
UNK_TOKEN = "<unk>"
try:
    nlp = ru_core_news_sm.load(disable=["parser", "ner"])
except Exception as e:
    print(f"Не удалось загрузить модель 'ru_core_news_sm'. Убедитесь, что она установлена:")
    print("python -m spacy download ru_core_news_sm")
    nlp = lambda text: [tok.text for tok in spacy.blank("ru").tokenizer(text)]
print(f"Устройство для вычислений: {DEVICE}")

Устройство для вычислений: cuda


Функции загрузки текстов обучения и их парсинга (разбор текстов в структурированный формат)

In [3]:
def unicode_to_ascii(s):
    return ''.join(
        c for c in unicodedata.normalize('NFD', s)
        if unicodedata.category(c) != 'Mn'
    )
def normalize_string(s):
    s = unicode_to_ascii(s.lower().strip())
    s = re.sub(r"([.!?])", r" \1", s)
    s = re.sub(r"[^а-яА-Яa-zA-Z.!?]+", r" ", s)
    s = re.sub(r"\s+", r" ", s).strip()
    return s
def load_and_parse_data(filepath):
    pairs = []
    pattern = re.compile(r"<s>\[INST\] (.*?) \[/INST\] (.*?)</s>", re.DOTALL)
    print(f"Загрузка и парсинг данных из {filepath}...")
    with open(filepath, 'r', encoding='utf-8') as f:
        for line in f:
            try:
                data = json.loads(line)
                text = data.get("text", "")
                match = pattern.search(text)
                if match:
                    src = normalize_string(match.group(1))
                    trg = normalize_string(match.group(2))
                    if src and trg:
                        pairs.append((src, trg))
            except json.JSONDecodeError:
                print(f"Ошибка декодирования JSON в строке: {line}")
                continue
    print(f"Загружено {len(pairs)} пар промпт-ответ.")
    return pairs
try:
    test_pairs = load_and_parse_data(DATA_FILE)
    print("\nПример пары:")
    print(f"ВХОД: {test_pairs[0][0]}")
    print(f"ВЫХОД: {test_pairs[0][1]}")
except FileNotFoundError:
    print(f"\nФайл {DATA_FILE} пока не найден. Будет загружен при обучении.")
except IndexError:
    print("\nФайл данных пуст или не содержит корректных пар.")

Загрузка и парсинг данных из prompt_expander_train.jsonl...
Загружено 621 пар промпт-ответ.

Пример пары:
ВХОД: как открыть вклад
ВЫХОД: открыть рублевыи вклад накопительныи на месяцев с возможностью ежемесячнои капитализации процентов


Формирование общих словарей и токенизация (разбиение на отдельные элементы текста + сопоставление им уникальных чисел, которые подаются в модель вместо самих слов/букв и тд)

In [4]:
def tokenize_ru(text):
    return [token.text for token in nlp(text)]
class Vocab:
    def __init__(self, name):
        self.name = name
        self.word2index = {}
        self.word2count = {}
        self.index2word = {0: PAD_TOKEN, 1: SOS_TOKEN, 2: EOS_TOKEN, 3: UNK_TOKEN}
        self.n_words = 4
    def add_sentence(self, sentence, tokenizer):
        for word in tokenizer(sentence):
            self.add_word(word)
    def add_word(self, word):
        if word not in self.word2index:
            self.word2index[word] = self.n_words
            self.word2count[word] = 1
            self.index2word[self.n_words] = word
            self.n_words += 1
        else:
            self.word2count[word] += 1
def build_vocabs(pairs, tokenizer):
    print("Построение словарей...")
    input_vocab = Vocab("input")
    output_vocab = Vocab("output")
    for src, trg in tqdm(pairs):
        input_vocab.add_sentence(src, tokenizer)
        output_vocab.add_sentence(trg, tokenizer)
    print(f"Словарь Входа: {input_vocab.n_words} слов")
    print(f"Словарь Выхода: {output_vocab.n_words} слов")
    return input_vocab, output_vocab
PAD_IDX = 0
SOS_IDX = 1
EOS_IDX = 2
UNK_IDX = 3

Функции загрузки датасета текстов с диска и в ОЗУ по батчам (порциям для ускорения обучения)

In [5]:
class PromptDataset(Dataset):
    def __init__(self, pairs):
        self.pairs = pairs
    def __len__(self):
        return len(self.pairs)
    def __getitem__(self, idx):
        return self.pairs[idx]
def text_to_indices(text, vocab, tokenizer):
    tokens = tokenizer(text)
    return [vocab.word2index.get(word, UNK_IDX) for word in tokens]
def collate_fn(batch, input_vocab, output_vocab, tokenizer, device):
    src_list, trg_list = [], []
    src_len_list, trg_len_list = [], []
    for src_text, trg_text in batch:
        src_indices = [SOS_IDX] + text_to_indices(src_text, input_vocab, tokenizer) + [EOS_IDX]
        trg_indices = [SOS_IDX] + text_to_indices(trg_text, output_vocab, tokenizer) + [EOS_IDX]
        src_tensor = torch.tensor(src_indices, dtype=torch.long)
        trg_tensor = torch.tensor(trg_indices, dtype=torch.long)
        src_list.append(src_tensor)
        trg_list.append(trg_tensor)
        src_len_list.append(len(src_indices))
        trg_len_list.append(len(trg_indices))
    src_padded = pad_sequence(src_list, batch_first=True, padding_value=PAD_IDX)
    trg_padded = pad_sequence(trg_list, batch_first=True, padding_value=PAD_IDX)
    src_lengths = torch.tensor(src_len_list, dtype=torch.long)
    trg_lengths = torch.tensor(trg_len_list, dtype=torch.long)
    return (
        src_padded.to(device),
        src_lengths.to("cpu"),
        trg_padded.to(device),
        trg_lengths.to("cpu")
    )

Seq2Seq состоит из частей - энкодера и декодера, а также использует слой внимания. Задаем энкодер

In [6]:
class Encoder(nn.Module):
    def __init__(self, input_dim, emb_dim, hid_dim, n_layers, dropout):
        super().__init__()
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.embedding = nn.Embedding(input_dim, emb_dim, padding_idx=PAD_IDX)
        self.rnn = nn.GRU(
            emb_dim,
            hid_dim,
            n_layers,
            dropout=dropout,
            batch_first=True,
            bidirectional=True
        )
        self.fc = nn.Linear(hid_dim * 2, hid_dim)
        self.dropout = nn.Dropout(dropout)
    def forward(self, src, src_len):
        embedded = self.dropout(self.embedding(src))
        packed_embedded = pack_padded_sequence(embedded, src_len, batch_first=True, enforce_sorted=False)
        outputs, hidden = self.rnn(packed_embedded)
        outputs, _ = pad_packed_sequence(outputs, batch_first=True)
        hidden = hidden.permute(1, 0, 2)
        hidden = hidden.contiguous().view(-1, self.n_layers, self.hid_dim * 2)
        hidden = torch.tanh(self.fc(hidden)).permute(1, 0, 2).contiguous()
        return outputs, hidden

Теперь слой внимания (который учитывает взаимосвязь между словами)

In [7]:
class Attention(nn.Module):
    def __init__(self, hid_dim):
        super().__init__()
        self.attn_W = nn.Linear(hid_dim * 2 + hid_dim, hid_dim)
        self.attn_v = nn.Linear(hid_dim, 1, bias=False)
    def forward(self, hidden, encoder_outputs):
        batch_size = encoder_outputs.shape[0]
        src_len = encoder_outputs.shape[1]
        hidden_repeated = hidden.unsqueeze(1).repeat(1, src_len, 1)
        energy_input = torch.cat((hidden_repeated, encoder_outputs), dim=2)
        energy = torch.tanh(self.attn_W(energy_input))
        attention_scores = self.attn_v(energy)
        attention_scores = attention_scores.squeeze(2)
        return F.softmax(attention_scores, dim=1)

И декодер

In [8]:
class Decoder(nn.Module):
    def __init__(self, output_dim, emb_dim, hid_dim, n_layers, dropout, attention):
        super().__init__()
        self.output_dim = output_dim
        self.hid_dim = hid_dim
        self.n_layers = n_layers
        self.attention = attention
        self.embedding = nn.Embedding(output_dim, emb_dim, padding_idx=PAD_IDX)
        self.rnn = nn.GRU(
            (hid_dim * 2) + emb_dim,
            hid_dim,
            n_layers,
            dropout=dropout,
            batch_first=True
        )
        self.fc_out = nn.Linear(
            emb_dim + hid_dim + (hid_dim * 2),
            output_dim
        )
        self.dropout = nn.Dropout(dropout)
    def forward(self, input_token, hidden, encoder_outputs):
        input_token = input_token.unsqueeze(1)
        embedded = self.dropout(self.embedding(input_token))
        attn_weights = self.attention(hidden[-1], encoder_outputs)
        attn_weights_unsqueezed = attn_weights.unsqueeze(1)
        context_vector = torch.bmm(attn_weights_unsqueezed, encoder_outputs)
        rnn_input = torch.cat((embedded, context_vector), dim=2)
        output, hidden = self.rnn(rnn_input, hidden)
        embedded_s = embedded.squeeze(1)
        output_s = output.squeeze(1)
        context_s = context_vector.squeeze(1)
        prediction_input = torch.cat((embedded_s, output_s, context_s), dim=1)
        prediction = self.fc_out(prediction_input)
        return prediction, hidden

Остается собрать модель воедино

In [9]:
class Seq2Seq(nn.Module):
    def __init__(self, encoder, decoder, device):
        super().__init__()
        self.encoder = encoder
        self.decoder = decoder
        self.device = device
    def forward(self, src, src_len, trg, teacher_forcing_ratio=0.5):
        batch_size = trg.shape[0]
        trg_len = trg.shape[1]
        trg_vocab_size = self.decoder.output_dim
        outputs = torch.zeros(batch_size, trg_len, trg_vocab_size).to(self.device)
        encoder_outputs, hidden = self.encoder(src, src_len)
        input_token = trg[:, 0]
        for t in range(1, trg_len):
            output, hidden = self.decoder(input_token, hidden, encoder_outputs)
            outputs[:, t] = output
            use_teacher_force = random.random() < teacher_forcing_ratio
            top1 = output.argmax(1)
            input_token = trg[:, t] if use_teacher_force else top1
        return outputs

Цикл обучения с расчетом времени и тп

In [10]:
def train_epoch(model, dataloader, optimizer, criterion, clip):
    model.train()
    epoch_loss = 0
    pbar = tqdm(dataloader, desc="Обучение", leave=False)
    for i, (src, src_len, trg, trg_len) in enumerate(pbar):
        optimizer.zero_grad()
        output = model(src, src_len, trg, TEACHER_FORCING_RATIO)
        output_dim = output.shape[-1]
        output_flat = output[:, 1:].reshape(-1, output_dim)
        trg_flat = trg[:, 1:].reshape(-1)
        loss = criterion(output_flat, trg_flat)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item()
        pbar.set_postfix(loss=f"{loss.item():.3f}")
    return epoch_loss / len(dataloader)
def format_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

Данная ячейка запускает обучение модели, выводятся данные по эпохам. PPL здесь - мера "запутанности" модели, сколько кандидатов-токенов она видит для конкретного одного

In [12]:
model = None
input_vocab = None
output_vocab = None
params = {}
if os.path.exists(MODEL_PATH):
    print(f"Найден сохраненный файл модели: {MODEL_PATH}")
    print("Загрузка модели и словарей...")
    try:
        checkpoint = torch.load(MODEL_PATH, map_location=DEVICE, weights_only=False)
        input_vocab = checkpoint['input_vocab']
        output_vocab = checkpoint['output_vocab']
        params = checkpoint['params']
        print("Словари загружены.")
        print(f"Параметры модели: {params}")
        attn = Attention(params['HID_DIM'])
        enc = Encoder(
            input_vocab.n_words,
            params['EMB_DIM'],
            params['HID_DIM'],
            params['N_LAYERS'],
            params['ENC_DROPOUT']
        )
        dec = Decoder(
            output_vocab.n_words,
            params['EMB_DIM'],
            params['HID_DIM'],
            params['N_LAYERS'],
            params['DEC_DROPOUT'],
            attn
        )
        model = Seq2Seq(enc, dec, DEVICE).to(DEVICE)
        model.load_state_dict(checkpoint['model_state_dict'])
        model.eval()
        print("Модель успешно загружена.")
    except Exception as e:
        print(f"Ошибка при загрузке модели: {e}. Начинаем обучение с нуля.")
        model = None
else:
    print(f"Файл модели {MODEL_PATH} не найден. Начинаем обучение...")
if model is None:
    all_pairs = load_and_parse_data(DATA_FILE)
    random.shuffle(all_pairs)
    train_pairs = all_pairs
    input_vocab, output_vocab = build_vocabs(train_pairs, tokenize_ru)
    train_dataset = PromptDataset(train_pairs)
    collate_with_vocabs = lambda batch: collate_fn(
        batch, input_vocab, output_vocab, tokenize_ru, DEVICE
    )
    train_dataloader = DataLoader(
        train_dataset,
        batch_size=BATCH_SIZE,
        shuffle=True,
        collate_fn=collate_with_vocabs
    )
    print("DataLoader'ы созданы.")
    params = {
        'INPUT_DIM': input_vocab.n_words,
        'OUTPUT_DIM': output_vocab.n_words,
        'EMB_DIM': EMB_DIM,
        'HID_DIM': HID_DIM,
        'N_LAYERS': N_LAYERS,
        'ENC_DROPOUT': ENC_DROPOUT,
        'DEC_DROPOUT': DEC_DROPOUT
    }
    attn = Attention(HID_DIM)
    enc = Encoder(
        params['INPUT_DIM'], EMB_DIM, HID_DIM, N_LAYERS, ENC_DROPOUT
    )
    dec = Decoder(
        params['OUTPUT_DIM'], EMB_DIM, HID_DIM, N_LAYERS, DEC_DROPOUT, attn
    )
    model = Seq2Seq(enc, dec, DEVICE).to(DEVICE)
    print(f"Модель инициализирована и перемещена на {DEVICE}.")
    optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE)
    criterion = nn.CrossEntropyLoss(ignore_index=PAD_IDX)
    best_loss = float('inf')
    for epoch in range(N_EPOCHS):
        start_time = time.time()
        train_loss = train_epoch(model, train_dataloader, optimizer, criterion, CLIP)
        end_time = time.time()
        epoch_mins, epoch_secs = format_time(start_time, end_time)
        print(f'Эпоха: {epoch+1:02} | Время: {epoch_mins}м {epoch_secs}с')
        print(f'\tLoss Обучения: {train_loss:.3f} | PPL: {math.exp(train_loss):7.3f}')
        if train_loss < best_loss:
            best_loss = train_loss
            print("Новая лучшая модель. Сохранение...")
            torch.save({
                'model_state_dict': model.state_dict(),
                'input_vocab': input_vocab,
                'output_vocab': output_vocab,
                'params': params,
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': best_loss,
            }, MODEL_PATH)
    print("Обучение завершено.")
    model.eval()

Найден сохраненный файл модели: seq2seq_model.pth
Загрузка модели и словарей...
Словари загружены.
Параметры модели: {'INPUT_DIM': 887, 'OUTPUT_DIM': 2766, 'EMB_DIM': 256, 'HID_DIM': 512, 'N_LAYERS': 2, 'ENC_DROPOUT': 0.5, 'DEC_DROPOUT': 0.5}
Модель успешно загружена.


Код функций для инференса, сам инференс в следующей ячейке

In [13]:
from textwrap import wrap
def predict_expansion(
    sentence,
    model,
    input_vocab,
    output_vocab,
    tokenizer,
    device,
    max_len=250
):
    model.eval()
    normalized_sentence = normalize_string(sentence)
    tokens = [SOS_TOKEN] + tokenizer(normalized_sentence) + [EOS_TOKEN]
    indices = [input_vocab.word2index.get(token, UNK_IDX) for token in tokens]
    src_tensor = torch.LongTensor(indices).unsqueeze(0).to(device)
    src_len = torch.LongTensor([len(indices)]).to("cpu")
    decoded_words = []
    with torch.no_grad():
        encoder_outputs, hidden = model.encoder(src_tensor, src_len)
        trg_input = torch.LongTensor([SOS_IDX]).to(device)
        for _ in range(max_len):
            output, hidden = model.decoder(trg_input, hidden, encoder_outputs)
            topv, topi = output.topk(1)
            if topi.item() == EOS_IDX:
                break
            word = output_vocab.index2word.get(topi.item(), UNK_TOKEN)
            decoded_words.append(word)
            trg_input = topi.squeeze(1).detach()
    return " ".join(decoded_words)

Запускаете в подобном ollama режиме - появляется текстовое окно для ввода запроса (краткаий промпт) - после ввода и нажатия Enter получаете ответ (развернутый промпт), появляется новое поле ввода до тех пор, пока не введете exit или выход и не подтвердите Enter

In [14]:
if model is None or input_vocab is None or output_vocab is None:
    print("Ошибка: Модель не была загружена или обучена.")
    print("Пожалуйста, сначала запустите Ячейку 11.")
else:
    print("\n" + "="*50)
    print("Модель готова к работе.")
    print("Введите ваш краткий запрос (например, 'Как открыть вклад')")
    print("Для выхода введите 'выход' или 'exit'.")
    print("="*50)
    while True:
        try:
            input_prompt = input("\n[ВЫ]: ")
            if input_prompt.lower() in ['выход', 'exit', 'quit']:
                print("Работа завершена.")
                break
            if not input_prompt:
                continue
            predicted_text = predict_expansion(
                input_prompt,
                model,
                input_vocab,
                output_vocab,
                tokenize_ru,
                DEVICE
            )
            print("\n[МОДЕЛЬ]:")
            lines = predicted_text.split('\n')
            for line in lines:
                print('\n'.join(wrap(line, width=100)))
        except KeyboardInterrupt:
            print("\nРабота прервана. Выход...")
            break
        except Exception as e:
            print(f"\nПроизошла ошибка: {e}")
            break


Модель готова к работе.
Введите ваш краткий запрос (например, 'Как открыть вклад')
Для выхода введите 'выход' или 'exit'.

[МОДЕЛЬ]:
узнать стоимость вложения в

[МОДЕЛЬ]:
узнать стоимость вложения в

[МОДЕЛЬ]:
наити в личном кабинете

[МОДЕЛЬ]:
проблема не может в пересчитать в праздничные дни
Работа завершена.
