In [1]:
import random
import string

import pandas as pd
import pytorch_lightning as pl
import torch
import torchmetrics
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from torch import nn
from torch.utils.data import DataLoader, Dataset
from tqdm.notebook import tqdm
from transformers import AutoModel, AutoTokenizer

tqdm.pandas()


MODEL_NAME = 'cointegrated/rubert-tiny2'

In [2]:
class TextDataset(Dataset):
    def __init__(
        self,
        texts: list[str],
        labels: list[str],
        tokenizer: AutoTokenizer,
        max_length: int = 512,
    ) -> None:
        self.texts = texts
        self.labels = labels
        self.tokenizer = tokenizer
        self.max_length = max_length

    def __len__(self) -> int:
        return len(self.texts)

    def __getitem__(self, idx: int) -> dict:
        text = self.texts[idx]
        label = self.labels[idx]
        encoding = self.tokenizer(
            text,
            padding='max_length',
            truncation=True,
            max_length=self.max_length,
            return_tensors='pt',
        )
        input_ids = encoding['input_ids'].squeeze(0)
        attention_mask = encoding['attention_mask'].squeeze(0)

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': torch.tensor(label, dtype=torch.long),
        }

In [3]:
class BERTClassifier(pl.LightningModule):
    def __init__(
        self,
        num_classes: int,
        steps_per_epoch=None,
        max_epochs=None,
        lr=2e-5,
    ) -> None:
        super().__init__()
        self.bert = AutoModel.from_pretrained(MODEL_NAME).train()
        hidden_size = self.bert.config.hidden_size
        self.pre_classifier = nn.Linear(hidden_size, hidden_size, bias=True)
        self.classifier = nn.Sequential(
            nn.ReLU(),
            nn.Dropout(0.1),
            nn.Linear(hidden_size, num_classes, bias=True),
        )

        self.loss_fn = nn.CrossEntropyLoss()
        self.steps_per_epoch = steps_per_epoch
        self.max_epochs = max_epochs
        self.lr = lr

        self.train_accuracy = torchmetrics.Accuracy(
            task='multiclass',
            num_classes=num_classes,
            average='macro',
        )
        self.val_accuracy = torchmetrics.Accuracy(
            task='multiclass',
            num_classes=num_classes,
            average='macro',
        )

        self.train_precision = torchmetrics.Precision(
            task='multiclass',
            num_classes=num_classes,
            average='macro',
        )
        self.train_recall = torchmetrics.Recall(
            task='multiclass',
            num_classes=num_classes,
            average='macro',
        )
        self.train_f1 = torchmetrics.F1Score(
            task='multiclass',
            num_classes=num_classes,
            average='macro',
        )

        self.val_precision = torchmetrics.Precision(
            task='multiclass',
            num_classes=num_classes,
            average='macro',
        )
        self.val_recall = torchmetrics.Recall(
            task='multiclass',
            num_classes=num_classes,
            average='macro',
        )
        self.val_f1 = torchmetrics.F1Score(
            task='multiclass',
            num_classes=num_classes,
            average='macro',
        )

    def forward(self, input_ids, attention_mask):
        outputs = self.bert(input_ids=input_ids, attention_mask=attention_mask)[1]
        return self.classifier(outputs)

    def training_step(self, batch):
        outputs = self(batch['input_ids'], batch['attention_mask'])
        labels = batch['labels']
        loss = self.loss_fn(outputs, labels)

        preds = outputs.softmax(dim=-1)
        self.train_accuracy.update(preds, labels)
        self.train_precision.update(preds, labels)
        self.train_recall.update(preds, labels)
        self.train_f1.update(preds, labels)

        self.log('train_loss', loss)
        return loss

    def on_train_epoch_end(self):
        accuracy = self.train_accuracy.compute()
        precision = self.train_precision.compute()
        recall = self.train_recall.compute()
        f1 = self.train_f1.compute()

        self.log('train_accuracy', accuracy, on_epoch=True)
        self.log('train_precision', precision, on_epoch=True)
        self.log('train_recall', recall, on_epoch=True)
        self.log('train_f1', f1, on_epoch=True)

        print(
            f'Epoch: {self.current_epoch}, Train Accuracy: {accuracy:.4f}, '
            f'Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}'
        )

        self.train_accuracy.reset()
        self.train_precision.reset()
        self.train_recall.reset()
        self.train_f1.reset()

    def validation_step(self, batch, batch_idx):
        outputs = self(batch['input_ids'], batch['attention_mask'])
        labels = batch['labels']
        loss = self.loss_fn(outputs, labels)

        preds = outputs.softmax(dim=-1)
        self.val_accuracy.update(preds, labels)
        self.val_precision.update(preds, labels)
        self.val_recall.update(preds, labels)
        self.val_f1.update(preds, labels)

        self.log('val_loss', loss)
        return loss

    def on_validation_epoch_end(self):
        accuracy = self.val_accuracy.compute()
        precision = self.val_precision.compute()
        recall = self.val_recall.compute()
        f1 = self.val_f1.compute()

        self.log('val_accuracy', accuracy, on_epoch=True)
        self.log('val_precision', precision, on_epoch=True)
        self.log('val_recall', recall, on_epoch=True)
        self.log('val_f1', f1, on_epoch=True)

        print(
            f'Epoch: {self.current_epoch}, Validation Accuracy: {accuracy:.4f}, '
            f'Precision: {precision:.4f}, Recall: {recall:.4f}, F1: {f1:.4f}'
        )

        self.val_accuracy.reset()
        self.val_precision.reset()
        self.val_recall.reset()
        self.val_f1.reset()

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.parameters(), lr=self.lr)
        return optimizer

In [4]:
def add_or_remove_punctuation(text: str) -> str:
    punctuations = [',', '.', '!', '?']
    words = text.split()

    if random.random() < 0.5:
        position = random.randint(0, len(words) - 1)
        punct = random.choice(punctuations)
        words[position] = words[position] + punct
    else:
        text = text.translate(str.maketrans('', '', string.punctuation))
        words = text.split()

    return ' '.join(words)


def introduce_typo(text: str) -> str:
    words = text.split()
    index = random.randint(0, len(words) - 1)
    word = words[index]

    typo_type = random.choice(['swap', 'remove', 'duplicate'])

    if typo_type == 'swap' and len(word) > 1:
        pos = random.randint(0, len(word) - 2)
        word = list(word)
        word[pos], word[pos + 1] = word[pos + 1], word[pos]
        words[index] = ''.join(word)

    elif typo_type == 'remove' and len(word) > 1:
        pos = random.randint(0, len(word) - 1)
        words[index] = word[:pos] + word[pos + 1 :]

    elif typo_type == 'duplicate':
        pos = random.randint(0, len(word) - 1)
        words[index] = word[:pos] + word[pos] + word[pos:]

    return ' '.join(words)


def shuffle_words(text: str) -> str:
    words = text.split()
    if len(words) > 1:
        random.shuffle(words)
    return ' '.join(words)


AUG_NUM = 30


def balance_dataset(texts: list[str], labels: list[int]):
    df = pd.DataFrame({'question': texts, 'content': labels})

    max_count = df['content'].value_counts().max()
    augmented_data = []

    for _, group in tqdm(df.groupby('content')):
        count = len(group)
        augmented_data.extend(group.to_dict('records'))

        for _ in range(min(AUG_NUM, max_count - count)):
            row = group.sample(1).iloc[0].to_dict()
            question = row['question']

            augmented_question = add_or_remove_punctuation(question)
            augmented_question = introduce_typo(augmented_question)
            augmented_question = shuffle_words(augmented_question)

            new_row = row.copy()
            new_row['question'] = augmented_question
            augmented_data.append(new_row)

    augmented_df = pd.DataFrame(augmented_data)

    balanced_texts = augmented_df['question'].tolist()
    balanced_labels = augmented_df['content'].tolist()

    return balanced_texts, balanced_labels

In [5]:
def preprocess_data(
    texts: list[str],
    labels: list[str],
    max_length: int,
    batch_size: int,
):
    le = LabelEncoder()
    labels = le.fit_transform(labels)

    train_texts, val_texts, train_labels, val_labels = train_test_split(
        texts,
        labels,
        test_size=0.15,
        random_state=42,
        stratify=labels,
    )

    train_texts, train_labels = balance_dataset(train_texts, train_labels)
    print(f'Train size: {len(train_labels)}, Valid size: {len(val_labels)}')

    tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

    train_dataset = TextDataset(train_texts, train_labels, tokenizer, max_length)
    val_dataset = TextDataset(val_texts, val_labels, tokenizer, max_length)

    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size)

    return train_loader, val_loader, le

In [6]:
def train_model(
    texts,
    labels,
    num_classes,
    max_length=512,
    batch_size=16,
    max_epochs=5,
    lr=2e-5,
):
    train_loader, val_loader, label_encoder = preprocess_data(
        texts, labels, max_length, batch_size
    )

    steps_per_epoch = len(train_loader)
    model = BERTClassifier(
        num_classes=num_classes,
        steps_per_epoch=steps_per_epoch,
        max_epochs=max_epochs,
        lr=lr,
    )

    trainer = pl.Trainer(
        max_epochs=max_epochs,
        devices=1 if torch.cuda.is_available() else None,
        accelerator='gpu' if torch.cuda.is_available() else 'cpu',
    )

    trainer.fit(model, train_loader, val_loader)

    return model, label_encoder

In [7]:
df = pd.read_csv('LK_modified.xlsx - Вопрос ответ.csv')
df = df[df['content'].duplicated(keep=False)]

texts = df['question'].to_list()
labels = df['content'].to_list()

In [8]:
num_classes = len(set(labels))
model, label_encoder = train_model(texts, labels, num_classes, max_epochs=15)

  0%|          | 0/117 [00:00<?, ?it/s]

Train size: 4805, Valid size: 238


GPU available: True (cuda), used: True
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs
d:\GitHub\research\.venv\Lib\site-packages\pytorch_lightning\trainer\connectors\logger_connector\logger_connector.py:75: Starting from v1.9.0, `tensorboardX` has been removed as a dependency of the `pytorch_lightning` package, due to potential conflicts with other packages in the ML ecosystem. For this reason, `logger=True` will use `CSVLogger` as the default logger, unless the `tensorboard` or `tensorboardX` packages are found. Please `pip install lightning[extra]` or one of them to enable TensorBoard support by default
You are using a CUDA device ('NVIDIA GeForce RTX 4060 Laptop GPU') that has Tensor Cores. To properly utilize them, you should set `torch.set_float32_matmul_precision('medium' | 'high')` which will trade-off precision for performance. For more details, read https://pytorch.org/docs/stable/generated/torch.set_float32_matmul_precision.html#torch.set_float32

Sanity Checking: |          | 0/? [00:00<?, ?it/s]

d:\GitHub\research\.venv\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:424: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.
  attn_output = torch.nn.functional.scaled_dot_product_attention(


Epoch: 0, Validation Accuracy: 0.0000, Precision: 0.0000, Recall: 0.0000, F1: 0.0000


d:\GitHub\research\.venv\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:424: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=15` in the `DataLoader` to improve performance.


Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 0, Validation Accuracy: 0.0269, Precision: 0.0179, Recall: 0.0269, F1: 0.0199
Epoch: 0, Train Accuracy: 0.0257, Precision: 0.0588, Recall: 0.0257, F1: 0.0194


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 1, Validation Accuracy: 0.1862, Precision: 0.1635, Recall: 0.1862, F1: 0.1678
Epoch: 1, Train Accuracy: 0.1405, Precision: 0.2885, Recall: 0.1405, F1: 0.1478


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 2, Validation Accuracy: 0.2161, Precision: 0.2176, Recall: 0.2161, F1: 0.2094
Epoch: 2, Train Accuracy: 0.2510, Precision: 0.5089, Recall: 0.2510, F1: 0.2707


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 3, Validation Accuracy: 0.3233, Precision: 0.3274, Recall: 0.3233, F1: 0.3180
Epoch: 3, Train Accuracy: 0.3526, Precision: 0.6039, Recall: 0.3526, F1: 0.3817


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 4, Validation Accuracy: 0.3411, Precision: 0.3463, Recall: 0.3411, F1: 0.3381
Epoch: 4, Train Accuracy: 0.4651, Precision: 0.7344, Recall: 0.4651, F1: 0.4997


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 5, Validation Accuracy: 0.5081, Precision: 0.5117, Recall: 0.5081, F1: 0.5044
Epoch: 5, Train Accuracy: 0.5513, Precision: 0.8211, Recall: 0.5513, F1: 0.5840


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 6, Validation Accuracy: 0.5249, Precision: 0.5515, Recall: 0.5249, F1: 0.5252
Epoch: 6, Train Accuracy: 0.6463, Precision: 0.8625, Recall: 0.6463, F1: 0.6800


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 7, Validation Accuracy: 0.5249, Precision: 0.5439, Recall: 0.5249, F1: 0.5240
Epoch: 7, Train Accuracy: 0.7386, Precision: 0.8841, Recall: 0.7386, F1: 0.7670


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 8, Validation Accuracy: 0.5613, Precision: 0.5728, Recall: 0.5613, F1: 0.5597
Epoch: 8, Train Accuracy: 0.7956, Precision: 0.9147, Recall: 0.7956, F1: 0.8167


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 9, Validation Accuracy: 0.5848, Precision: 0.5813, Recall: 0.5848, F1: 0.5755
Epoch: 9, Train Accuracy: 0.8541, Precision: 0.9382, Recall: 0.8541, F1: 0.8708


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 10, Validation Accuracy: 0.5841, Precision: 0.5807, Recall: 0.5841, F1: 0.5749
Epoch: 10, Train Accuracy: 0.8874, Precision: 0.9432, Recall: 0.8874, F1: 0.8980


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 11, Validation Accuracy: 0.5953, Precision: 0.6127, Recall: 0.5953, F1: 0.5921
Epoch: 11, Train Accuracy: 0.9196, Precision: 0.9523, Recall: 0.9196, F1: 0.9264


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 12, Validation Accuracy: 0.5917, Precision: 0.6046, Recall: 0.5917, F1: 0.5839
Epoch: 12, Train Accuracy: 0.9400, Precision: 0.9597, Recall: 0.9400, F1: 0.9447


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 13, Validation Accuracy: 0.6046, Precision: 0.6157, Recall: 0.6046, F1: 0.5961
Epoch: 13, Train Accuracy: 0.9501, Precision: 0.9623, Recall: 0.9501, F1: 0.9529


Validation: |          | 0/? [00:00<?, ?it/s]

Epoch: 14, Validation Accuracy: 0.6051, Precision: 0.6154, Recall: 0.6051, F1: 0.5972
Epoch: 14, Train Accuracy: 0.9589, Precision: 0.9670, Recall: 0.9589, F1: 0.9606


`Trainer.fit` stopped: `max_epochs=15` reached.


In [9]:
def predict_class(
    model: BERTClassifier,
    tokenizer: AutoTokenizer,
    text: str,
    label_encoder: LabelEncoder,
    device: str = 'cpu',
) -> str:
    inputs = tokenizer(
        text, padding='max_length', truncation=True, max_length=64, return_tensors='pt'
    )
    input_ids = inputs['input_ids'].to(device)
    attention_mask = inputs['attention_mask'].to(device)

    model.eval()

    with torch.no_grad():
        outputs = model(input_ids, attention_mask)
        predictions = torch.argmax(outputs, dim=-1)

    predicted_class_id = predictions.item()
    predicted_class_text = label_encoder.inverse_transform([predicted_class_id])[0]

    return predicted_class_text


model.eval()
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model.to(device)
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

text = 'Не вижу свою команду в личном кабинете!'
predicted_class_text = predict_class(model, tokenizer, text, label_encoder, device)
print(f'Predicted class: {predicted_class_text}')

Predicted class: Если в "команде" нет подчиненных сотрудников просьба обратиться в поддержку для корректировки
