## Использование алгоритма Ахо-Корасик для определения сущностей

In [None]:
def find_word_indices(texts: list[str], words: list[str], name_to_id: dict):
    '''
    Возвращает два массива: в первом для каждого сообщения перечислены issuer_id,
    синонимы к которым были найдены, во втором перечислены сами синонимы
    :param texts: массив сообщений
    :param words: массив синонимов, вхождения которых будем искать в сообщениях
    :param name_to_id: Словарь, где каждому синониму соответствует некоторый issuer_id
    '''
    # Инициализируем автомата для Ахо-Корасик
    automaton = Automaton()

    # Добавляем слова в автомат
    for idx, word in enumerate(words):
        automaton.add_word(word, idx)

    # Строим автомат
    automaton.make_automaton()

    id_list = []
    word_list = []
    # Итерируемся по всем сообщениям
    for text in texts:
        # Мапим issuer_id к синониму, который встретился в сообщении
        ids_found_in_text = set()
        words_found_in_text = set()
        # Находим вхождения словарных слов в сообщения и сохраняем их
        for end_index, word_index in automaton.iter(text.lower()):
            if name_to_id[words[word_index]] not in ids_found_in_text:
                ids_found_in_text.add(name_to_id[words[word_index]])
                words_found_in_text.add(words[word_index])

        # Список issuer_id найденных в каждом сообщении
        id_list.append(list(ids_found_in_text))
        # Список самих синонимов, которые были найдены
        word_list.append(list(words_found_in_text))

    return id_list, word_list

## Автогенерация синонимов

In [None]:
def add_new_synonyms(synonyms):
    """Функция для добавляения новых синонимов по падежам и числам"""
    new_synonyms = np.full((synonyms.shape[0], 12), fill_value='' , dtype=object)
    # Падеж
    word_case = ['nomn', 'gent', 'datv', 'accs', 'ablt', 'loct']
    # Число
    word_num = ['sing', 'plur']
    for issuer_id in range(len(synonyms)):
        if (len(synonyms[issuer_id]) <= 2):
            whole_phrase = synonyms[issuer_id][0].split(sep=' ')
        else:
            whole_phrase = synonyms[issuer_id][2].split(sep=' ')
        # Будем склонять каждое слово во фразе
        for word in whole_phrase:
            phrase = morph.parse(word)[0]
            for i in range(12):
                try:
                    new_synonyms[issuer_id][i] += phrase.inflect({f'{word_num[(i < 6)]}', f'{word_case[i % 6]}'}).word + " "
                except AttributeError:
                    new_synonyms[issuer_id][i] += word + " "
    return new_synonyms

## Аугментация данных

In [None]:
def get_synonym(word):
  for i in synonyms:
    if word in i:
      return i[randint(0, len(i) - 1)]

def get_syn_len(word):
  for i in synonyms:
    if word in i:
      return len(i)
  return 0


def augmentate(df):
  """Расширяет датасет путём добавления новых примеров для классов с редкими лейблами"""
  new_df = df.copy()
  for index, row in oold_df.iterrows():
      aspect = row['aspect']
      label = row['label']
      for _ in range(
          min(weights[label], get_syn_len(aspect) - 5)
          ):
          new_row = row
          new_row['aspect'] = get_synonym(aspect)
          new_df.loc[len(new_df)] = new_row

  return new_df

## Классы для работы с моделью

#### Определение архитектуры модели

In [None]:
class TransformerClassificationModel(nn.Module):
    """Надстройка над предобученной LLM, добавление классификационной головы с несколькими линейными слоями"""

    def __init__(self, base_transformer_model: str, num_classes: int, num_dense_layers: int):
        super(TransformerClassificationModel, self).__init__()
        config = AutoConfig.from_pretrained(base_transformer_model)

        self.backbone = AutoModel.from_pretrained(base_transformer_model, config=config)

        layers = []
        input_size = self.backbone.config.hidden_size
        for _ in range(num_dense_layers - 1):
            layers.append(nn.Linear(input_size, input_size))
            layers.append(nn.ReLU())
        layers.append(nn.Linear(input_size, num_classes))
        self.classifier = nn.Sequential(*layers)

        self.softmax = nn.Softmax(dim=1)

    def forward(self, input_ids, attention_mask):
        outputs = self.backbone(input_ids=input_ids, attention_mask=attention_mask)
        pooled_output = outputs.pooler_output
        logits = self.classifier(pooled_output)
        probabilities = self.softmax(logits)
        return {'logits': logits, 'probabilities': probabilities, 'backbone outputs': outputs, }


def freeze_backbone_function(model: TransformerClassificationModel, freeze=True):
    for param in model.backbone.parameters():
        param.requires_grad = not freeze
    return model


def preprocess_data(tokenizer, texts, aspects):
    inputs = tokenizer([wrap(text, aspect) for text, aspect in zip(texts, aspects)],
                            max_length=256, truncation=True, padding=True, return_tensors="pt")
    return inputs

def evaluate(model, tokenizer, dataset, device):
    model.eval()
    test_texts, test_aspects = dataset["text"], dataset["aspect"]
    test_inputs = preprocess_data(tokenizer, test_texts, test_aspects)
    test_data = torch.utils.data.TensorDataset(test_inputs['input_ids'], test_inputs['attention_mask'])
    test_loader = DataLoader(test_data, batch_size=32, shuffle=False)

    predictions = []

    with torch.no_grad():
        for batch in test_loader:
            batch = tuple(t.to(device) for t in batch)
            input_ids, attention_mask = batch
            outputs = model(input_ids, attention_mask)
            logits = outputs['logits']
            preds = torch.argmax(logits, dim=1).cpu().numpy()
            predictions.extend(preds)

    return predictions

#### Класс для тренировки и вычисления предсказаний

In [None]:
class TransformerClassificationTrainer:
    """Класс, используемый для обучения и оценки модели классификации"""

    def __init__(self, model, tokenizer, max_length=MAX_LENGTH, batch_size=BATCH_SIZE, lr=LR,
                 num_epochs=NUM_EPOCHS, freeze_backbone=FREEZE_BACKBONE):
        self.model = freeze_backbone_function(model, freeze_backbone)
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.batch_size = batch_size
        self.lr = lr
        self.num_epochs = num_epochs

    def preprocess_data(self, texts, aspects, labels):
        #inputs = self.tokenizer(texts, padding=True, truncation=True, max_length=self.max_length, return_tensors="pt")
        inputs = self.tokenizer([wrap(text, aspect) for text, aspect in zip(texts, aspects)],
                                max_length=self.max_length, truncation=True, padding=True, return_tensors="pt")
        labels = torch.tensor(labels)
        return inputs, labels

    def train(self, train_texts, train_aspects, train_labels):
        train_inputs, train_labels = self.preprocess_data(train_texts, train_aspects, train_labels)
        train_data = torch.utils.data.TensorDataset(train_inputs['input_ids'], train_inputs['attention_mask'], train_labels)
        train_loader = DataLoader(train_data, batch_size=self.batch_size, shuffle=True)

        optimizer = AdamW(self.model.parameters(), lr=self.lr)
        loss_fn = nn.CrossEntropyLoss()

        self.model.to(device)
        self.model.train()

        for epoch in range(self.num_epochs):
            total_loss = 0
            for batch in train_loader:
                batch = tuple(t.to(device) for t in batch)
                input_ids, attention_mask, labels = batch

                optimizer.zero_grad()
                outputs = self.model(input_ids, attention_mask)
                logits = outputs['logits']
                loss = loss_fn(logits, labels)
                total_loss += loss.item()

                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
                optimizer.step()

            avg_train_loss = total_loss / len(train_loader)
            print(f"Epoch {epoch+1}/{self.num_epochs}, Train Loss: {avg_train_loss:.4f}")

        return self.model

    def evaluate(self, test_texts, test_aspects, test_labels):
        self.model.eval()
        test_inputs, test_labels = self.preprocess_data(test_texts, test_aspects, test_labels)
        test_data = torch.utils.data.TensorDataset(test_inputs['input_ids'], test_inputs['attention_mask'], test_labels)
        test_loader = DataLoader(test_data, batch_size=self.batch_size, shuffle=False)

        predictions = []
        true_labels = []

        with torch.no_grad():
            for batch in test_loader:
                batch = tuple(t.to(device) for t in batch)
                input_ids, attention_mask, labels = batch
                outputs = self.model(input_ids, attention_mask)
                logits = outputs['logits']
                preds = torch.argmax(logits, dim=1).cpu().numpy()
                predictions.extend(preds)
                true_labels.extend(labels.cpu().numpy())

        accuracy = accuracy_score(true_labels, predictions)
        print(f"Accuracy: {accuracy:.4f}")
        f1 = f1_score(true_labels, predictions, average='macro')
        print(f"F1 score: {f1:.4f}")

def freeze_backbone_function(model: TransformerClassificationModel, freeze=True):
    for param in model.backbone.parameters():
        param.requires_grad = not freeze
    return model