# Тренировка и конвертация нейронной сети bert-tiny для решения задачи классификации текстовых запросов.


## Импорт необходимых модулей

In [1]:
import os
import re
import torch
from torch import nn
from torch.optim import Adam
from torch.optim.lr_scheduler import ReduceLROnPlateau

from torch.utils.data import DataLoader
from tqdm import tqdm
import wandb

import transformers
from transformers import AutoTokenizer, AutoModel, AutoModelForSequenceClassification
from transformers.onnx import FeaturesManager

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, balanced_accuracy_score, f1_score


  from .autonotebook import tqdm as notebook_tqdm


## Описание необходимых классов и функций

In [2]:
class BertDataset():
    def __init__(self, texts, labels, \
                 tokenizer_path="cointegrated/rubert-tiny2",\
                 alphabet = "абвгдежзийклмнопрстуфхцчшщъыьэюя "):
        
        self.alphabet = alphabet
        self.tokenizer = AutoTokenizer.from_pretrained(tokenizer_path,)
        self.texts = texts
        self.labels = labels
    
    def __getitem__(self, idx: int):
        text = self.texts[idx]
        label = self.labels[idx]
        
        text = self.preprocess(text)
        input_ids, token_type_ids, attention_mask = self.tokenize(text)
        
        return input_ids, token_type_ids, attention_mask, label
    
    def __len__(self,):
        return len(self.texts)
        
    def preprocess(self, text: str):
        """
            Метод подготавливает текст 
            перед использованием в процессе обучения.
            
            Args:
                text (str): текст запроса
            
            Returns:
                new_text (str): подготовленный текст
        """
        text = self.lowercase(text)
        text = self.remove_html(text)
        text = text.replace('ё', 'е')
        text = self.filter_symbols(text) 
        return text
    
    def tokenize(self, text: str):
        """
            Метод разбивает текст на токены, и возвращает три тензора.
            
            Args:
                text (str): текст запроса
            
            Returns:
                input_ids (torch.Tensor): id токенов для переданного в функцию текста
                token_type_ids (torch.Tensor): индексы, обозначающие тип этих токенов
                attention_mask (torch.Tensor): бинарная маска, указывающая на окончание последовательности
                
        """
        t = self.tokenizer(text, padding='max_length', truncation=True, return_tensors='pt')
        input_ids = t['input_ids']
        token_type_ids = t['token_type_ids']
        attention_mask = t['attention_mask']
        return input_ids.squeeze(), token_type_ids.squeeze(), attention_mask.squeeze()
    
    def remove_html(self, text: str):
        """
            Метод ищет все подстроки типа "</p>" (html разметка)
            и удаляет из текста.
            
            Args:
                text (str): текст запроса
            
            Returns:
                text (str): искомый текст
        """
        html_code_pattern = "<\S{1,}>"
        substrings = re.findall(html_code_pattern, text)
        for substring in substrings:
            text = text.replace(substring, '')
        return text
    
    def filter_symbols(self, text: str):
        """
            Метод убирает из текста все символы, 
            которые не входят в словарь (self.alphabet).
            
            Args:
                text (str): текст запроса
            
            Returns:
                new_text (str): отфильтрованный текст
        """
        new_text = ""
        for char in text:
            if char in self.alphabet:
                new_text += char
        return new_text
    
    def lowercase(self, text: str):
        return text.lower()        

In [3]:
class BertClassifier(nn.Module):
    """
        Класс, описывающий нашу модель.
        Нейронная сеть представляет из себя 
        transformer-encoder + линейный классификатор.
    """
    def __init__(self, \
                 num_classes=1, \
                 dropout=0.5, \
                 embedding_size=312, \
                 model_path="cointegrated/rubert-tiny2"):

        super(BertClassifier, self).__init__()

        self.bert = AutoModelForSequenceClassification.from_pretrained(model_path, num_labels=num_classes)
#         self.bert = AutoModel.from_pretrained(model_path, num_labels=num_classes)
        self.dropout = nn.Dropout(dropout)
        self.linear = nn.Linear(embedding_size, num_classes)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax()

    def forward(self, input_ids, token_type_ids, attention_mask):
#         _, pooled_output = self.bert(input_ids=input_ids, attention_mask=attention_mask,return_dict=False)
        output = self.bert(input_ids=input_ids, attention_mask=attention_mask,return_dict=False)[0]
        
#         dropout_output = self.dropout(pooled_output)
#         linear_output = self.linear(dropout_output)
#         final_layer = self.relu(linear_output)

        return output

In [4]:
def adjust_optim(step, optimizer, scheduler, loss, warmup_steps, start_lr):
    warmup_shift = start_lr / warmup_steps
    if (step <= warmup_steps): #(optimizer.param_groups[0]['lr'] < start_lr)
        optimizer.param_groups[0]['lr'] += warmup_shift
    else:  
        scheduler.step(loss)

def train(model, train_dataloader, criterion, optimizer, scheduler, warmup_steps, lr, epoch):
    """
            Функция обучения нейронной сети.
            
            Args:
                model (torch.nn.Module): объект модели
                train_dataloader (torch.utils.data.dataloader.DataLoader): тренировочный загрузчик данных
                criterion (torch.nn.modules.loss.CrossEntropyLoss): функция ошибки
                optimizer (torch.optim.adam.Adam): оптимизатор
                
            Returns:
                total_loss_train (float): среднее значение функции ошибки за эпоху
    """
    total_loss_train = 0

    for i, batch in enumerate(tqdm(train_dataloader)):
        input_ids, token_type_ids, attention_mask, labels = (t.cuda() for t in batch)

        output = model(input_ids, token_type_ids, attention_mask)

        batch_loss = criterion(output, labels.long())
        total_loss_train += batch_loss.item()
        
        wandb.log({"loss":batch_loss.item(),
                  "lr":optimizer.param_groups[0]['lr']})        
        
        model.zero_grad()
        batch_loss.backward()
        optimizer.step()
        if not epoch:
            adjust_optim(i, optimizer, scheduler, batch_loss.item(), warmup_steps, lr)

    total_loss_train = total_loss_train / len(train_dataloader)
    return total_loss_train
                
def test(model, test_dataloader, criterion):
    """
            Функция тестирования нейронной сети.
            
            Args:
                model (torch.nn.Module): объект модели
                test_dataloader (torch.utils.data.dataloader.DataLoader): тестовый загрузчик данных
                criterion (torch.nn.modules.loss.CrossEntropyLoss): функция ошибки
                
            Returns:
                metrics (dict): словарь с метриками
    """
    total_loss_test = 0
    preds = []
    gtrue = []
    
    with torch.no_grad():
        for batch in tqdm(test_dataloader):
            input_ids, token_type_ids, attention_mask, labels = (t.cuda() for t in batch)

            output = model(input_ids, token_type_ids, attention_mask)
    
            preds.extend(output.argmax(-1).tolist())
            gtrue.extend(labels.tolist())
            batch_loss = criterion(output, labels.long())
            total_loss_test += batch_loss.item()
            
    total_loss_test = total_loss_test / len(test_dataloader)
    
    accuracy = accuracy_score(gtrue, preds)
    balanced_accuracy = balanced_accuracy_score(gtrue, preds)
    f1 = f1_score(gtrue, preds, average='weighted')
    
    metrics = {"Accuracy": accuracy, \
              "Balanced_accuracy": balanced_accuracy,\
              "F1-score": f1,\
              "Test loss": total_loss_test,}
    
    return metrics
                  

## Подготовка данных

In [5]:
data_path = 'data.xlsx'
#Читаем табличку и удаляем записи с пропущенными значениями
df = pd.read_excel(data_path).dropna() 

In [6]:
class_counts = df['name'].value_counts() 
#Считаем встречаемость классов в датасете
name2count = {name:count for name,count in zip(class_counts.index, class_counts.values)}

In [7]:
#Добавляем в табличку столбец вхождений класса
df['class_count'] = df['name'].apply(lambda x: name2count[x]) 

In [8]:
#Оставляем записи только тех классов, встречаемость которых выше единицы
data = df[df['class_count'] > 1][['description', 'name']]
name2class_id = {name:idx for idx, name in enumerate(data['name'].unique())}
#Добавляем в табличку столбец с индексами классов
data['class_id'] = data['name'].apply(lambda x: name2class_id[x])
data.drop(['name'], axis=1, inplace=True)

In [9]:
X = data['description'].values
y = data['class_id'].values
#Делим датасет на тренировочную и тестовую выборки, учитывая распределение классов
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

## Обучение нейросети

In [10]:
batch_size = 4                              #Размер батча
learning_rate = 0.00005                     #Начальный шаг обучения
epochs = 20                                 #Итерации обучения
factor = 0.99                               #Коэффициент уменьшения шага обучения
patience = 2000                             #Количество шагов перед уменьшением шага обучения
min_lr = 0.0000005                          #Минимальный шаг обучения
warmup_steps = 1500                         #Количество warmup шагов
n_classes = len(data['class_id'].unique())  #Количество классов в датасете
wandb_project = "infolabs"                  #Проект в wandb для визуализации метрик
wandb_user = "outerspaceguy"                #Username wandb

In [11]:
#Создаем объекты тренировочного и тестового датасетов
train_dataset = BertDataset(X_train, y_train)
test_dataset = BertDataset(X_test, y_test)

#Оборачиваем в dataloader-ы
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

In [12]:
model = BertClassifier(n_classes).cuda()
criterion = nn.CrossEntropyLoss().cuda()
optimizer = Adam(model.parameters(), lr=0)
scheduler = ReduceLROnPlateau(optimizer, 'min', factor=factor, \
         patience=patience, verbose=True, min_lr=min_lr)

Some weights of the model checkpoint at cointegrated/rubert-tiny2 were not used when initializing BertForSequenceClassification: ['cls.predictions.transform.dense.weight', 'cls.predictions.transform.LayerNorm.weight', 'cls.seq_relationship.weight', 'cls.seq_relationship.bias', 'cls.predictions.bias', 'cls.predictions.decoder.bias', 'cls.predictions.transform.dense.bias', 'cls.predictions.decoder.weight', 'cls.predictions.transform.LayerNorm.bias']
- This IS expected if you are initializing BertForSequenceClassification from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing BertForSequenceClassification from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of BertForSequenceClassification were not 

In [13]:
wandb.init(project=wandb_project, entity=wandb_user, name=wandb_project, settings=wandb.Settings(code_dir="."))

[34m[1mwandb[0m: Currently logged in as: [33mouterspaceguy[0m. Use [1m`wandb login --relogin`[0m to force relogin


In [14]:
for epoch in range(epochs):
    train_loss = train(model, train_dataloader, criterion, optimizer, scheduler, warmup_steps, learning_rate, epoch)
    metrics = test(model, test_dataloader, criterion)
    wandb.log(metrics)

100%|██████████████████████████████████████████████████████████████████████████████████████████████████| 1651/1651 [03:26<00:00,  8.00it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 708/708 [00:31<00:00, 22.26it/s]
100%|██████████████████████████████████████████████████████████████████████████████████████████████████| 1651/1651 [03:20<00:00,  8.25it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 708/708 [00:31<00:00, 22.25it/s]
100%|██████████████████████████████████████████████████████████████████████████████████████████████████| 1651/1651 [03:18<00:00,  8.31it/s]
100%|████████████████████████████████████████████████████████████████████████████████████████████████████| 708/708 [00:32<00:00, 21.49it/s]
100%|██████████████████████████████████████████████████████████████████████████████████████████████████| 1651/1651 [03:21<00:00,  8.18it/s]
100%|███████████████

KeyboardInterrupt: 

## Экспорт обученной нейросети в формат ONNX


In [16]:
out_file = "bert-tiny.onnx"
tokenizer_path = "cointegrated/rubert-tiny2"
tokenizer = AutoTokenizer.from_pretrained(tokenizer_path,)

export_model = model.bert.cpu()
dummy_model_input = tokenizer("Это пример", return_tensors="pt", padding='max_length', max_length=2048)

# Экспортируем в формат ONNX
torch.onnx.export(
    model, 
    tuple(dummy_model_input.values()),
    f=out_file,  
    input_names=['input_ids', 'attention_mask'], 
    output_names=['logits'], 
    do_constant_folding=True, 
    opset_version=13, 
)



In [17]:
# Сохраняем токенизатор для последующего использования
tokenizer.save_pretrained('bert_tokenizer')

('bert_tokenizer/tokenizer_config.json',
 'bert_tokenizer/special_tokens_map.json',
 'bert_tokenizer/vocab.txt',
 'bert_tokenizer/added_tokens.json',
 'bert_tokenizer/tokenizer.json')

In [24]:
# Сохраняем набор меток
import json
class_id2name = {name2class_id[name]:name for name in name2class_id}
with open('labels.json', 'w', encoding='utf-8') as file:
    json.dump(class_id2name, file,  ensure_ascii=False)