In [None]:
import os
import random
from pathlib import Path

import pandas as pd
import torch 

import matplotlib.pyplot as plt

## Общие утилиты

In [None]:
IMAGES_PATH = Path('imgs/finetune_gpt/')
DATA_PATH = Path('data/finetune_gpt/')

IMAGES_PATH.mkdir(parents=True, exist_ok=True)
DATA_PATH.mkdir(parents=True, exist_ok=True)
    
def seed_all(seed: int) -> None:
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    random.seed(seed)

SEED = 42

seed_all(SEED)

# Загрузка данных

[Huggingface](https://huggingface.co/datasets/d0rj/geo-reviews-dataset-2023?row=1)

In [None]:
from datasets import load_dataset

# Загрузка датасета
dataset = load_dataset("d0rj/geo-reviews-dataset-2023", cache_dir=DATA_PATH / 'model_cache')

In [None]:
# Преобразование данных в DataFrame
data_df = pd.DataFrame(dataset['train'])

print("Number of rows and columns in the data set:", data_df.shape)

In [None]:
data_df.head(5)

In [None]:
data_df.info()

## Препроцессинг

In [None]:
work_data = data_df.dropna(subset=['text', 'name_ru', 'rating'])
work_data = work_data.drop_duplicates(subset=['text']).reset_index(drop=True)
work_data['text'] = work_data['text'].str.replace('\\n', ' ')
work_data = work_data[:15000]
work_data['text'][0]

In [None]:
work_data.info()

In [None]:
unique_name_ru = work_data['name_ru'].unique().tolist()

unique_rubrics = work_data['rubrics'].unique().tolist()

# Модель

In [None]:
import pandas as pd
from transformers import GPT2LMHeadModel, GPT2Tokenizer, TextDataset, DataCollatorForLanguageModeling, Trainer, TrainingArguments
from pathlib import Path

class FineTuner:
    def __init__(self, 
                 model_name='ai-forever/rugpt3small_based_on_gpt2', 
                 cache_dir='model_cache',
                 data_path=DATA_PATH):
        self.data_path = Path(data_path)
        
        # Инициализация токенизатора и модели
        self.tokenizer = GPT2Tokenizer.from_pretrained(model_name, cache_dir=str(self.data_path / cache_dir))
        self.model = GPT2LMHeadModel.from_pretrained(model_name, cache_dir=str(self.data_path / cache_dir))

    def prepare_data(self, df):
        """
        Подготовка данных для обучения
        """
        # Объединение типа проблемы и исходного текста в одну строку входных данных
        df['input'] = df.apply(
            lambda row: f"<name_ru> {row['name_ru']} <rubrics> {row['rubrics']} <rating> {row['rating']} {self.tokenizer.eos_token}", axis=1
            )
        
        # Добавление к целевому тексту токена окончания строки
        df['output'] = df.apply(lambda row: f" <text> {row['text']} {self.tokenizer.eos_token}", axis=1)
        
        # Подготовка пути для сохранения данных
        dataset_path = self.data_path / 'train_dataset.txt'
        # Запись данных в файл
        with dataset_path.open('w', encoding='utf-8') as file:
            for input_text, target_text in zip(df['input'], df['output']):
                file.write(input_text + ' ' + target_text + '\n')
        return dataset_path

    def fine_tune(self, 
                  dataset_path, 
                  output_name='fine_tuned_model', 
                  num_train_epochs=4, 
                  per_device_train_batch_size=4, 
                  learning_rate=5e-5, 
                  save_steps=10_000):
        """
        Дообучение модели на заданном датасете.
        """
        train_dataset = TextDataset(
            tokenizer=self.tokenizer,
            file_path=str(dataset_path),
            block_size=256
        )

        data_collator = DataCollatorForLanguageModeling(
            tokenizer=self.tokenizer, mlm=False
        )

        training_args = TrainingArguments(
            output_dir=str(self.data_path / output_name),
            overwrite_output_dir=True,
            num_train_epochs=num_train_epochs,
            per_device_train_batch_size=per_device_train_batch_size,
            save_steps=save_steps,
            learning_rate=learning_rate,
            save_total_limit=2,
            logging_dir=str(self.data_path / 'logs'),
        )

        trainer = Trainer(
            model=self.model,
            args=training_args,
            data_collator=data_collator,
            train_dataset=train_dataset,
        )

        trainer.train()
        # Сохранение обученной модели и токенизатора
        self.model.save_pretrained(str(self.data_path / output_name))
        self.tokenizer.save_pretrained(str(self.data_path / output_name))

In [None]:
from transformers import GPT2LMHeadModel, GPT2Tokenizer
from pathlib import Path

class TextGenerator:
    def __init__(self, model_name='fine_tuned_model', data_path=DATA_PATH):
        """
        Инициализация модели и токенизатора.
        Загружаем модель и токенизатор из указанного пути.
        """
        model_path = Path(data_path) / model_name
        self.tokenizer = GPT2Tokenizer.from_pretrained(str(model_path))
        self.model = GPT2LMHeadModel.from_pretrained(str(model_path))
        self.model.eval()

    def generate_text(self, 
                    name_ru: str, 
                    rubrics: str, 
                    rating: int,
                    max_length=100, 
                    num_return_sequences=1, 
                    temperature=1.0, 
                    top_k=0, 
                    top_p=1.0, 
                    do_sample=False):
        """
        Генерация текста на основе заданного начального текста (prompt) и параметров.
        
        Параметры:
        - name_ru: Название организации.
        - rubrics: Список рубрик, к которым относится организация.
        - rating: Оценка пользователя.
        - max_length: Максимальная длина сгенерированного текста.
        - num_return_sequences: Количество возвращаемых последовательностей.
        - temperature: Контролирует разнообразие вывода.
        - top_k: Если больше 0, ограничивает количество слов для выборки только k наиболее вероятными словами.
        - top_p: Если меньше 1.0, применяется nucleus sampling.
        - do_sample: Если True, включает случайную выборку для увеличения разнообразия.
        """
        # Формирование prompt
        prompt_text = f"<name_ru> {name_ru} <rubrics> {rubrics} <rating> {rating} {self.tokenizer.eos_token} <text> "
        
        # Кодирование текста в формате, пригодном для модели
        encoded_input = self.tokenizer.encode(prompt_text, return_tensors='pt')
        
        # Генерация текстов
        outputs = self.model.generate(
            encoded_input,
            max_length=max_length + len(encoded_input[0]),
            num_return_sequences=num_return_sequences,
            temperature=temperature,
            top_k=top_k,
            top_p=top_p,
            do_sample=do_sample,
            no_repeat_ngram_size=2
        )
        
        # Декодирование результатов
        all_texts = [self.tokenizer.decode(output, skip_special_tokens=True) for output in outputs]
        
        # Удаление входных данных из текстов
        prompt_length = len(self.tokenizer.decode(encoded_input[0], skip_special_tokens=True))
        trimmed_texts = [text[prompt_length:] for text in all_texts]
        
        # Возврат результатов в виде словаря
        return {
            "full_texts": all_texts,
            "generated_texts": trimmed_texts
        }

# Обучение

In [None]:
finetuner = FineTuner()
dataset_path = finetuner.prepare_data(work_data)
finetuner.fine_tune(dataset_path, output_name='fine_tuned_model_gpt_2')


# Предикт

In [None]:
unique_name_ru[:10]

In [None]:
unique_rubrics[:10]

In [None]:
name_ru = unique_name_ru[1]
rubrics = unique_rubrics[1]
rating = 1

generator = TextGenerator(
    model_name='fine_tuned_model_gpt_2',
    data_path=DATA_PATH
)
generated_texts = generator.generate_text(
    name_ru=name_ru,
    rubrics=rubrics,
    rating=rating,
    max_length=100,
    # num_beams=3 # если несколько последовательностей 
    num_return_sequences=3,
    do_sample=True,
    temperature=0.95,  # Слегка уменьшаем уверенность
    top_k=10,         # Уменьшаем количество рассматриваемых верхних k слов
    top_p=0.95        # Уменьшаем "ядерность" распределения
)
for i, text in enumerate(generated_texts['generated_texts']):
    print(f"Generated Text {i+1}: {text}")