# Данные

In [1]:
!pip install --quiet datasets

In [2]:
from datasets import Dataset
import json
from collections import Counter
from sklearn.model_selection import train_test_split

# Читаем датасет

In [None]:
with open('dataset.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

# Преобразуем классы

In [None]:
for d in data:
    if d["label"] == "ANSWER":
        d["label"] = "CHAT"
    elif d["label"] == "SUPPORT":
        d["label"] = "QUESTION"

# Некоторая инфа по таргет рейту данных

In [None]:
def calc_target_rates(data):
  target_rates = {
    "SPAM": 0,
    "QUESTION": 0,
    "FEEDBACK": 0,
    "CHAT": 0,
    "IMPORTANT": 0
  }
  for dct in data:
    target_rates[dct['label']] += 1
  return target_rates
print(calc_target_rates(data))

{'SPAM': 178, 'QUESTION': 247, 'FEEDBACK': 167, 'CHAT': 298, 'IMPORTANT': 11}


In [None]:
train_data, test_data = train_test_split(data, test_size=0.2, random_state=42, shuffle=True)

In [None]:
print(calc_target_rates(train_data))

{'SPAM': 141, 'QUESTION': 202, 'FEEDBACK': 134, 'CHAT': 236, 'IMPORTANT': 7}


In [None]:
print(calc_target_rates(test_data))

{'SPAM': 37, 'QUESTION': 45, 'FEEDBACK': 33, 'CHAT': 62, 'IMPORTANT': 4}


In [None]:
import random

dataset_train = Dataset.from_list(train_data)

test_data_arr = [t["text"] for t in test_data]
dataset_test = Dataset.from_list(test_data)

In [None]:
dataset_train

Dataset({
    features: ['text', 'label'],
    num_rows: 720
})

In [None]:
dataset_test

Dataset({
    features: ['text', 'label'],
    num_rows: 181
})

# Параметры

In [None]:
id2label = {
    0: "SPAM",
    1: "QUESTION",
    2: "FEEDBACK",
    3: "CHAT",
    4: "IMPORTANT"
}

label2id = {v: k for k, v in id2label.items()}

# Модель

In [None]:
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from transformers import TrainingArguments, Trainer
from transformers import pipeline

In [None]:
from google.colab import userdata
from huggingface_hub import login

hf_token = userdata.get('HF_TOKEN')
login(token=hf_token)

In [None]:
import torch
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [None]:
from torch import nn
from transformers import AutoModel, AutoModelForSequenceClassification

class CustomClassificationHead(nn.Module):
    def __init__(self, hidden_size, num_labels):
        super().__init__()
        self.layers = nn.Sequential(
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size // 2),
            nn.ReLU(),
            nn.Linear(hidden_size // 2, num_labels)
        )

    def forward(self, x):
        return self.layers(x)

class CustomModel(AutoModelForSequenceClassification):
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels
        self.bert = AutoModel.from_pretrained(config._name_or_path, config=config)
        hidden_size = config.hidden_size

        self.classifier = CustomClassificationHead(hidden_size, config.num_labels)

    def forward(self, **kwargs):
        outputs = self.bert(**kwargs)
        pooled_output = outputs.pooler_output  # [CLS] токен
        logits = self.classifier(pooled_output)
        return {'logits': logits}

In [None]:
from transformers import AutoTokenizer, AutoConfig

model_name = "blanchefort/rubert-base-cased-sentiment"

tokenizer = AutoTokenizer.from_pretrained(model_name)
config = AutoConfig.from_pretrained(
    model_name,
    num_labels=len(id2label),
    id2label=id2label,
    label2id=label2id
)

model = CustomModel.from_pretrained(model_name, config=config, ignore_mismatched_sizes=True).to(device)

Some weights of BertForSequenceClassification were not initialized from the model checkpoint at blanchefort/rubert-base-cased-sentiment and are newly initialized because the shapes did not match:
- classifier.bias: found shape torch.Size([3]) in the checkpoint and torch.Size([5]) in the model instantiated
- classifier.weight: found shape torch.Size([3, 768]) in the checkpoint and torch.Size([5, 768]) in the model instantiated
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


# Обучение

In [None]:
def tokenize_function(examples):
    tokenized = tokenizer(examples["text"], padding="max_length", truncation=True, max_length=128)
    tokenized["label"] = [label2id[label] for label in examples["label"]]
    return tokenized

tokenized_datasets_train = dataset_train.map(
    tokenize_function,
    batched=True,
    remove_columns=["text", "label"]
)

tokenized_datasets_test = dataset_test.map(
    tokenize_function,
    batched=True,
    remove_columns=["text", "label"]
)

Map:   0%|          | 0/720 [00:00<?, ? examples/s]

Map:   0%|          | 0/181 [00:00<?, ? examples/s]

In [None]:
from sklearn.metrics import precision_score, recall_score, f1_score
import numpy as np
from transformers import Trainer, TrainingArguments, EarlyStoppingCallback

# Определяем функцию для расчета метрик
def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=1)

    precision = precision_score(labels, predictions, average='macro', labels=np.unique(labels))
    recall = recall_score(labels, predictions, average='macro', labels=np.unique(labels))
    f1 = f1_score(labels, predictions, average='macro', labels=np.unique(labels))

    return {
        'precision': precision,
        'recall': recall,
        'f1': f1
    }

training_args = TrainingArguments(
    output_dir="./results",
    evaluation_strategy="steps",
    save_strategy="steps",
    save_steps=1000,
    save_total_limit=3,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    greater_is_better=True,
    num_train_epochs=50,
    logging_dir="./logs",
    logging_steps=1000,
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets_train,
    eval_dataset=tokenized_datasets_test,
    compute_metrics=compute_metrics,
    callbacks=[EarlyStoppingCallback(early_stopping_patience=2)],
)

trainer.train()

eval_results = trainer.evaluate()

print(f"Evaluation Results: {eval_results}")



Step,Training Loss,Validation Loss,Precision,Recall,F1
1000,0.8817,0.937925,0.601466,0.561201,0.570623
2000,0.4914,1.090685,0.635075,0.581383,0.592738
3000,0.4277,0.827385,0.665753,0.671399,0.665272
4000,0.3152,0.910851,0.665613,0.672187,0.667732


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


Evaluation Results: {'eval_loss': 0.910851240158081, 'eval_precision': 0.6656126669029895, 'eval_recall': 0.6721874367035657, 'eval_f1': 0.6677317869415808, 'eval_runtime': 1.3594, 'eval_samples_per_second': 133.144, 'eval_steps_per_second': 16.919, 'epoch': 50.0}


  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))


# Проверка

In [None]:
def convert_labels_to_vector(data, label2id):
    return [label2id[item['label']] for item in data]

In [None]:
test_data_arr

['этот продукт изменит твою жизнь — жми сюда',
 'И это тоже. В зависимости от стиля программирования concurrency подобные "императивные" проблемы могут возникать реже.',
 'мне очень понравился интерфейс — всё просто',
 "Кнопка 'отправить' неактивна",
 'что-нибудь делал сегодня?',
 'Если бы можно было односложно, то я бы не рассказывал 3 часа (',
 'По каким критериям отбираются студенты на курс, если порог превышен?',
 'Свой стд лок не будем писать для произвольного числа аргументов?)',
 'Привет, давай поболтаем!',
 'Слишком мелкий шрифт в мобильной версии сайта.',
 'Вопрос, есть предметы по выбору, они очные (пешочком ходить надо)?',
 'Какой адрес у нового офиса?',
 'Где оставить документы?',
 'https://spam.link',
 'Заработай на просмотре рекламы — плати за интернет просмотром',
 'Ваш номер выбран для получения приза. Подробности по ссылке.',
 'Почему мой профиль не обновляется?',
 'не получается привязать почту',
 'думаю, стоит взять выходной',
 'прикинь, вчера свет вырубили',
 'Когда

In [None]:
y_test = convert_labels_to_vector(test_data, label2id)

In [None]:
from sklearn.metrics import accuracy_score, precision_score, recall_score

In [None]:
label2id

{'SPAM': 0, 'QUESTION': 1, 'FEEDBACK': 2, 'CHAT': 3, 'IMPORTANT': 4}

In [None]:
classifier = pipeline(
    "text-classification",
    model=model,
    tokenizer=tokenizer
)

y_pred = []
for text in test_data_arr:
    result = classifier(text)
    y_pred.append(label2id[result[0]['label']])
    #print(f"Текст: {text[:30]}... | Класс: {result[0]['label']}")

Device set to use cuda:0


In [None]:
accuracy_score(y_test, y_pred)

0.7828947368421053

In [None]:
precision_class_SPAM = precision_score(y_test, y_pred, labels=[0], average='micro')
recall_class_SPAM = recall_score(y_test, y_pred, labels=[0], average='micro')
precision_class_SPAM, recall_class_SPAM

(0.9, 0.782608695652174)

In [None]:
precision_class_QUESTION = precision_score(y_test, y_pred, labels=[1], average='micro')
recall_class_QUESTION = recall_score(y_test, y_pred, labels=[1], average='micro')
precision_class_QUESTION, recall_class_QUESTION

(0.7391304347826086, 0.8095238095238095)

In [None]:
precision_class_FEEDBACK = precision_score(y_test, y_pred, labels=[2], average='micro')
recall_class_FEEDBACK = recall_score(y_test, y_pred, labels=[2], average='micro')
precision_class_FEEDBACK, recall_class_FEEDBACK

(0.8636363636363636, 0.76)

In [None]:
precision_class_CHAT = precision_score(y_test, y_pred, labels=[3], average='micro')
recall_class_CHAT = recall_score(y_test, y_pred, labels=[3], average='micro')
precision_class_CHAT, recall_class_CHAT

(0.7796610169491526, 0.7796610169491526)

In [None]:
precision_class_IMPORTANT = precision_score(y_test, y_pred, labels=[4], average='micro')
recall_class_IMPORTANT = recall_score(y_test, y_pred, labels=[4], average='micro')
precision_class_IMPORTANT, recall_class_IMPORTANT

(0.4, 0.6666666666666666)

In [None]:
precision = precision_score(y_test, y_pred, average='macro', labels=[0,1,2,3,4])
recall = recall_score(y_test, y_pred, average='macro', labels=[0,1,2,3,4])
f1 = f1_score(y_test, y_pred, average='macro', labels=[0,1,2,3,4])
precision, recall, f1

(0.736485563073625, 0.7596920377583606, 0.7396216460599758)