In [None]:
import re
import string
import pandas as pd
import numpy as np
import emoji
from sklearn.model_selection import train_test_split
import evaluate
import transformers
import torch
import matplotlib.pyplot as plt
from sklearn.metrics import f1_score
from sklearn.metrics import confusion_matrix
from transformers import EarlyStoppingCallback
from peft import LoraConfig, get_peft_model
import os

from torch.utils.data import Dataset, DataLoader
from datasets import load_dataset
from transformers import DataCollatorWithPadding
from transformers import (
    AutoTokenizer,
    TrainingArguments,
    Trainer,
    AutoTokenizer, AutoModelForSequenceClassification
)

In [None]:
np.random.seed(0)

In [None]:
data_train = pd.read_csv('/content/drive/MyDrive/Znatno_personal/Отчетность ВКР/Артефакты/Parsing_avito/Model for benchmark/data/train - train (3).csv')

In [None]:
data_test = pd.read_csv('/content/drive/MyDrive/Znatno_personal/Отчетность ВКР/Артефакты/Parsing_avito/Model for benchmark/data/test_data.csv')

In [None]:
X = data_train['message.text']
y = data_train['Оценка']
X_test = data_test['message.text']
y_test = data_test['Оценка']

# Предобработка

In [None]:
def convert_emojis_to_words(text):

    # Convert emojis to words
    text = emoji.replace_emoji(text, replace=" ")

    # Remove the : from the words and replace _ with space
    text = text.replace("_", " ")

    return text

In [None]:
symbols_pattern = re.compile(pattern = "["
    "@_!#$%^&*()<>?/\|}{~√•—"
                       "]+", flags = re.UNICODE) #спецсимволы
# двойные пробелы
space_pattern = re.compile('\s+')

In [None]:
def clear_text(text):
    """ Функция удаления спецсимволов"""
    # удаление спецсимволов и emoji
    pre = symbols_pattern.sub(r'',text)
    pre = convert_emojis_to_words(pre)
    pre = re.sub(r'http\S+', '', pre)

    return space_pattern.sub(' ', pre)

In [None]:
def preprocess_text(text):
    """ Финальная функция для обработки """
    # srip + lower + punctuation
    sentence = (
        ''.join([x for x in str(text).strip().lower()])
    )

    return clear_text(sentence)

In [None]:
X = X.apply(preprocess_text)

In [None]:
X_test = X_test.apply(preprocess_text)

In [None]:
X_train, X_val, y_train, y_val = train_test_split(X, y,test_size=0.2, random_state = 42)

# Загрузка базовой модели

In [None]:
tokenizer = AutoTokenizer.from_pretrained("DeepPavlov/rubert-base-cased")
model = AutoModelForSequenceClassification.from_pretrained("DeepPavlov/rubert-base-cased", num_labels = 9)

In [None]:
tokenizer.add_special_tokens({'pad_token': '[SEP]'})

In [None]:
model.resize_token_embeddings(len(tokenizer))

In [None]:
model.config.pad_token_id = tokenizer.pad_token_id

In [None]:
class CustomDataset(Dataset):
    def __init__(self, X, y, tokenizer):
        super().__init__()
        self.X = X
        self.y = y
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.X)
    def __getitem__(self, idx):
        text = self.X.iloc[idx]
        #text = self.text.iloc[idx]
        label = self.y.iloc[idx]
        output = self.tokenizer(
            text,
            max_length=512,
            padding='max_length',
            truncation=True
            #return_tensors="pt"
        )

        #one_hot_labels = torch.nn.functional.one_hot(label - 1, num_classes=10)
        output["label"]= torch.tensor(label)
        #output["label"] =
        return output

In [None]:
tokenized_train = CustomDataset(X_train, y_train, tokenizer)
tokenized_test = CustomDataset(X_test, y_test, tokenizer)

In [None]:
from transformers import DataCollatorWithPadding

# ... (rest of your code) ...

# Replace default_data_collator with DataCollatorWithPadding
data_collator = DataCollatorWithPadding(tokenizer=tokenizer)

In [None]:
train_dataloader = DataLoader(
    CustomDataset(
        X = X_train,
        y = y_train,
        tokenizer = tokenizer),
    collate_fn=data_collator,
    batch_size=2,
    shuffle=True
)

In [None]:
next(iter(train_dataloader))

In [None]:
f1_metric = evaluate.load("f1")
accuracy_metric = evaluate.load("accuracy")
one_balance_accuracy = evaluate.load("accuracy")

In [None]:
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'
os.environ['TORCH_USE_CUDA_DSA'] = '1'

In [None]:
def compute_metrics(eval_pred):
    logits, labels = eval_pred
    logits = torch.from_numpy(logits)
    labels = torch.from_numpy(labels)
    if torch.isnan(logits).any() or torch.isinf(logits).any():
        print("Warning: NaN or inf values detected in logits.")
    predictions = np.argmax(logits, axis=-1)
    global f1_metric, accuracy_metric
    f1 = f1_metric.compute(predictions=predictions, references=labels,
      average='weighted')['f1']
    accuracy = accuracy_metric.compute(predictions=predictions, references=labels)['accuracy']
    one_balance_accuracy
    return {"accuracy": accuracy, "f1":f1}


# Обучение c LoRA

In [None]:
training_args = TrainingArguments(
    output_dir="//content/drive/MyDrive/finetuning_rubert_LoRA3/",
    per_device_train_batch_size= 64,
    per_device_eval_batch_size=64,
    eval_accumulation_steps=100,
    learning_rate=1e-3,
	  num_train_epochs=25,
    gradient_accumulation_steps=2, # Accumulate gradients over 2 steps
	  warmup_ratio=0.1,
    weight_decay=0.01,
    bf16=True, # bfloat16 training
    logging_strategy="steps",
    logging_steps=20,
    evaluation_strategy="steps",
    save_strategy="steps",
    save_total_limit=2,
    optim="adamw_torch",
    load_best_model_at_end=True,
    metric_for_best_model="accuracy")
    #report_to="tensorb")

In [None]:
peft_config = LoraConfig(
    r=16,  # Rank of the LoRA update matrices
    lora_alpha=16,  # Scaling factor for the LoRA update matrices
    lora_dropout=0.5,  # Dropout probability for the LoRA update matrices
    bias="lora_only",
    modules_to_save=["decode_head"],# Whether to apply bias to the LoRA update matrices
    task_type="SEQ_CLS" ) # Task type for sequence classification


model = get_peft_model(model, peft_config)
model.print_trainable_parameters()

In [None]:
trainer = Trainer(
    model,
    args=training_args,
    train_dataset=CustomDataset(X_train, y_train, tokenizer=tokenizer),  # Use the Dataset object
    eval_dataset=CustomDataset(X_val, y_val, tokenizer=tokenizer),  # Use the Dataset object
    compute_metrics=compute_metrics,
    data_collator=data_collator,
    callbacks=[EarlyStoppingCallback(2, 0.0)]
)

In [None]:
torch.cuda.empty_cache()

In [None]:
trainer.train()

In [None]:
model_id = "//content/drive/MyDrive/finetuning_rubert_LoRA3/deepPavlov_with_lora"
model.save_pretrained(model_id)

In [None]:
test_dataset=CustomDataset(X_test, y_test, tokenizer=tokenizer)

In [None]:
test = trainer.predict(test_dataset)

In [None]:
y_pred = np.argmax(test[0],axis=1)

In [None]:
print(test[2])

In [None]:
cm = confusion_matrix(y_test, y_pred, labels=np.unique(y_test))

fig, ax = plt.subplots(figsize=(6,6))
im = ax.imshow(cm, interpolation='nearest', aspect='auto')
unique_labels = np.unique(y_test).astype(int)
ax.set_xticks(range(9))
ax.set_yticks(range(9))
ax.set_xticklabels(unique_labels, rotation=45)
ax.set_yticklabels(unique_labels)
ax.set_ylabel('Истинный класс')
ax.set_xlabel('Предсказанный класс')
fig.colorbar(im, ax=ax)
plt.tight_layout()
plt.show()

In [None]:
# Сначала получим список всех классов
classes = np.unique(y_test)

# 1) F1-score по классам
f1_per_class = f1_score(y_test, y_pred, labels=classes, average=None, zero_division=0)

# 2) «Точность по классу» как доля правильно предсказанных среди всех примеров этого класса
#    (этот показатель в мультиклассе эквивалентен recall для данного класса)
accuracy_per_class = np.array([
    np.mean(y_pred[y_test == cls] == cls)
    for cls in classes
])

# 3) Собираем всё в DataFrame
df = pd.DataFrame({
    'class': classes.astype(str),
    'accuracy': accuracy_per_class,
    'f1_score': f1_per_class
}).set_index('class')

print(df)


# Частичное обучение

In [None]:
for name, param in model.bert.named_parameters():
  print(name)
  if 'encoder.layer.' in name:
      layer_num = int(name.split('.')[2])
      if layer_num >= (model.config.num_hidden_layers - 2):
          param.requires_grad = True
      else:
          param.requires_grad = False

In [None]:
training_args1 = TrainingArguments(
    output_dir="checkpoints/",
    per_device_train_batch_size=64,
    per_device_eval_batch_size=64,
    eval_accumulation_steps=20,
    learning_rate=1e-4,
	  num_train_epochs=11,
    logging_strategy="steps",
    logging_steps=100,
    evaluation_strategy="steps",
    save_strategy="steps",
    save_total_limit=2,
    load_best_model_at_end=True,
    metric_for_best_model="f1",
    report_to="tensorboard",
)

In [None]:
trainer = Trainer(
    model=model,
    args=training_args1,
    train_dataset=CustomDataset(X_train, y_train, tokenizer=tokenizer),
    eval_dataset=CustomDataset(X_val, y_val, tokenizer=tokenizer),
    compute_metrics=compute_metrics,
)

In [None]:
trainer.train()

In [None]:
# Извлекаем историю логирования
log_history = trainer.state.log_history

# Списки для тренировочной и валидационной потерь и соответствующих шагов
train_steps, train_losses = [], []
eval_steps, eval_losses = [], []

for entry in log_history:
    # Тренировочная потеря
    if "loss" in entry:
        # Используем шаг обучения, если он есть, иначе порядковый номер
        train_steps.append(entry.get("step", len(train_steps) + 1))
        train_losses.append(entry["loss"])
    # Валидационная потеря
    if "eval_loss" in entry:
        eval_steps.append(entry.get("step", len(eval_steps) + 1))
        eval_losses.append(entry["eval_loss"])

# Построение графика
plt.figure(figsize=(10, 5))
plt.plot(train_steps, train_losses, marker="o", label="Training Loss")
plt.plot(eval_steps, eval_losses, marker="o", label="Validation Loss")
plt.xlabel("Шаг обучения")
plt.ylabel("Потеря")
plt.title("График тренировочной и валидационной потери")
plt.legend()
plt.grid(True)
plt.show()
plt.savefig('mistral_train_val.png')

In [None]:
test_dataset1 = CustomDataset(X_test, y_test, tokenizer=tokenizer)

In [None]:
test1 = trainer.predict(test_dataset1)

In [None]:
y_pred1 = np.argmax(test1[0],axis=1)

In [None]:
print(test1[2])

In [None]:
classes = np.unique(y_test)

# 1) F1-score по классам
f1_per_class = f1_score(y_test, y_pred1, labels=classes, average=None, zero_division=0)

# 2) «Точность по классу» как доля правильно предсказанных среди всех примеров этого класса
#    (этот показатель в мультиклассе эквивалентен recall для данного класса)
accuracy_per_class = np.array([
    np.mean(y_pred1[y_test == cls] == cls)
    for cls in classes
])

# 3) Собираем всё в DataFrame
df = pd.DataFrame({
    'class': classes.astype(str),
    'accuracy': accuracy_per_class,
    'f1_score': f1_per_class
}).set_index('class')

print(df)