In [None]:
from transformers import AutoTokenizer, AutoModel
from sklearn.cluster import KMeans
from sklearn.metrics.pairwise import pairwise_distances
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
import torch
from tqdm import tqdm
import datasets
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report

# Раскоментить если вызапускаете код впервые: СЧИТАЕТ ЧАСА 2!!!

# Загружаем дата-сет банкинг-77
dataset = datasets.load_dataset('banking77')

# Загружаем токенайзер и берт
tokenizer = AutoTokenizer.from_pretrained('bert-base-uncased')
model = AutoModel.from_pretrained('bert-base-uncased')

# Токенизируем текст в данных (энкодим)
train_encodings = tokenizer(dataset['train']['text'], truncation=True, padding=True)
validation_encodings = tokenizer(dataset['test']['text'], truncation=True, padding=True)

# Создаем эмбединги для тренировочной части данных
train_embeddings = []
for i in tqdm(range(len(dataset['train']['text']))):
    input_ids = torch.tensor(train_encodings['input_ids'][i]).unsqueeze(0)
    attention_mask = torch.tensor(train_encodings['attention_mask'][i]).unsqueeze(0)
    with torch.no_grad():
        output = model(input_ids, attention_mask=attention_mask)
    embeddings = output.last_hidden_state.mean(dim=1).squeeze().numpy()
    train_embeddings.append(embeddings)
train_embeddings = np.array(train_embeddings)

# То же самое, но для валидационной части
validation_embeddings = []
for i in tqdm(range(len(dataset['test']['text']))):
    input_ids = torch.tensor(validation_encodings['input_ids'][i]).unsqueeze(0)
    attention_mask = torch.tensor(validation_encodings['attention_mask'][i]).unsqueeze(0)
    with torch.no_grad():
        output = model(input_ids, attention_mask=attention_mask)
    embeddings = output.last_hidden_state.mean(dim=1).squeeze().numpy()
    validation_embeddings.append(embeddings)
validation_embeddings = np.array(validation_embeddings)

In [None]:
train_labels = dataset['train']['label']
# Группируем обучающие вложения(дальше эмбединги) с помощью K-средних (кнн)
num_classes = len(np.unique(train_labels))
kmeans = KMeans(n_clusters=num_classes, random_state=42)
kmeans.fit(train_embeddings)
train_distances = pairwise_distances(train_embeddings, kmeans.cluster_centers_)
threshold = train_distances.max(axis=1).mean()

# Создаем сет валидации для нахождения OOD (out of domain)
validation_distances = pairwise_distances(validation_embeddings, kmeans.cluster_centers_)

In [None]:
# Определяем тренировочные данные для out-of-domain классификатора
ood_train_features = np.concatenate([train_embeddings[train_distances.max(axis=1) > threshold], train_embeddings], axis=0)
ood_train_labels = ['in_domain'] * len(train_embeddings[train_distances.max(axis=1) > threshold]) + ['out_of_domain'] * len(train_embeddings)

# Подставляем в наш классификатор данные (в нашем случае в Логистическую Регрессию) i.e.(обучаем классификатор)
ood_classifier = LogisticRegression(random_state=42)
ood_classifier.fit(ood_train_features, ood_train_labels)

In [None]:
# Размечаем данные сета валидации используя OOD детекшн (находим грубо говоря, что лежит очень далеко от центра и кидаем бан в виде лейбла out_of_domain)
validation_labels_pred = []
for i in range(len(validation_embeddings)):
    if validation_distances[i].min() > threshold:
        validation_labels_pred.append('out_of_domain')
    else:
        validation_logits = model(torch.tensor(validation_encodings['input_ids'][i]).unsqueeze(0), attention_mask=torch.tensor(validation_encodings['attention_mask'][i]).unsqueeze(0))[0]
        if ood_classifier.predict_proba(validation_embeddings[i].reshape(1, -1))[0][1] > 0.5:
            validation_labels_pred.append('out_of_domain')
        else:
            validation_labels_pred.append(dataset['test'][i]['label'])

In [None]:
validation_labels_pred = [-1 if x == 'out_of_domain' else x for x in validation_labels_pred]

In [None]:
# Смотрим перфоманс обученной на этот момент модели
print("Accuracy with out-of-domain detection:", accuracy_score(dataset['test']['label'], validation_labels_pred))
print("F1 score with out-of-domain detection:", f1_score(dataset['test']['label'], validation_labels_pred, average='weighted'))

# Вычисляем расстояния между обучающими вложениями и центроидами кластеров
train_distances = pairwise_distances(train_embeddings, kmeans.cluster_centers_)

# Еще раз определяем трейнинг сет чтобы дальше его допилить с ин домэйн сэмплами
ood_train_features = train_embeddings[train_distances.max(axis=1) > threshold]
ood_train_labels = ['out_of_domain'] * len(ood_train_features)

In [None]:
# Добавляем немного in-domain экземпляров в ood_train_features и ood_train_labels чтобы создать сбалансированный тренировочный сет
num_in_domain_samples = min(len(train_embeddings), len(ood_train_features))
idx = np.random.choice(len(train_embeddings), num_in_domain_samples, replace=False)
in_domain_samples = train_embeddings[idx]
ood_train_features = np.concatenate([ood_train_features, in_domain_samples])
ood_train_labels = np.concatenate([ood_train_labels, ['in_domain'] * num_in_domain_samples])

# Еще раз обучаем на новых данных, смотрим где результаты лучше(в тот или этот раз)
ood_classifier = LogisticRegression(random_state=42)
ood_classifier.fit(ood_train_features, ood_train_labels)

# Тестируем out-of-domain классификатор
ood_test_features = np.concatenate([train_embeddings[train_distances.max(axis=1) <= threshold], validation_embeddings])
ood_test_labels = ['in_domain'] * len(train_embeddings[train_distances.max(axis=1) <= threshold]) + ['out_of_domain'] * len(validation_embeddings)
ood_test_labels_pred = ood_classifier.predict(ood_test_features)
print("Classification report for out-of-domain detection:")
print(classification_report(ood_test_labels, ood_test_labels_pred))

# Получаем новый ф1 и аккураси скор (и немного других метрик)

In [None]:
# Сохраняем модель
model_path = "/path/to/model.pt"
torch.save(model.state_dict(), model_path)

In [None]:
# Загружаем модель
model_path = "/path/to/model.pt"
state_dict = torch.load(model_path)

In [None]:
# # Прикручиваем логрег к нашему берту
class SentenceClassifier:
    def __init__(self, model_path, tokenizer_path, ood_model_path):
        # Грузим берт и токинайзер
        self.tokenizer = BertTokenizer.from_pretrained(tokenizer_path)
        self.model = BertForSequenceClassification.from_pretrained(model_path)

        # Грузим нашу ООД классификатор - линрег
        self.ood_model = torch.load(ood_model_path)

    def predict(self, sentence):
        # Токенизируем полученный вход
        inputs = self.tokenizer(sentence, return_tensors="pt")

        # пропускаем через берта (нашего нафайнтьюненного) наше входное предложение
        outputs = self.model(**inputs)

        # Получаем предсказаный лейбл для нашего предложения и его вероятность отношения к ин домейн или наоборот оод
        predicted_label = torch.argmax(outputs[0]).item()
        in_domain_score = torch.softmax(outputs[0], dim=1)[0][predicted_label].item()

        # пропускаем через оод классификатор
        ood_label = self.ood_model.predict(sentence)

        # ну и ретерним результат
        return {"in_domain_label": predicted_label, 
                "in_domain_score": in_domain_score, 
                "ood_label": ood_label} 

In [None]:
# указываем путь от нашей заранее сохраненной модели (в нашем случае путь до нафайнтьюненного берта) и загружаем ее
model_path = "path/to/fine-tuned/bert/model"
# тоукнайзер у нас стандартный
tokenizer_path = "bert-base-uncased"
# указываем путь от нашей заранее сохраненной модели (теперь от лин-регрессии) и загружаем ее
ood_model_path = "path/to/ood_classifier/model"
# созаем экземпляр класса и передаем данные о наших моделях
classifier = SentenceClassifier(model_path, tokenizer_path, ood_model_path)

In [None]:
# Наше предложение на вход
sentence = "I want to open a checking account"
# отдельная переменная для предсказания о экземпляре класса given the input
prediction = classifier.predict(sentence)
print(prediction)