In [None]:
import torch
from torch.utils.data import Dataset

class NERDataset(Dataset):

    def __init__(self, texts, tags, tokenizer, tag2id, max_len=512, label_all_tokens=False):
        self.texts = texts  # Список токенизированных текстов (списки слов)
        self.tags = tags  # Список последовательностей тегов, соответствующих текстам
        self.tokenizer = tokenizer
        self.tag2id = tag2id  # Словарь, отображающий строки тегов в их ID
        self.max_len = max_len
        self.label_all_tokens = label_all_tokens  # Отмечать ли все подсловные токены метками

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        words = self.texts[idx]
        labels = self.tags[idx]

        encoding = self.tokenizer(
            words,
            is_split_into_words=True,
            return_offsets_mapping=True,
            padding='max_length',
            truncation=True,
            max_length=self.max_len,
            return_tensors='pt'
        )

        input_ids = encoding['input_ids'].squeeze()
        attention_mask = encoding['attenti  on_mask'].squeeze()
        offset_mapping = encoding['offset_mapping'].squeeze()

        # Получаем идентификаторы слов для каждого токена во входной последовательности
        word_ids = encoding.word_ids(batch_index=0)

        # Выравниваем метки с токенами
        aligned_labels = []
        previous_word_idx = None
        for word_idx in word_ids:
            if word_idx is None:
                # Специальные токены (например, [CLS], [SEP]) имеют word_id равный None
                aligned_labels.append(-100)
            elif word_idx != previous_word_idx:
                # Метка для первого токена слова
                aligned_labels.append(self.tag2id[labels[word_idx]])
            else:
                # Для подсловных токенов (частей слова после разбиения)
                if self.label_all_tokens:
                    aligned_labels.append(self.tag2id[labels[word_idx]])
                else:
                    # Игнорируем метки для остальных частей слова
                    aligned_labels.append(-100)
            previous_word_idx = word_idx

        labels = torch.tensor(aligned_labels, dtype=torch.long)

        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'labels': labels
        }

In [None]:
import torch
from torch.utils.data import DataLoader
from transformers import BertTokenizer, BertForTokenClassification, AdamW, get_linear_schedule_with_warmup

class BertNERTrainer:

    def __init__(self, model_path, tokenizer_path, tag2id, epochs=1, model_save_path='bert_ner.pt'):
        self.model = BertForTokenClassification.from_pretrained(
            model_path,
            num_labels=len(tag2id)
        )
        self.tokenizer = BertTokenizer.from_pretrained(tokenizer_path)
        self.device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
        self.model_save_path = model_save_path
        self.max_len = 512
        self.epochs = epochs
        self.tag2id = tag2id
        self.model.to(self.device)

    def preparation(self, X_train, y_train, X_valid, y_valid, label_all_tokens=False):
        # Создаем датасеты
        self.train_set = NERDataset(X_train, y_train, self.tokenizer, self.tag2id, self.max_len, label_all_tokens)
        self.valid_set = NERDataset(X_valid, y_valid, self.tokenizer, self.tag2id, self.max_len, label_all_tokens)

        # Создаем загрузчики данных
        self.train_loader = DataLoader(self.train_set, batch_size=8, shuffle=True)
        self.valid_loader = DataLoader(self.valid_set, batch_size=8, shuffle=False)

        # Инициализация оптимизатора и шедулера
        self.optimizer = AdamW(self.model.parameters(), lr=2e-5, correct_bias=False)
        self.scheduler = get_linear_schedule_with_warmup(
            self.optimizer,
            num_warmup_steps=0,
            num_training_steps=len(self.train_loader) * self.epochs
        )
        self.loss_fn = torch.nn.CrossEntropyLoss().to(self.device)
        
        
    def fit(self):
    self.model.train()
    losses = []
    correct_predictions = 0
    total_predictions = 0

    for data in self.train_loader:
        input_ids = data["input_ids"].to(self.device)
        attention_mask = data["attention_mask"].to(self.device)
        targets = data["targets"].to(self.device)

        # Обнуляем градиенты оптимизатора
        self.optimizer.zero_grad()

        # Прямой проход модели с передачей меток (targets)
        outputs = self.model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=targets
        )
        
        loss = outputs.loss  # Потери, вычисленные моделью
        logits = outputs.logits  # Логиты размерностью [batch_size, seq_len, num_labels]

        # Получаем предсказания, выбирая метку с наибольшей вероятностью для каждого токена
        preds = torch.argmax(logits, dim=2)  # Размерность [batch_size, seq_len]

        # Сглаживаем тензоры для удобства вычислений
        preds_flat = preds.view(-1)
        targets_flat = targets.view(-1)

        # Создаем маску для игнорирования паддингов и специальных токенов с меткой -100
        active_indices = targets_flat != -100

        # Применяем маску к предсказаниям и целевым меткам
        active_preds = preds_flat[active_indices]
        active_targets = targets_flat[active_indices]

        # Вычисляем количество правильных предсказаний и общее количество предсказаний
        correct_predictions += torch.sum(active_preds == active_targets).item()
        total_predictions += active_targets.shape[0]

        # Добавляем текущую потерю в список потерь
        losses.append(loss.item())

        # Обратный проход и обновление весов
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.model.parameters(), max_norm=1.0)
        self.optimizer.step()
        self.scheduler.step()

    # Вычисляем среднюю точность и потерю за эпоху
    train_acc = correct_predictions / total_predictions
    train_loss = np.mean(losses)
    return train_acc, train_loss



    from seqeval.metrics import classification_report, f1_score

    def eval(self):
        self.model.eval()
        losses = []
        true_labels = []
        pred_labels = []

        with torch.no_grad():
            for data in self.valid_loader:
                input_ids = data["input_ids"].to(self.device)
                attention_mask = data["attention_mask"].to(self.device)
                targets = data["targets"].to(self.device)

                # Прямой проход модели с передачей меток (targets)
                outputs = self.model(
                    input_ids=input_ids,
                    attention_mask=attention_mask,
                    labels=targets
                )

                loss = outputs.loss  # Потери, вычисленные моделью
                logits = outputs.logits  # Логиты размерностью [batch_size, seq_len, num_labels]

                # Получаем предсказания, выбирая метку с наибольшей вероятностью для каждого токена
                preds = torch.argmax(logits, dim=2)  # Размерность [batch_size, seq_len]

                # Переводим тензоры на CPU и в numpy для последующей обработки
                preds = preds.detach().cpu().numpy()
                targets = targets.detach().cpu().numpy()

                # Добавляем текущую потерю в список потерь
                losses.append(loss.item())

                # Конвертируем предсказания и целевые метки в списки токенов
                for i in range(len(targets)):
                    pred_tags = []
                    true_tags = []
                    for j in range(len(targets[i])):
                        if targets[i][j] != -100:
                            pred_tags.append(self.id2tag[preds[i][j]])
                            true_tags.append(self.id2tag[targets[i][j]])
                    pred_labels.append(pred_tags)
                    true_labels.append(true_tags)

        avg_loss = np.mean(losses)

        # Вычисляем F1-score и выводим отчет классификации
        f1 = f1_score(true_labels, pred_labels)
        report = classification_report(true_labels, pred_labels)
        print("Validation Loss: {:.4f}".format(avg_loss))
        print("Validation F1-Score: {:.4f}".format(f1))
        print(report)

        return f1, avg_loss
    
    
    def train(self):
    best_f1 = 0.0
    for epoch in range(self.epochs):
        print(f'Epoch {epoch + 1}/{self.epochs}')
        
        train_f1, train_loss = self.fit()
        print(f'Train Loss: {train_loss:.4f} F1-Score: {train_f1:.4f}')

        val_f1, val_loss = self.eval()
        print(f'Validation Loss: {val_loss:.4f} F1-Score: {val_f1:.4f}')
        print('-' * 50)

        if val_f1 > best_f1:
            torch.save(self.model.state_dict(), self.model_save_path)
            best_f1 = val_f1

    # Загрузка наилучшей модели после обучения
    self.model.load_state_dict(torch.load(self.model_save_path))
    self.model.to(self.device)
    
    def predict(self, text):
    # Шаг 1: Токенизация входного текста
    encoding = self.tokenizer.encode_plus(
        text,
        add_special_tokens=True,
        max_length=self.max_len,
        return_token_type_ids=False,
        truncation=True,
        padding='max_length',
        return_attention_mask=True,
        return_tensors='pt',
    )

    input_ids = encoding['input_ids'].to(self.device)
    attention_mask = encoding['attention_mask'].to(self.device)

    # Шаг 2: Получение предсказаний модели
    with torch.no_grad():
        outputs = self.model(
            input_ids=input_ids,
            attention_mask=attention_mask
        )

    # Шаг 3: Преобразование логитов в предсказанные метки
    logits = outputs.logits  # или outputs[0], в зависимости от модели
    predictions = torch.argmax(logits, dim=2)

    # Перенос на CPU и преобразование в numpy
    predictions = predictions.cpu().numpy()[0]
    input_ids = input_ids.cpu().numpy()[0]

    # Шаг 4: Преобразование идентификаторов токенов в сами токены
    tokens = self.tokenizer.convert_ids_to_tokens(input_ids)

    # Шаг 5: Преобразование идентификаторов меток в названия меток
    predicted_labels = [self.id2tag[pred] for pred in predictions]

    # Шаг 6: Обработка токенов и меток, исключая специальные токены
    final_tokens = []
    final_labels = []

    for token, label in zip(tokens, predicted_labels):
        if token not in self.tokenizer.all_special_tokens:
            final_tokens.append(token)
            final_labels.append(label)

    # Шаг 7: Возвращение токенов и соответствующих им предсказанных меток
    return list(zip(final_tokens, final_labels))

In [None]:
# Предполагается, что вы уже обучили модель и у вас есть объект trainer
# trainer = Trainer(...)

# Получаем предсказания на валидационном наборе
predictions, labels, _ = trainer.predict(valid_dataset)

# Преобразуем логиты в предсказанные метки
predictions = np.argmax(predictions, axis=2)

# Список меток в вашей задаче NER
label_list = [...]  # Замените на ваш список меток

In [None]:
from seqeval.metrics import classification_report, f1_score, accuracy_score

# Генерируем отчет по метрикам
report = classification_report(true_labels, true_predictions)
print(report)

# Вычисляем F1-score
f1 = f1_score(true_labels, true_predictions)
print(f"F1-score: {f1:.4f}")

# Вычисляем точность
accuracy = accuracy_score(true_labels, true_predictions)
print(f"Accuracy: {accuracy:.4f}")

In [None]:
import pandas as pd

# Собираем данные для сохранения
data = []
for i, (tokens, preds, labels) in enumerate(zip(X_valid, true_predictions, true_labels)):
    for token, pred, label in zip(tokens, preds, labels):
        data.append({
            'sentence_id': i,
            'token': token,
            'true_label': label,
            'predicted_label': pred
        })

# Создаем DataFrame и сохраняем в CSV
df = pd.DataFrame(data)
df.to_csv(f"ner_predictions_fold_{i}.csv", index=False)