# Необходимые библиотеки

In [None]:
%%capture
!pip install transformers datasets evaluate torch pandas scikit-learn

In [None]:
import pandas as pd
import numpy as np
import torch
import json
from sklearn.model_selection import train_test_split
from datasets import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForSequenceClassification,
    TrainingArguments,
    Trainer,
    EarlyStoppingCallback
)
import evaluate
from sklearn.metrics import f1_score, roc_auc_score
from typing import List, Dict

In [None]:
from transformers import GPT2Config, GPT2LMHeadModel, GPT2Tokenizer, TrainingArguments, Trainer
import torch
import pandas as pd
from datasets import Dataset
from sklearn.model_selection import train_test_split
from sklearn.metrics import f1_score, roc_auc_score
import evaluate
import numpy as np
import os
import json
from typing import Dict, List
from torch.utils.data import DataLoader

# Подключение диска

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Обучение различных классификаторов

In [None]:
class TopicClassifier:
    def __init__(
        self,
        data_path: str,
        columns: List[str],
        maximum_sequence_length: int = 200,
        output_dir: str = "./model",
        model_name: str = "sshleifer/tiny-gpt2"
    ):
        try:
            self.data = pd.read_excel(data_path)
        except FileNotFoundError:
            raise ValueError(f"File {data_path} not found!")

        self.model_name = model_name
        self.columns = columns
        self.maximum_sequence_length = maximum_sequence_length
        self.output_dir = output_dir
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        self.topic2id: Dict[str, int] = {}
        self.id2topic: Dict[int, str] = {}
        self.topic_to_label_str: Dict[str, str] = {}
        self.label_str_to_topic: Dict[str, str] = {}
        self.tokenizer = None
        self.model = None
        self.trainer = None
        self.evaluation_results: Dict[str, float] = {}

    def __prepare_data__(self):
        self.data['text'] = self.data[self.columns].apply(
            lambda x: ' '.join(x.dropna().astype(str)), axis=1
        )

        unique_topics = self.data['topic'].unique()
        self.topic2id = {topic: i for i, topic in enumerate(unique_topics)}
        self.id2topic = {i: topic for i, topic in enumerate(unique_topics)}

        self.topic_to_label_str = {
            topic: str(topic).strip().lower().replace(' ', '_')
            for topic in unique_topics
        }
        self.label_str_to_topic = {v: k for k, v in self.topic_to_label_str.items()}

        self.data['formatted_text'] = self.data.apply(
            lambda row: f"{row['text']} [SEP] {self.topic_to_label_str[row['topic']]}",
            axis=1
        )

        if len(self.topic2id) < 2:
            raise ValueError("At least 2 classes required for classification")

    def __load_model__(self):
        self.tokenizer = GPT2Tokenizer.from_pretrained(self.model_name)
        self.tokenizer.add_special_tokens({'sep_token': '[SEP]'})
        self.tokenizer.pad_token = self.tokenizer.eos_token

        config = GPT2Config(
            vocab_size=len(self.tokenizer),
            n_embd=256,
            n_layer=4,
            n_head=4,
            pad_token_id=self.tokenizer.pad_token_id
        )
        self.model = GPT2LMHeadModel(config)
        self.model.resize_token_embeddings(len(self.tokenizer))
        self.model.to(self.device)

    def __tokenize_data__(self, df: pd.DataFrame) -> Dataset:
        dataset = Dataset.from_pandas(df[['formatted_text']])

        def tokenize_function(examples):
            return self.tokenizer(
                examples["formatted_text"],
                padding="max_length",
                truncation=True,
                max_length=self.maximum_sequence_length
            )

        return dataset.map(tokenize_function, batched=True)

    def __collate_fn(self, batch):
        input_ids = torch.stack([torch.tensor(item['input_ids'], dtype=torch.long) for item in batch])
        masks = torch.stack([torch.tensor(item['attention_mask'], dtype=torch.long) for item in batch])
        labels = input_ids.clone()

        sep_id = self.tokenizer.sep_token_id
        for i, seq in enumerate(input_ids):
            sep_pos = (seq == sep_id).nonzero(as_tuple=True)[0]
            if len(sep_pos) > 0:
                labels[i, :sep_pos[0]] = -100

        return {
            "input_ids": input_ids,
            "attention_mask": masks,
            "labels": labels
        }

    def train_model(self):
        self.__prepare_data__()
        train_df, val_df = train_test_split(
            self.data,
            test_size=0.2,
            random_state=42,
            stratify=self.data['topic']
        )

        self.__load_model__()

        train_dataset = self.__tokenize_data__(train_df)
        val_dataset = self.__tokenize_data__(val_df)

        training_args_kwargs = {
            "output_dir": self.output_dir,
            "learning_rate": 5e-4,
            "per_device_train_batch_size": 8,
            "per_device_eval_batch_size": 8,
            "num_train_epochs": 10,
            "weight_decay": 0.01,
            "load_best_model_at_end": True,
            "metric_for_best_model": "loss",
            "logging_dir": './logs',
            "logging_steps": 50,
            "report_to": "none",
            "save_total_limit": 1
        }

        if hasattr(TrainingArguments, "evaluation_strategy"):
            training_args_kwargs["evaluation_strategy"] = "epoch"
            training_args_kwargs["save_strategy"] = "epoch"
        elif hasattr(TrainingArguments, "eval_strategy"):
            training_args_kwargs["eval_strategy"] = "epoch"
            training_args_kwargs["save_strategy"] = "epoch"
        else:
            training_args_kwargs["evaluate_during_training"] = True
            training_args_kwargs["save_steps"] = 10000

        training_args = TrainingArguments(**training_args_kwargs)

        self.trainer = Trainer(
            model=self.model,
            args=training_args,
            train_dataset=train_dataset,
            eval_dataset=val_dataset,
            data_collator=self.__collate_fn
        )

        self.trainer.train()
        self.__save_model__()

    def __save_model__(self):
        if not os.path.exists(self.output_dir):
            os.makedirs(self.output_dir)

        self.model.save_pretrained(self.output_dir)
        self.tokenizer.save_pretrained(self.output_dir)

        with open(f"{self.output_dir}/id2topic.json", "w") as f:
            # Ключи преобразуем в int, затем в str для совместимости с JSON
            json.dump({str(int(k)): v for k, v in self.id2topic.items()}, f)

        with open(f"{self.output_dir}/label_mappings.json", "w") as f:
            json.dump({
                "topic_to_label_str": self.topic_to_label_str,
                "label_str_to_topic": self.label_str_to_topic
            }, f)

    def load_trained_model(self, model_path: str):
        self.tokenizer = GPT2Tokenizer.from_pretrained(model_path)
        self.model = GPT2LMHeadModel.from_pretrained(model_path).to(self.device)

        with open(f"{model_path}/id2topic.json", "r") as f:
            # При загрузке преобразуем строки обратно в int
            loaded_data = json.load(f)
            self.id2topic = {int(k): v for k, v in loaded_data.items()}

        with open(f"{model_path}/label_mappings.json", "r") as f:
            mappings = json.load(f)
            self.topic_to_label_str = mappings["topic_to_label_str"]
            self.label_str_to_topic = mappings["label_str_to_topic"]

    def predict(self, text: str) -> str:
        self.model.eval()
        prompt = f"{text} [SEP]"
        inputs = self.tokenizer(
            prompt,
            return_tensors="pt",
            truncation=True,
            max_length=self.maximum_sequence_length
        ).to(self.device)

        output = self.model.generate(
            inputs.input_ids,
            max_new_tokens=5,
            num_beams=3,
            early_stopping=True,
            pad_token_id=self.tokenizer.eos_token_id
        )

        sep_pos = (output[0] == self.tokenizer.sep_token_id).nonzero()
        if sep_pos.size(0) > 0:
            sep_index = sep_pos[0].item()
            label_tokens = output[0][sep_index+1:]
            label_str = self.tokenizer.decode(
                label_tokens,
                skip_special_tokens=True
            ).strip().lower()

            label_str = label_str.split()[0].strip()
            label_str = label_str.split('\n')[0].strip()

            return self.label_str_to_topic.get(
                label_str,
                list(self.label_str_to_topic.values())[0]
            )
        return list(self.label_str_to_topic.values())[0]

## С tfidf стоп-словами и с доп стоп-словами

In [None]:
classifier_add_tfidf_10 = TopicClassifier(
    data_path="labeled_add_news.xlsx",
    columns=["title", "summary", "content"],
    output_dir="./drive/MyDrive/classificator_add_tfidf_10"
)
classifier_add_tfidf_10.train_model()

Map:   0%|          | 0/13943 [00:00<?, ? examples/s]

Map:   0%|          | 0/3486 [00:00<?, ? examples/s]

`loss_type=None` was set in the config but it is unrecognised.Using the default loss: `ForCausalLMLoss`.


Epoch,Training Loss,Validation Loss
1,1.7183,1.5655
2,1.3632,1.27964
3,1.2265,1.166211
4,1.134,1.105257
5,1.098,1.060534
6,1.05,1.027831
7,1.0299,1.001984
8,0.9934,0.977912
9,0.9359,0.966658
10,0.9609,0.957848


There were missing keys in the checkpoint model loaded: ['lm_head.weight'].


TypeError: Object of type int64 is not JSON serializable

In [None]:
classifier_add_tfidf_10.predict("Пройти тест можно на сайте факультета довузовской подготовки Вышки. На основе результатов тестирования школьникам предложат подходящие программы подготовки к поступлению, а всем желающим вне зависимости asmdkmsakdmaskmfkjsandfjk mkasm m lasmlkc alk m alksm cka kmsamks kal kla sa lkk akl la lsa as las ka assamdjksamdjsamdjkaismd maskldmklsamdkas от возраста — разбор от психолога по типу личности, а также рекомендации по подходящим сферам обучения или работы.")

np.int64(7)